[t:4778] upgrade subtree estimates to stat64info using real code. fix #4778

refactor existing node deserialization code so that upgrading a header can
call deserialization code on the root node and get stat64info instead of
deserializing it manually.


git-svn-id: file:///svn/toku/tokudb@43459 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Leif Walsh 2013-04-17 00:00:29 -04:00 committed by Yoni Fogel
parent ee6094ccce
commit 676654a13f

View file

@ -1684,7 +1684,8 @@ deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnode,
static int static int
deserialize_and_upgrade_internal_node(BRTNODE node, deserialize_and_upgrade_internal_node(BRTNODE node,
struct rbuf *rb, struct rbuf *rb,
struct brtnode_fetch_extra* bfe) struct brtnode_fetch_extra* bfe,
STAT64INFO info)
{ {
int r = 0; int r = 0;
int version = node->layout_version_read_from_disk; int version = node->layout_version_read_from_disk;
@ -1700,10 +1701,17 @@ deserialize_and_upgrade_internal_node(BRTNODE node,
if (version == BRT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) { if (version == BRT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
(void) rbuf_int(rb); // 8. fingerprint (void) rbuf_int(rb); // 8. fingerprint
} }
(void) rbuf_ulonglong(rb); // 9. nkeys (ulonglong) uint64_t nkeys = rbuf_ulonglong(rb);
(void) rbuf_ulonglong(rb); // 10. ndata (ulonglong) uint64_t ndata = rbuf_ulonglong(rb);
(void) rbuf_ulonglong(rb); // 11. dsize (ulonglong) uint64_t dsize = rbuf_ulonglong(rb);
(void) rbuf_char(rb); // 12. exact (char) (void) rbuf_char(rb); // 12. exact (char)
invariant(nkeys == ndata);
if (info) {
// info is non-null if we're trying to upgrade old subtree
// estimates to stat64info
info->numrows += nkeys;
info->numbytes += dsize;
}
} }
node->childkeys = NULL; node->childkeys = NULL;
@ -1897,16 +1905,24 @@ deserialize_and_upgrade_internal_node(BRTNODE node,
static int static int
deserialize_and_upgrade_leaf_node(BRTNODE node, deserialize_and_upgrade_leaf_node(BRTNODE node,
struct rbuf *rb, struct rbuf *rb,
struct brtnode_fetch_extra* bfe) struct brtnode_fetch_extra* bfe,
STAT64INFO info)
{ {
int r = 0; int r = 0;
int version = node->layout_version_read_from_disk; int version = node->layout_version_read_from_disk;
// This is a leaf node, so the offsets in the buffer will be // This is a leaf node, so the offsets in the buffer will be
// different from the internal node offsets above. // different from the internal node offsets above.
(void) rbuf_ulonglong(rb); // 6. nkeys uint64_t nkeys = rbuf_ulonglong(rb); // 6. nkeys
(void) rbuf_ulonglong(rb); // 7. ndata uint64_t ndata = rbuf_ulonglong(rb); // 7. ndata
(void) rbuf_ulonglong(rb); // 8. dsize uint64_t dsize = rbuf_ulonglong(rb); // 8. dsize
invariant(nkeys == ndata);
if (info) {
// info is non-null if we're trying to upgrade old subtree
// estimates to stat64info
info->numrows += nkeys;
info->numbytes += dsize;
}
if (version == BRT_LAYOUT_VERSION_14) { if (version == BRT_LAYOUT_VERSION_14) {
(void) rbuf_int(rb); // 9. optimized_for_upgrade (void) rbuf_int(rb); // 9. optimized_for_upgrade
@ -2078,6 +2094,7 @@ deserialize_and_upgrade_brtnode(BRTNODE node,
BRTNODE_DISK_DATA* ndd, BRTNODE_DISK_DATA* ndd,
BLOCKNUM blocknum, BLOCKNUM blocknum,
struct brtnode_fetch_extra* bfe, struct brtnode_fetch_extra* bfe,
STAT64INFO info,
int fd) int fd)
{ {
int r = 0; int r = 0;
@ -2135,9 +2152,9 @@ deserialize_and_upgrade_brtnode(BRTNODE node,
// Check height to determine whether this is a leaf node or not. // Check height to determine whether this is a leaf node or not.
if (node->height > 0) { if (node->height > 0) {
r = deserialize_and_upgrade_internal_node(node, &rb, bfe); r = deserialize_and_upgrade_internal_node(node, &rb, bfe, info);
} else { } else {
r = deserialize_and_upgrade_leaf_node(node, &rb, bfe); r = deserialize_and_upgrade_leaf_node(node, &rb, bfe, info);
} }
*ndd = toku_xmalloc(node->n_children*sizeof(**ndd)); *ndd = toku_xmalloc(node->n_children*sizeof(**ndd));
@ -2160,6 +2177,7 @@ deserialize_brtnode_from_rbuf(
BLOCKNUM blocknum, BLOCKNUM blocknum,
u_int32_t fullhash, u_int32_t fullhash,
struct brtnode_fetch_extra* bfe, struct brtnode_fetch_extra* bfe,
STAT64INFO info,
struct rbuf *rb, struct rbuf *rb,
int fd int fd
) )
@ -2192,7 +2210,7 @@ deserialize_brtnode_from_rbuf(
// Check if we are reading in an older node version. // Check if we are reading in an older node version.
if (version <= BRT_LAYOUT_VERSION_14) { if (version <= BRT_LAYOUT_VERSION_14) {
// Perform the upgrade. // Perform the upgrade.
r = deserialize_and_upgrade_brtnode(node, ndd, blocknum, bfe, fd); r = deserialize_and_upgrade_brtnode(node, ndd, blocknum, bfe, info, fd);
if (r != 0) { if (r != 0) {
goto cleanup; goto cleanup;
} }
@ -2407,6 +2425,32 @@ toku_deserialize_bp_from_compressed(BRTNODE node, int childnum,
return e; return e;
} }
static enum deserialize_error_code
deserialize_brtnode_from_fd(int fd,
BLOCKNUM blocknum,
u_int32_t fullhash,
BRTNODE *brtnode,
BRTNODE_DISK_DATA *ndd,
struct brtnode_fetch_extra *bfe,
STAT64INFO info)
{
enum deserialize_error_code e;
struct rbuf rb = RBUF_INITIALIZER;
int r = 0;
r = read_block_from_fd_into_rbuf(fd, blocknum, bfe->h, &rb);
if (r != 0) {
e = DS_ERRNO;
goto cleanup;
} // if we were successful, then we are done.
e = deserialize_brtnode_from_rbuf(brtnode, ndd, blocknum, fullhash, bfe, info, &rb, fd);
if (e != DS_OK) {
dump_bad_block(rb.buf,rb.size);
}
cleanup:
toku_free(rb.buf);
return e;
}
// Read brt node from file into struct. Perform version upgrade if necessary. // Read brt node from file into struct. Perform version upgrade if necessary.
enum deserialize_error_code enum deserialize_error_code
@ -2415,7 +2459,8 @@ toku_deserialize_brtnode_from (int fd,
u_int32_t fullhash, u_int32_t fullhash,
BRTNODE *brtnode, BRTNODE *brtnode,
BRTNODE_DISK_DATA* ndd, BRTNODE_DISK_DATA* ndd,
struct brtnode_fetch_extra* bfe) struct brtnode_fetch_extra* bfe
)
// Effect: Read a node in. If possible, read just the header. // Effect: Read a node in. If possible, read just the header.
{ {
toku_trace("deserial start"); toku_trace("deserial start");
@ -2424,27 +2469,12 @@ toku_deserialize_brtnode_from (int fd,
read_brtnode_header_from_fd_into_rbuf_if_small_enough(fd, blocknum, bfe->h, &rb); read_brtnode_header_from_fd_into_rbuf_if_small_enough(fd, blocknum, bfe->h, &rb);
e = deserialize_brtnode_header_from_rbuf_if_small_enough(brtnode, ndd, blocknum, fullhash, bfe, &rb, fd); e = deserialize_brtnode_header_from_rbuf_if_small_enough(brtnode, ndd, blocknum, fullhash, bfe, &rb, fd);
if (e != DS_OK) { //<CER> ??? Change this to != DS_OK? if (e != DS_OK) {
e = DS_OK;
toku_free(rb.buf);
rb = RBUF_INITIALIZER;
// Something went wrong, go back to doing it the old way. // Something went wrong, go back to doing it the old way.
int r = 0; e = deserialize_brtnode_from_fd(fd, blocknum, fullhash, brtnode, ndd, bfe, NULL);
r = read_block_from_fd_into_rbuf(fd, blocknum, bfe->h, &rb);
if (r != 0) {
e = DS_ERRNO;
goto cleanup;
} // if we were successful, then we are done.
e = deserialize_brtnode_from_rbuf(brtnode, ndd, blocknum, fullhash, bfe, &rb, fd);
if (e != DS_OK) {
dump_bad_block(rb.buf,rb.size);
}
} }
toku_trace("deserial done"); toku_trace("deserial done");
cleanup:
toku_free(rb.buf); toku_free(rb.buf);
return e; return e;
} }
@ -2744,122 +2774,26 @@ exit:
} }
static enum deserialize_error_code static enum deserialize_error_code
upgrade_subtree_estimates_to_stat64info(int UU(fd), struct brt_header *h) upgrade_subtree_estimates_to_stat64info(int fd, struct brt_header *h)
{ {
int r; enum deserialize_error_code e;
enum deserialize_error_code e = DS_OK;
// 15 was the last version with subtree estimates // 15 was the last version with subtree estimates
invariant(h->layout_version_read_from_disk <= BRT_LAYOUT_VERSION_15); invariant(h->layout_version_read_from_disk <= BRT_LAYOUT_VERSION_15);
BLOCKNUM b = h->root_blocknum;
struct rbuf rb_s; BRTNODE unused_node = NULL;
struct rbuf *rb = &rb_s; BRTNODE_DISK_DATA unused_ndd = NULL;
rbuf_init(rb, NULL, 0); struct brtnode_fetch_extra bfe;
DISKOFF offset, size; fill_bfe_for_min_read(&bfe, h);
toku_translate_blocknum_to_offset_size(h->blocktable, b, &offset, &size); e = deserialize_brtnode_from_fd(fd, h->root_blocknum, 0, &unused_node, &unused_ndd,
{ &bfe, &h->on_disk_stats);
u_int8_t *XMALLOC_N(size, raw_block);
{
ssize_t rlen = pread(fd, raw_block, size, offset);
lazy_assert((DISKOFF)rlen == size);
}
{
// root node must be a leaf or nonleaf node
u_int8_t *magic = raw_block + uncompressed_magic_offset;
invariant(memcmp(magic, "tokuleaf", 8) == 0 || memcmp(magic, "tokunode", 8) == 0);
// root node cannot have a different version from the header, if
// the header needs to read its subtree estimates
u_int8_t *version = raw_block + uncompressed_version_offset;
int layout_version = toku_dtoh32(*(uint32_t*)version);
invariant(layout_version == h->layout_version_read_from_disk);
}
{
int n_sub_blocks = toku_dtoh32(*(u_int32_t*)&raw_block[node_header_overhead]);
invariant(0 <= n_sub_blocks && n_sub_blocks <= max_sub_blocks);
{
u_int32_t header_length = node_header_overhead + sub_block_header_size(n_sub_blocks);
invariant(header_length <= size);
u_int32_t xsum = x1764_memory(raw_block, header_length);
u_int32_t stored_xsum = toku_dtoh32(*(u_int32_t *)(raw_block + header_length));
if (xsum != stored_xsum) {
e = DS_XSUM_FAIL;
goto exit;
}
}
struct sub_block sub_block[n_sub_blocks];
u_int32_t *sub_block_header = (u_int32_t *) &raw_block[node_header_overhead + 4];
size_t uncompressed_size = 0;
for (int i = 0; i < n_sub_blocks; ++i) {
sub_block_init(&sub_block[i]);
int64_t csize = toku_dtoh32(sub_block_header[0]);
int64_t usize = toku_dtoh32(sub_block_header[1]);
invariant(0 <= csize && csize < (1<<30));
invariant(0 <= usize && usize < (1<<30));
sub_block[i].compressed_size = csize;
sub_block[i].uncompressed_size = usize;
sub_block[i].xsum = toku_dtoh32(sub_block_header[2]);
uncompressed_size += sub_block[i].uncompressed_size;
sub_block_header += 3;
}
unsigned char *buf = toku_xmalloc(node_header_overhead + uncompressed_size);
resource_assert(buf);
rbuf_init(rb, buf, node_header_overhead + uncompressed_size);
memcpy(rb->buf, raw_block, node_header_overhead);
unsigned char *compressed_data = raw_block + node_header_overhead + sub_block_header_size(n_sub_blocks) + sizeof(u_int32_t);
unsigned char *uncompressed_data = rb->buf + node_header_overhead;
r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores, brt_pool);
if (r != 0) {
fprintf(stderr, "%s:%d block %"PRId64" failed %d at %p size %"PRId64"\n", __FUNCTION__, __LINE__, b.b, r, raw_block, size);
dump_bad_block(raw_block, size);
fprintf(stderr, "Unknown failure while reading node in file %s.\n", toku_cachefile_fname_in_env(h->cf));
}
lazy_assert_zero(r);
rb->ndone = 0;
}
toku_free(raw_block);
}
resource_assert(rb->buf);
bytevec magic;
rbuf_literal_bytes(rb, &magic, 8);
int node_version = rbuf_int(rb);
invariant(node_version == h->layout_version_read_from_disk);
(void) rbuf_int(rb); // layout_version_original
(void) rbuf_int(rb); // build_id
(void) rbuf_int(rb); // nodesize
(void) rbuf_int(rb); // flags
int height = rbuf_int(rb);
if (node_version <= BRT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
(void) rbuf_int(rb); // rand4fingerprint
(void) rbuf_int(rb); // localfingerprint
(void) rbuf_int(rb); // another fingerprint (according to deserialize_brtnode_nonleaf_from_rbuf in 5.0.8)
}
h->on_disk_stats = ZEROSTATS;
if (height > 0) {
invariant(memcmp(magic, "tokunode", 8) == 0);
int n_children = rbuf_int(rb);
for (int i = 0; i < n_children; ++i) {
if (node_version <= BRT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
(void) rbuf_int(rb); // child fingerprint
}
u_int64_t nkeys = rbuf_ulonglong(rb);
u_int64_t ndata = rbuf_ulonglong(rb);
invariant(nkeys == ndata);
h->on_disk_stats.numrows += nkeys;
h->on_disk_stats.numbytes += rbuf_ulonglong(rb);
(void) rbuf_char(rb); // exact
}
} else {
invariant(memcmp(magic, "tokuleaf", 8) == 0);
u_int64_t nkeys = rbuf_ulonglong(rb);
u_int64_t ndata = rbuf_ulonglong(rb);
invariant(nkeys == ndata);
h->on_disk_stats.numrows += nkeys;
h->on_disk_stats.numbytes += rbuf_ulonglong(rb);
}
h->in_memory_stats = h->on_disk_stats; h->in_memory_stats = h->on_disk_stats;
// done, discard the rest if (unused_node) {
toku_free(rb->buf); toku_brtnode_free(&unused_node);
exit: }
if (unused_ndd) {
toku_free(unused_ndd);
}
return e; return e;
} }