diff --git a/newbrt/brt.c b/newbrt/brt.c index aa982a61a37..356b381d83b 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -353,6 +353,7 @@ int toku_unpin_brtnode (BRT brt, BRTNODE node) { // //if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn); // } VERIFY_NODE(brt,node); + if (node->height>0) { int i; for (i=0; i+1u.n.n_children; i++) assert(node->u.n.childkeys[i]); } return toku_cachetable_unpin(brt->cf, node->thisnodename, node->fullhash, node->dirty, brtnode_memory_size(node)); } @@ -1870,25 +1871,31 @@ maybe_merge_pinned_nonleaf_nodes (BRT t, BOOL *did_merge, struct kv_pair **splitk) { + assert(parent_splitk); int old_n_children = a->u.n.n_children; int new_n_children = old_n_children + b->u.n.n_children; XREALLOC_N(new_n_children, a->u.n.childinfos); memcpy(a->u.n.childinfos + old_n_children, b->u.n.childinfos, - b->u.n.n_children); + b->u.n.n_children*sizeof(b->u.n.childinfos[0])); XREALLOC_N(new_n_children-1, a->u.n.childkeys); a->u.n.childkeys[old_n_children-1] = parent_splitk; memcpy(a->u.n.childkeys + old_n_children, b->u.n.childkeys, - b->u.n.n_children-1); + (b->u.n.n_children-1)*sizeof(b->u.n.childkeys[0])); a->u.n.totalchildkeylens += b->u.n.totalchildkeylens; a->u.n.n_bytes_in_buffers += b->u.n.n_bytes_in_buffers; a->u.n.n_children = new_n_children; + + b->u.n.totalchildkeylens = 0; + b->u.n.n_children = 0; + b->u.n.n_bytes_in_buffers = 0; + fixup_child_fingerprint(parent, childnum_of_parent, a, t, logger); - abort(); // must deallocate b. - abort(); // don't forget to reuse blocknums +// abort(); // don't forget to reuse blocknums *did_merge = TRUE; *splitk = NULL; + { int i; for (i=0; i+1u.n.n_children; i++) assert(a->u.n.childkeys[i]); } return 0; } @@ -1902,11 +1909,22 @@ maybe_merge_pinned_nodes (BRT t, // For leaf nodes, we distribute the leafentries evenly. // For nonleaf nodes, we distribute the children evenly. That may leave one or both of the nodes overfull, but that's OK. // If we distribute, we set *splitk to a malloced pivot key. +// Parameters: +// t The BRT. +// parent The parent of the two nodes to be split. +// childnum_of_parent Which child of the parent is a? (b is the next child.) +// parent_splitk The pivot key between a and b. This is either free()'d or returned in *splitk. +// a The first node to merge. +// b The second node to merge. +// logger The logger. +// did_merge (OUT): Did the two nodes actually get merged? +// splitk (OUT): If the two nodes did not get merged, the new pivot key between the two nodes. { assert(a->height == b->height); - if (a->height == 0) + if (a->height == 0) { + toku_free(parent_splitk); // We don't need the parent_splitk any more. If we need a splitk, we'll malloc a new one. return maybe_merge_pinned_leaf_nodes(t, a, b, logger, did_merge, splitk); - else { + } else { return maybe_merge_pinned_nonleaf_nodes(t, parent, childnum_of_parent, parent_splitk, a, b, logger, did_merge, splitk); } } @@ -1974,17 +1992,24 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL } int r; + BOOL did_merge; { - BOOL did_merge; struct kv_pair *splitk_kvpair = 0; + struct kv_pair *old_split_key = node->u.n.childkeys[childnuma]; + unsigned int deleted_size = toku_brt_pivot_key_len(t, old_split_key); + printf("%s:%d Maybe merging pinned nodes\n", __FILE__, __LINE__); + if (childa->height>0) { int i; for (i=0; i+1u.n.n_children; i++) assert(childa->u.n.childkeys[i]); } + { int i; for (i=0; i+1u.n.n_children; i++) assert(node->u.n.childkeys[i]); } r = maybe_merge_pinned_nodes(t, node, childnuma, node->u.n.childkeys[childnuma], childa, childb, logger, &did_merge, &splitk_kvpair); + if (childa->height>0) { int i; for (i=0; i+1u.n.n_children; i++) assert(childa->u.n.childkeys[i]); } + //(toku_verify_counts(childa), toku_verify_estimates(t,childa)); + if (did_merge) assert(!splitk_kvpair); else assert(splitk_kvpair); if (r!=0) goto return_r; + + node->u.n.totalchildkeylens -= deleted_size; // The key was free()'d inside the maybe_merge_pinned_nodes. + if (did_merge) { - { - struct kv_pair *delete_this_key = node->u.n.childkeys[childnuma]; - node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, delete_this_key); - toku_free(delete_this_key); - } + printf("%s:%d %s did_merge\n", __FILE__, __LINE__, __func__); toku_fifo_free(&BNC_BUFFER(node, childnumb)); node->u.n.n_children--; memmove(&node->u.n.childinfos[childnumb], @@ -1996,21 +2021,27 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL (node->u.n.n_children-childnumb)*sizeof(node->u.n.childkeys[0])); REALLOC_N(node->u.n.n_children-1, node->u.n.childkeys); fixup_child_fingerprint(node, childnuma, childa, t, logger); + assert(node->u.n.childinfos[childnuma].blocknum.b == childa->thisnodename.b); } else { - // If we didn't merge the nodes, then we may have mucked with the pivot. - if (splitk_kvpair) { - node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[childnuma]); - toku_free(node->u.n.childkeys[childnuma]); - node->u.n.childkeys[childnuma] = splitk_kvpair; - node->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[childnuma]); - } + assert(splitk_kvpair); + // If we didn't merge the nodes, then we need the correct pivot. + node->u.n.childkeys[childnuma] = splitk_kvpair; + node->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[childnuma]); } } return_r: // Unpin both, and return the first nonzero error code that is found + assert(node->dirty); { int rra = toku_unpin_brtnode(t, childa); - int rrb = toku_unpin_brtnode(t, childb); + int rrb; + if (did_merge) { + rrb = toku_cachetable_unpin_and_remove(t->cf, childb->thisnodename, node->fullhash, 0); + toku_brtnode_free(&childb); + } else { + rrb = toku_unpin_brtnode(t, childb); + } + if (rra) return rra; if (rrb) return rrb; } @@ -2019,6 +2050,7 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_io, TOKUL static int brt_handle_maybe_reactive_child(BRT t, BRTNODE node, int childnum, enum reactivity re, BOOL *did_io, TOKULOGGER logger) { + { int i; for (i=0; i+1u.n.n_children; i++) assert(node->u.n.childkeys[i]); } switch (re) { case RE_STABLE: return 0; case RE_FISSIBLE: diff --git a/newbrt/cachetable.c b/newbrt/cachetable.c index 9c6c108bfa9..bf10788dac0 100644 --- a/newbrt/cachetable.c +++ b/newbrt/cachetable.c @@ -976,14 +976,12 @@ int toku_cachetable_close (CACHETABLE *tp) { return 0; } -#if 0 // this is broken. needs to wait for writebacks to complete -int toku_cachetable_remove (CACHEFILE cachefile, CACHEKEY key, int write_me) { +int toku_cachetable_upnin_and_remove (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, int write_me) { /* Removing something already present is OK. */ CACHETABLE t = cachefile->cachetable; PAIR p; int count = 0; - u_int32_t fullhash = toku_cachetable_hash(cachefile, key); cachetable_lock(t); for (p=t->table[fullhash&(t->table_size-1)]; p; p=p->hash_chain) { count++; @@ -999,7 +997,6 @@ int toku_cachetable_remove (CACHEFILE cachefile, CACHEKEY key, int write_me) { note_hash_count(count); return 0; } -#endif #if 0 static void flush_and_keep (PAIR flush_me) { diff --git a/newbrt/cachetable.h b/newbrt/cachetable.h index a129c19deaf..af3af16f7d6 100644 --- a/newbrt/cachetable.h +++ b/newbrt/cachetable.h @@ -105,7 +105,9 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash // Returns: 0 if success, otherwise returns an error number. int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, int dirty, long size); -int toku_cachetable_remove (CACHEFILE, CACHEKEY, int /*write_me*/); /* Removing something already present is OK. */ +int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY, u_int32_t fullhash, int /*write_me_if_dirty*/); /* Removing something already present is OK. */ +// Effect: Remove an object from the cachetable, writing it if the object is dirty. +// Requires: The object must be pinned exactly once. int toku_cachetable_assert_all_unpinned (CACHETABLE);