/* -*- mode: C; c-basic-offset: 4 -*- */ #ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved." /* * We always write nodes to a new location on disk. * The nodes themselves contain the information about the tree structure. * Q: During recovery, how do we find the root node without looking at every block on disk? * A: The root node is either the designated root near the front of the freelist. * The freelist is updated infrequently. Before updating the stable copy of the freelist, we make sure that * the root is up-to-date. We can make the freelist-and-root update be an arbitrarily small fraction of disk bandwidth. * */ #include #include #include #include #include #include #include #include "toku_assert.h" #include "brt-internal.h" #include "key.h" #include "log_header.h" typedef struct weakstrong { char ignore; } *WS; #define WEAK ((WS)1) #define STRONG ((WS)0) extern long long n_items_malloced; static void verify_local_fingerprint_nonleaf (BRTNODE node); #ifdef FOO static int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER); /* Frees a node, including all the stuff in the hash table. */ void toku_brtnode_free (BRTNODE *nodep) { BRTNODE node=*nodep; int i; //printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, node, node->mdicts[0]); if (node->height>0) { for (i=0; iu.n.n_children-1; i++) { toku_free((void*)node->u.n.childkeys[i]); } for (i=0; iu.n.n_children; i++) { if (BNC_BUFFER(node,i)) { toku_fifo_free(&BNC_BUFFER(node,i)); } } } else { if (node->u.l.buffer) // The buffer may have been freed already, in some cases. toku_pma_free(&node->u.l.buffer); } toku_free(node); *nodep=0; } #endif static long brtnode_size(BRTNODE node) { return toku_serialize_brtnode_size(node); } static void toku_update_brtnode_loggerlsn(BRTNODE node, TOKULOGGER logger) { if (logger) { node->log_lsn = toku_logger_last_lsn(logger); } } static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE child, BRT brt, TOKULOGGER logger) { u_int32_t old_fingerprint = BNC_SUBTREE_FINGERPRINT(node,childnum_of_node); u_int32_t sum = child->local_fingerprint; if (child->height>0) { int i; for (i=0; iu.n.n_children; i++) { sum += BNC_SUBTREE_FINGERPRINT(child,i); } } // Don't try to get fancy about not modifying the fingerprint if it didn't change. // We only call this function if we have reason to believe that the child's fingerprint did change. BNC_SUBTREE_FINGERPRINT(node,childnum_of_node)=sum; node->dirty=1; toku_log_changechildfingerprint(logger, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum); toku_update_brtnode_loggerlsn(node, logger); } // If you pass in data==0 then it only compares the key, not the data (even if is a DUPSORT database) static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) { int cmp; DBT mydbt; struct kv_pair *kv = (struct kv_pair *) ck; if (brt->flags & TOKU_DB_DUPSORT) { cmp = brt->compare_fun(brt->db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv))); if (cmp == 0 && data != 0) cmp = brt->dup_compare(brt->db, data, toku_fill_dbt(&mydbt, kv_pair_val(kv), kv_pair_vallen(kv))); } else { cmp = brt->compare_fun(brt->db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv))); } return cmp; } #ifdef FOO void toku_brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnode_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) { BRTNODE brtnode = brtnode_v; if (0) { printf("%s:%d toku_brtnode_flush_callback %p keep_me=%d height=%d", __FILE__, __LINE__, brtnode, keep_me, brtnode->height); if (brtnode->height==0) printf(" pma=%p", brtnode->u.l.buffer); printf("\n"); } assert(brtnode->thisnodename==nodename); if (write_me) { toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode->nodesize, brtnode); } if (!keep_me) { toku_brtnode_free(&brtnode); } } int toku_brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn) { BRT t =(BRT)extraargs; BRTNODE *result=(BRTNODE*)brtnode_pv; int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, result, t->flags, t->nodesize, t->compare_fun, t->dup_compare, t->db, toku_cachefile_filenum(t->cf)); if (r == 0) { *sizep = brtnode_size(*result); *written_lsn = (*result)->disk_lsn; } return r; } void toku_brtheader_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *header_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN lsn __attribute__((__unused__)), BOOL rename_p __attribute__((__unused__))) { struct brt_header *h = header_v; assert(nodename==0); assert(!h->dirty); // shouldn't be dirty once it is unpinned. if (write_me) { toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h); } if (!keep_me) { if (h->n_named_roots>0) { int i; for (i=0; in_named_roots; i++) { toku_free(h->names[i]); } toku_free(h->names); toku_free(h->roots); } toku_free(h); } } int toku_brtheader_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **headerp_v, long *sizep __attribute__((unused)), void*extraargs __attribute__((__unused__)), LSN *written_lsn) { struct brt_header **h = (struct brt_header **)headerp_v; assert(nodename==0); int r = toku_deserialize_brtheader_from(toku_cachefile_fd(cachefile), nodename, h); written_lsn->lsn = 0; // !!! WRONG. This should be stored or kept redundantly or something. return r; } int toku_read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header) { void *header_p; //fprintf(stderr, "%s:%d read_and_pin_brt_header(...)\n", __FILE__, __LINE__); int r = toku_cachetable_get_and_pin(cf, 0, &header_p, NULL, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0); if (r!=0) return r; *header = header_p; return 0; } int toku_unpin_brt_header (BRT brt) { int r = toku_cachetable_unpin(brt->cf, 0, brt->h->dirty, 0); brt->h->dirty=0; brt->h=0; return r; } #endif static int unpin_brtnode (BRT brt, BRTNODE node) { return toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, brtnode_size(node)); } #ifdef FOO typedef struct kvpair { bytevec key; unsigned int keylen; bytevec val; unsigned int vallen; } *KVPAIR; /* Forgot to handle the case where there is something in the freelist. */ static int malloc_diskblock_header_is_in_memory (DISKOFF *res, BRT brt, int size, TOKULOGGER logger) { DISKOFF result = brt->h->unused_memory; brt->h->unused_memory+=size; brt->h->dirty = 1; int r = toku_log_changeunusedmemory(logger, toku_cachefile_filenum(brt->cf), result, brt->h->unused_memory); *res = result; return r; } int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER logger) { #if 0 int r = read_and_pin_brt_header(brt->fd, &brt->h); assert(r==0); { DISKOFF result = malloc_diskblock_header_is_in_memory(brt, size); r = write_brt_header(brt->fd, &brt->h); assert(r==0); return result; } #else return malloc_diskblock_header_is_in_memory(res, brt,size, logger); #endif } static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height) { int i; n->tag = TYP_BRTNODE; n->nodesize = t->h->nodesize; n->flags = t->h->flags; n->thisnodename = nodename; n->disk_lsn.lsn = 0; // a new one can always be 0. n->log_lsn = n->disk_lsn; n->layout_version = 2; n->height = height; n->rand4fingerprint = random(); n->local_fingerprint = 0; n->dirty = 1; assert(height>=0); if (height>0) { n->u.n.n_children = 0; for (i=0; iu.n.childkeys[i] = 0; // n->u.n.childkeylens[i] = 0; } n->u.n.totalchildkeylens = 0; for (i=0; iu.n.children[i] = 0; // n->u.n.buffers[i] = 0; BNC_NBYTESINBUF(n,i) = 0; } n->u.n.n_bytes_in_buffers = 0; } else { int r = toku_pma_create(&n->u.l.buffer, t->compare_fun, t->db, toku_cachefile_filenum(t->cf), n->nodesize); assert(r==0); toku_pma_set_dup_mode(n->u.l.buffer, t->flags & (TOKU_DB_DUP+TOKU_DB_DUPSORT)); toku_pma_set_dup_compare(n->u.l.buffer, t->dup_compare); static int rcount=0; //printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount); rcount++; n->u.l.n_bytes_in_buffer = 0; } } int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logger) { TAGMALLOC(BRTNODE, n); int r; DISKOFF name; if ((r = malloc_diskblock(&name, t, t->h->nodesize, logger))) return r; assert(n); assert(t->h->nodesize>0); //printf("%s:%d malloced %lld (and malloc again=%lld)\n", __FILE__, __LINE__, name, malloc_diskblock(t, t->nodesize)); initialize_brtnode(t, n, name, height); *result = n; assert(n->nodesize>0); // n->brt = t; //printf("%s:%d putting %p (%lld) parent=%p\n", __FILE__, __LINE__, n, n->thisnodename, parent_brtnode); if ((r = toku_cachetable_put(t->cf, n->thisnodename, n, brtnode_size(n), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t))) return r; if ((r = toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint))) return r; toku_update_brtnode_loggerlsn(n, logger); return 0; } #ifdef FOO static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, BRT_CMD cmd) { unsigned int n_bytes_added = BRT_CMD_OVERHEAD + KEY_VALUE_OVERHEAD + cmd->u.id.key->size + cmd->u.id.val->size; int r = toku_fifo_enq_cmdstruct(BNC_BUFFER(node,childnum), cmd); if (r!=0) return r; node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_cmdstruct(cmd); BNC_NBYTESINBUF(node,childnum) += n_bytes_added; node->u.n.n_bytes_in_buffers += n_bytes_added; node->dirty = 1; return 0; } #endif // Split a leaf node, reusing it in new_nodes (as the last element) static int split_leaf_node (BRT t, TOKULOGGER logger, BRTNODE node, int *n_new_nodes, BRTNODE **new_nodes, DBT **splitks) { assert(node->height==0); int r; int n_children=1; // Initially we have the node itself. BRTNODE *result_nodes=toku_malloc(sizeof(*result_nodes)); if (errno!=0) { r=errno; if (0) { died0: toku_free(result_nodes); } return r; } DBT *result_splitks=toku_malloc(sizeof(*result_splitks)); if (errno!=0) { r=errno; if (0) { died1: toku_free(result_splitks); } goto died0; } while (toku_serialize_brtnode_size(node)>node->nodesize) { BRTNODE B; DBT splitk; if ((r = toku_create_new_brtnode(t, &B, 0, logger))) return r; // Split so that B is at least 1/2 full // The stuff in B goes *before* node if ((r = toku_pma_split(logger, toku_cachefile_filenum(t->cf), node->thisnodename, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn, &splitk, B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint, &B->log_lsn))) goto died1; n_children++; result_nodes = toku_realloc(result_nodes, n_children*sizeof(*result_nodes)); result_nodes[n_children-2] = B; result_splitks = toku_realloc(result_nodes, (n_children-1)*sizeof(*result_splitks)); result_splitks[n_children-2] = splitk; } result_nodes[n_children-1]=node; *n_new_nodes = n_children; *new_nodes = result_nodes; *splitks = result_splitks; return 0; } #endif static void find_heaviest_child (BRTNODE node, int *childnum) { int max_child = 0; int max_weight = BNC_NBYTESINBUF(node, 0); int i; if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight); assert(node->u.n.n_children>0); for (i=1; iu.n.n_children; i++) { int this_weight = BNC_NBYTESINBUF(node,i); if (0) printf(" %d", this_weight); if (max_weight < this_weight) { max_child = i; max_weight = this_weight; } } *childnum = max_child; if (0) printf("\n"); } /* find the leftmost child that may contain the key */ static unsigned int brtnode_which_child (BRTNODE node , DBT *k, DBT *d, BRT t) { int i; assert(node->height>0); for (i=0; iu.n.n_children-1; i++) { int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]); if (cmp > 0) continue; if (cmp < 0) return i; return i; } return node->u.n.n_children-1; } static int brtnode_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p); // If CHILD is too wide, split it, and create a new node with the new children. Unpin CHILD or the new children (even if something goes wrong). // If it does split, unpin the new root node also. static int maybe_split_root(BRT brt, BRTNODE child, CACHEKEY *rootp, TOKULOGGER logger); // if CHILD is too wide, split it, and fix up NODE. Either way, unpin the child or resulting children (even if it fails do the unpin) static int maybe_split_nonroot (BRT brt, BRTNODE node, int childnum, BRTNODE child, int *n_children_replacing_child, TOKULOGGER logger); // Push stuff into a child weakly. (That is don't cause any I/O or cause the child to get too big.) static int weak_push_to_child (BRT brt, BRTNODE node, int childnum, TOKULOGGER logger) { void *child_v; int r = toku_cachetable_maybe_get_and_pin(brt->cf, BNC_DISKOFF(node, childnum), &child_v); if (r!=0) return 0; BRTNODE child = child_v; DBT key,val; BRT_CMD_S cmd; while (0 == toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) { r = brtnode_put(brt, child, &cmd, logger, WEAK); if (r==EAGAIN) break; if (r!=0) goto died; r=toku_fifo_deq(BNC_BUFFER(node, childnum)); if (r!=0) goto died; } return unpin_brtnode(brt, child); died: unpin_brtnode(brt, child); return r; } // If the buffers are too big, push stuff down. The subchild may need to be split, in which case our fanout may get too large. // When are done, this node is has little enough stuff in its buffers (but the fanout may be too large), and all the descendant // nodes are properly sized (the buffer sizes and fanouts are all small enough). static int push_down_if_buffers_too_full(BRT brt, BRTNODE node, TOKULOGGER logger) { if (node->height==0) return 0; // can't push down for leaf nodes while (node->u.n.n_bytes_in_buffers > 0 && toku_serialize_brtnode_size(node)>node->nodesize) { int childnum; find_heaviest_child(node, &childnum); void *child_v; int r = toku_cachetable_get_and_pin(brt->cf, BNC_DISKOFF(node, childnum), &child_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); if (r!=0) return r; BRTNODE child=child_v; if (0) { died: unpin_brtnode(brt, child); return r; } BRT_CMD_S cmd; DBT key,val; while (0==toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) { r=toku_fifo_deq(BNC_BUFFER(node, childnum)); assert(r==0); // we just did a peek, so the buffer must be nonempty r=brtnode_put(brt, child, &cmd, logger, WEAK); if (r!=EAGAIN && r!=0) goto died; if (r==EAGAIN) { // Weak pushes ran out of steam. Now do a strong push if there is still something in the buffer. if (0==toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) { r=brtnode_put(brt, child, &cmd, logger, STRONG); if (r!=0) goto died; r=toku_fifo_deq(BNC_BUFFER(node, childnum)); if (r!=0) goto died; // Now it's possible that the child must be split. (Or maybe the child managed to flush stuff to our grandchildren) int n_children_replacing_child; r=maybe_split_nonroot(brt, node, childnum, child, &n_children_replacing_child, logger); if (r!=0) return r; // don't go to died since that unpins int i; for (i=0; idata pointer to a malloc'd value */ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger) { int old_n_children = node->u.n.n_children; int n_children_in_a = old_n_children/2; int n_children_in_b = old_n_children-n_children_in_a; BRTNODE B; FILENUM fnum = toku_cachefile_filenum(t->cf); assert(node->height>0); assert(node->u.n.n_children>=2); // Otherwise, how do we split? We need at least two children to split. */ assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */ toku_create_new_brtnode(t, &B, node->height, logger); B->u.n.n_children =n_children_in_b; //printf("%s:%d %p (%lld) becomes %p and %p\n", __FILE__, __LINE__, node, node->thisnodename, A, B); //printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename); { /* The first n_children_in_a go into node a. * That means that the first n_children_in_a-1 keys go into node a. * The splitter key is key number n_children_in_a */ int i; for (i=0; ithisnodename, targchild, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i)); if (r!=0) return r; while (1) { bytevec key, data; unsigned int keylen, datalen; int type; TXNID xid; int fr = toku_fifo_peek(from_htab, &key, &keylen, &data, &datalen, &type, &xid); if (fr!=0) break; int n_bytes_moved = keylen+datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; BYTESTRING keybs = { .len = keylen, .data = (char*)key }; BYTESTRING databs = { .len = datalen, .data = (char*)data }; u_int32_t old_from_fingerprint = node->local_fingerprint; u_int32_t old_to_fingerprint = B->local_fingerprint; u_int32_t delta = toku_calccrc32_cmd(type, xid, key, keylen, data, datalen); u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta; u_int32_t new_to_fingerprint = old_to_fingerprint + B->rand4fingerprint *delta; if (r!=0) return r; r = toku_log_brtdeq(logger, fnum, node->thisnodename, n_children_in_a, xid, type, keybs, databs, old_from_fingerprint, new_from_fingerprint); if (r!=0) return r; r = toku_log_brtenq(logger, fnum, B->thisnodename, targchild, xid, type, keybs, databs, old_to_fingerprint, new_to_fingerprint); r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid); if (r!=0) return r; toku_fifo_deq(from_htab); // key and data will no longer be valid node->local_fingerprint = new_from_fingerprint; B->local_fingerprint = new_to_fingerprint; B->u.n.n_bytes_in_buffers += n_bytes_moved; BNC_NBYTESINBUF(B, targchild) += n_bytes_moved; node->u.n.n_bytes_in_buffers -= n_bytes_moved; BNC_NBYTESINBUF(node, i) -= n_bytes_moved; // verify_local_fingerprint_nonleaf(B); // verify_local_fingerprint_nonleaf(node); } // Delete a child, removing it's fingerprint, and also the preceeding pivot key. The child number must be > 0 { BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]), .data = kv_pair_key(node->u.n.childkeys[i-1]) }; assert(i>0); r = toku_log_delchild(logger, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i), bs); if (r!=0) return r; if (i>n_children_in_a) { r = toku_log_setpivot(logger, fnum, B->thisnodename, targchild-1, bs); if (r!=0) return r; B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1]; B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]); node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]); node->u.n.childkeys[i-1] = 0; } } BNC_DISKOFF(node, i) = 0; BNC_SUBTREE_FINGERPRINT(B, targchild) = BNC_SUBTREE_FINGERPRINT(node, i); BNC_SUBTREE_FINGERPRINT(node, i) = 0; assert(BNC_NBYTESINBUF(node, i) == 0); } // Drop the n_children now (not earlier) so that we can do the fingerprint verification at any time. node->u.n.n_children=n_children_in_a; for (i=n_children_in_a; idata = (void*)(node->u.n.childkeys[n_children_in_a-1]); splitk->size = toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]); node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]); node->u.n.childkeys[n_children_in_a-1]=0; verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(B); } *nodeb = B; assert(toku_serialize_brtnode_size(node)nodesize); assert(toku_serialize_brtnode_size(B)nodesize); return 0; } static int nonleaf_node_is_too_wide (BRTNODE node) { assert(node->height>0); return node->u.n.n_children > TREE_FANOUT; } static int maybe_fixup_fat_child(BRT brt, BRTNODE node, int childnum, BRTNODE child, TOKULOGGER logger) // If the node is too big then deal with it. Unpin the child (or children if it splits) NODE may be too big at the end { int r = push_down_if_buffers_too_full(brt, child, logger); if (r!=0) return r; // now the child may have too much fanout. if (child->height>0) { if (nonleaf_node_is_too_wide(child)) { BRTNODE newchild; DBT splitk; if ((r=brt_nonleaf_split(brt, child, &newchild, &splitk, logger))) return r; int old_n_children = node->u.n.n_children; FIFO old_fifo = BNC_BUFFER(node, childnum); // slide the children over REALLOC_N(old_n_children+1, node->u.n.childinfos); memmove(&node->u.n.childinfos[childnum+1], &node->u.n.childinfos[childnum+2], (old_n_children-childnum-1)*sizeof(node->u.n.childinfos[0])); // fill in the new children { struct brtnode_nonleaf_childinfo *ci = &node->u.n.childinfos[childnum+1]; ci->subtree_fingerprint = 0; ci->diskoff = newchild->thisnodename; ci->n_bytes_in_buffer = 0; r=toku_fifo_create(&ci->buffer); if (r!=0) return r; } // replace the fifo in the old child r=toku_fifo_create(&BNC_BUFFER(node, childnum)); if (r!=0) return r; // slide the keys over REALLOC_N(old_n_children, node->u.n.childkeys); memmove(&node->u.n.childkeys[childnum], &node->u.n.childkeys[childnum+1], (old_n_children-childnum-1)*sizeof(node->u.n.childkeys[0])); { struct kv_pair *pivot = splitk.data; BYTESTRING bs = { .len = splitk.size, .data = kv_pair_key(pivot) }; r = toku_log_setpivot(logger, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum, bs); if (r!=0) return r; node->u.n.childkeys[childnum] = pivot; node->u.n.totalchildkeylens += toku_brt_pivot_key_len(brt, pivot); } node->u.n.n_children++; // fix up fingerprints fixup_child_fingerprint(node, childnum, child, brt, logger); fixup_child_fingerprint(node, childnum+1, newchild, brt, logger); // now everything in the fifo must be put again into one of the two fifos BRT_CMD_S cmd; DBT key,val; while (0==toku_fifo_peek_deq_cmdstruct(old_fifo, &cmd, &key, &val)) { int cmp = brt_compare_pivot(brt, cmd.u.id.key, 0, node->u.n.childkeys[childnum]); if (cmp<=0) { r=toku_fifo_enq_cmdstruct(BNC_BUFFER(node, childnum), &cmd); if (r!=0) return r; } if (cmp==0 && cmd.type==BRT_DELETE && brt->flags&TOKU_DB_DUPSORT) { r=toku_fifo_enq_cmdstruct(BNC_BUFFER(node, childnum+1), &cmd); if (r!=0) return r; } } toku_fifo_free(&old_fifo); if (r!=0) return r; } } else { abort(); // if a leaf is too fat need to split it. } return 0; } // There are two kinds of puts: // A "weak" put that is guaranteed to trigger no I/O, and will not leaf the node overfull. // A weak put may not actually perform the put, however (in which case it returns EAGAIN instead of 0) // A "strong" put that is guaranteed to do the put. However, it may trigger I/O and the resulting node may be too big. static int brt_leaf_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p) { FILENUM filenum = toku_cachefile_filenum(t->cf); switch (cmd->type) { case BRT_INSERT: { int r = toku_pma_insert_or_replace_ws(node->u.l.buffer, cmd->u.id.key, cmd->u.id.val, logger, cmd->xid, filenum, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn, &node->u.l.n_bytes_in_buffer, weak_p==WEAK); if (r==EAGAIN) return EAGAIN; assert(r==0); node->dirty=1; return r; } case BRT_DELETE: { int r = toku_pma_delete_fixupsize(node->u.l.buffer, cmd->u.id.key, (DBT*)0, logger, cmd->xid, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn, &node->u.l.n_bytes_in_buffer); if (r==0) node->dirty=1; return r; } case BRT_DELETE_BOTH: { int r = toku_pma_delete_fixupsize(node->u.l.buffer, cmd->u.id.key, cmd->u.id.val, logger, cmd->xid, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn, &node->u.l.n_bytes_in_buffer); if (r == 0) node->dirty = 1; return r; } case BRT_NONE: return 0; } return EINVAL; // if none of the cases match, then the command is messed up. } // Put an command in a particular child's fifo. // If weak_p then do it without doing I/O or overfilling the child. // If the child is in main memory and we can do a weak put on the child, then push into the child. // Otherwise we return EAGAIN. // If not weak_p then we are willing to overfill the child. static int brt_nonleaf_put_cmd_to_child (BRT t, BRTNODE node, int childnum, BRT_CMD cmd, TOKULOGGER logger, WS weak_p) { DBT *k = cmd->u.id.key; DBT *v = cmd->u.id.val; int r; if (toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0) { void *child_v; r = toku_cachetable_maybe_get_and_pin(t->cf, BNC_DISKOFF(node, childnum), &child_v); if (r==0) { BRTNODE child=child_v; r = brtnode_put(t, child, cmd, logger, weak_p); if (r==EAGAIN) { r = unpin_brtnode(t, child); if (r!=0) return r; // node is still OK } else if (r==0) { return maybe_fixup_fat_child(t, node, childnum, child, logger); // If the node is too big then deal with it. Unpin the child. NODE may be too big. I think the only way a node can get fat is if weak_p==STRONG. } else { unpin_brtnode(t, child); return r; // node is still OK } } } // For some reason we didn't put it into the child, so we must put it in the fifo. int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; if (diff+toku_serialize_brtnode_size(node)>node->nodesize) return EAGAIN; // And it doesn't fit here. r=toku_fifo_enq_cmdstruct(BNC_BUFFER(node,childnum), cmd); if (r!=0) return r; node->local_fingerprint += node->rand4fingerprint * toku_calccrc32_cmdstruct(cmd); node->u.n.n_bytes_in_buffers += diff; BNC_NBYTESINBUF(node, childnum) += diff; node->dirty = 1; return 0; // node may be too big } static void determine_which_children_to_push_delete (BRT t, BRTNODE node, BRT_CMD cmd, int *n_children_to_push, int *children_to_push) { int i; *n_children_to_push=0; for (i=0; iu.n.n_children-1; i++) { int cmp = brt_compare_pivot(t, cmd->u.id.key, 0, node->u.n.childkeys[i]); if (cmp>0) continue; // the cmd is bigger than the pivot, so it doesn't go here. else if (cmp<0) { // the cmd is smaller than the pivot, so it goes here, and goes nowhere else to the right children_to_push[(*n_children_to_push)++] = i; return; } else if (t->flags & TOKU_DB_DUPSORT) { // the cmd is equal and we are in a dupsort, so push and and go around to push additional ones. children_to_push[(*n_children_to_push)++] = i; continue; } else { // the cmd is equal but we are not in a dupsort, so we save i, but there is no saving the next one. children_to_push[(*n_children_to_push)++] = i; return; } } // if we fell off the bottom, which means we must include the last one. children_to_push[(*n_children_to_push)++] = i; } // Put the cmd into all the subtrees that it belong in. (Deletes can end up in several subtrees.) // If weak_p then // Don't do any I/O and the node will not be overfull. // To guarantee that no I/O will occur, we must make sure we can insert everything before inserting anything. // else put it regardless, possibly overflowing the node. static int brt_nonleaf_put_delete (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p) { int singlediff = cmd->u.id.key->size + cmd->u.id.val->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; int n_children_to_push = 0; int children_to_push[node->u.n.n_children]; determine_which_children_to_push_delete(t, node, cmd, &n_children_to_push, children_to_push); int totaldiff = singlediff * n_children_to_push; if (weak_p && (totaldiff + toku_serialize_brtnode_size(node) > node->nodesize)) return EAGAIN; // Now we know it will fit, so do all the weak pushes. We are being a little bit conservative, // since a soft push might succeed, in getting data to a child without using up the local storage. int i; for (i=0; iu.id.key, cmd->u.id.val, t), cmd, logger, weak_p); } // Put the cmd into the node. Possibly results in the node being overfull. (But not if weak_p is set, in which case EAGAIN is returned instead) // The command could get pushed into the appropriate child if the child is in main memory and has space to hold the command. static int brt_nonleaf_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p) { if (cmd->type == BRT_INSERT || cmd->type == BRT_DELETE_BOTH) { return brt_nonleaf_put_nonreplicating_cmd(t, node, cmd, logger, weak_p); } else if (cmd->type == BRT_DELETE) { return brt_nonleaf_put_delete(t, node, cmd, logger, weak_p); } else return EINVAL; } // Put the command into the node. // If weak_p is set then neither the node nor any descendants will get too big, and no I/O will occur. // if !weak_p then I/O could occur and the node could end up with too much fanout. (But the children will all be properly sized) static int brtnode_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p) { if (node->height==0) { return brt_leaf_put(t, node, cmd, logger, weak_p); } else { return brt_nonleaf_put(t, node, cmd, logger, weak_p); } } static void verify_local_fingerprint_nonleaf (BRTNODE node) { u_int32_t fp=0; int i; if (node->height==0) return; for (i=0; iu.n.n_children; i++) FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid, ({ fp += node->rand4fingerprint * toku_calccrc32_cmd(type, xid, key, keylen, data, datalen); })); assert(fp==node->local_fingerprint); } #ifdef FOO static int setup_initial_brt_root_node (BRT t, DISKOFF offset, TOKULOGGER logger) { int r; TAGMALLOC(BRTNODE, node); assert(node); initialize_brtnode(t, node, offset, /* the location is one nodesize offset from 0. */ 0); // node->brt = t; if (0) { printf("%s:%d for tree %p node %p mdict_create--> %p\n", __FILE__, __LINE__, t, node, node->u.l.buffer); printf("%s:%d put root at %lld\n", __FILE__, __LINE__, offset); } r=toku_cachetable_put(t->cf, offset, node, brtnode_size(node), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t); if (r!=0) { toku_free(node); return r; } toku_verify_counts(node); toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint); toku_update_brtnode_loggerlsn(node, logger); r=unpin_brtnode(t, node); if (r!=0) { toku_free(node); return r; } return 0; } int toku_brt_create(BRT *brt_ptr) { BRT brt = toku_malloc(sizeof *brt); if (brt == 0) return ENOMEM; memset(brt, 0, sizeof *brt); list_init(&brt->cursors); brt->flags = 0; brt->nodesize = BRT_DEFAULT_NODE_SIZE; brt->compare_fun = toku_default_compare_fun; brt->dup_compare = toku_default_compare_fun; *brt_ptr = brt; return 0; } int toku_brt_set_flags(BRT brt, unsigned int flags) { brt->flags = flags; return 0; } int toku_brt_get_flags(BRT brt, unsigned int *flags) { *flags = brt->flags; return 0; } int toku_brt_set_nodesize(BRT brt, unsigned int nodesize) { brt->nodesize = nodesize; return 0; } int toku_brt_get_nodesize(BRT brt, unsigned int *nodesize) { *nodesize = brt->nodesize; return 0; } int toku_brt_set_bt_compare(BRT brt, int (*bt_compare)(DB *, const DBT*, const DBT*)) { brt->compare_fun = bt_compare; return 0; } int toku_brt_set_dup_compare(BRT brt, int (*dup_compare)(DB *, const DBT*, const DBT*)) { brt->dup_compare = dup_compare; return 0; } int toku_brt_get_fd(BRT brt, int *fdp) { *fdp = toku_cachefile_fd(brt->cf); return 0; } enum { UNDO_COUNTER_LIMIT=10 }; typedef void(*undo_fun)(void*); struct undo_rec { undo_fun f; void *v; }; struct undo { int undo_counter; struct undo_rec undos[UNDO_COUNTER_LIMIT]; }; #define INITUNDO(u) struct undo u = (struct undo){.undo_counter=0} void push_undo(struct undo *undos, undo_fun f, void *v) { assert(undos->undo_counterundos[undos->undo_counter++]=(struct undo_rec){f,v}; } void do_undos(struct undo *undos) { while (undos->undo_counter>0) { struct undo_rec *r = &undos->undos[--undos->undo_counter]; r->f(r->v); } } void undo_free (void *v) { void **ptr=v; toku_free(*ptr); *ptr=0; } // tbou means "toku_brt_open undo" void tbou_close_cachefile (void *v) { BRT t = v; toku_cachefile_close(&t->cf); } struct maybe_unpin_info { int is_pinned; CACHEFILE cf; CACHEKEY ckey; }; void tbou_maybe_unpin (void *v) { struct maybe_unpin_info *mui = v; if (mui->is_pinned) toku_cachetable_unpin(mui->cf, mui->ckey, 0, 0); mui->is_pinned=0; } int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char *dbname, int is_create, int only_create, int load_flags, CACHETABLE cachetable, TOKUTXN txn, DB *db) { /* If dbname is NULL then we setup to hold a single tree. Otherwise we setup an array. */ int r; struct maybe_unpin_info mui = {.is_pinned=0}; INITUNDO(undos); push_undo(&undos, tbou_maybe_unpin, &mui); // if we pin a cf, then we put it into the maybe_undo_info so it will get undone on error. assert(is_create || !only_create); assert(!load_flags || !only_create); if (0) { died: do_undos(&undos); return r; } { if (dbname) { char *malloced_name = toku_strdup(dbname); if (malloced_name==0) { r = errno; goto died; } push_undo(&undos, undo_free, &t->database_name); t->database_name = malloced_name; } else { t->database_name = 0; } } t->db = db; { int fd = open(fname, O_RDWR, 0777); r = errno; if (fd==-1) { if (r==ENOENT) { if (!is_create) { goto died; } fd = open(fname, O_RDWR | O_CREAT, 0777); if (fd==-1) { r=errno; goto died; } r = toku_logger_log_fcreate(txn, fname_in_env, 0777); if (r!=0) goto died; } else goto died; } if ((r = toku_cachetable_openfd(&t->cf, cachetable, fd, t))) goto died; push_undo(&undos, tbou_close_cachefile, t); } if ((r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf)))) goto died; // no undo action for log_fopen assert(t->nodesize>0); if (is_create) { r = toku_read_and_pin_brt_header(t->cf, &t->h); if (r!=0 && r!=-1) goto died; if (r==0) { mui=(struct maybe_unpin_info){.is_pinned=1, .cf=t->cf, .ckey=0}; // remember to unpin it int i; assert(r==0); assert(dbname); if (t->h->unnamed_root!=-1) { r=EINVAL; goto died; } // Cannot create a subdb in a file that is not enabled for subdbs assert(t->h->n_named_roots>=0); for (i=0; ih->n_named_roots; i++) { if (strcmp(t->h->names[i], dbname)==0) { if (only_create) { r = EEXIST; goto died; } else goto found_it; } } if ((t->h->names = toku_realloc(t->h->names, (1+t->h->n_named_roots)*sizeof(*t->h->names))) == 0) { r=errno; goto died; } if ((t->h->roots = toku_realloc(t->h->roots, (1+t->h->n_named_roots)*sizeof(*t->h->roots))) == 0) { r=errno; goto died; } t->h->n_named_roots++; if ((t->h->names[t->h->n_named_roots-1] = toku_strdup(dbname)) == 0) { r=errno; goto died; } push_undo(&undos, undo_free, &t->h->names[t->h->n_named_roots-1]); r = malloc_diskblock_header_is_in_memory(&t->h->roots[t->h->n_named_roots-1], t, t->h->nodesize, toku_txn_logger(txn)); if (r!=0) goto died; t->h->dirty = 1; if ((r=setup_initial_brt_root_node(t, t->h->roots[t->h->n_named_roots-1], toku_txn_logger(txn)))!=0) goto died; } else { assert(r==-1); // the pin failed because no data was present /* construct a new header. */ if ((MALLOC(t->h))==0) { r = errno; goto died; } t->h->dirty=1; t->h->flags = t->flags; t->h->nodesize=t->nodesize; t->h->freelist=-1; t->h->unused_memory=2*t->nodesize; if (dbname) { t->h->unnamed_root = -1; t->h->n_named_roots = 1; if ((MALLOC_N(1, t->h->names))==0) { r=errno; goto died; } push_undo(&undos, undo_free, &t->h->names); if ((MALLOC_N(1, t->h->roots))==0) { r=errno; goto died; } push_undo(&undos, undo_free, &t->h->roots); if ((t->h->names[0] = toku_strdup(dbname))==0) { r=errno; goto died; } push_undo(&undos, undo_free, &t->h->names[0]); t->h->roots[0] = t->nodesize; } else { t->h->unnamed_root = t->nodesize; t->h->n_named_roots = -1; t->h->names=0; t->h->roots=0; } if ((r=toku_logger_log_header(txn, toku_cachefile_filenum(t->cf), t->h))) goto died; if ((r=setup_initial_brt_root_node(t, t->nodesize, toku_txn_logger(txn)))!=0) goto died; if ((r=toku_cachetable_put(t->cf, 0, t->h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0))) goto died; mui=(struct maybe_unpin_info){.is_pinned=1, .cf=t->cf, .ckey=0}; // remember to unpin it } } else { if ((r = toku_read_and_pin_brt_header(t->cf, &t->h))!=0) goto died; mui=(struct maybe_unpin_info){.is_pinned=1, .cf=t->cf, .ckey=0}; // remember to unpin it if (!dbname) { if (t->h->n_named_roots!=-1) { r = EINVAL; goto died; } // requires a subdb } else { int i; if (t->h->n_named_roots==-1) { r=EINVAL; goto died; } // no suddbs in the db // printf("%s:%d n_roots=%d\n", __FILE__, __LINE__, t->h->n_named_roots); for (i=0; ih->n_named_roots; i++) { if (strcmp(t->h->names[i], dbname)==0) { goto found_it; } } r=ENOENT; /* the database doesn't exist */ goto died; } found_it: t->nodesize = t->h->nodesize; /* inherit the pagesize from the file */ if (t->flags != t->h->flags) { /* flags must match */ if (load_flags) t->flags = t->h->flags; else { r = EINVAL; goto died; } } } assert(t->h); if ((r = toku_unpin_brt_header(t)) !=0) goto died; // it's unpinned mui.is_pinned=0; assert(t->h==0); return 0; } int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) { int r; int i; int found = -1; assert(flags == 0); r = toku_read_and_pin_brt_header(brt->cf, &brt->h); if (r!=0) return r; assert(brt->h->unnamed_root==-1); assert(brt->h->n_named_roots>=0); for (i = 0; i < brt->h->n_named_roots; i++) { if (strcmp(brt->h->names[i], dbname) == 0) { found = i; break; } } if (found == -1) { //Should not be possible. r = ENOENT; goto error; } //Free old db name toku_free(brt->h->names[found]); //TODO: Free Diskblocks including root for (i = found + 1; i < brt->h->n_named_roots; i++) { brt->h->names[i - 1] = brt->h->names[i]; brt->h->roots[i - 1] = brt->h->roots[i]; } brt->h->n_named_roots--; brt->h->dirty = 1; //TODO: What if n_named_roots becomes 0? Should we handle it specially? Should we delete the file? if ((brt->h->names = toku_realloc(brt->h->names, (brt->h->n_named_roots)*sizeof(*brt->h->names))) == 0) { r=errno; goto error; } if ((brt->h->roots = toku_realloc(brt->h->roots, (brt->h->n_named_roots)*sizeof(*brt->h->roots))) == 0) { r=errno; goto error; } r = toku_unpin_brt_header(brt); return r; error: toku_unpin_brt_header(brt); return r; } // This one has no env int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt, int nodesize, CACHETABLE cachetable, TOKUTXN txn, int (*compare_fun)(DB*,const DBT*,const DBT*), DB *db) { BRT brt; int r; const int only_create = 0; const int load_flags = 0; r = toku_brt_create(&brt); if (r != 0) return r; toku_brt_set_nodesize(brt, nodesize); toku_brt_set_bt_compare(brt, compare_fun); r = toku_brt_open(brt, fname, fname, dbname, is_create, only_create, load_flags, cachetable, txn, db); if (r != 0) { toku_free(brt); return r; } *newbrt = brt; return 0; } int toku_close_brt (BRT brt) { int r; while (!list_empty(&brt->cursors)) { BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link); r=toku_brt_cursor_close(c); if (r!=0) return r; } if (brt->cf) { assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // For the brt, the pinned count should be zero. //printf("%s:%d closing cachetable\n", __FILE__, __LINE__); if ((r = toku_cachefile_close(&brt->cf))!=0) return r; } if (brt->database_name) toku_free(brt->database_name); if (brt->skey) { toku_free(brt->skey); } if (brt->sval) { toku_free(brt->sval); } toku_free(brt); return 0; } CACHEKEY* toku_calculate_root_offset_pointer (BRT brt) { if (brt->database_name==0) { return &brt->h->unnamed_root; } else { int i; for (i=0; ih->n_named_roots; i++) { if (strcmp(brt->database_name, brt->h->names[i])==0) { return &brt->h->roots[i]; } } } abort(); } static int brt_init_new_root(BRT brt, int n_new_nodes, BRTNODE *new_nodes, DBT *splitks, CACHEKEY *rootp, TOKULOGGER logger, BRTNODE *newrootp) { assert(n_new_nodes>0); TAGMALLOC(BRTNODE, newroot); int r; int new_height = new_nodes[0]->height+1; int new_nodesize = brt->h->nodesize; DISKOFF newroot_diskoff; if ((r=malloc_diskblock(&newroot_diskoff, brt, new_nodesize, logger))) return r; assert(newroot); if (brt->database_name==0) { toku_log_changeunnamedroot(logger, toku_cachefile_filenum(brt->cf), *rootp, newroot_diskoff); } else { BYTESTRING bs; bs.len = 1+strlen(brt->database_name); bs.data = brt->database_name; toku_log_changenamedroot(logger, toku_cachefile_filenum(brt->cf), bs, *rootp, newroot_diskoff); } *rootp=newroot_diskoff; brt->h->dirty=1; initialize_brtnode (brt, newroot, newroot_diskoff, new_height); newroot->u.n.n_children=n_new_nodes; r=toku_log_newbrtnode(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint); if (r!=0) return r; int i; for (i=0; ithisnodename; r=toku_fifo_create(&BNC_BUFFER(newroot,i)); if (r!=0) return r; r=toku_log_addchild(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, new_nodes[i]->thisnodename, 0); if (r!=0) return r; fixup_child_fingerprint(newroot, i, new_nodes[i], brt, logger); } toku_verify_counts(newroot); int sum_splitk_sizes=0; for (i=0; i+1u.n.childkeys[i] = splitks[i].data; BYTESTRING bs = { .len = kv_pair_keylen(newroot->u.n.childkeys[0]), .data = kv_pair_key(newroot->u.n.childkeys[0]) }; r=toku_log_setpivot(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs); if (r!=0) return r; toku_update_brtnode_loggerlsn(newroot, logger); } newroot->u.n.totalchildkeylens=sum_splitk_sizes; for (i=0; icf, newroot_diskoff, newroot, brtnode_size(newroot), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); *newrootp = newroot; return 0; } static int split_nonleaf_node(BRT, int *n_new_nodes, BRTNODE **new_nodes, DBT **splitks); static int leaf_node_is_too_full (BRT, BRTNODE); // push things down into node's children (and into their children and so forth) but don't make any descendant too big. static int push_down_without_overfilling (BRT brt, BRTNODE node, TOKULOGGER logger); // Push data toward a child. If the child gets too big then the child will push down or split. // If a split happens, then return immediately so that we can check to see if NODE needs to be split static int flush_toward_child (BRT brt, BRTNODE node, int childnum, TOKULOGGER logger); static int maybe_fixup_root (BRT brt, BRTNODE node, CACHEKEY *rootp, TOKULOGGER logger) { int r; if (node->height>0) { // internal nodes can be too wide, but if too full, they did a push down maybe_reshape_internal_node: while (nonleaf_node_is_too_wide(brt, node)) { int n_new_nodes; BRTNODE *new_nodes; DBT *splitks; if ((r=split_nonleaf_node(brt, node, &n_new_nodes, &new_nodes, &splitks))) return r; if ((r=brt_init_new_root(brt, n_new_nodes, new_nodes, splitks, rootp, logger, &node))) return r; // unpins all the new nodes, which are all small enough // now node is still possibly too wide, hence the loop } } else { // leaf nodes can be too full if (leaf_node_is_too_full(brt, node)) { int n_new_nodes; BRTNODE *new_nodes; DBT *splitks; if ((r==split_leaf_node(brt, logger, node, &n_new_nodes, &new_nodes, &splitks))) return r; if ((r==brt_init_new_root(brt, n_new_nodes, new_nodes, splitks, rootp, logger, &node))) return r; // unpins all the new nodes, which are all small enough assert(node->height>0); goto maybe_reshape_internal_node; } } return 0; } #endif static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) { void *node_v; BRTNODE node; CACHEKEY *rootp; int r; //assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable)); if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) { if (0) { died0: toku_unpin_brt_header(brt); } return r; } rootp = toku_calculate_root_offset_pointer(brt); if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) { if (0) { died1: unpin_brtnode(brt, node); goto died0; } goto died0; } //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v); node=node_v; if ((r = brtnode_put(brt, node, cmd, logger, STRONG))) goto died1; // put stuff in, possibly causing the buffers to get too big if ((r = push_down_if_buffers_too_full(brt, node, logger))) goto died1; // if the buffers are too big, push stuff down if ((r = maybe_split_root(brt, node, rootp, logger))) goto died1; // now the node might have to split (leaf nodes can't push down, and internal nodes have too much fanout) This will change node. // Now the node is OK, brt->h->dirty=1; return toku_unpin_brt_header(brt); } int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) { int r; BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}}; r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn)); return r; } int toku_brt_lookup (BRT brt, DBT *k, DBT *v) { int r, rr; BRT_CURSOR cursor; rr = toku_brt_cursor(brt, &cursor); if (rr != 0) return rr; int op = brt->flags & TOKU_DB_DUPSORT ? DB_GET_BOTH : DB_SET; r = toku_brt_cursor_get(cursor, k, v, op, 0); rr = toku_brt_cursor_close(cursor); assert(rr == 0); return r; } int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) { int r; DBT val; BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}}; r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn)); return r; } int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) { int r; BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, toku_txn_get_txnid(txn), .u.id={key,val}}; r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn)); return r; } int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse, BRTNODE parent_brtnode); int toku_dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, BRTNODE parent_brtnode) { int result=0; BRTNODE node; void *node_v; int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); assert(r==0); printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v); node=node_v; result=toku_verify_brtnode(brt, off, lorange, lolen, hirange, hilen, 0, parent_brtnode); printf("%*sNode=%p\n", depth, "", node); if (node->height>0) { printf("%*sNode %lld nodesize=%d height=%d n_children=%d n_bytes_in_buffers=%d keyrange=%s %s\n", depth, "", off, node->nodesize, node->height, node->u.n.n_children, node->u.n.n_bytes_in_buffers, (char*)lorange, (char*)hirange); //printf("%s %s\n", lorange ? lorange : "NULL", hirange ? hirange : "NULL"); { int i; for (i=0; i< node->u.n.n_children; i++) { printf("%*schild %d buffered (%d entries):\n", depth+1, "", i, toku_fifo_n_entries(BNC_BUFFER(node,i))); FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid, ({ data=data; datalen=datalen; keylen=keylen; printf("%*s xid=%"PRId64" %d (type=%d)\n", depth+2, "", xid, ntohl(*(int*)key), type); //assert(strlen((char*)key)+1==keylen); //assert(strlen((char*)data)+1==datalen); })); } for (i=0; iu.n.n_children; i++) { printf("%*schild %d\n", depth, "", i); if (i>0) { printf("%*spivot %d len=%d %d\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, ntohl(*(int*)&node->u.n.childkeys[i-1]->key)); } toku_dump_brtnode(brt, BNC_DISKOFF(node, i), depth+4, (i==0) ? lorange : node->u.n.childkeys[i-1], (i==0) ? lolen : toku_brt_pivot_key_len(brt, node->u.n.childkeys[i-1]), (i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i], (i==node->u.n.n_children-1) ? hilen : toku_brt_pivot_key_len(brt, node->u.n.childkeys[i]), node ); } } } else { printf("%*sNode %lld nodesize=%d height=%d n_bytes_in_buffer=%d keyrange=%d %d\n", depth, "", off, node->nodesize, node->height, node->u.l.n_bytes_in_buffer, lorange ? ntohl(*(int*)lorange) : 0, hirange ? ntohl(*(int*)hirange) : 0); PMA_ITERATE(node->u.l.buffer, key, keylen, val __attribute__((__unused__)), vallen, ( keylen=keylen, vallen=vallen, printf(" (%d)%d ", keylen, ntohl(*(int*)key)))); printf("\n"); } r = toku_cachetable_unpin(brt->cf, off, 0, 0); assert(r==0); return result; } int toku_dump_brt (BRT brt) { int r; CACHEKEY *rootp; struct brt_header *prev_header = brt->h; if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) { if (0) { died0: toku_unpin_brt_header(brt); } return r; } rootp = toku_calculate_root_offset_pointer(brt); if ((r = toku_dump_brtnode(brt, *rootp, 0, 0, 0, 0, 0, null_brtnode))) goto died0; if ((r = toku_unpin_brt_header(brt))!=0) return r; brt->h = prev_header; return 0; } static int show_brtnode_blocknumbers (BRT brt, DISKOFF off) { BRTNODE node; void *node_v; int i,r; assert(off%brt->h->nodesize==0); if ((r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) { if (0) { died0: toku_cachetable_unpin(brt->cf, off, 0, 0); } return r; } printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v); node=node_v; printf(" %lld", off/brt->h->nodesize); if (node->height>0) { for (i=0; iu.n.n_children; i++) { if ((r=show_brtnode_blocknumbers(brt, BNC_DISKOFF(node, i)))) goto died0; } } r = toku_cachetable_unpin(brt->cf, off, 0, 0); return r; } #if 0 int show_brt_blocknumbers (BRT brt) { int r; CACHEKEY *rootp; if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) { if (0) { died0: toku_unpin_brt_header(brt); } return r; } rootp = toku_calculate_root_offset_pointer(brt); printf("BRT %p has blocks:", brt); if ((r=show_brtnode_blocknumbers (brt, *rootp, 0))) goto died0; printf("\n"); if ((r = toku_unpin_brt_header(brt))!=0) return r; return 0; } #endif int toku_brt_dbt_set_key(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) { int r = toku_dbt_set_value(ybt, val, vallen, &brt->skey); return r; } int toku_brt_dbt_set_value(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) { int r = toku_dbt_set_value(ybt, val, vallen, &brt->sval); return r; } #ifdef FOO /* search in a node's child */ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) { int r, rr; /* if the child's buffer is not empty then try to empty it */ if (BNC_NBYTESINBUF(node, childnum) > 0) { rr = maybe_push_some_brt_cmds_down(brt, node, childnum, logger); if (rr!=0) return rr; /* push down may cause a child split, so childnum may not be appropriate, and the node itself may split, so retry */ return EAGAIN; } void *node_v; rr = toku_cachetable_get_and_pin(brt->cf, BNC_DISKOFF(node,childnum), &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); assert(rr == 0); for (;;) { BRTNODE childnode = node_v; BRT_SPLIT childsplit; brt_split_init(&childsplit); r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, logger); if (childsplit.did_split) { rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk, &split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger); assert(rr == 0); break; } else { if (r == EAGAIN) continue; rr = toku_cachetable_unpin(brt->cf, childnode->thisnodename, childnode->dirty, brtnode_size(childnode)); assert(rr == 0); break; } } return r; } static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) { int c; restart: { /* binary search is overkill for a small array */ int child[node->u.n.n_children]; /* scan left to right or right to left depending on the search direction */ for (c = 0; c < node->u.n.n_children; c++) child[c] = search->direction & BRT_SEARCH_LEFT ? c : node->u.n.n_children - 1 - c; for (c = 0; c < node->u.n.n_children-1; c++) { int p = search->direction & BRT_SEARCH_LEFT ? child[c] : child[c] - 1; struct kv_pair *pivot = node->u.n.childkeys[p]; DBT pivotkey, pivotval; if (search->compare(search, toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)), brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) { // We know which child we want to search. First make sure the buffer is empty. r = flush_toward_child(brt, node, child[c], logger, &did_split); if (did_split) goto restart; // If we didn't split, then the buffer is empty, so search that child r=search_that_child(); // Now that child may be bent out of shape ??? int r = brt_search_child(brt, node, child[c], search, newkey, newval, logger); // searching the child can cause it to get bent out of shape int rr = maybe_fixup_nonroot(brt, node, child[c], logger); if (rr!=0) return rr; if (r == 0) return r; } } /* check the first (left) or last (right) node if nothing has been found */ if (r == DB_NOTFOUND && c == node->u.n.n_children-1) r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger); return r; } static int brt_search_leaf_node(BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval) { PMA pma = node->u.l.buffer; int r = toku_pma_search(pma, search, newkey, newval); return r; } static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) { if (node->height > 0) return brt_search_nonleaf_node(brt, node, search, newkey, newval, logger); else return brt_search_leaf_node(node, search, newkey, newval); } int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) { int r, rr; rr = toku_read_and_pin_brt_header(brt->cf, &brt->h); if (rr!=0) { if (0) { died0: toku_unpin_brt_header(brt); } return rr; } CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt); void *node_v; BRTNODE node; rr = toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); if (rr!=0) { if (0) { died1: unpin_brtnode(brt, node); } goto died0; } node = node_v; r = brt_search_node(brt, node, search, newkey, newval, logger); rr = maybe_fixup_root(brt, node, rootp, logger); if (rr!=0) { goto died1; } rr = unpin_brtnode(brt, node); if (rr!=0) { goto died0; } rr = toku_unpin_brt_header(brt); if (rr!=0) return rr; return r; } static inline void dbt_cleanup(DBT *dbt) { if (dbt->data && (dbt->flags & DB_DBT_MALLOC)) { toku_free_n(dbt->data, dbt->size); dbt->data = 0; } } static inline void brt_cursor_cleanup(BRT_CURSOR cursor) { dbt_cleanup(&cursor->key); dbt_cleanup(&cursor->val); } static inline int brt_cursor_not_set(BRT_CURSOR cursor) { return cursor->key.data == 0 || cursor->val.data == 0; } BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) { return brt_cursor_not_set(c); } static inline void brt_cursor_set_key_val(BRT_CURSOR cursor, DBT *newkey, DBT *newval) { brt_cursor_cleanup(cursor); cursor->key = *newkey; memset(newkey, 0, sizeof *newkey); cursor->val = *newval; memset(newval, 0, sizeof *newval); } int toku_brt_cursor(BRT brt, BRT_CURSOR *cursorptr) { BRT_CURSOR cursor = toku_malloc(sizeof *cursor); if (cursor == 0) return ENOMEM; cursor->brt = brt; toku_init_dbt(&cursor->key); toku_init_dbt(&cursor->val); list_push(&brt->cursors, &cursor->cursors_link); *cursorptr = cursor; return 0; } int toku_brt_cursor_close(BRT_CURSOR cursor) { brt_cursor_cleanup(cursor); list_remove(&cursor->cursors_link); toku_free_n(cursor, sizeof *cursor); return 0; } static inline int compare_k_x(BRT brt, DBT *k, DBT *x) { return brt->compare_fun(brt->db, k, x); } static inline int compare_v_y(BRT brt, DBT *v, DBT *y) { return brt->dup_compare(brt->db, v, y); } static inline int compare_kv_xy(BRT brt, DBT *k, DBT *v, DBT *x, DBT *y) { int cmp = brt->compare_fun(brt->db, k, x); if (cmp == 0 && v && y) cmp = brt->dup_compare(brt->db, v, y); return cmp; } static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) { int r = 0; if (key) r = toku_dbt_set_value(key, cursor->key.data, cursor->key.size, &cursor->brt->skey); if (r == 0 && val) r = toku_dbt_set_value(val, cursor->val.data, cursor->val.size, &cursor->brt->sval); return r; } static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) { BRT brt = search->context; return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */ } static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outval, TOKULOGGER logger) { if (brt_cursor_not_set(cursor)) return EINVAL; if (op == DB_CURRENT) { DBT newkey; toku_init_dbt(&newkey); DBT newval; toku_init_dbt(&newval); brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt); int r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger); if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0) return DB_KEYEMPTY; } return brt_cursor_copyout(cursor, outkey, outval); } /* search for the first kv pair that matches the search object */ static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) { DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC; DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC; int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger); if (r == 0) { brt_cursor_set_key_val(cursor, &newkey, &newval); r = brt_cursor_copyout(cursor, outkey, outval); } dbt_cleanup(&newkey); dbt_cleanup(&newval); return r; } /* search for the kv pair that matches the search object and is equal to kv */ static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) { DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC; DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC; int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger); if (r == 0) { if (compare_kv_xy(cursor->brt, search->k, search->v, &newkey, &newval) == 0) { brt_cursor_set_key_val(cursor, &newkey, &newval); r = brt_cursor_copyout(cursor, outkey, outval); } else r = DB_NOTFOUND; } dbt_cleanup(&newkey); dbt_cleanup(&newval); return r; } /* search for the kv pair that matches the search object and is equal to k */ static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) { DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC; DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC; int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger); if (r == 0) { if (compare_k_x(cursor->brt, search->k, &newkey) == 0) { brt_cursor_set_key_val(cursor, &newkey, &newval); r = brt_cursor_copyout(cursor, outkey, outval); } else r = DB_NOTFOUND; } dbt_cleanup(&newkey); dbt_cleanup(&newval); return r; } static int brt_cursor_compare_one(brt_search_t *search, DBT *x, DBT *y) { search = search; x = x; y = y; return 1; } static int brt_cursor_first(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, 0, cursor->brt); return brt_cursor_search(cursor, &search, outkey, outval, logger); } static int brt_cursor_last(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, 0, cursor->brt); return brt_cursor_search(cursor, &search, outkey, outval, logger); } static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) { BRT brt = search->context; return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */ } static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt); return brt_cursor_search(cursor, &search, outkey, outval, logger); } static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) { BRT brt = search->context; y = y; return compare_k_x(brt, search->k, x) < 0; /* return min x: k < x */ } static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_nodup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt); return brt_cursor_search(cursor, &search, outkey, outval, logger); } static int brt_cursor_compare_next_dup(brt_search_t *search, DBT *x, DBT *y) { BRT brt = search->context; int keycmp = compare_k_x(brt, search->k, x); if (keycmp < 0) return 1; else return keycmp == 0 && y && compare_v_y(brt, search->v, y) < 0; /* return min xy: k <= x && v < y */ } static int brt_cursor_next_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_dup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt); return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger); } static int brt_cursor_compare_get_both_range(brt_search_t *search, DBT *x, DBT *y) { BRT brt = search->context; int keycmp = compare_k_x(brt, search->k, x); if (keycmp < 0) return 1; else return keycmp == 0 && (y == 0 || compare_v_y(brt, search->v, y) <= 0); /* return min xy: k <= x && v <= y */ } static int brt_cursor_get_both_range(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_get_both_range, BRT_SEARCH_LEFT, key, val, cursor->brt); return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger); } static int brt_cursor_compare_prev(brt_search_t *search, DBT *x, DBT *y) { BRT brt = search->context; return compare_kv_xy(brt, search->k, search->v, x, y) > 0; /* return max xy: kv > xy */ } static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt); return brt_cursor_search(cursor, &search, outkey, outval, logger); } static int brt_cursor_compare_prev_nodup(brt_search_t *search, DBT *x, DBT *y) { BRT brt = search->context; y = y; return compare_k_x(brt, search->k, x) > 0; /* return max x: k > x */ } static int brt_cursor_prev_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_nodup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt); return brt_cursor_search(cursor, &search, outkey, outval, logger); } #ifdef DB_PREV_DUP static int brt_cursor_compare_prev_dup(brt_search_t *search, DBT *x, DBT *y) { BRT brt = search->context; int keycmp = compare_k_x(brt, search->k, x); if (keycmp > 0) return 1; else return keycmp == 0 && y && compare_v_y(brt, search->v, y) > 0; /* return max xy: k >= x && v > y */ } static int brt_cursor_prev_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_dup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt); return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger); } #endif static int brt_cursor_compare_set_range(brt_search_t *search, DBT *x, DBT *y) { BRT brt = search->context; return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return kv <= xy */ } static int brt_cursor_set(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, val, cursor->brt); return brt_cursor_search_eq_kv_xy(cursor, &search, outkey, outval, logger); } static int brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, DBT *outkey, DBT *outval, TOKULOGGER logger) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, 0, cursor->brt); return brt_cursor_search(cursor, &search, outkey, outval, logger); } int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, TOKUTXN txn) { int r; int op = get_flags & DB_OPFLAGS_MASK; TOKULOGGER logger = toku_txn_logger(txn); if (get_flags & ~DB_OPFLAGS_MASK) return EINVAL; switch (op) { case DB_CURRENT: case DB_CURRENT_BINDING: r = brt_cursor_current(cursor, op, key, val, logger); break; case DB_FIRST: r = brt_cursor_first(cursor, key, val, logger); break; case DB_LAST: r = brt_cursor_last(cursor, key, val, logger); break; case DB_NEXT: if (brt_cursor_not_set(cursor)) r = brt_cursor_first(cursor, key, val, logger); else r = brt_cursor_next(cursor, key, val, logger); break; case DB_NEXT_DUP: if (brt_cursor_not_set(cursor)) r = EINVAL; else r = brt_cursor_next_dup(cursor, key, val, logger); break; case DB_NEXT_NODUP: if (brt_cursor_not_set(cursor)) r = brt_cursor_first(cursor, key, val, logger); else r = brt_cursor_next_nodup(cursor, key, val, logger); break; case DB_PREV: if (brt_cursor_not_set(cursor)) r = brt_cursor_last(cursor, key, val, logger); else r = brt_cursor_prev(cursor, key, val, logger); break; #ifdef DB_PREV_DUP case DB_PREV_DUP: if (brt_cursor_not_set(cursor)) r = EINVAL; else r = brt_cursor_prev_dup(cursor, key, val, logger); break; #endif case DB_PREV_NODUP: if (brt_cursor_not_set(cursor)) r = brt_cursor_last(cursor, key, val, logger); else r = brt_cursor_prev_nodup(cursor, key, val, logger); break; case DB_SET: r = brt_cursor_set(cursor, key, 0, 0, val, logger); break; case DB_SET_RANGE: r = brt_cursor_set_range(cursor, key, key, val, logger); break; case DB_GET_BOTH: r = brt_cursor_set(cursor, key, val, 0, 0, logger); break; case DB_GET_BOTH_RANGE: r = brt_cursor_get_both_range(cursor, key, val, 0, val, logger); break; default: r = EINVAL; break; } return r; } int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) { if ((flags & ~DB_DELETE_ANY) != 0) return EINVAL; if (brt_cursor_not_set(cursor)) return EINVAL; int r = 0; if (!(flags & DB_DELETE_ANY)) r = brt_cursor_current(cursor, DB_CURRENT, 0, 0, toku_txn_logger(txn)); if (r == 0) r = toku_brt_delete_both(cursor->brt, &cursor->key, &cursor->val, txn); return r; } int toku_brt_height_of_root(BRT brt, int *height) { // for an open brt, return the current height. int r; if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) { if (0) { died0: toku_unpin_brt_header(brt); } return r; } CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt); void *node_v; if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) { goto died0; } BRTNODE node = node_v; *height = node->height; r = unpin_brtnode(brt, node); assert(r==0); r = toku_unpin_brt_header(brt); assert(r==0); return 0; } #endif