Rework the way transactions ids are used in the log (xid's on messages are used when inserting something into a leaf.) Addresses #27.

git-svn-id: file:///svn/tokudb@2199 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Bradley C. Kuszmaul 2008-02-08 19:54:00 +00:00
parent 90f58ae1ae
commit d9f3060f83
9 changed files with 265 additions and 297 deletions

View file

@ -396,7 +396,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
}
if (n_in_buf > 0) {
u_int32_t actual_sum = 0;
r = toku_pma_bulk_insert((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, result->u.l.buffer, keys, vals, n_in_buf, result->rand4fingerprint, &actual_sum, 0);
r = toku_pma_bulk_insert((TOKULOGGER)0, (FILENUM){0}, (DISKOFF)0, result->u.l.buffer, keys, vals, n_in_buf, result->rand4fingerprint, &actual_sum, 0);
if (r!=0) goto died_21;
if (actual_sum!=result->local_fingerprint) {
//fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf);

View file

@ -39,7 +39,7 @@
extern long long n_items_malloced;
static int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKUTXN);
static int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER);
static void verify_local_fingerprint_nonleaf (BRTNODE node);
/* Frees a node, including all the stuff in the hash table. */
@ -74,13 +74,13 @@ static long brtnode_size(BRTNODE node) {
return size;
}
static void toku_update_brtnode_lsn(BRTNODE node, TOKUTXN txn) {
if (txn) {
node->log_lsn = toku_txn_get_last_lsn(txn);
static void toku_update_brtnode_loggerlsn(BRTNODE node, TOKULOGGER logger) {
if (logger) {
node->log_lsn = toku_logger_last_lsn(logger);
}
}
static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE child, BRT brt, TOKUTXN txn) {
static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE child, BRT brt, TOKULOGGER logger) {
u_int32_t old_fingerprint = BNC_SUBTREE_FINGERPRINT(node,childnum_of_node);
u_int32_t sum = child->local_fingerprint;
if (child->height>0) {
@ -93,11 +93,8 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
// We only call this function if we have reason to believe that the child's fingerprint did change.
BNC_SUBTREE_FINGERPRINT(node,childnum_of_node)=sum;
node->dirty=1;
if (toku_txn_get_last_lsn(txn).lsn >= 917435 && toku_txn_get_last_lsn(txn).lsn < 917439) {
printf("%s:%d changing fingerprint\n", __FILE__, __LINE__);
}
toku_log_changechildfingerprint(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_update_brtnode_lsn(node, txn);
toku_log_changechildfingerprint(logger, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_update_brtnode_loggerlsn(node, logger);
}
// If you pass in data==0 then it only compares the key, not the data (even if is a DUPSORT database)
@ -224,16 +221,16 @@ int kvpair_compare (const void *av, const void *bv) {
#endif
/* Forgot to handle the case where there is something in the freelist. */
static int malloc_diskblock_header_is_in_memory (DISKOFF *res, BRT brt, int size, TOKUTXN txn) {
static int malloc_diskblock_header_is_in_memory (DISKOFF *res, BRT brt, int size, TOKULOGGER logger) {
DISKOFF result = brt->h->unused_memory;
brt->h->unused_memory+=size;
brt->h->dirty = 1;
int r = toku_log_changeunusedmemory(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), result, brt->h->unused_memory);
int r = toku_log_changeunusedmemory(logger, toku_cachefile_filenum(brt->cf), result, brt->h->unused_memory);
*res = result;
return r;
}
int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKUTXN txn) {
int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER logger) {
#if 0
int r = read_and_pin_brt_header(brt->fd, &brt->h);
assert(r==0);
@ -244,7 +241,7 @@ int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKUTXN txn) {
return result;
}
#else
return malloc_diskblock_header_is_in_memory(res, brt,size, txn);
return malloc_diskblock_header_is_in_memory(res, brt,size, logger);
#endif
}
@ -288,11 +285,11 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
}
}
static void create_new_brtnode (BRT t, BRTNODE *result, int height, TOKUTXN txn) {
static void create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logger) {
TAGMALLOC(BRTNODE, n);
int r;
DISKOFF name;
r = malloc_diskblock(&name, t, t->h->nodesize, txn);
r = malloc_diskblock(&name, t, t->h->nodesize, logger);
assert(r==0);
assert(n);
assert(t->h->nodesize>0);
@ -305,9 +302,9 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height, TOKUTXN txn)
r=toku_cachetable_put(t->cf, n->thisnodename, n, brtnode_size(n),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
assert(r==0);
r=toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint);
r=toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint);
assert(r==0);
toku_update_brtnode_lsn(n, txn);
toku_update_brtnode_loggerlsn(n, logger);
}
static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v, int type, TXNID xid) {
@ -322,11 +319,11 @@ static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT
}
static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk) {
static int brtleaf_split (TOKULOGGER logger, FILENUM filenum, BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk) {
BRTNODE B;
assert(node->height==0);
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
create_new_brtnode(t, &B, 0, txn);
create_new_brtnode(t, &B, 0, logger);
//printf("leaf_split %lld - %lld %lld\n", node->thisnodename, A->thisnodename, B->thisnodename);
//printf("%s:%d A PMA= %p\n", __FILE__, __LINE__, A->u.l.buffer);
//printf("%s:%d B PMA= %p\n", __FILE__, __LINE__, A->u.l.buffer);
@ -336,7 +333,7 @@ static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRT
//printf("%s:%d B is at %lld nodesize=%d\n", __FILE__, __LINE__, B->thisnodename, B->nodesize);
assert(node->height>0 || node->u.l.buffer!=0);
int r;
r = toku_pma_split(txn, filenum,
r = toku_pma_split(logger, filenum,
node->thisnodename, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn,
splitk,
B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint, &B->log_lsn);
@ -353,17 +350,16 @@ static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRT
}
/* Side effect: sets splitk->data pointer to a malloc'd value */
static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKUTXN txn) {
static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger) {
int old_n_children = node->u.n.n_children;
int n_children_in_a = old_n_children/2;
int n_children_in_b = old_n_children-n_children_in_a;
BRTNODE B;
TXNID txnid = toku_txn_get_txnid(txn);
FILENUM fnum = toku_cachefile_filenum(t->cf);
assert(node->height>0);
assert(node->u.n.n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
create_new_brtnode(t, &B, node->height, txn);
create_new_brtnode(t, &B, node->height, logger);
B->u.n.n_children =n_children_in_b;
//printf("%s:%d %p (%lld) becomes %p and %p\n", __FILE__, __LINE__, node, node->thisnodename, A, B);
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
@ -387,7 +383,7 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
BNC_DISKOFF(B, targchild) = thischilddiskoff;
int r = toku_log_addchild(txn, txnid, fnum, B->thisnodename, targchild, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i));
int r = toku_log_addchild(logger, fnum, B->thisnodename, targchild, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i));
if (r!=0) return r;
while (1) {
@ -406,9 +402,9 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
u_int32_t new_to_fingerprint = old_to_fingerprint + B->rand4fingerprint *delta;
if (r!=0) return r;
r = toku_log_brtdeq(txn, xid, fnum, node->thisnodename, n_children_in_a, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
r = toku_log_brtdeq(logger, fnum, node->thisnodename, n_children_in_a, xid, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
if (r!=0) return r;
r = toku_log_brtenq(txn, xid, fnum, B->thisnodename, targchild, type, keybs, databs, old_to_fingerprint, new_to_fingerprint);
r = toku_log_brtenq(logger, fnum, B->thisnodename, targchild, xid, type, keybs, databs, old_to_fingerprint, new_to_fingerprint);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
if (r!=0) return r;
toku_fifo_deq(from_htab);
@ -429,10 +425,10 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]),
.data = kv_pair_key(node->u.n.childkeys[i-1]) };
assert(i>0);
r = toku_log_delchild(txn, txnid, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i), bs);
r = toku_log_delchild(logger, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i), bs);
if (r!=0) return r;
if (i>n_children_in_a) {
r = toku_log_setpivot(txn, txnid, fnum, B->thisnodename, targchild-1, bs);
r = toku_log_setpivot(logger, fnum, B->thisnodename, targchild-1, bs);
if (r!=0) return r;
B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
@ -494,13 +490,13 @@ static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *split,
int debug,
TOKUTXN txn);
TOKULOGGER);
/* key is not in the buffer. Either put the key-value pair in the child, or put it in the node. */
static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRTNODE node, BRTNODE child,
BRT_CMD cmd,
int childnum_of_node,
TOKUTXN txn) {
TOKULOGGER logger) {
assert(node->height>0); /* Not a leaf. */
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
@ -522,15 +518,15 @@ static int push_brt_cmd_down_only_if_it_wont_push_more_else_put_here (BRT t, BRT
toku_init_dbt(&againk);
//printf("%s:%d hello!\n", __FILE__, __LINE__);
r = brtnode_put_cmd(t, child, cmd,
&again_split, &againa, &againb, &againk,
0,
txn);
&again_split, &againa, &againb, &againk,
0,
logger);
if (r!=0) return r;
assert(again_split==0); /* I only did the insert if I knew it wouldn't push down, and hence wouldn't split. */
} else {
r=insert_to_buffer_in_nonleaf(node, childnum_of_node, k, v, cmd->type, cmd->xid);
}
fixup_child_fingerprint(node, childnum_of_node, child, t, txn);
fixup_child_fingerprint(node, childnum_of_node, child, t, logger);
return r;
}
@ -538,7 +534,7 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
BRT_CMD cmd,
int *child_did_split, BRTNODE *childa, BRTNODE *childb,
DBT *childsplitk,
TOKUTXN txn) {
TOKULOGGER logger) {
//if (debug) printf("%s:%d %*sinserting down\n", __FILE__, __LINE__, debug, "");
//printf("%s:%d hello!\n", __FILE__, __LINE__);
assert(node->height>0);
@ -546,7 +542,7 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
int r = brtnode_put_cmd(t, child, cmd,
child_did_split, childa, childb, childsplitk,
0,
txn);
logger);
if (r!=0) return r;
}
@ -566,15 +562,15 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
node->dirty = 1;
}
if (*child_did_split) {
fixup_child_fingerprint(node, childnum, *childa, t, txn);
fixup_child_fingerprint(node, childnum+1, *childb, t, txn);
fixup_child_fingerprint(node, childnum, *childa, t, logger);
fixup_child_fingerprint(node, childnum+1, *childb, t, logger);
} else {
fixup_child_fingerprint(node, childnum, child, t, txn);
fixup_child_fingerprint(node, childnum, child, t, logger);
}
return 0;
}
static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, TOKUTXN txn);
static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, TOKULOGGER logger);
static int split_count=0;
@ -590,7 +586,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
DBT *childsplitk, /* the data in the childsplitk is alloc'd and is consumed by this call. */
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
TOKUTXN txn) {
TOKULOGGER logger) {
assert(node->height>0);
assert(0 <= childnum && childnum < node->u.n.n_children);
FIFO old_h = BNC_BUFFER(node,childnum);
@ -615,11 +611,11 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
for (cnum=node->u.n.n_children; cnum>childnum+1; cnum--) {
node->u.n.childinfos[cnum] = node->u.n.childinfos[cnum-1];
}
r = toku_log_addchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
r = toku_log_addchild(logger, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
assert(BNC_DISKOFF(node, childnum)==childa->thisnodename);
BNC_DISKOFF(node, childnum+1) = childb->thisnodename;
fixup_child_fingerprint(node, childnum, childa, t, txn);
fixup_child_fingerprint(node, childnum+1, childb, t, txn);
fixup_child_fingerprint(node, childnum, childa, t, logger);
fixup_child_fingerprint(node, childnum+1, childb, t, logger);
r=toku_fifo_create(&BNC_BUFFER(node,childnum)); assert(r==0); // ??? SHould handle this error case
r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); assert(r==0);
BNC_NBYTESINBUF(node, childnum) = 0;
@ -634,13 +630,13 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
struct kv_pair *pivot = childsplitk->data;
BYTESTRING bs = { .len = childsplitk->size,
.data = kv_pair_key(pivot) };
r = toku_log_setpivot(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
r = toku_log_setpivot(logger, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
if (r!=0) return r;
for (cnum=node->u.n.n_children-1; cnum>childnum; cnum--) {
node->u.n.childkeys[cnum] = node->u.n.childkeys[cnum-1];
}
if (txn) assert((t->flags&TOKU_DB_DUPSORT)==0); // none of this works for dupsort databases. The size is wrong. The setpivot is wrong.
if (logger) assert((t->flags&TOKU_DB_DUPSORT)==0); // none of this works for dupsort databases. The size is wrong. The setpivot is wrong.
node->u.n.childkeys[childnum]= pivot;
node->u.n.totalchildkeylens += childsplitk->size;
}
@ -681,7 +677,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
if (pusha) {
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &brtcmd, childnum, txn);
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &brtcmd, childnum, logger);
} else {
r=insert_to_buffer_in_nonleaf(node, childnum, &skd, &svd, type, xid);
}
@ -689,7 +685,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
if (pushb) {
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum+1))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &brtcmd, childnum+1, txn);
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &brtcmd, childnum+1, logger);
} else {
r=insert_to_buffer_in_nonleaf(node, childnum+1, &skd, &svd, type, xid);
}
@ -727,7 +723,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
if (node->u.n.n_children>TREE_FANOUT) {
//printf("%s:%d about to split having pushed %d out of %d keys\n", __FILE__, __LINE__, i, n_pairs);
r=brt_nonleaf_split(t, node, nodea, nodeb, splitk, txn);
r=brt_nonleaf_split(t, node, nodea, nodeb, splitk, logger);
if (r!=0) return r;
//printf("%s:%d did split\n", __FILE__, __LINE__);
split_count++;
@ -747,7 +743,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
if (toku_serialize_brtnode_size(node) > node->nodesize) {
/* lighten the node by pushing down its buffers. this may cause
the current node to split and go away */
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, 0, txn);
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, 0, logger);
assert(r == 0);
}
if (*did_split == 0) assert(toku_serialize_brtnode_size(node)<=node->nodesize);
@ -759,7 +755,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
int debug,
TOKUTXN txn) {
TOKULOGGER logger) {
void *childnode_v;
BRTNODE child;
int r;
@ -807,7 +803,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
&brtcmd,
&child_did_split, &childa, &childb,
&childsplitk,
txn);
logger);
if (0){
unsigned int sum=0;
@ -825,7 +821,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
r=handle_split_of_child (t, node, childnum,
childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk,
txn);
logger);
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
@ -848,7 +844,7 @@ static int debugp1 (int debug) {
return debug ? debug+1 : 0;
}
static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, TOKUTXN txn)
static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, TOKULOGGER logger)
/* If the buffer is too full, then push down. Possibly the child will split. That may make us split. */
{
assert(node->height>0);
@ -864,7 +860,7 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE
find_heaviest_child(node, &childnum);
if (0) printf("%s:%d %*spush some down from %lld into %lld (child %d)\n", __FILE__, __LINE__, debug, "", node->thisnodename, BNC_DISKOFF(node, childnum), childnum);
assert(BNC_DISKOFF(node, childnum)!=0);
int r = push_some_brt_cmds_down(t, node, childnum, did_split, nodea, nodeb, splitk, debugp1(debug), txn);
int r = push_some_brt_cmds_down(t, node, childnum, did_split, nodea, nodeb, splitk, debugp1(debug), logger);
if (r!=0) return r;
assert(*did_split==0 || *did_split==1);
if (debug) printf("%s:%d %*sdid push_some_brt_cmds_down did_split=%d\n", __FILE__, __LINE__, debug, "", *did_split);
@ -897,7 +893,7 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE
static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug,
TOKUTXN txn) {
TOKULOGGER logger) {
// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint);
assert(node->height==0);
FILENUM filenum = toku_cachefile_filenum(t->cf);
@ -905,7 +901,10 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
int replaced_v_size;
enum pma_errors pma_status = toku_pma_insert_or_replace(node->u.l.buffer, k, v, &replaced_v_size, txn, filenum, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn);
enum pma_errors pma_status = toku_pma_insert_or_replace(node->u.l.buffer,
k, v, &replaced_v_size,
logger, cmd->xid,
filenum, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn);
assert(pma_status==BRT_OK);
//printf("replaced_v_size=%d\n", replaced_v_size);
if (replaced_v_size>=0) {
@ -919,7 +918,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
// If it doesn't fit, then split the leaf.
if (toku_serialize_brtnode_size(node) > node->nodesize) {
int r = brtleaf_split (txn, filenum, t, node, nodea, nodeb, splitk);
int r = brtleaf_split (logger, filenum, t, node, nodea, nodeb, splitk);
if (r!=0) return r;
//printf("%s:%d splitkey=%s\n", __FILE__, __LINE__, (char*)*splitkey);
split_count++;
@ -980,7 +979,7 @@ static unsigned int brtnode_right_child (BRTNODE node, DBT *k, DBT *data, BRT t)
/* put a cmd into a nodes child */
static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKUTXN txn, int childnum, int maybe) {
int debug, TOKULOGGER logger, int childnum, int maybe) {
int r;
void *child_v;
BRTNODE child;
@ -1002,7 +1001,7 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd,
child_did_split = 0;
r = brtnode_put_cmd(t, child, cmd,
&child_did_split, &childa, &childb, &childsplitk, debug, txn);
&child_did_split, &childa, &childb, &childsplitk, debug, logger);
if (r != 0) {
/* putting to the child failed for some reason, so unpin the child and return the error code */
int rr = unpin_brtnode(t, child);
@ -1015,11 +1014,11 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD cmd,
r = handle_split_of_child(t, node, childnum,
childa, childb, &childsplitk,
did_split, nodea, nodeb, splitk,
txn);
logger);
assert(r == 0);
} else {
//verify_local_fingerprint_nonleaf(child);
fixup_child_fingerprint(node, childnum, child, t, txn);
fixup_child_fingerprint(node, childnum, child, t, logger);
int rr = unpin_brtnode(t, child);
assert(rr == 0);
}
@ -1031,12 +1030,12 @@ int toku_brt_do_push_cmd = 1;
/* put a cmd into a node at childnum */
static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKUTXN txn, unsigned int childnum, int can_push, int *do_push_down) {
int debug, TOKULOGGER logger, unsigned int childnum, int can_push, int *do_push_down) {
//verify_local_fingerprint_nonleaf(node);
/* try to push the cmd to the subtree if the buffer is empty and pushes are enabled */
if (BNC_NBYTESINBUF(node, childnum) == 0 && can_push && toku_brt_do_push_cmd) {
int r = brt_nonleaf_put_cmd_child_node(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 1);
int r = brt_nonleaf_put_cmd_child_node(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger, childnum, 1);
if (r == 0)
return r;
}
@ -1062,7 +1061,7 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD cmd,
static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKUTXN txn) {
int debug, TOKULOGGER logger) {
//verify_local_fingerprint_nonleaf(node);
unsigned int childnum;
int r;
@ -1072,14 +1071,14 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
/* put the cmd in the subtree */
int do_push_down = 0;
r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 1, &do_push_down);
r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger, childnum, 1, &do_push_down);
if (r != 0) return r;
/* maybe push down */
if (do_push_down) {
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
//verify_local_fingerprint_nonleaf(node);
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), txn);
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), logger);
if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) {
@ -1109,7 +1108,7 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug,
TOKUTXN txn) {
TOKULOGGER logger) {
int r;
/* find all children that need a delete cmd */
@ -1141,7 +1140,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
/* issue the delete cmd to all of the children found previously */
int do_push_down = 0;
for (i=0; i<delidx; i++) {
r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, delchild[i], delidx == 1, &do_push_down);
r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger, delchild[i], delidx == 1, &do_push_down);
assert(r == 0);
}
@ -1149,7 +1148,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
/* maybe push down */
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
//verify_local_fingerprint_nonleaf(node);
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), txn);
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), logger);
if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) {
@ -1178,11 +1177,11 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
int debug,
TOKUTXN txn) {
TOKULOGGER logger) {
if (cmd->type == BRT_INSERT || cmd->type == BRT_DELETE_BOTH) {
return brt_nonleaf_insert_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn);
return brt_nonleaf_insert_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger);
} else if (cmd->type == BRT_DELETE) {
return brt_nonleaf_delete_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn);
return brt_nonleaf_delete_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, logger);
} else
return EINVAL;
}
@ -1203,7 +1202,7 @@ static void verify_local_fingerprint_nonleaf (BRTNODE node) {
static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug,
TOKUTXN txn) {
TOKULOGGER logger) {
//static int counter=0; // FOO
//static int oldcounter=0;
//int tmpcounter;
@ -1213,12 +1212,12 @@ static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
if (node->height==0) {
// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint);
r = brt_leaf_put_cmd(t, node, cmd,
did_split, nodea, nodeb, splitk,
debug, txn);
did_split, nodea, nodeb, splitk,
debug, logger);
} else {
r = brt_nonleaf_put_cmd(t, node, cmd,
did_split, nodea, nodeb, splitk,
debug, txn);
did_split, nodea, nodeb, splitk,
debug, logger);
}
//oldcounter=tmpcounter;
// Watch out. If did_split then the original node is no longer allocated.
@ -1251,7 +1250,7 @@ int toku_brt_create_cachetable(CACHETABLE *ct, long cachesize, LSN initial_lsn,
return toku_create_cachetable(ct, cachesize, initial_lsn, logger);
}
static int setup_brt_root_node (BRT t, DISKOFF offset, TOKUTXN txn) {
static int setup_initial_brt_root_node (BRT t, DISKOFF offset, TOKULOGGER logger) {
int r;
TAGMALLOC(BRTNODE, node);
assert(node);
@ -1271,13 +1270,10 @@ static int setup_brt_root_node (BRT t, DISKOFF offset, TOKUTXN txn) {
toku_free(node);
return r;
}
if (node->thisnodename==20971520) {
printf("%s:%d created %lld\n", __FILE__, __LINE__, node->thisnodename);
}
toku_verify_counts(node);
// verify_local_fingerprint_nonleaf(node);
toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
toku_update_brtnode_lsn(node, txn);
toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
toku_update_brtnode_loggerlsn(node, logger);
r=unpin_brtnode(t, node);
if (r!=0) {
toku_free(node);
@ -1425,7 +1421,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
t->h->roots=0;
}
if ((r=toku_logger_log_header(txn, toku_cachefile_filenum(t->cf), t->h))) { goto died6; }
if ((r=setup_brt_root_node(t, t->nodesize, txn))!=0) { died6: if (dbname) goto died5; else goto died2; }
if ((r=setup_initial_brt_root_node(t, t->nodesize, toku_txn_logger(txn)))!=0) { died6: if (dbname) goto died5; else goto died2; }
if ((r=toku_cachetable_put(t->cf, 0, t->h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0))) { goto died6; }
}
else if (r!=0) {
@ -1451,10 +1447,10 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
t->h->n_named_roots++;
if ((t->h->names[t->h->n_named_roots-1] = toku_strdup(dbname)) == 0) { assert(errno==ENOMEM); r=ENOMEM; goto died_after_read_and_pin; }
//printf("%s:%d t=%p\n", __FILE__, __LINE__, t);
r = malloc_diskblock_header_is_in_memory(&t->h->roots[t->h->n_named_roots-1], t, t->h->nodesize, txn);
r = malloc_diskblock_header_is_in_memory(&t->h->roots[t->h->n_named_roots-1], t, t->h->nodesize, toku_txn_logger(txn));
if (r!=0) goto died_after_read_and_pin;
t->h->dirty = 1;
if ((r=setup_brt_root_node(t, t->h->roots[t->h->n_named_roots-1], txn))!=0) goto died_after_read_and_pin;
if ((r=setup_initial_brt_root_node(t, t->h->roots[t->h->n_named_roots-1], toku_txn_logger(txn)))!=0) goto died_after_read_and_pin;
}
} else {
if ((r = toku_read_and_pin_brt_header(t->cf, &t->h))!=0) goto died1;
@ -1591,22 +1587,22 @@ CACHEKEY* toku_calculate_root_offset_pointer (BRT brt) {
abort();
}
static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *rootp, TOKUTXN txn, BRTNODE *newrootp) {
static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *rootp, TOKULOGGER logger, BRTNODE *newrootp) {
TAGMALLOC(BRTNODE, newroot);
int r;
int new_height = nodea->height+1;
int new_nodesize = brt->h->nodesize;
DISKOFF newroot_diskoff;
r=malloc_diskblock(&newroot_diskoff, brt, new_nodesize, txn);
r=malloc_diskblock(&newroot_diskoff, brt, new_nodesize, logger);
assert(r==0);
assert(newroot);
if (brt->database_name==0) {
toku_log_changeunnamedroot(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), *rootp, newroot_diskoff);
toku_log_changeunnamedroot(logger, toku_cachefile_filenum(brt->cf), *rootp, newroot_diskoff);
} else {
BYTESTRING bs;
bs.len = 1+strlen(brt->database_name);
bs.data = brt->database_name;
toku_log_changenamedroot(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), bs, *rootp, newroot_diskoff);
toku_log_changenamedroot(logger, toku_cachefile_filenum(brt->cf), bs, *rootp, newroot_diskoff);
}
*rootp=newroot_diskoff;
brt->h->dirty=1;
@ -1623,20 +1619,20 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
toku_verify_counts(newroot);
//verify_local_fingerprint_nonleaf(nodea);
//verify_local_fingerprint_nonleaf(nodeb);
r=toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
r=toku_log_newbrtnode(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
if (r!=0) return r;
r=toku_log_addchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
r=toku_log_addchild(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
if (r!=0) return r;
r=toku_log_addchild(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename, 0);
r=toku_log_addchild(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename, 0);
if (r!=0) return r;
fixup_child_fingerprint(newroot, 0, nodea, brt, txn);
fixup_child_fingerprint(newroot, 1, nodeb, brt, txn);
fixup_child_fingerprint(newroot, 0, nodea, brt, logger);
fixup_child_fingerprint(newroot, 1, nodeb, brt, logger);
{
BYTESTRING bs = { .len = kv_pair_keylen(newroot->u.n.childkeys[0]),
.data = kv_pair_key(newroot->u.n.childkeys[0]) };
r=toku_log_setpivot(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
r=toku_log_setpivot(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
if (r!=0) return r;
toku_update_brtnode_lsn(newroot, txn);
toku_update_brtnode_loggerlsn(newroot, logger);
}
r=unpin_brtnode(brt, nodea);
if (r!=0) return r;
@ -1649,7 +1645,7 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
return 0;
}
static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKUTXN txn) {
static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) {
void *node_v;
BRTNODE node;
CACHEKEY *rootp;
@ -1676,7 +1672,7 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKUTXN txn) {
result = brtnode_put_cmd(brt, node, cmd,
&did_split, &nodea, &nodeb, &splitk,
debug,
txn);
logger);
if (debug) printf("%s:%d did_insert\n", __FILE__, __LINE__);
if (did_split) {
// node is unpinned, so now we have to proceed to update the root with a new node.
@ -1685,7 +1681,7 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKUTXN txn) {
//printf("Did split, splitkey=%s\n", splitkey);
if (nodeb->height>0) assert(BNC_DISKOFF(nodeb,nodeb->u.n.n_children-1)!=0);
assert(nodeb->nodesize>0);
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp, txn, &node);
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp, logger, &node);
assert(r == 0);
} else {
if (node->height>0)
@ -1703,7 +1699,7 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r;
BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}};
r = brt_root_put_cmd(brt, &brtcmd, txn);
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
@ -1726,14 +1722,14 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r;
DBT val;
BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
r = brt_root_put_cmd(brt, &brtcmd, txn);
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r;
BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, toku_txn_get_txnid(txn), .u.id={key,val}};
r = brt_root_put_cmd(brt, &brtcmd, txn);
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
@ -1871,15 +1867,15 @@ static inline void brt_split_init(BRT_SPLIT *split) {
toku_init_dbt(&split->splitk);
}
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKUTXN txn);
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger);
/* search in a node's child */
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKUTXN txn) {
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) {
int r, rr;
/* if the child's buffer is not empty then try to empty it */
if (BNC_NBYTESINBUF(node, childnum) > 0) {
rr = push_some_brt_cmds_down(brt, node, childnum, &split->did_split, &split->nodea, &split->nodeb, &split->splitk, 0, txn);
rr = push_some_brt_cmds_down(brt, node, childnum, &split->did_split, &split->nodea, &split->nodeb, &split->splitk, 0, logger);
assert(rr == 0);
/* push down may cause a child split, so childnum may not be appropriate, and the node itself may split, so retry */
return EAGAIN;
@ -1892,11 +1888,11 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
for (;;) {
BRTNODE childnode = node_v;
BRT_SPLIT childsplit; brt_split_init(&childsplit);
r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, txn);
r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, logger);
if (childsplit.did_split) {
rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk,
&split->did_split, &split->nodea, &split->nodeb, &split->splitk, txn);
&split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger);
assert(rr == 0);
break;
} else {
@ -1911,7 +1907,7 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
return r;
}
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKUTXN txn) {
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) {
int r = DB_NOTFOUND;
int c;
@ -1929,7 +1925,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
if (search->compare(search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, txn);
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger);
if (r == 0 || r == EAGAIN)
break;
}
@ -1937,26 +1933,25 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
/* check the first (left) or last (right) node if nothing has been found */
if (r == DB_NOTFOUND && c == node->u.n.n_children-1)
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, txn);
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger);
return r;
}
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKUTXN txn) {
brt = brt; split = split; txn = txn;
static int brt_search_leaf_node(BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval) {
PMA pma = node->u.l.buffer;
int r = toku_pma_search(pma, search, newkey, newval);
return r;
}
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKUTXN txn) {
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) {
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, txn);
return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, logger);
else
return brt_search_leaf_node(brt, node, search, newkey, newval, split, txn);
return brt_search_leaf_node(node, search, newkey, newval);
}
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKUTXN txn) {
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) {
int r, rr;
rr = toku_read_and_pin_brt_header(brt->cf, &brt->h);
@ -1973,7 +1968,7 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOK
for (;;) {
BRT_SPLIT split; brt_split_init(&split);
r = brt_search_node(brt, node, search, newkey, newval, &split, txn);
r = brt_search_node(brt, node, search, newkey, newval, &split, logger);
if (split.did_split) {
rr = brt_init_new_root(brt, split.nodea, split.nodeb, split.splitk, rootp, 0, &node);
@ -2062,7 +2057,7 @@ static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) {
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */
}
static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (brt_cursor_not_set(cursor))
return EINVAL;
if (op == DB_CURRENT) {
@ -2070,7 +2065,7 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva
DBT newval; toku_init_dbt(&newval);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
int r = toku_brt_search(cursor->brt, &search, &newkey, &newval, txn);
int r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger);
if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0)
return DB_KEYEMPTY;
}
@ -2078,11 +2073,11 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva
}
/* search for the first kv pair that matches the search object */
static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, txn);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
@ -2093,11 +2088,11 @@ static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outke
}
/* search for the kv pair that matches the search object and is equal to kv */
static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, txn);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &newkey, &newval) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
@ -2111,11 +2106,11 @@ static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, D
}
/* search for the kv pair that matches the search object and is equal to k */
static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, txn);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &newkey) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
@ -2133,14 +2128,14 @@ static int brt_cursor_compare_one(brt_search_t *search, DBT *x, DBT *y) {
return 1;
}
static int brt_cursor_first(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_first(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_last(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_last(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) {
@ -2148,9 +2143,9 @@ static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) {
return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */
}
static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) {
@ -2158,9 +2153,9 @@ static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) {
return compare_k_x(brt, search->k, x) < 0; /* return min x: k < x */
}
static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_nodup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next_dup(brt_search_t *search, DBT *x, DBT *y) {
@ -2172,9 +2167,9 @@ static int brt_cursor_compare_next_dup(brt_search_t *search, DBT *x, DBT *y) {
return keycmp == 0 && compare_v_y(brt, search->v, y) < 0; /* return min xy: k <= x && v < y */
}
static int brt_cursor_next_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_next_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_dup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, txn);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_get_both_range(brt_search_t *search, DBT *x, DBT *y) {
@ -2186,9 +2181,9 @@ static int brt_cursor_compare_get_both_range(brt_search_t *search, DBT *x, DBT *
return keycmp == 0 && compare_v_y(brt, search->v, y) <= 0; /* return min xy: k <= x && v <= y */
}
static int brt_cursor_get_both_range(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_get_both_range(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_get_both_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, txn);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_prev(brt_search_t *search, DBT *x, DBT *y) {
@ -2196,9 +2191,9 @@ static int brt_cursor_compare_prev(brt_search_t *search, DBT *x, DBT *y) {
return compare_kv_xy(brt, search->k, search->v, x, y) > 0; /* return max xy: kv > xy */
}
static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_prev_nodup(brt_search_t *search, DBT *x, DBT *y) {
@ -2206,9 +2201,9 @@ static int brt_cursor_compare_prev_nodup(brt_search_t *search, DBT *x, DBT *y) {
return compare_k_x(brt, search->k, x) > 0; /* return max x: k > x */
}
static int brt_cursor_prev_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_prev_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_nodup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
#ifdef DB_PREV_DUP
@ -2234,83 +2229,84 @@ static int brt_cursor_compare_set_range(brt_search_t *search, DBT *x, DBT *y) {
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return kv <= xy */
}
static int brt_cursor_set(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_set(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_kv_xy(cursor, &search, outkey, outval, txn);
return brt_cursor_search_eq_kv_xy(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, DBT *outkey, DBT *outval, TOKUTXN txn) {
static int brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, txn);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, TOKUTXN txn) {
int r;
int op = get_flags & DB_OPFLAGS_MASK;
TOKULOGGER logger = toku_txn_logger(txn);
if (get_flags & ~DB_OPFLAGS_MASK)
return EINVAL;
switch (op) {
case DB_CURRENT:
case DB_CURRENT_BINDING:
r = brt_cursor_current(cursor, op, key, val, txn);
r = brt_cursor_current(cursor, op, key, val, logger);
break;
case DB_FIRST:
r = brt_cursor_first(cursor, key, val, txn);
r = brt_cursor_first(cursor, key, val, logger);
break;
case DB_LAST:
r = brt_cursor_last(cursor, key, val, txn);
r = brt_cursor_last(cursor, key, val, logger);
break;
case DB_NEXT:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val, txn);
r = brt_cursor_first(cursor, key, val, logger);
else
r = brt_cursor_next(cursor, key, val, txn);
r = brt_cursor_next(cursor, key, val, logger);
break;
case DB_NEXT_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_next_dup(cursor, key, val, txn);
r = brt_cursor_next_dup(cursor, key, val, logger);
break;
case DB_NEXT_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val, txn);
r = brt_cursor_first(cursor, key, val, logger);
else
r = brt_cursor_next_nodup(cursor, key, val, txn);
r = brt_cursor_next_nodup(cursor, key, val, logger);
break;
case DB_PREV:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val, txn);
r = brt_cursor_last(cursor, key, val, logger);
else
r = brt_cursor_prev(cursor, key, val, txn);
r = brt_cursor_prev(cursor, key, val, logger);
break;
#ifdef DB_PREV_DUP
case DB_PREV_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_prev_dup(cursor, key, val, txn);
r = brt_cursor_prev_dup(cursor, key, val, logger);
break;
#endif
case DB_PREV_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val, txn);
r = brt_cursor_last(cursor, key, val, logger);
else
r = brt_cursor_prev_nodup(cursor, key, val, txn);
r = brt_cursor_prev_nodup(cursor, key, val, logger);
break;
case DB_SET:
r = brt_cursor_set(cursor, key, 0, 0, val, txn);
r = brt_cursor_set(cursor, key, 0, 0, val, logger);
break;
case DB_SET_RANGE:
r = brt_cursor_set_range(cursor, key, key, val, txn);
r = brt_cursor_set_range(cursor, key, key, val, logger);
break;
case DB_GET_BOTH:
r = brt_cursor_set(cursor, key, val, 0, 0, txn);
r = brt_cursor_set(cursor, key, val, 0, 0, logger);
break;
case DB_GET_BOTH_RANGE:
r = brt_cursor_get_both_range(cursor, key, val, 0, val, txn);
r = brt_cursor_get_both_range(cursor, key, val, 0, val, logger);
break;
default:
r = EINVAL;
@ -2326,7 +2322,7 @@ int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) {
return EINVAL;
int r = 0;
if (!(flags & DB_DELETE_ANY))
r = brt_cursor_current(cursor, DB_CURRENT, 0, 0, txn);
r = brt_cursor_current(cursor, DB_CURRENT, 0, 0, toku_txn_logger(txn));
if (r == 0)
r = toku_brt_delete_both(cursor->brt, &cursor->key, &cursor->val, txn);
return r;

View file

@ -214,7 +214,13 @@ int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf) {
int toku_logger_commit (TOKUTXN txn, int nosync) {
// panic handled in log_commit
int r = toku_log_commit(txn, txn->txnid64, nosync);
int r = toku_log_commit(txn->logger, txn->txnid64);
if (r!=0) goto free_and_return;
if (txn->parent && !nosync) {
r = toku_logger_fsync(txn->logger);
if (r!=0) toku_logger_panic(txn->logger, r);
}
free_and_return: /*nothing*/;
struct log_entry *item;
while ((item=txn->oldest_logentry)) {
txn->oldest_logentry = item->next;
@ -257,7 +263,7 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
BYTESTRING bs;
bs.len = strlen(fname);
bs.data = (char*)fname;
return toku_log_fcreate (txn, toku_txn_get_txnid(txn), bs, mode);
return toku_log_fcreate (txn->logger, toku_txn_get_txnid(txn), bs, mode);
}
/* fopen isn't really an action. It's just for bookkeeping. We need to know the filename that goes with a filenum. */
@ -267,7 +273,7 @@ int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) {
BYTESTRING bs;
bs.len = strlen(fname);
bs.data = (char*)fname;
return toku_log_fopen (txn,toku_txn_get_txnid(txn), bs, filenum);
return toku_log_fopen (txn->logger, toku_txn_get_txnid(txn), bs, filenum);
}
@ -566,6 +572,15 @@ LSN toku_txn_get_last_lsn (TOKUTXN txn) {
if (txn==0) return (LSN){0};
return txn->last_lsn;
}
LSN toku_logger_last_lsn(TOKULOGGER logger) {
LSN result=logger->lsn;
result.lsn--;
return result;
}
TOKULOGGER toku_txn_logger (TOKUTXN txn) {
return txn ? txn->logger : 0;
}
int toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unused__)), TOKUTXN txn) {
toku_logger_panic(txn->logger, EINVAL);

View file

@ -18,6 +18,7 @@ int toku_logger_log_checkpoint (TOKULOGGER, LSN*);
void toku_logger_panic(TOKULOGGER, int/*err*/);
int toku_logger_panicked(TOKULOGGER /*logger*/);
int toku_logger_is_open(TOKULOGGER);
LSN toku_logger_last_lsn(TOKULOGGER);
int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair);
@ -62,6 +63,8 @@ int toku_read_and_print_logmagic (FILE *f, u_int32_t *version);
TXNID toku_txn_get_txnid (TOKUTXN);
LSN toku_txn_get_last_lsn (TOKUTXN);
TOKULOGGER toku_txn_logger (TOKUTXN txn);
static inline int toku_copy_FILENUM(FILENUM *target, FILENUM val) { *target = val; return 0; }
static inline void toku_free_FILENUM(FILENUM val __attribute__((__unused__))) {}

View file

@ -53,61 +53,52 @@ const struct logtype logtypes[] = {
{"FILENUM", "filenum", 0},
{"LOGGEDBRTHEADER", "header", 0},
NULLFIELD}},
{"newbrtnode", 'N', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"newbrtnode", 'N', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "height", 0},
{"u_int32_t", "nodesize", 0},
{"u_int8_t", "is_dup_sort", 0},
{"u_int32_t", "rand4fingerprint", "%08x"},
NULLFIELD}},
{"changechildfingerprint", 'f', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"changechildfingerprint", 'f', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"u_int32_t", "oldfingerprint", "%08x"},
{"u_int32_t", "newfingerprint", "%08x"},
NULLFIELD}},
{"changeunnamedroot", 'u', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "oldroot", 0},
{"DISKOFF", "newroot", 0},
NULLFIELD}},
{"changenamedroot", 'n', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"changeunnamedroot", 'u', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "oldroot", 0},
{"DISKOFF", "newroot", 0},
NULLFIELD}},
{"changenamedroot", 'n', FA{{"FILENUM", "filenum", 0},
{"BYTESTRING", "name", 0},
{"DISKOFF", "oldroot", 0},
{"DISKOFF", "newroot", 0},
NULLFIELD}},
{"changeunusedmemory", 'm', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"changeunusedmemory", 'm', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "oldunused", 0},
{"DISKOFF", "newunused", 0},
NULLFIELD}},
{"addchild", 'c', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"addchild", 'c', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0}, // children scoot over
{"DISKOFF", "child", 0},
{"u_int32_t", "childfingerprint", "%08x"},
NULLFIELD}},
{"delchild", 'r', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"delchild", 'r', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0}, // children scoot over
{"DISKOFF", "child", 0},
{"u_int32_t", "childfingerprint", "%08x"},
{"BYTESTRING", "pivotkey", 0},
NULLFIELD}},
{"setchild", 'i', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"setchild", 'i', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"DISKOFF", "oldchild", 0},
{"DISKOFF", "newchild", 0},
NULLFIELD}},
{"setpivot", 'k', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"setpivot", 'k', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"BYTESTRING", "pivotkey", 0},
@ -116,20 +107,20 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "fname", 0},
{"FILENUM", "filenum", 0},
NULLFIELD}},
{"brtdeq", 'U', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"brtdeq", 'U', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"TXNID", "xid", 0},
{"u_int32_t", "typ", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
{"u_int32_t", "oldfingerprint", "%08x"},
{"u_int32_t", "newfingerprint", "%08x"},
NULLFIELD}},
{"brtenq", 'Q', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"brtenq", 'Q', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "childnum", 0},
{"TXNID", "xid", 0},
{"u_int32_t", "typ", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
@ -150,14 +141,12 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"resizepma", 'R', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"resizepma", 'R', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "oldsize", 0},
{"u_int32_t", "newsize", 0},
NULLFIELD}},
{"pmadistribute", 'M', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"pmadistribute", 'M', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "old_diskoff", 0},
{"DISKOFF", "new_diskoff", 0},
{"INTPAIRARRAY", "fromto", 0},
@ -252,15 +241,12 @@ void generate_log_free(void) {
void generate_log_writer (void) {
DO_LOGTYPES(lt, ({
fprintf2(cf, hf, "int toku_log_%s (TOKUTXN txn", lt->name);
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger", lt->name);
DO_FIELDS(ft, lt,
fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
if (lt->command=='C') {
fprintf2(cf,hf, ", int nosync");
}
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
fprintf(cf, " if (txn==0) return 0;\n");
fprintf(cf, " if (logger==0) return 0;\n");
fprintf(cf, " const unsigned int buflen= (+4 // len at the beginning\n");
fprintf(cf, " +1 // log command\n");
fprintf(cf, " +8 // lsn\n");
@ -274,48 +260,15 @@ void generate_log_writer (void) {
fprintf(cf, " wbuf_init(&wbuf, buf, buflen);\n");
fprintf(cf, " wbuf_int(&wbuf, buflen);\n");
fprintf(cf, " wbuf_char(&wbuf, '%c');\n", lt->command);
fprintf(cf, " wbuf_LSN(&wbuf, txn->logger->lsn);\n");
fprintf(cf, " txn->last_lsn = txn->logger->lsn;\n");
fprintf(cf, " txn->logger->lsn.lsn++;\n");
fprintf(cf, " wbuf_LSN(&wbuf, logger->lsn);\n");
fprintf(cf, " logger->lsn.lsn++;\n");
DO_FIELDS(ft, lt,
fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name));
fprintf(cf, " int r= toku_logger_finish(txn->logger, &wbuf);\n");
fprintf(cf, " int r= toku_logger_finish(logger, &wbuf);\n");
fprintf(cf, " assert(wbuf.ndone==buflen);\n");
fprintf(cf, " toku_free(buf);\n");
if (lt->command=='C') {
fprintf(cf, " if (r!=0) return r;\n");
fprintf(cf, " // commit has some extra work to do.\n");
fprintf(cf, " if (nosync) return 0;\n");
fprintf(cf, " if (txn->parent) { // do not fsync if there is a parent. Instead append the log entries onto the parent.\n");
fprintf(cf, " if (txn->parent->oldest_logentry) txn->parent->newest_logentry->next = txn->oldest_logentry;\n");
fprintf(cf, " else txn->parent->oldest_logentry = txn->oldest_logentry;\n");
fprintf(cf, " if (txn->newest_logentry) txn->parent->newest_logentry = txn->newest_logentry;\n");
fprintf(cf, " txn->newest_logentry = txn->oldest_logentry = 0;\n");
fprintf(cf, " } else {\n");
fprintf(cf, " r = toku_logger_fsync(txn->logger);\n");
fprintf(cf, " if (r!=0) toku_logger_panic(txn->logger, r);\n");
fprintf(cf, " }\n");
fprintf(cf, " return 0;\n");
} else {
int i=0;
fprintf(cf, " struct log_entry *MALLOC(lentry);\n");
fprintf(cf, " if (lentry==0) return errno;\n");
fprintf(cf, " if (0) { died0: toku_free(lentry); return r; }\n");
fprintf(cf, " lentry->cmd = %d;\n", lt->command);
fprintf(cf, " lentry->u.%s.lsn = toku_txn_get_last_lsn(txn);\n", lt->name);
DO_FIELDS(ft, lt,
({
fprintf(cf, " r=toku_copy_%s(&lentry->u.%s.%s, %s);\n", ft->type, lt->name, ft->name, ft->name);
fprintf(cf, " if (r!=0) { if(0) { died%d: toku_free_%s(lentry->u.%s.%s); } goto died%d; }\n", i+1, ft->type, lt->name, ft->name, i);
i++;
}));
fprintf(cf, " if (0) { goto died%d; }\n", i); // Need to use that label.
fprintf(cf, " lentry->next = 0;\n");
fprintf(cf, " if (txn->oldest_logentry==0) txn->oldest_logentry = lentry;\n");
fprintf(cf, " else txn->newest_logentry->next = lentry;\n");
fprintf(cf, " txn->newest_logentry = lentry;\n");
fprintf(cf, " return r;\n");
}
fprintf(cf, " return r;\n");
fprintf(cf, "}\n\n");
}));

View file

@ -28,9 +28,9 @@ struct pma {
int toku_pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi);
void toku_pmainternal_calculate_parameters (PMA pma);
int toku_pmainternal_smooth_region (TOKUTXN, FILENUM, DISKOFF, struct kv_pair */*pairs*/[], int /*n*/, int /*idx*/, int /*base*/, PMA /*pma*/, int */*new_idx*/, LSN */*node_lsn*/);
int toku_pmainternal_smooth_region (TOKULOGGER, FILENUM, DISKOFF, struct kv_pair */*pairs*/[], int /*n*/, int /*idx*/, int /*base*/, PMA /*pma*/, int */*new_idx*/, LSN */*node_lsn*/);
int toku_pmainternal_printpairs (struct kv_pair *pairs[], int N);
int toku_pmainternal_make_space_at (TOKUTXN, FILENUM, DISKOFF, PMA pma, int idx, unsigned int *new_index, LSN *node_lsn);
int toku_pmainternal_make_space_at (TOKULOGGER, FILENUM, DISKOFF, PMA pma, int idx, unsigned int *new_index, LSN *node_lsn);
int toku_pmainternal_find (PMA pma, DBT *); // The DB is so the comparison fuction can be called.
void toku_print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */

View file

@ -16,12 +16,14 @@
/* we use pma cursors for testing the pma_search function. otherwise, there are no pma cursors */
#include "pma-cursor.h"
static TOKULOGGER const null_logger = 0;
static TOKUTXN const null_txn = 0;
static DB * const null_db = 0;
static const DISKOFF null_diskoff = -1;
static const FILENUM null_filenum = {0};
static TXNID const null_txnid = 0;
#define NULL_ARGS null_txn, null_filenum, null_diskoff
#define NULL_ARGS null_logger, null_txnid, null_filenum, null_diskoff
void *skey=0, *sval=0;
@ -47,7 +49,7 @@ static void test_make_space_at (void) {
r=toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0);
assert(r==0);
assert(toku_pma_n_entries(pma)==0);
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi, (LSN*)0);
r=toku_pmainternal_make_space_at(null_logger, null_filenum, null_diskoff, pma, 2, &newi, (LSN*)0);
assert(r==0);
assert(toku_pma_index_limit(pma)==4);
assert((unsigned long)pma->pairs[toku_pma_index_limit(pma)]==0xdeadbeefL);
@ -55,7 +57,7 @@ static void test_make_space_at (void) {
pma->pairs[2] = key_A;
pma->n_pairs_present++;
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi, (LSN*)0);
r=toku_pmainternal_make_space_at(null_logger, null_filenum, null_diskoff, pma, 2, &newi, (LSN*)0);
assert(r==0);
if (verbose) printf("Requested space at 2, got space at %d\n", newi);
if (verbose) toku_print_pma(pma);
@ -69,7 +71,7 @@ static void test_make_space_at (void) {
pma->pairs[3] = 0;
pma->n_pairs_present=2;
if (verbose) toku_print_pma(pma);
toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 0, &newi, (LSN*)0);
toku_pmainternal_make_space_at(null_logger, null_filenum, null_diskoff, pma, 0, &newi, (LSN*)0);
assert(r==0);
if (verbose) printf("Requested space at 0, got space at %d\n", newi);
if (verbose) toku_print_pma(pma);
@ -86,7 +88,7 @@ static void test_make_space_at (void) {
pma->pairs[7] = 0;
pma->n_pairs_present=2;
if (verbose) toku_print_pma(pma);
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 5, &newi, (LSN*)0);
r=toku_pmainternal_make_space_at(null_logger, null_filenum, null_diskoff, pma, 5, &newi, (LSN*)0);
assert(r==0);
if (verbose) toku_print_pma(pma);
if (verbose) printf("r=%d\n", newi);
@ -180,7 +182,7 @@ static void test_smooth_region_N (int N) {
}
}
if (verbose) { toku_pmainternal_printpairs(pairs, N); printf(" at %d becomes f", insertat); }
toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, insertat, 0, 0, &r, (LSN*)0);
toku_pmainternal_smooth_region(null_logger, null_filenum, null_diskoff, pairs, N, insertat, 0, 0, &r, (LSN*)0);
if (verbose) { toku_pmainternal_printpairs(pairs, N); printf(" at %d\n", r); }
assert(0<=r); assert(r<N);
assert(pairs[r]==0);
@ -222,7 +224,7 @@ static void test_smooth_region6 (void) {
pairs[1] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
int r;
toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, 2, 0, 0, &r, (LSN*)0);
toku_pmainternal_smooth_region(null_logger, null_filenum, null_diskoff, pairs, N, 2, 0, 0, &r, (LSN*)0);
if (verbose) {
printf("{ ");
for (i=0; i<N; i++)
@ -921,10 +923,10 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
/* split the pma */
DBT splitk;
r = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
&splitk,
null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
r = toku_pma_split(null_logger, null_filenum,
null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
&splitk,
null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
assert(r == 0);
toku_pma_verify(pmaa);
@ -1044,10 +1046,10 @@ static void test_pma_split_varkey(void) {
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
r = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
0,
null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
r = toku_pma_split(null_logger, null_filenum,
null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
0,
null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
assert(r == 0);
toku_pma_verify(pmaa);
toku_pma_verify(pmac);
@ -1192,7 +1194,7 @@ static void test_pma_bulk_insert_n(int n) {
}
/* bulk insert n kv pairs */
r = toku_pma_bulk_insert(null_txn, null_filenum, (DISKOFF)0, pma, keys, vals, n, rand4fingerprint, &sum, 0);
r = toku_pma_bulk_insert(null_logger, null_filenum, (DISKOFF)0, pma, keys, vals, n, rand4fingerprint, &sum, 0);
assert(r == 0);
assert(sum==expect_fingerprint);
toku_pma_verify(pma);

View file

@ -27,9 +27,9 @@
/**************************** static functions forward declarations. *********************/
/* resize the pma array to asksize. zero all array entries starting from startx.*/
static int pma_resize_array(TOKUTXN, FILENUM, DISKOFF, PMA pma, int asksize, int startx, LSN *node_lsn);
static int pma_resize_array(TOKULOGGER, FILENUM, DISKOFF, PMA pma, int asksize, int startx, LSN *node_lsn);
static int old_pma_resize_array(PMA pma, int asksize, int startx) {
return pma_resize_array((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, pma, asksize, startx, (LSN*)0);
return pma_resize_array((TOKULOGGER)0, (FILENUM){0}, (DISKOFF)0, pma, asksize, startx, (LSN*)0);
}
/* extract pairs from the pma in the window delimited by lo and hi.*/
@ -351,7 +351,7 @@ static int distribute_data (struct kv_pair *destpairs[], int dcount,
}
}
static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff, DISKOFF new_diskoff, int n_pairs, struct kv_pair_tag *pairs, LSN *oldnode_lsn, LSN*newnode_lsn) {
static int pma_log_distribute (TOKULOGGER logger, FILENUM filenum, DISKOFF old_diskoff, DISKOFF new_diskoff, int n_pairs, struct kv_pair_tag *pairs, LSN *oldnode_lsn, LSN*newnode_lsn) {
INTPAIRARRAY ipa;
MALLOC_N(n_pairs, ipa.array);
if (ipa.array==0) return errno;
@ -365,9 +365,9 @@ static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff
}
}
ipa.size=j;
int r=toku_log_pmadistribute(txn, toku_txn_get_txnid(txn), filenum, old_diskoff, new_diskoff, ipa);
if (txn && oldnode_lsn) *oldnode_lsn = toku_txn_get_last_lsn(txn);
if (txn && newnode_lsn) *newnode_lsn = toku_txn_get_last_lsn(txn);
int r=toku_log_pmadistribute(logger, filenum, old_diskoff, new_diskoff, ipa);
if (logger && oldnode_lsn) *oldnode_lsn = toku_logger_last_lsn(logger);
if (logger && newnode_lsn) *newnode_lsn = toku_logger_last_lsn(logger);
// if (0 && pma) {
// printf("Pma state:\n");
// PMA_ITERATE_IDX (pma, pidx, key, keylen, data, datalen,
@ -379,7 +379,7 @@ static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff
/* spread the non-empty pairs around. There are n of them. Create an empty slot just before the IDXth
element, and return that slot's index in the smoothed array. */
int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, struct kv_pair *pairs[], int n, int idx, int base, PMA pma, int *new_idx, LSN *node_lsn) {
int toku_pmainternal_smooth_region (TOKULOGGER logger, FILENUM filenum, DISKOFF diskoff, struct kv_pair *pairs[], int n, int idx, int base, PMA pma, int *new_idx, LSN *node_lsn) {
int i;
int n_present=0;
for (i=0; i<n; i++) {
@ -414,7 +414,7 @@ int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskof
/* Now the tricky part. Distribute the data. */
newidx=distribute_data (pairs, n,
tmppairs, n_saved, pma);
int r = pma_log_distribute(txn, filenum, diskoff, diskoff,
int r = pma_log_distribute(logger, filenum, diskoff, diskoff,
n_saved,
tmppairs,
node_lsn, node_lsn);
@ -538,12 +538,12 @@ static int pma_resize_array_nolog(PMA pma, int asksize, int startz, unsigned int
return 0;
}
static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz, LSN *node_lsn) {
static int pma_resize_array(TOKULOGGER logger, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz, LSN *node_lsn) {
unsigned int oldN, n;
int r = pma_resize_array_nolog(pma, asksize, startz, &oldN, &n);
if (r!=0) return r;
toku_log_resizepma (txn, toku_txn_get_txnid(txn), filenum, offset, oldN, n);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
toku_log_resizepma (logger, filenum, offset, oldN, n);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
return 0;
}
@ -580,7 +580,7 @@ static int pma_next_key(PMA pma, DBT *k, DBT *v, int here, int n, int *found) {
/* Make some space for a key to go at idx (the thing currently at idx should end up at to the right.) */
/* (Making space may involve moving things around, including the hole at index.) */
int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int idx, unsigned int *new_index, LSN *node_lsn) {
int toku_pmainternal_make_space_at (TOKULOGGER logger, FILENUM filenum, DISKOFF offset, PMA pma, int idx, unsigned int *new_index, LSN *node_lsn) {
/* Within a range LO to HI we have a limit of how much packing we will tolerate.
* We allow the entire array to be 50% full.
* We allow a region of size lgN to be full.
@ -616,7 +616,7 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset
size*=2;
// printf("pma_make_space_realloc %d to %d hi %d\n", pma->N, size, hi);
pma_resize_array(txn, filenum, offset, pma, size, hi, node_lsn);
pma_resize_array(logger, filenum, offset, pma, size, hi, node_lsn);
hi=size;
//printf("doubled N\n");
@ -629,7 +629,7 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset
//printf("%s:%d Smoothing from %d to %d to density %f\n", __FILE__, __LINE__, lo, hi, density);
{
int sub_new_index;
int r = toku_pmainternal_smooth_region(txn, filenum, offset, pma->pairs+lo, hi-lo, idx-lo, lo, pma, &sub_new_index, node_lsn);
int r = toku_pmainternal_smooth_region(logger, filenum, offset, pma->pairs+lo, hi-lo, idx-lo, lo, pma, &sub_new_index, node_lsn);
if (r!=0) return r;
*new_index=sub_new_index+lo;
return 0;
@ -693,7 +693,7 @@ int toku_pma_free (PMA *pmap) {
/* Copies keylen and datalen */
/* returns an error if the key is already present. */
int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, u_int32_t rand4fingerprint, u_int32_t *fingerprint, LSN *node_lsn) {
int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILENUM filenum, DISKOFF diskoff, u_int32_t rand4fingerprint, u_int32_t *fingerprint, LSN *node_lsn) {
int found;
unsigned int idx = pma_search(pma, k, pma->dup_mode & TOKU_DB_DUPSORT ? v : 0, 0, pma->N, &found);
if (found)
@ -701,7 +701,7 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx;
int r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx, (LSN*)0); /* returns the new idx. */
int r = toku_pmainternal_make_space_at (logger, filenum, diskoff, pma, idx, &newidx, (LSN*)0); /* returns the new idx. */
if (r!=0) return r;
idx = newidx;
}
@ -715,9 +715,8 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
if (toku_txn_get_last_lsn(txn).lsn>=3836455 && toku_txn_get_last_lsn(txn).lsn<=3836460) printf("%s:%d inserting\n", __FILE__, __LINE__);
int r = toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
int r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
return r;
}
}
@ -840,7 +839,7 @@ static void pma_delete_at(PMA pma, int here) {
int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
int *replaced_v_size, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
TOKUTXN txn, FILENUM filenum, DISKOFF diskoff,
TOKULOGGER logger, TXNID xid, FILENUM filenum, DISKOFF diskoff,
u_int32_t rand4fingerprint, u_int32_t *fingerprint,
LSN *node_lsn) {
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
@ -854,10 +853,10 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
{
const BYTESTRING deletedkey = { kv->keylen, kv_pair_key(kv) };
const BYTESTRING deleteddata = { kv->vallen, kv_pair_val(kv) };
r=toku_log_deleteinleaf(txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, deletedkey, deleteddata);
r=toku_log_deleteinleaf(logger, xid, pma->filenum, diskoff, idx, deletedkey, deleteddata);
if (r!=0) return r;
}
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
if (v->size == (unsigned int) kv_pair_vallen(kv)) {
memcpy(kv_pair_val(kv), v->data, v->size);
} else {
@ -870,7 +869,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
}
if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx;
r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx, node_lsn); /* returns the new idx. */
r = toku_pmainternal_make_space_at (logger, filenum, diskoff, pma, idx, &newidx, node_lsn); /* returns the new idx. */
if (r!=0) return r;
idx=newidx;
}
@ -886,8 +885,8 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
r = toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
}
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r;
@ -944,7 +943,7 @@ static void __pma_relocate_kvpairs(PMA pma) {
#endif
int toku_pma_split(TOKUTXN txn, FILENUM filenum,
int toku_pma_split(TOKULOGGER logger, FILENUM filenum,
DISKOFF diskoff, PMA pma, unsigned int *pma_size_p, u_int32_t rand4fp, u_int32_t *fingerprint_p, LSN *lsn,
DBT *splitk,
DISKOFF newdiskoff, PMA newpma, unsigned int *newpma_size_p, u_int32_t newrand4fp, u_int32_t *newfingerprint_p, LSN *newlsn) {
@ -1015,11 +1014,11 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
/* put the second half of pairs into the right pma */
/* Do this first, so that the logging will move the stuff out of the left pma first, and then later when we redistribute in the left PMA, we won't overwrite something. */
n = npairs - spliti;
error = pma_resize_array(txn, filenum, newdiskoff, newpma, n + n/4, 0, newlsn);
error = pma_resize_array(logger, filenum, newdiskoff, newpma, n + n/4, 0, newlsn);
assert(error == 0);
distribute_data(newpma->pairs, toku_pma_index_limit(newpma), &pairs[spliti], n, newpma);
{
int r = pma_log_distribute(txn, filenum, diskoff, newdiskoff, n, &pairs[spliti], lsn, newlsn);
int r = pma_log_distribute(logger, filenum, diskoff, newdiskoff, n, &pairs[spliti], lsn, newlsn);
if (r!=0) { toku_free(pairs); return r; }
}
#if PMA_USE_MEMPOOL
@ -1041,11 +1040,11 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
assert(error == 0);
distribute_data(pma->pairs, toku_pma_index_limit(pma), &pairs[0], n, pma);
{
int r = pma_log_distribute(txn, filenum, diskoff, diskoff, spliti, &pairs[0], lsn, lsn);
int r = pma_log_distribute(logger, filenum, diskoff, diskoff, spliti, &pairs[0], lsn, lsn);
if (r!=0) { toku_free(pairs); return r; }
r = toku_log_resizepma(txn, toku_txn_get_txnid(txn), filenum, diskoff, oldn_for_logging, newn_for_logging);
r = toku_log_resizepma(logger, filenum, diskoff, oldn_for_logging, newn_for_logging);
if (r!=0) { toku_free(pairs); return r; }
if (txn && lsn) *lsn = toku_txn_get_last_lsn(txn);
if (logger && lsn) *lsn = toku_logger_last_lsn(logger);
}
// Don't have to relocate kvpairs, because these ones are still there.
@ -1071,7 +1070,7 @@ static void __pma_bulk_cleanup(struct pma *pma, struct kv_pair_tag *pairs, int n
pma_mfree_kv_pair(pma, pairs[i].pair);
}
int toku_pma_bulk_insert(TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4fp, u_int32_t *sum, LSN *node_lsn) {
int toku_pma_bulk_insert(TOKULOGGER logger, FILENUM filenum, DISKOFF diskoff, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4fp, u_int32_t *sum, LSN *node_lsn) {
struct kv_pair_tag *newpairs;
int i;
int error;
@ -1103,7 +1102,7 @@ int toku_pma_bulk_insert(TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, PMA pma,
}
}
error = pma_resize_array(txn, filenum, diskoff, pma, n_newpairs + n_newpairs/4, 0, node_lsn);
error = pma_resize_array(logger, filenum, diskoff, pma, n_newpairs + n_newpairs/4, 0, node_lsn);
if (error) {
__pma_bulk_cleanup(pma, newpairs, n_newpairs);
toku_free(newpairs);

View file

@ -48,7 +48,7 @@ int toku_pma_n_entries (PMA);
//enum pma_errors toku_pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
// The DB pointer is there so that the comparison function can be called.
enum pma_errors toku_pma_insert (PMA, DBT*, DBT*, TOKUTXN, FILENUM, DISKOFF, u_int32_t /*random for fingerprint */, u_int32_t */*fingerprint*/, LSN *node_lsn);
enum pma_errors toku_pma_insert (PMA, DBT*, DBT*, TOKULOGGER, TXNID, FILENUM, DISKOFF, u_int32_t /*random for fingerprint */, u_int32_t */*fingerprint*/, LSN *node_lsn);
/* This returns an error if the key is NOT present. */
int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
@ -61,7 +61,7 @@ int toku_pma_delete (PMA, DBT */*key*/, DBT */*val*/, u_int32_t /*random for fin
int toku_pma_insert_or_replace (PMA /*pma*/, DBT */*k*/, DBT */*v*/,
int */*replaced_v_size*/, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
TOKUTXN /*txn*/, FILENUM, DISKOFF,
TOKULOGGER, TXNID, FILENUM, DISKOFF,
u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/,
LSN */*node_lsn*/);
@ -90,7 +90,7 @@ int toku_pma_search(PMA, brt_search_t *, DBT *, DBT *);
* The original PMA gets keys <= pivot key
* The NEWPMA gets keys > pivot key
*/
int toku_pma_split(TOKUTXN, FILENUM,
int toku_pma_split(TOKULOGGER, FILENUM,
DISKOFF /*diskoff*/, PMA /*pma*/, unsigned int */*pma_size*/, u_int32_t /*rand4sum*/, u_int32_t */*fingerprint*/, LSN* /*lsn*/,
DBT */*splitk*/,
DISKOFF /*newdiskoff*/, PMA /*newpma*/, unsigned int */*newpma_size*/, u_int32_t /*newrand4sum*/, u_int32_t */*newfingerprint*/, LSN* /*newlsn*/);
@ -106,7 +106,7 @@ int toku_pma_split(TOKUTXN, FILENUM,
* vals - an array of values
* n_newpairs - the number of key value pairs
*/
int toku_pma_bulk_insert(TOKUTXN, FILENUM, DISKOFF, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint, LSN */*node_lsn*/);
int toku_pma_bulk_insert(TOKULOGGER, FILENUM, DISKOFF, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint, LSN */*node_lsn*/);
int toku_pma_random_pick(PMA, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen);