diff --git a/newbrt/brt-serialize.c b/newbrt/brt-serialize.c index a8a26a3eec2..69c622e3280 100644 --- a/newbrt/brt-serialize.c +++ b/newbrt/brt-serialize.c @@ -164,6 +164,7 @@ void toku_serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node) })); } //printf("%s:%d check_local_fingerprint=%8x\n", __FILE__, __LINE__, check_local_fingerprint); + if (check_local_fingerprint!=node->local_fingerprint) printf("%s:%d node=%lld fingerprint expected=%08x actual=%08x\n", __FILE__, __LINE__, (long long)node->thisnodename, check_local_fingerprint, node->local_fingerprint); assert(check_local_fingerprint==node->local_fingerprint); } } else { diff --git a/newbrt/brt.c b/newbrt/brt.c index f3ce73323d2..ef9f10b6dbf 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -345,8 +345,31 @@ static int brtleaf_split (TOKULOGGER logger, FILENUM filenum, BRT t, BRTNODE nod return 0; } +#define MAX_PATHLEN_TO_ROOT 40 + +static int log_and_save_brtenq(TOKULOGGER logger, BRT t, BRTNODE node, int childnum, TXNID xid, int type, const char *key, int keylen, const char *data, int datalen, u_int32_t *fingerprint, DISKOFFARRAY path_to_parent) { + BYTESTRING keybs = {.len=keylen, .data=(char*)key}; + BYTESTRING databs = {.len=datalen, .data=(char*)data}; + u_int32_t old_fingerprint = *fingerprint; + u_int32_t fdiff=node->rand4fingerprint*toku_calccrc32_cmd(type, xid, key, keylen, data, datalen); + u_int32_t new_fingerprint = old_fingerprint + fdiff; + printf("%s:%d node=%lld fingerprint old=%08x new=%08x diff=%08x xid=%lld\n", __FILE__, __LINE__, (long long)node->thisnodename, old_fingerprint, new_fingerprint, fdiff, (long long)xid); + *fingerprint = new_fingerprint; + int r = toku_log_brtenq(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, xid, type, keybs, databs, old_fingerprint, new_fingerprint); + if (r!=0) return r; + TOKUTXN txn; + if (0==toku_txnid2txn(logger, xid, &txn) && txn) { + DISKOFFARRAY path = path_to_parent; + path.array = toku_memdup(path.array, sizeof(path.array[0])*(1+path.len)); + if (path.array==0) return errno; + r = toku_logger_save_rollback_xactiontouchednonleaf(txn, toku_cachefile_filenum(t->cf), path, node->thisnodename); + if (r!=0) return r; + } + return 0; +} + /* Side effect: sets splitk->data pointer to a malloc'd value */ -static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger) { +static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger, DISKOFFARRAY path_to_parent) { int old_n_children = node->u.n.n_children; int n_children_in_a = old_n_children/2; int n_children_in_b = old_n_children-n_children_in_a; @@ -397,20 +420,19 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node BYTESTRING keybs = { .len = keylen, .data = (char*)key }; BYTESTRING databs = { .len = datalen, .data = (char*)data }; u_int32_t old_from_fingerprint = node->local_fingerprint; - u_int32_t old_to_fingerprint = B->local_fingerprint; u_int32_t delta = toku_calccrc32_cmd(type, xid, key, keylen, data, datalen); u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta; - u_int32_t new_to_fingerprint = old_to_fingerprint + B->rand4fingerprint *delta; if (r!=0) return r; r = toku_log_brtdeq(logger, 0, fnum, node->thisnodename, n_children_in_a, xid, type, keybs, databs, old_from_fingerprint, new_from_fingerprint); if (r!=0) return r; - r = toku_log_brtenq(logger, 0, fnum, B->thisnodename, targchild, xid, type, keybs, databs, old_to_fingerprint, new_to_fingerprint); + assert(path_to_parent.lenthisnodename; // Don't have to restore it since path_to_parent is passed by value. + r = log_and_save_brtenq(logger, t, B, targchild, xid, type, key, keylen, data, datalen, &B->local_fingerprint, path_to_parent); r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid); if (r!=0) return r; toku_fifo_deq(from_htab); // key and data will no longer be valid node->local_fingerprint = new_from_fingerprint; - B->local_fingerprint = new_to_fingerprint; B->u.n.n_bytes_in_buffers += n_bytes_moved; BNC_NBYTESINBUF(B, targchild) += n_bytes_moved; @@ -531,6 +553,8 @@ static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRT DBT againk; toku_init_dbt(&againk); //printf("%s:%d hello!\n", __FILE__, __LINE__); + assert(path_to_parent.lenthisnodename; r = brtnode_put_cmd(t, child, cmd, &again_split, &againa, &againb, &againk, 0, @@ -555,6 +579,8 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum //printf("%s:%d hello!\n", __FILE__, __LINE__); assert(node->height>0); { + assert(path_to_parent.lenthisnodename; int r = brtnode_put_cmd(t, child, cmd, child_did_split, childa, childb, childsplitk, 0, @@ -769,7 +795,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, if (node->u.n.n_children>TREE_FANOUT) { //printf("%s:%d about to split having pushed %d out of %d keys\n", __FILE__, __LINE__, i, n_pairs); - r=brt_nonleaf_split(t, node, nodea, nodeb, splitk, logger); + r=brt_nonleaf_split(t, node, nodea, nodeb, splitk, logger, path_to_parent); if (r!=0) return r; //printf("%s:%d did split\n", __FILE__, __LINE__); split_count++; @@ -1050,9 +1076,12 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd, child = child_v; child_did_split = 0; + assert(path_to_parent.lenthisnodename; r = brtnode_put_cmd(t, child, cmd, &child_did_split, &childa, &childb, &childsplitk, debug, logger, path_to_parent); + path_to_parent.len--; if (r != 0) { /* putting to the child failed for some reason, so unpin the child and return the error code */ int rr = toku_unpin_brtnode(t, child); @@ -1100,24 +1129,11 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd, DBT *k = cmd->u.id.key; DBT *v = cmd->u.id.val; + int r = log_and_save_brtenq(logger, t, node, childnum, cmd->xid, type, k->data, k->size, v->data, v->size, &node->local_fingerprint, path_to_parent); + if (r!=0) return r; int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; - BYTESTRING keybs = { .len=k->size, .data=(char*)k->data }; - BYTESTRING databs = { .len=v->size, .data=(char*)v->data }; - u_int32_t newfingerprint = node->local_fingerprint + node->rand4fingerprint * toku_calccrc32_cmd(type, cmd->xid, k->data, k->size, v->data, v->size); - int r=toku_log_brtenq(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, cmd->xid, type, keybs, databs, node->local_fingerprint, newfingerprint); - assert(r==0); - { - TOKUTXN txn; - if (0==toku_txnid2txn(logger,cmd->xid,&txn) && txn) { - DISKOFFARRAY path = path_to_parent; - path.array = toku_memdup(path.array, sizeof(path.array[0])*(1+path.len)); - r=toku_logger_save_rollback_xactiontouchednonleaf(txn, toku_cachefile_filenum(t->cf), path, node->thisnodename); - if (r!=0) return r; - } - } r=toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, cmd->xid); assert(r==0); - node->local_fingerprint = newfingerprint; node->u.n.n_bytes_in_buffers += diff; BNC_NBYTESINBUF(node, childnum) += diff; node->dirty = 1; @@ -1777,8 +1793,6 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger, DISKOFFARRA return result; } -#define MAX_PATHLEN_TO_ROOT 40 - int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) { int r; BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}}; @@ -1974,11 +1988,13 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s rr = toku_cachetable_get_and_pin(brt->cf, BNC_DISKOFF(node,childnum), &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); assert(rr == 0); + assert(path_to_parent.lenthisnodename; + for (;;) { BRTNODE childnode = node_v; BRT_SPLIT childsplit; brt_split_init(&childsplit); r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, logger, path_to_parent); - if (childsplit.did_split) { rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk, &split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger, path_to_parent); @@ -2454,9 +2470,11 @@ struct callpair { static int note_removal (bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen, int type, TXNID xid, void*cpairv) { struct callpair *cpair = cpairv; BRTNODE node = cpair->node; + printf("%s:%d Removed %s,%s fingerprint was %08x ", __FILE__, __LINE__, (char*)key, (char*)data, node->local_fingerprint); int childnum = cpair->childnum; u_int32_t old_fingerprint = node->local_fingerprint; node->local_fingerprint = old_fingerprint = node->rand4fingerprint*toku_calccrc32_cmd(type, xid, key, keylen, data, datalen); + printf("is %08x (addr=%p)\n", node->local_fingerprint, &node->local_fingerprint); u_int32_t countdiff = keylen+datalen+KEY_VALUE_OVERHEAD+BRT_CMD_OVERHEAD; BNC_NBYTESINBUF(node,childnum) -= countdiff; node->u.n.n_bytes_in_buffers -= countdiff; @@ -2466,8 +2484,11 @@ static int note_removal (bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN data int toku_brt_nonleaf_expunge_xaction(BRT brt, DISKOFF diskoff, TXNID xid) { void *node_v; int r = toku_cachetable_get_and_pin(brt->cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); + assert(r==0); if (r!=0) return r; BRTNODE node = node_v; + verify_local_fingerprint_nonleaf(node); + printf("%s:%d node->local_fingerprint=%08x\n", __FILE__, __LINE__, node->local_fingerprint); int i; r=0; for (i=0; iu.n.n_children; i++) { diff --git a/newbrt/log.c b/newbrt/log.c index f04a9bc836d..a98d3fb0cd5 100644 --- a/newbrt/log.c +++ b/newbrt/log.c @@ -701,6 +701,7 @@ int toku_logger_abort(TOKUTXN txn) { // Must undo everything. Must undo it all in reverse order. // Build the reverse list struct roll_entry *item; + printf("%s:%d abort\n", __FILE__, __LINE__); while ((item=txn->newest_logentry)) { txn->newest_logentry = item->prev; int r; diff --git a/newbrt/roll.c b/newbrt/roll.c index edc3dcaa1a3..179ae62fc05 100644 --- a/newbrt/roll.c +++ b/newbrt/roll.c @@ -70,5 +70,9 @@ int toku_rollback_xactiontouchednonleaf(FILENUM filenum, DISKOFFARRAY array __at assert(r==0); r = toku_brt_nonleaf_expunge_xaction(brt, diskoff, txn->txnid64); assert(r==0); + printf("%s:%d node=%lld has Rollback parents = {", __FILE__, __LINE__, (long long)diskoff); + int i; for (i=0; iabort(txn); CKERR(r); // Don't do a lookup on "hello7", because that will force things out of the buffer. + r=db->close(db, 0); CKERR(r); + r=db->open(db, txn, "foo.db", 0, DB_BTREE, 0, 0777); CKERR(r); r=env->txn_begin(env, 0, &txn, 0); assert(r==0); {