mariadb/newbrt/brt2.c
Bradley C. Kuszmaul 8511ea7372 Log db close so that recovery will work right if the same db is opened and closed repeatedly.
Also the file numbers can thus be reused.
Don't pass the BRT into the flush commands, since the BRT may no longer be present.
Put a counter in to see how many rollback records are present.  (Addresses #698.)
Increment the file version to 4.
Fixes #545, #703.

Note: All the tests pass except
 * Many cxx tests are getting valgrind errors.  (Addresses #716.  Possibly causes #716.)
 * {{{test_log9.recover}}} fails with "Binary files ... differ".  These will presumably be fixed by #711 or #714.  (Addresses #711, #714.)
 * {{{test_log10.recover}}} fails.   There are two failures:
  1. A valgrind problem (see #718.)  (Addresses #718.  Possibly causes #718.)
  1. The "Binary files ... differ" issue.


git-svn-id: file:///svn/tokudb@3486 c7de825b-a66e-492c-adef-691d508d4ae1
2008-04-17 03:11:55 +00:00

1911 lines
72 KiB
C

/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/*
* We always write nodes to a new location on disk.
* The nodes themselves contain the information about the tree structure.
* Q: During recovery, how do we find the root node without looking at every block on disk?
* A: The root node is either the designated root near the front of the freelist.
* The freelist is updated infrequently. Before updating the stable copy of the freelist, we make sure that
* the root is up-to-date. We can make the freelist-and-root update be an arbitrarily small fraction of disk bandwidth.
*
*/
#include <arpa/inet.h>
#include <errno.h>
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "toku_assert.h"
#include "brt-internal.h"
#include "key.h"
#include "log_header.h"
typedef struct weakstrong { char ignore; } *WS;
#define WEAK ((WS)1)
#define STRONG ((WS)0)
extern long long n_items_malloced;
static void verify_local_fingerprint_nonleaf (BRTNODE node);
#ifdef FOO
static int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER);
/* Frees a node, including all the stuff in the hash table. */
void toku_brtnode_free (BRTNODE *nodep) {
BRTNODE node=*nodep;
int i;
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, node, node->mdicts[0]);
if (node->height>0) {
for (i=0; i<node->u.n.n_children-1; i++) {
toku_free((void*)node->u.n.childkeys[i]);
}
for (i=0; i<node->u.n.n_children; i++) {
if (BNC_BUFFER(node,i)) {
toku_fifo_free(&BNC_BUFFER(node,i));
}
}
} else {
if (node->u.l.buffer) // The buffer may have been freed already, in some cases.
toku_pma_free(&node->u.l.buffer);
}
toku_free(node);
*nodep=0;
}
#endif
static long brtnode_size(BRTNODE node) {
return toku_serialize_brtnode_size(node);
}
static void toku_update_brtnode_loggerlsn(BRTNODE node, TOKULOGGER logger) {
if (logger) {
node->log_lsn = toku_logger_last_lsn(logger);
}
}
static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE child, BRT brt, TOKULOGGER logger) {
u_int32_t old_fingerprint = BNC_SUBTREE_FINGERPRINT(node,childnum_of_node);
u_int32_t sum = child->local_fingerprint;
if (child->height>0) {
int i;
for (i=0; i<child->u.n.n_children; i++) {
sum += BNC_SUBTREE_FINGERPRINT(child,i);
}
}
// Don't try to get fancy about not modifying the fingerprint if it didn't change.
// We only call this function if we have reason to believe that the child's fingerprint did change.
BNC_SUBTREE_FINGERPRINT(node,childnum_of_node)=sum;
node->dirty=1;
toku_log_changechildfingerprint(logger, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_update_brtnode_loggerlsn(node, logger);
}
// If you pass in data==0 then it only compares the key, not the data (even if is a DUPSORT database)
static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) {
int cmp;
DBT mydbt;
struct kv_pair *kv = (struct kv_pair *) ck;
if (brt->flags & TOKU_DB_DUPSORT) {
cmp = brt->compare_fun(brt->db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
if (cmp == 0 && data != 0)
cmp = brt->dup_compare(brt->db, data, toku_fill_dbt(&mydbt, kv_pair_val(kv), kv_pair_vallen(kv)));
} else {
cmp = brt->compare_fun(brt->db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
}
return cmp;
}
#ifdef FOO
void toku_brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnode_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) {
BRTNODE brtnode = brtnode_v;
if (0) {
printf("%s:%d toku_brtnode_flush_callback %p keep_me=%d height=%d", __FILE__, __LINE__, brtnode, keep_me, brtnode->height);
if (brtnode->height==0) printf(" pma=%p", brtnode->u.l.buffer);
printf("\n");
}
assert(brtnode->thisnodename==nodename);
if (write_me) {
toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode->nodesize, brtnode);
}
if (!keep_me) {
toku_brtnode_free(&brtnode);
}
}
int toku_brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn) {
BRT t =(BRT)extraargs;
BRTNODE *result=(BRTNODE*)brtnode_pv;
int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, result, t->flags, t->nodesize,
t->compare_fun, t->dup_compare, t->db, toku_cachefile_filenum(t->cf));
if (r == 0) {
*sizep = brtnode_size(*result);
*written_lsn = (*result)->disk_lsn;
}
return r;
}
void toku_brtheader_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *header_v, long size __attribute((unused)), BOOL write_me, BOOL keep_me, LSN lsn __attribute__((__unused__)), BOOL rename_p __attribute__((__unused__))) {
struct brt_header *h = header_v;
assert(nodename==0);
assert(!h->dirty); // shouldn't be dirty once it is unpinned.
if (write_me) {
toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h);
}
if (!keep_me) {
if (h->n_named_roots>0) {
int i;
for (i=0; i<h->n_named_roots; i++) {
toku_free(h->names[i]);
}
toku_free(h->names);
toku_free(h->roots);
}
toku_free(h);
}
}
int toku_brtheader_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **headerp_v, long *sizep __attribute__((unused)), void*extraargs __attribute__((__unused__)), LSN *written_lsn) {
struct brt_header **h = (struct brt_header **)headerp_v;
assert(nodename==0);
int r = toku_deserialize_brtheader_from(toku_cachefile_fd(cachefile), nodename, h);
written_lsn->lsn = 0; // !!! WRONG. This should be stored or kept redundantly or something.
return r;
}
int toku_read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header) {
void *header_p;
//fprintf(stderr, "%s:%d read_and_pin_brt_header(...)\n", __FILE__, __LINE__);
int r = toku_cachetable_get_and_pin(cf, 0, &header_p, NULL,
toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
if (r!=0) return r;
*header = header_p;
return 0;
}
int toku_unpin_brt_header (BRT brt) {
int r = toku_cachetable_unpin(brt->cf, 0, brt->h->dirty, 0);
brt->h->dirty=0;
brt->h=0;
return r;
}
#endif
static int unpin_brtnode (BRT brt, BRTNODE node) {
return toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, brtnode_size(node));
}
#ifdef FOO
typedef struct kvpair {
bytevec key;
unsigned int keylen;
bytevec val;
unsigned int vallen;
} *KVPAIR;
/* Forgot to handle the case where there is something in the freelist. */
static int malloc_diskblock_header_is_in_memory (DISKOFF *res, BRT brt, int size, TOKULOGGER logger) {
DISKOFF result = brt->h->unused_memory;
brt->h->unused_memory+=size;
brt->h->dirty = 1;
int r = toku_log_changeunusedmemory(logger, toku_cachefile_filenum(brt->cf), result, brt->h->unused_memory);
*res = result;
return r;
}
int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER logger) {
#if 0
int r = read_and_pin_brt_header(brt->fd, &brt->h);
assert(r==0);
{
DISKOFF result = malloc_diskblock_header_is_in_memory(brt, size);
r = write_brt_header(brt->fd, &brt->h);
assert(r==0);
return result;
}
#else
return malloc_diskblock_header_is_in_memory(res, brt,size, logger);
#endif
}
static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height) {
int i;
n->tag = TYP_BRTNODE;
n->nodesize = t->h->nodesize;
n->flags = t->h->flags;
n->thisnodename = nodename;
n->disk_lsn.lsn = 0; // a new one can always be 0.
n->log_lsn = n->disk_lsn;
n->layout_version = 4;
n->height = height;
n->rand4fingerprint = random();
n->local_fingerprint = 0;
n->dirty = 1;
assert(height>=0);
if (height>0) {
n->u.n.n_children = 0;
for (i=0; i<TREE_FANOUT; i++) {
// n->u.n.childkeys[i] = 0;
// n->u.n.childkeylens[i] = 0;
}
n->u.n.totalchildkeylens = 0;
for (i=0; i<TREE_FANOUT+1; i++) {
BNC_SUBTREE_FINGERPRINT(n, i) = 0;
// n->u.n.children[i] = 0;
// n->u.n.buffers[i] = 0;
BNC_NBYTESINBUF(n,i) = 0;
}
n->u.n.n_bytes_in_buffers = 0;
} else {
int r = toku_pma_create(&n->u.l.buffer, t->compare_fun, t->db, toku_cachefile_filenum(t->cf), n->nodesize);
assert(r==0);
toku_pma_set_dup_mode(n->u.l.buffer, t->flags & (TOKU_DB_DUP+TOKU_DB_DUPSORT));
toku_pma_set_dup_compare(n->u.l.buffer, t->dup_compare);
static int rcount=0;
//printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount);
rcount++;
n->u.l.n_bytes_in_buffer = 0;
}
}
int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logger) {
TAGMALLOC(BRTNODE, n);
int r;
DISKOFF name;
if ((r = malloc_diskblock(&name, t, t->h->nodesize, logger))) return r;
assert(n);
assert(t->h->nodesize>0);
//printf("%s:%d malloced %lld (and malloc again=%lld)\n", __FILE__, __LINE__, name, malloc_diskblock(t, t->nodesize));
initialize_brtnode(t, n, name, height);
*result = n;
assert(n->nodesize>0);
// n->brt = t;
//printf("%s:%d putting %p (%lld) parent=%p\n", __FILE__, __LINE__, n, n->thisnodename, parent_brtnode);
if ((r = toku_cachetable_put(t->cf, n->thisnodename, n, brtnode_size(n), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t)))
return r;
if ((r = toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint)))
return r;
toku_update_brtnode_loggerlsn(n, logger);
return 0;
}
#ifdef FOO
static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, BRT_CMD cmd) {
unsigned int n_bytes_added = BRT_CMD_OVERHEAD + KEY_VALUE_OVERHEAD + cmd->u.id.key->size + cmd->u.id.val->size;
int r = toku_fifo_enq_cmdstruct(BNC_BUFFER(node,childnum), cmd);
if (r!=0) return r;
node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_cmdstruct(cmd);
BNC_NBYTESINBUF(node,childnum) += n_bytes_added;
node->u.n.n_bytes_in_buffers += n_bytes_added;
node->dirty = 1;
return 0;
}
#endif
// Split a leaf node, reusing it in new_nodes (as the last element)
static int split_leaf_node (BRT t, TOKULOGGER logger, BRTNODE node, int *n_new_nodes, BRTNODE **new_nodes, DBT **splitks) {
assert(node->height==0);
int r;
int n_children=1; // Initially we have the node itself.
BRTNODE *result_nodes=toku_malloc(sizeof(*result_nodes));
if (errno!=0) { r=errno; if (0) { died0: toku_free(result_nodes); } return r; }
DBT *result_splitks=toku_malloc(sizeof(*result_splitks));
if (errno!=0) { r=errno; if (0) { died1: toku_free(result_splitks); } goto died0; }
while (toku_serialize_brtnode_size(node)>node->nodesize) {
BRTNODE B;
DBT splitk;
if ((r = toku_create_new_brtnode(t, &B, 0, logger))) return r;
// Split so that B is at least 1/2 full
// The stuff in B goes *before* node
if ((r = toku_pma_split(logger, toku_cachefile_filenum(t->cf),
node->thisnodename, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn,
&splitk,
B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint, &B->log_lsn)))
goto died1;
n_children++;
result_nodes = toku_realloc(result_nodes, n_children*sizeof(*result_nodes));
result_nodes[n_children-2] = B;
result_splitks = toku_realloc(result_nodes, (n_children-1)*sizeof(*result_splitks));
result_splitks[n_children-2] = splitk;
}
result_nodes[n_children-1]=node;
*n_new_nodes = n_children;
*new_nodes = result_nodes;
*splitks = result_splitks;
return 0;
}
#endif
static void find_heaviest_child (BRTNODE node, int *childnum) {
int max_child = 0;
int max_weight = BNC_NBYTESINBUF(node, 0);
int i;
if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight);
assert(node->u.n.n_children>0);
for (i=1; i<node->u.n.n_children; i++) {
int this_weight = BNC_NBYTESINBUF(node,i);
if (0) printf(" %d", this_weight);
if (max_weight < this_weight) {
max_child = i;
max_weight = this_weight;
}
}
*childnum = max_child;
if (0) printf("\n");
}
/* find the leftmost child that may contain the key */
static unsigned int brtnode_which_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
int i;
assert(node->height>0);
for (i=0; i<node->u.n.n_children-1; i++) {
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]);
if (cmp > 0) continue;
if (cmp < 0) return i;
return i;
}
return node->u.n.n_children-1;
}
static int brtnode_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p);
// If CHILD is too wide, split it, and create a new node with the new children. Unpin CHILD or the new children (even if something goes wrong).
// If it does split, unpin the new root node also.
static int maybe_split_root(BRT brt, BRTNODE child, CACHEKEY *rootp, TOKULOGGER logger);
// if CHILD is too wide, split it, and fix up NODE. Either way, unpin the child or resulting children (even if it fails do the unpin)
static int maybe_split_nonroot (BRT brt, BRTNODE node, int childnum, BRTNODE child, int *n_children_replacing_child, TOKULOGGER logger);
// Push stuff into a child weakly. (That is don't cause any I/O or cause the child to get too big.)
static int weak_push_to_child (BRT brt, BRTNODE node, int childnum, TOKULOGGER logger) {
void *child_v;
int r = toku_cachetable_maybe_get_and_pin(brt->cf, BNC_DISKOFF(node, childnum), &child_v);
if (r!=0) return 0;
BRTNODE child = child_v;
DBT key,val;
BRT_CMD_S cmd;
while (0 == toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) {
r = brtnode_put(brt, child, &cmd, logger, WEAK);
if (r==EAGAIN) break;
if (r!=0) goto died;
r=toku_fifo_deq(BNC_BUFFER(node, childnum));
if (r!=0) goto died;
}
return unpin_brtnode(brt, child);
died:
unpin_brtnode(brt, child);
return r;
}
// If the buffers are too big, push stuff down. The subchild may need to be split, in which case our fanout may get too large.
// When are done, this node is has little enough stuff in its buffers (but the fanout may be too large), and all the descendant
// nodes are properly sized (the buffer sizes and fanouts are all small enough).
static int push_down_if_buffers_too_full(BRT brt, BRTNODE node, TOKULOGGER logger) {
if (node->height==0) return 0; // can't push down for leaf nodes
while (node->u.n.n_bytes_in_buffers > 0 && toku_serialize_brtnode_size(node)>node->nodesize) {
int childnum;
find_heaviest_child(node, &childnum);
void *child_v;
int r = toku_cachetable_get_and_pin(brt->cf, BNC_DISKOFF(node, childnum), &child_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE child=child_v;
if (0) { died: unpin_brtnode(brt, child); return r; }
BRT_CMD_S cmd;
DBT key,val;
while (0==toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) {
r=toku_fifo_deq(BNC_BUFFER(node, childnum));
assert(r==0); // we just did a peek, so the buffer must be nonempty
r=brtnode_put(brt, child, &cmd, logger, WEAK);
if (r!=EAGAIN && r!=0) goto died;
if (r==EAGAIN) {
// Weak pushes ran out of steam. Now do a strong push if there is still something in the buffer.
if (0==toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) {
r=brtnode_put(brt, child, &cmd, logger, STRONG);
if (r!=0) goto died;
r=toku_fifo_deq(BNC_BUFFER(node, childnum));
if (r!=0) goto died;
// Now it's possible that the child must be split. (Or maybe the child managed to flush stuff to our grandchildren)
int n_children_replacing_child;
r=maybe_split_nonroot(brt, node, childnum, child, &n_children_replacing_child, logger);
if (r!=0) return r; // don't go to died since that unpins
int i;
for (i=0; i<n_children_replacing_child; i++) {
r=weak_push_to_child(brt, node, childnum+i, logger);
if (r!=0) return r;
}
// we basically pushed as much as we could to that child
}
}
}
}
return 0;
}
/* Side effect: sets splitk->data pointer to a malloc'd value */
static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger) {
int old_n_children = node->u.n.n_children;
int n_children_in_a = old_n_children/2;
int n_children_in_b = old_n_children-n_children_in_a;
BRTNODE B;
FILENUM fnum = toku_cachefile_filenum(t->cf);
assert(node->height>0);
assert(node->u.n.n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
toku_create_new_brtnode(t, &B, node->height, logger);
B->u.n.n_children =n_children_in_b;
//printf("%s:%d %p (%lld) becomes %p and %p\n", __FILE__, __LINE__, node, node->thisnodename, A, B);
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
{
/* The first n_children_in_a go into node a.
* That means that the first n_children_in_a-1 keys go into node a.
* The splitter key is key number n_children_in_a */
int i;
for (i=0; i<n_children_in_b; i++) {
int r = toku_fifo_create(&BNC_BUFFER(B,i));
if (r!=0) return r;
}
for (i=n_children_in_a; i<old_n_children; i++) {
int targchild = i-n_children_in_a;
FIFO from_htab = BNC_BUFFER(node,i);
FIFO to_htab = BNC_BUFFER(B, targchild);
DISKOFF thischilddiskoff = BNC_DISKOFF(node, i);
BNC_DISKOFF(B, targchild) = thischilddiskoff;
int r = toku_log_addchild(logger, fnum, B->thisnodename, targchild, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i));
if (r!=0) return r;
while (1) {
bytevec key, data;
unsigned int keylen, datalen;
int type;
TXNID xid;
int fr = toku_fifo_peek(from_htab, &key, &keylen, &data, &datalen, &type, &xid);
if (fr!=0) break;
int n_bytes_moved = keylen+datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
BYTESTRING keybs = { .len = keylen, .data = (char*)key };
BYTESTRING databs = { .len = datalen, .data = (char*)data };
u_int32_t old_from_fingerprint = node->local_fingerprint;
u_int32_t old_to_fingerprint = B->local_fingerprint;
u_int32_t delta = toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
u_int32_t new_to_fingerprint = old_to_fingerprint + B->rand4fingerprint *delta;
if (r!=0) return r;
r = toku_log_brtdeq(logger, fnum, node->thisnodename, n_children_in_a, xid, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
if (r!=0) return r;
r = toku_log_brtenq(logger, fnum, B->thisnodename, targchild, xid, type, keybs, databs, old_to_fingerprint, new_to_fingerprint);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
if (r!=0) return r;
toku_fifo_deq(from_htab);
// key and data will no longer be valid
node->local_fingerprint = new_from_fingerprint;
B->local_fingerprint = new_to_fingerprint;
B->u.n.n_bytes_in_buffers += n_bytes_moved;
BNC_NBYTESINBUF(B, targchild) += n_bytes_moved;
node->u.n.n_bytes_in_buffers -= n_bytes_moved;
BNC_NBYTESINBUF(node, i) -= n_bytes_moved;
// verify_local_fingerprint_nonleaf(B);
// verify_local_fingerprint_nonleaf(node);
}
// Delete a child, removing it's fingerprint, and also the preceeding pivot key. The child number must be > 0
{
BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]),
.data = kv_pair_key(node->u.n.childkeys[i-1]) };
assert(i>0);
r = toku_log_delchild(logger, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i), bs);
if (r!=0) return r;
if (i>n_children_in_a) {
r = toku_log_setpivot(logger, fnum, B->thisnodename, targchild-1, bs);
if (r!=0) return r;
B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
node->u.n.childkeys[i-1] = 0;
}
}
BNC_DISKOFF(node, i) = 0;
BNC_SUBTREE_FINGERPRINT(B, targchild) = BNC_SUBTREE_FINGERPRINT(node, i);
BNC_SUBTREE_FINGERPRINT(node, i) = 0;
assert(BNC_NBYTESINBUF(node, i) == 0);
}
// Drop the n_children now (not earlier) so that we can do the fingerprint verification at any time.
node->u.n.n_children=n_children_in_a;
for (i=n_children_in_a; i<old_n_children; i++) {
toku_fifo_free(&BNC_BUFFER(node,i));
}
splitk->data = (void*)(node->u.n.childkeys[n_children_in_a-1]);
splitk->size = toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
node->u.n.childkeys[n_children_in_a-1]=0;
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(B);
}
*nodeb = B;
assert(toku_serialize_brtnode_size(node)<node->nodesize);
assert(toku_serialize_brtnode_size(B)<B->nodesize);
return 0;
}
static int nonleaf_node_is_too_wide (BRTNODE node) {
assert(node->height>0);
return node->u.n.n_children > TREE_FANOUT;
}
static int maybe_fixup_fat_child(BRT brt, BRTNODE node, int childnum, BRTNODE child, TOKULOGGER logger) // If the node is too big then deal with it. Unpin the child (or children if it splits) NODE may be too big at the end
{
int r = push_down_if_buffers_too_full(brt, child, logger);
if (r!=0) return r;
// now the child may have too much fanout.
if (child->height>0) {
if (nonleaf_node_is_too_wide(child)) {
BRTNODE newchild;
DBT splitk;
if ((r=brt_nonleaf_split(brt, child, &newchild, &splitk, logger))) return r;
int old_n_children = node->u.n.n_children;
FIFO old_fifo = BNC_BUFFER(node, childnum);
// slide the children over
REALLOC_N(old_n_children+1, node->u.n.childinfos);
memmove(&node->u.n.childinfos[childnum+1], &node->u.n.childinfos[childnum+2], (old_n_children-childnum-1)*sizeof(node->u.n.childinfos[0]));
// fill in the new children
{
struct brtnode_nonleaf_childinfo *ci = &node->u.n.childinfos[childnum+1];
ci->subtree_fingerprint = 0;
ci->diskoff = newchild->thisnodename;
ci->n_bytes_in_buffer = 0;
r=toku_fifo_create(&ci->buffer);
if (r!=0) return r;
}
// replace the fifo in the old child
r=toku_fifo_create(&BNC_BUFFER(node, childnum));
if (r!=0) return r;
// slide the keys over
REALLOC_N(old_n_children, node->u.n.childkeys);
memmove(&node->u.n.childkeys[childnum], &node->u.n.childkeys[childnum+1], (old_n_children-childnum-1)*sizeof(node->u.n.childkeys[0]));
{
struct kv_pair *pivot = splitk.data;
BYTESTRING bs = { .len = splitk.size,
.data = kv_pair_key(pivot) };
r = toku_log_setpivot(logger, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum, bs);
if (r!=0) return r;
node->u.n.childkeys[childnum] = pivot;
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(brt, pivot);
}
node->u.n.n_children++;
// fix up fingerprints
fixup_child_fingerprint(node, childnum, child, brt, logger);
fixup_child_fingerprint(node, childnum+1, newchild, brt, logger);
// now everything in the fifo must be put again into one of the two fifos
BRT_CMD_S cmd;
DBT key,val;
while (0==toku_fifo_peek_deq_cmdstruct(old_fifo, &cmd, &key, &val)) {
int cmp = brt_compare_pivot(brt, cmd.u.id.key, 0, node->u.n.childkeys[childnum]);
if (cmp<=0) {
r=toku_fifo_enq_cmdstruct(BNC_BUFFER(node, childnum), &cmd);
if (r!=0) return r;
}
if (cmp==0 && cmd.type==BRT_DELETE && brt->flags&TOKU_DB_DUPSORT) {
r=toku_fifo_enq_cmdstruct(BNC_BUFFER(node, childnum+1), &cmd);
if (r!=0) return r;
}
}
toku_fifo_free(&old_fifo);
if (r!=0) return r;
}
} else {
abort(); // if a leaf is too fat need to split it.
}
return 0;
}
// There are two kinds of puts:
// A "weak" put that is guaranteed to trigger no I/O, and will not leaf the node overfull.
// A weak put may not actually perform the put, however (in which case it returns EAGAIN instead of 0)
// A "strong" put that is guaranteed to do the put. However, it may trigger I/O and the resulting node may be too big.
static int brt_leaf_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p) {
FILENUM filenum = toku_cachefile_filenum(t->cf);
switch (cmd->type) {
case BRT_INSERT: {
int r = toku_pma_insert_or_replace_ws(node->u.l.buffer,
cmd->u.id.key, cmd->u.id.val,
logger, cmd->xid,
filenum, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint,
&node->log_lsn, &node->u.l.n_bytes_in_buffer,
weak_p==WEAK);
if (r==EAGAIN) return EAGAIN;
assert(r==0);
node->dirty=1;
return r;
}
case BRT_DELETE: {
int r = toku_pma_delete_fixupsize(node->u.l.buffer, cmd->u.id.key, (DBT*)0,
logger, cmd->xid, node->thisnodename,
node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn, &node->u.l.n_bytes_in_buffer);
if (r==0) node->dirty=1;
return r;
}
case BRT_DELETE_BOTH: {
int r = toku_pma_delete_fixupsize(node->u.l.buffer, cmd->u.id.key, cmd->u.id.val,
logger, cmd->xid, node->thisnodename,
node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn, &node->u.l.n_bytes_in_buffer);
if (r == 0) node->dirty = 1;
return r;
}
case BRT_NONE: return 0;
}
return EINVAL; // if none of the cases match, then the command is messed up.
}
// Put an command in a particular child's fifo.
// If weak_p then do it without doing I/O or overfilling the child.
// If the child is in main memory and we can do a weak put on the child, then push into the child.
// Otherwise we return EAGAIN.
// If not weak_p then we are willing to overfill the child.
static int brt_nonleaf_put_cmd_to_child (BRT t, BRTNODE node, int childnum, BRT_CMD cmd, TOKULOGGER logger, WS weak_p) {
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
int r;
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0) {
void *child_v;
r = toku_cachetable_maybe_get_and_pin(t->cf, BNC_DISKOFF(node, childnum), &child_v);
if (r==0) {
BRTNODE child=child_v;
r = brtnode_put(t, child, cmd, logger, weak_p);
if (r==EAGAIN) {
r = unpin_brtnode(t, child);
if (r!=0) return r; // node is still OK
} else if (r==0) {
return maybe_fixup_fat_child(t, node, childnum, child, logger); // If the node is too big then deal with it. Unpin the child. NODE may be too big. I think the only way a node can get fat is if weak_p==STRONG.
} else {
unpin_brtnode(t, child);
return r; // node is still OK
}
}
}
// For some reason we didn't put it into the child, so we must put it in the fifo.
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
if (diff+toku_serialize_brtnode_size(node)>node->nodesize) return EAGAIN; // And it doesn't fit here.
r=toku_fifo_enq_cmdstruct(BNC_BUFFER(node,childnum), cmd);
if (r!=0) return r;
node->local_fingerprint += node->rand4fingerprint * toku_calccrc32_cmdstruct(cmd);
node->u.n.n_bytes_in_buffers += diff;
BNC_NBYTESINBUF(node, childnum) += diff;
node->dirty = 1;
return 0; // node may be too big
}
static void determine_which_children_to_push_delete (BRT t, BRTNODE node, BRT_CMD cmd, int *n_children_to_push, int *children_to_push) {
int i;
*n_children_to_push=0;
for (i=0; i<node->u.n.n_children-1; i++) {
int cmp = brt_compare_pivot(t, cmd->u.id.key, 0, node->u.n.childkeys[i]);
if (cmp>0) continue; // the cmd is bigger than the pivot, so it doesn't go here.
else if (cmp<0) {
// the cmd is smaller than the pivot, so it goes here, and goes nowhere else to the right
children_to_push[(*n_children_to_push)++] = i;
return;
} else if (t->flags & TOKU_DB_DUPSORT) {
// the cmd is equal and we are in a dupsort, so push and and go around to push additional ones.
children_to_push[(*n_children_to_push)++] = i;
continue;
} else {
// the cmd is equal but we are not in a dupsort, so we save i, but there is no saving the next one.
children_to_push[(*n_children_to_push)++] = i;
return;
}
}
// if we fell off the bottom, which means we must include the last one.
children_to_push[(*n_children_to_push)++] = i;
}
// Put the cmd into all the subtrees that it belong in. (Deletes can end up in several subtrees.)
// If weak_p then
// Don't do any I/O and the node will not be overfull.
// To guarantee that no I/O will occur, we must make sure we can insert everything before inserting anything.
// else put it regardless, possibly overflowing the node.
static int brt_nonleaf_put_delete (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p) {
int singlediff = cmd->u.id.key->size + cmd->u.id.val->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
int n_children_to_push = 0;
int children_to_push[node->u.n.n_children];
determine_which_children_to_push_delete(t, node, cmd, &n_children_to_push, children_to_push);
int totaldiff = singlediff * n_children_to_push;
if (weak_p && (totaldiff + toku_serialize_brtnode_size(node) > node->nodesize)) return EAGAIN;
// Now we know it will fit, so do all the weak pushes. We are being a little bit conservative,
// since a soft push might succeed, in getting data to a child without using up the local storage.
int i;
for (i=0; i<n_children_to_push; i++) {
int r=brt_nonleaf_put_cmd_to_child(t, node, children_to_push[i], cmd, logger, WEAK);
if (r==EAGAIN) {
r = toku_fifo_enq_cmdstruct(BNC_BUFFER(node, children_to_push[i]), cmd);
if (r!=0) return r;
} else if (r!=0) return r;
}
// We did we weak pushes to the children, but if that didn't work we put it in the buffer. The node could be overfull now.
return 0;
}
// a DELETE could be replicating in a dupsort database. Everything else is non replicating.
static int brt_nonleaf_put_nonreplicating_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p) {
return brt_nonleaf_put_cmd_to_child(t, node,
brtnode_which_child(node, cmd->u.id.key, cmd->u.id.val, t),
cmd, logger,
weak_p);
}
// Put the cmd into the node. Possibly results in the node being overfull. (But not if weak_p is set, in which case EAGAIN is returned instead)
// The command could get pushed into the appropriate child if the child is in main memory and has space to hold the command.
static int brt_nonleaf_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p) {
if (cmd->type == BRT_INSERT || cmd->type == BRT_DELETE_BOTH) {
return brt_nonleaf_put_nonreplicating_cmd(t, node, cmd, logger, weak_p);
} else if (cmd->type == BRT_DELETE) {
return brt_nonleaf_put_delete(t, node, cmd, logger, weak_p);
} else
return EINVAL;
}
// Put the command into the node.
// If weak_p is set then neither the node nor any descendants will get too big, and no I/O will occur.
// if !weak_p then I/O could occur and the node could end up with too much fanout. (But the children will all be properly sized)
static int brtnode_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p) {
if (node->height==0) {
return brt_leaf_put(t, node, cmd, logger, weak_p);
} else {
return brt_nonleaf_put(t, node, cmd, logger, weak_p);
}
}
static void verify_local_fingerprint_nonleaf (BRTNODE node) {
u_int32_t fp=0;
int i;
if (node->height==0) return;
for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
({
fp += node->rand4fingerprint * toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
}));
assert(fp==node->local_fingerprint);
}
#ifdef FOO
static int setup_initial_brt_root_node (BRT t, DISKOFF offset, TOKULOGGER logger) {
int r;
TAGMALLOC(BRTNODE, node);
assert(node);
initialize_brtnode(t, node,
offset, /* the location is one nodesize offset from 0. */
0);
// node->brt = t;
if (0) {
printf("%s:%d for tree %p node %p mdict_create--> %p\n", __FILE__, __LINE__, t, node, node->u.l.buffer);
printf("%s:%d put root at %lld\n", __FILE__, __LINE__, offset);
}
r=toku_cachetable_put(t->cf, offset, node, brtnode_size(node),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
if (r!=0) {
toku_free(node);
return r;
}
toku_verify_counts(node);
toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
toku_update_brtnode_loggerlsn(node, logger);
r=unpin_brtnode(t, node);
if (r!=0) {
toku_free(node);
return r;
}
return 0;
}
int toku_brt_create(BRT *brt_ptr) {
BRT brt = toku_malloc(sizeof *brt);
if (brt == 0)
return ENOMEM;
memset(brt, 0, sizeof *brt);
list_init(&brt->cursors);
brt->flags = 0;
brt->nodesize = BRT_DEFAULT_NODE_SIZE;
brt->compare_fun = toku_default_compare_fun;
brt->dup_compare = toku_default_compare_fun;
*brt_ptr = brt;
return 0;
}
int toku_brt_set_flags(BRT brt, unsigned int flags) {
brt->flags = flags;
return 0;
}
int toku_brt_get_flags(BRT brt, unsigned int *flags) {
*flags = brt->flags;
return 0;
}
int toku_brt_set_nodesize(BRT brt, unsigned int nodesize) {
brt->nodesize = nodesize;
return 0;
}
int toku_brt_get_nodesize(BRT brt, unsigned int *nodesize) {
*nodesize = brt->nodesize;
return 0;
}
int toku_brt_set_bt_compare(BRT brt, int (*bt_compare)(DB *, const DBT*, const DBT*)) {
brt->compare_fun = bt_compare;
return 0;
}
int toku_brt_set_dup_compare(BRT brt, int (*dup_compare)(DB *, const DBT*, const DBT*)) {
brt->dup_compare = dup_compare;
return 0;
}
int toku_brt_get_fd(BRT brt, int *fdp) {
*fdp = toku_cachefile_fd(brt->cf);
return 0;
}
enum { UNDO_COUNTER_LIMIT=10 };
typedef void(*undo_fun)(void*);
struct undo_rec { undo_fun f; void *v; };
struct undo {
int undo_counter;
struct undo_rec undos[UNDO_COUNTER_LIMIT];
};
#define INITUNDO(u) struct undo u = (struct undo){.undo_counter=0}
void push_undo(struct undo *undos, undo_fun f, void *v) {
assert(undos->undo_counter<UNDO_COUNTER_LIMIT);
undos->undos[undos->undo_counter++]=(struct undo_rec){f,v};
}
void do_undos(struct undo *undos) {
while (undos->undo_counter>0) {
struct undo_rec *r = &undos->undos[--undos->undo_counter];
r->f(r->v);
}
}
void undo_free (void *v) {
void **ptr=v;
toku_free(*ptr);
*ptr=0;
}
// tbou means "toku_brt_open undo"
void tbou_close_cachefile (void *v) {
BRT t = v;
toku_cachefile_close(&t->cf);
}
struct maybe_unpin_info {
int is_pinned;
CACHEFILE cf;
CACHEKEY ckey;
};
void tbou_maybe_unpin (void *v) {
struct maybe_unpin_info *mui = v;
if (mui->is_pinned)
toku_cachetable_unpin(mui->cf, mui->ckey, 0, 0);
mui->is_pinned=0;
}
int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char *dbname, int is_create, int only_create, int load_flags, CACHETABLE cachetable, TOKUTXN txn, DB *db) {
/* If dbname is NULL then we setup to hold a single tree. Otherwise we setup an array. */
int r;
struct maybe_unpin_info mui = {.is_pinned=0};
INITUNDO(undos);
push_undo(&undos, tbou_maybe_unpin, &mui); // if we pin a cf, then we put it into the maybe_undo_info so it will get undone on error.
assert(is_create || !only_create);
assert(!load_flags || !only_create);
if (0) {
died:
do_undos(&undos);
return r;
}
{
if (dbname) {
char *malloced_name = toku_strdup(dbname);
if (malloced_name==0) { r = errno; goto died; }
push_undo(&undos, undo_free, &t->database_name);
t->database_name = malloced_name;
} else {
t->database_name = 0;
}
}
t->db = db;
{
int fd = open(fname, O_RDWR, 0777);
r = errno;
if (fd==-1) {
if (r==ENOENT) {
if (!is_create) { goto died; }
fd = open(fname, O_RDWR | O_CREAT, 0777);
if (fd==-1) { r=errno; goto died; }
r = toku_logger_log_fcreate(txn, fname_in_env, 0777);
if (r!=0) goto died;
} else
goto died;
}
if ((r = toku_cachetable_openfd(&t->cf, cachetable, fd, t))) goto died;
push_undo(&undos, tbou_close_cachefile, t);
}
if ((r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf)))) goto died;
// no undo action for log_fopen
assert(t->nodesize>0);
if (is_create) {
r = toku_read_and_pin_brt_header(t->cf, &t->h);
if (r!=0 && r!=-1) goto died;
if (r==0) {
mui=(struct maybe_unpin_info){.is_pinned=1, .cf=t->cf, .ckey=0}; // remember to unpin it
int i;
assert(r==0);
assert(dbname);
if (t->h->unnamed_root!=-1) { r=EINVAL; goto died; } // Cannot create a subdb in a file that is not enabled for subdbs
assert(t->h->n_named_roots>=0);
for (i=0; i<t->h->n_named_roots; i++) {
if (strcmp(t->h->names[i], dbname)==0) {
if (only_create) {
r = EEXIST;
goto died;
}
else goto found_it;
}
}
if ((t->h->names = toku_realloc(t->h->names, (1+t->h->n_named_roots)*sizeof(*t->h->names))) == 0) { r=errno; goto died; }
if ((t->h->roots = toku_realloc(t->h->roots, (1+t->h->n_named_roots)*sizeof(*t->h->roots))) == 0) { r=errno; goto died; }
t->h->n_named_roots++;
if ((t->h->names[t->h->n_named_roots-1] = toku_strdup(dbname)) == 0) { r=errno; goto died; }
push_undo(&undos, undo_free, &t->h->names[t->h->n_named_roots-1]);
r = malloc_diskblock_header_is_in_memory(&t->h->roots[t->h->n_named_roots-1], t, t->h->nodesize, toku_txn_logger(txn));
if (r!=0) goto died;
t->h->dirty = 1;
if ((r=setup_initial_brt_root_node(t, t->h->roots[t->h->n_named_roots-1], toku_txn_logger(txn)))!=0) goto died;
} else {
assert(r==-1); // the pin failed because no data was present
/* construct a new header. */
if ((MALLOC(t->h))==0) { r = errno; goto died; }
t->h->dirty=1;
t->h->flags = t->flags;
t->h->nodesize=t->nodesize;
t->h->freelist=-1;
t->h->unused_memory=2*t->nodesize;
if (dbname) {
t->h->unnamed_root = -1;
t->h->n_named_roots = 1;
if ((MALLOC_N(1, t->h->names))==0) { r=errno; goto died; } push_undo(&undos, undo_free, &t->h->names);
if ((MALLOC_N(1, t->h->roots))==0) { r=errno; goto died; } push_undo(&undos, undo_free, &t->h->roots);
if ((t->h->names[0] = toku_strdup(dbname))==0) { r=errno; goto died; } push_undo(&undos, undo_free, &t->h->names[0]);
t->h->roots[0] = t->nodesize;
} else {
t->h->unnamed_root = t->nodesize;
t->h->n_named_roots = -1;
t->h->names=0;
t->h->roots=0;
}
if ((r=toku_logger_log_header(txn, toku_cachefile_filenum(t->cf), t->h))) goto died;
if ((r=setup_initial_brt_root_node(t, t->nodesize, toku_txn_logger(txn)))!=0) goto died;
if ((r=toku_cachetable_put(t->cf, 0, t->h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0))) goto died;
mui=(struct maybe_unpin_info){.is_pinned=1, .cf=t->cf, .ckey=0}; // remember to unpin it
}
} else {
if ((r = toku_read_and_pin_brt_header(t->cf, &t->h))!=0) goto died;
mui=(struct maybe_unpin_info){.is_pinned=1, .cf=t->cf, .ckey=0}; // remember to unpin it
if (!dbname) {
if (t->h->n_named_roots!=-1) { r = EINVAL; goto died; } // requires a subdb
} else {
int i;
if (t->h->n_named_roots==-1) { r=EINVAL; goto died; } // no suddbs in the db
// printf("%s:%d n_roots=%d\n", __FILE__, __LINE__, t->h->n_named_roots);
for (i=0; i<t->h->n_named_roots; i++) {
if (strcmp(t->h->names[i], dbname)==0) {
goto found_it;
}
}
r=ENOENT; /* the database doesn't exist */
goto died;
}
found_it:
t->nodesize = t->h->nodesize; /* inherit the pagesize from the file */
if (t->flags != t->h->flags) { /* flags must match */
if (load_flags) t->flags = t->h->flags;
else { r = EINVAL; goto died; }
}
}
assert(t->h);
if ((r = toku_unpin_brt_header(t)) !=0) goto died; // it's unpinned
mui.is_pinned=0;
assert(t->h==0);
return 0;
}
int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) {
int r;
int i;
int found = -1;
assert(flags == 0);
r = toku_read_and_pin_brt_header(brt->cf, &brt->h);
if (r!=0) return r;
assert(brt->h->unnamed_root==-1);
assert(brt->h->n_named_roots>=0);
for (i = 0; i < brt->h->n_named_roots; i++) {
if (strcmp(brt->h->names[i], dbname) == 0) {
found = i;
break;
}
}
if (found == -1) {
//Should not be possible.
r = ENOENT;
goto error;
}
//Free old db name
toku_free(brt->h->names[found]);
//TODO: Free Diskblocks including root
for (i = found + 1; i < brt->h->n_named_roots; i++) {
brt->h->names[i - 1] = brt->h->names[i];
brt->h->roots[i - 1] = brt->h->roots[i];
}
brt->h->n_named_roots--;
brt->h->dirty = 1;
//TODO: What if n_named_roots becomes 0? Should we handle it specially? Should we delete the file?
if ((brt->h->names = toku_realloc(brt->h->names, (brt->h->n_named_roots)*sizeof(*brt->h->names))) == 0) { r=errno; goto error; }
if ((brt->h->roots = toku_realloc(brt->h->roots, (brt->h->n_named_roots)*sizeof(*brt->h->roots))) == 0) { r=errno; goto error; }
r = toku_unpin_brt_header(brt);
return r;
error:
toku_unpin_brt_header(brt);
return r;
}
// This one has no env
int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt, int nodesize, CACHETABLE cachetable, TOKUTXN txn,
int (*compare_fun)(DB*,const DBT*,const DBT*), DB *db) {
BRT brt;
int r;
const int only_create = 0;
const int load_flags = 0;
r = toku_brt_create(&brt);
if (r != 0) return r;
toku_brt_set_nodesize(brt, nodesize);
toku_brt_set_bt_compare(brt, compare_fun);
r = toku_brt_open(brt, fname, fname, dbname, is_create, only_create, load_flags, cachetable, txn, db);
if (r != 0) {
toku_free(brt);
return r;
}
*newbrt = brt;
return 0;
}
int toku_close_brt (BRT brt) {
int r;
while (!list_empty(&brt->cursors)) {
BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link);
r=toku_brt_cursor_close(c);
if (r!=0) return r;
}
if (brt->cf) {
assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // For the brt, the pinned count should be zero.
//printf("%s:%d closing cachetable\n", __FILE__, __LINE__);
if ((r = toku_cachefile_close(&brt->cf))!=0) return r;
}
if (brt->database_name) toku_free(brt->database_name);
if (brt->skey) { toku_free(brt->skey); }
if (brt->sval) { toku_free(brt->sval); }
toku_free(brt);
return 0;
}
CACHEKEY* toku_calculate_root_offset_pointer (BRT brt) {
if (brt->database_name==0) {
return &brt->h->unnamed_root;
} else {
int i;
for (i=0; i<brt->h->n_named_roots; i++) {
if (strcmp(brt->database_name, brt->h->names[i])==0) {
return &brt->h->roots[i];
}
}
}
abort();
}
static int brt_init_new_root(BRT brt, int n_new_nodes, BRTNODE *new_nodes, DBT *splitks, CACHEKEY *rootp, TOKULOGGER logger, BRTNODE *newrootp) {
assert(n_new_nodes>0);
TAGMALLOC(BRTNODE, newroot);
int r;
int new_height = new_nodes[0]->height+1;
int new_nodesize = brt->h->nodesize;
DISKOFF newroot_diskoff;
if ((r=malloc_diskblock(&newroot_diskoff, brt, new_nodesize, logger))) return r;
assert(newroot);
if (brt->database_name==0) {
toku_log_changeunnamedroot(logger, toku_cachefile_filenum(brt->cf), *rootp, newroot_diskoff);
} else {
BYTESTRING bs;
bs.len = 1+strlen(brt->database_name);
bs.data = brt->database_name;
toku_log_changenamedroot(logger, toku_cachefile_filenum(brt->cf), bs, *rootp, newroot_diskoff);
}
*rootp=newroot_diskoff;
brt->h->dirty=1;
initialize_brtnode (brt, newroot, newroot_diskoff, new_height);
newroot->u.n.n_children=n_new_nodes;
r=toku_log_newbrtnode(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
if (r!=0) return r;
int i;
for (i=0; i<n_new_nodes; i++) {
BNC_DISKOFF(newroot, i)=new_nodes[i]->thisnodename;
r=toku_fifo_create(&BNC_BUFFER(newroot,i)); if (r!=0) return r;
r=toku_log_addchild(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, new_nodes[i]->thisnodename, 0);
if (r!=0) return r;
fixup_child_fingerprint(newroot, i, new_nodes[i], brt, logger);
}
toku_verify_counts(newroot);
int sum_splitk_sizes=0;
for (i=0; i+1<n_new_nodes; i++) {
sum_splitk_sizes += splitks[i].size;
newroot->u.n.childkeys[i] = splitks[i].data;
BYTESTRING bs = { .len = kv_pair_keylen(newroot->u.n.childkeys[0]),
.data = kv_pair_key(newroot->u.n.childkeys[0]) };
r=toku_log_setpivot(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
if (r!=0) return r;
toku_update_brtnode_loggerlsn(newroot, logger);
}
newroot->u.n.totalchildkeylens=sum_splitk_sizes;
for (i=0; i<n_new_nodes; i++) {
r=unpin_brtnode(brt, new_nodes[i]);
if (r!=0) return r;
}
toku_cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
*newrootp = newroot;
return 0;
}
static int split_nonleaf_node(BRT, int *n_new_nodes, BRTNODE **new_nodes, DBT **splitks);
static int leaf_node_is_too_full (BRT, BRTNODE);
// push things down into node's children (and into their children and so forth) but don't make any descendant too big.
static int push_down_without_overfilling (BRT brt, BRTNODE node, TOKULOGGER logger);
// Push data toward a child. If the child gets too big then the child will push down or split.
// If a split happens, then return immediately so that we can check to see if NODE needs to be split
static int flush_toward_child (BRT brt, BRTNODE node, int childnum, TOKULOGGER logger);
static int maybe_fixup_root (BRT brt, BRTNODE node, CACHEKEY *rootp, TOKULOGGER logger) {
int r;
if (node->height>0) {
// internal nodes can be too wide, but if too full, they did a push down
maybe_reshape_internal_node:
while (nonleaf_node_is_too_wide(brt, node)) {
int n_new_nodes; BRTNODE *new_nodes; DBT *splitks;
if ((r=split_nonleaf_node(brt, node, &n_new_nodes, &new_nodes, &splitks))) return r;
if ((r=brt_init_new_root(brt, n_new_nodes, new_nodes, splitks, rootp, logger, &node))) return r; // unpins all the new nodes, which are all small enough
// now node is still possibly too wide, hence the loop
}
} else {
// leaf nodes can be too full
if (leaf_node_is_too_full(brt, node)) {
int n_new_nodes; BRTNODE *new_nodes; DBT *splitks;
if ((r==split_leaf_node(brt, logger, node, &n_new_nodes, &new_nodes, &splitks))) return r;
if ((r==brt_init_new_root(brt, n_new_nodes, new_nodes, splitks, rootp, logger, &node))) return r; // unpins all the new nodes, which are all small enough
assert(node->height>0);
goto maybe_reshape_internal_node;
}
}
return 0;
}
#endif
static int brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) {
void *node_v;
BRTNODE node;
CACHEKEY *rootp;
int r;
//assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
rootp = toku_calculate_root_offset_pointer(brt);
if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) {
if (0) { died1: unpin_brtnode(brt, node); goto died0; }
goto died0;
}
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
if ((r = brtnode_put(brt, node, cmd, logger, STRONG))) goto died1; // put stuff in, possibly causing the buffers to get too big
if ((r = push_down_if_buffers_too_full(brt, node, logger))) goto died1; // if the buffers are too big, push stuff down
if ((r = maybe_split_root(brt, node, rootp, logger))) goto died1; // now the node might have to split (leaf nodes can't push down, and internal nodes have too much fanout) This will change node.
// Now the node is OK,
brt->h->dirty=1;
return toku_unpin_brt_header(brt);
}
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r;
BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
int r, rr;
BRT_CURSOR cursor;
rr = toku_brt_cursor(brt, &cursor);
if (rr != 0) return rr;
int op = brt->flags & TOKU_DB_DUPSORT ? DB_GET_BOTH : DB_SET;
r = toku_brt_cursor_get(cursor, k, v, op, 0);
rr = toku_brt_cursor_close(cursor); assert(rr == 0);
return r;
}
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r;
DBT val;
BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r;
BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, toku_txn_get_txnid(txn), .u.id={key,val}};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
int toku_verify_brtnode (BRT brt, DISKOFF off, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, int recurse, BRTNODE parent_brtnode);
int toku_dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen, BRTNODE parent_brtnode) {
int result=0;
BRTNODE node;
void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(r==0);
printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
result=toku_verify_brtnode(brt, off, lorange, lolen, hirange, hilen, 0, parent_brtnode);
printf("%*sNode=%p\n", depth, "", node);
if (node->height>0) {
printf("%*sNode %lld nodesize=%d height=%d n_children=%d n_bytes_in_buffers=%d keyrange=%s %s\n",
depth, "", off, node->nodesize, node->height, node->u.n.n_children, node->u.n.n_bytes_in_buffers, (char*)lorange, (char*)hirange);
//printf("%s %s\n", lorange ? lorange : "NULL", hirange ? hirange : "NULL");
{
int i;
for (i=0; i< node->u.n.n_children; i++) {
printf("%*schild %d buffered (%d entries):\n", depth+1, "", i, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
({
data=data; datalen=datalen; keylen=keylen;
printf("%*s xid=%"PRId64" %d (type=%d)\n", depth+2, "", xid, ntohl(*(int*)key), type);
//assert(strlen((char*)key)+1==keylen);
//assert(strlen((char*)data)+1==datalen);
}));
}
for (i=0; i<node->u.n.n_children; i++) {
printf("%*schild %d\n", depth, "", i);
if (i>0) {
printf("%*spivot %d len=%d %d\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, ntohl(*(int*)&node->u.n.childkeys[i-1]->key));
}
toku_dump_brtnode(brt, BNC_DISKOFF(node, i), depth+4,
(i==0) ? lorange : node->u.n.childkeys[i-1],
(i==0) ? lolen : toku_brt_pivot_key_len(brt, node->u.n.childkeys[i-1]),
(i==node->u.n.n_children-1) ? hirange : node->u.n.childkeys[i],
(i==node->u.n.n_children-1) ? hilen : toku_brt_pivot_key_len(brt, node->u.n.childkeys[i]),
node
);
}
}
} else {
printf("%*sNode %lld nodesize=%d height=%d n_bytes_in_buffer=%d keyrange=%d %d\n",
depth, "", off, node->nodesize, node->height, node->u.l.n_bytes_in_buffer, lorange ? ntohl(*(int*)lorange) : 0, hirange ? ntohl(*(int*)hirange) : 0);
PMA_ITERATE(node->u.l.buffer, key, keylen, val __attribute__((__unused__)), vallen,
( keylen=keylen, vallen=vallen, printf(" (%d)%d ", keylen, ntohl(*(int*)key))));
printf("\n");
}
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
assert(r==0);
return result;
}
int toku_dump_brt (BRT brt) {
int r;
CACHEKEY *rootp;
struct brt_header *prev_header = brt->h;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
rootp = toku_calculate_root_offset_pointer(brt);
if ((r = toku_dump_brtnode(brt, *rootp, 0, 0, 0, 0, 0, null_brtnode))) goto died0;
if ((r = toku_unpin_brt_header(brt))!=0) return r;
brt->h = prev_header;
return 0;
}
static int show_brtnode_blocknumbers (BRT brt, DISKOFF off) {
BRTNODE node;
void *node_v;
int i,r;
assert(off%brt->h->nodesize==0);
if ((r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) {
if (0) { died0: toku_cachetable_unpin(brt->cf, off, 0, 0); }
return r;
}
printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
printf(" %lld", off/brt->h->nodesize);
if (node->height>0) {
for (i=0; i<node->u.n.n_children; i++) {
if ((r=show_brtnode_blocknumbers(brt, BNC_DISKOFF(node, i)))) goto died0;
}
}
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
return r;
}
#if 0
int show_brt_blocknumbers (BRT brt) {
int r;
CACHEKEY *rootp;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
rootp = toku_calculate_root_offset_pointer(brt);
printf("BRT %p has blocks:", brt);
if ((r=show_brtnode_blocknumbers (brt, *rootp, 0))) goto died0;
printf("\n");
if ((r = toku_unpin_brt_header(brt))!=0) return r;
return 0;
}
#endif
int toku_brt_dbt_set_key(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) {
int r = toku_dbt_set_value(ybt, val, vallen, &brt->skey);
return r;
}
int toku_brt_dbt_set_value(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) {
int r = toku_dbt_set_value(ybt, val, vallen, &brt->sval);
return r;
}
#ifdef FOO
/* search in a node's child */
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) {
int r, rr;
/* if the child's buffer is not empty then try to empty it */
if (BNC_NBYTESINBUF(node, childnum) > 0) {
rr = maybe_push_some_brt_cmds_down(brt, node, childnum, logger);
if (rr!=0) return rr;
/* push down may cause a child split, so childnum may not be appropriate, and the node itself may split, so retry */
return EAGAIN;
}
void *node_v;
rr = toku_cachetable_get_and_pin(brt->cf, BNC_DISKOFF(node,childnum), &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(rr == 0);
for (;;) {
BRTNODE childnode = node_v;
BRT_SPLIT childsplit; brt_split_init(&childsplit);
r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, logger);
if (childsplit.did_split) {
rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk,
&split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger);
assert(rr == 0);
break;
} else {
if (r == EAGAIN)
continue;
rr = toku_cachetable_unpin(brt->cf, childnode->thisnodename, childnode->dirty, brtnode_size(childnode));
assert(rr == 0);
break;
}
}
return r;
}
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) {
int c;
restart:
{
/* binary search is overkill for a small array */
int child[node->u.n.n_children];
/* scan left to right or right to left depending on the search direction */
for (c = 0; c < node->u.n.n_children; c++)
child[c] = search->direction & BRT_SEARCH_LEFT ? c : node->u.n.n_children - 1 - c;
for (c = 0; c < node->u.n.n_children-1; c++) {
int p = search->direction & BRT_SEARCH_LEFT ? child[c] : child[c] - 1;
struct kv_pair *pivot = node->u.n.childkeys[p];
DBT pivotkey, pivotval;
if (search->compare(search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
// We know which child we want to search. First make sure the buffer is empty.
r = flush_toward_child(brt, node, child[c], logger, &did_split);
if (did_split) goto restart;
// If we didn't split, then the buffer is empty, so search that child
r=search_that_child();
// Now that child may be bent out of shape
???
int r = brt_search_child(brt, node, child[c], search, newkey, newval, logger);
// searching the child can cause it to get bent out of shape
int rr = maybe_fixup_nonroot(brt, node, child[c], logger);
if (rr!=0) return rr;
if (r == 0) return r;
}
}
/* check the first (left) or last (right) node if nothing has been found */
if (r == DB_NOTFOUND && c == node->u.n.n_children-1)
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger);
return r;
}
static int brt_search_leaf_node(BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval) {
PMA pma = node->u.l.buffer;
int r = toku_pma_search(pma, search, newkey, newval);
return r;
}
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) {
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, logger);
else
return brt_search_leaf_node(node, search, newkey, newval);
}
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) {
int r, rr;
rr = toku_read_and_pin_brt_header(brt->cf, &brt->h);
if (rr!=0) {
if (0) { died0: toku_unpin_brt_header(brt); }
return rr;
}
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt);
void *node_v;
BRTNODE node;
rr = toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (rr!=0) {
if (0) { died1: unpin_brtnode(brt, node); }
goto died0;
}
node = node_v;
r = brt_search_node(brt, node, search, newkey, newval, logger);
rr = maybe_fixup_root(brt, node, rootp, logger);
if (rr!=0) { goto died1; }
rr = unpin_brtnode(brt, node);
if (rr!=0) { goto died0; }
rr = toku_unpin_brt_header(brt);
if (rr!=0) return rr;
return r;
}
static inline void dbt_cleanup(DBT *dbt) {
if (dbt->data && (dbt->flags & DB_DBT_MALLOC)) {
toku_free_n(dbt->data, dbt->size); dbt->data = 0;
}
}
static inline void brt_cursor_cleanup(BRT_CURSOR cursor) {
dbt_cleanup(&cursor->key);
dbt_cleanup(&cursor->val);
}
static inline int brt_cursor_not_set(BRT_CURSOR cursor) {
return cursor->key.data == 0 || cursor->val.data == 0;
}
BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) {
return brt_cursor_not_set(c);
}
static inline void brt_cursor_set_key_val(BRT_CURSOR cursor, DBT *newkey, DBT *newval) {
brt_cursor_cleanup(cursor);
cursor->key = *newkey; memset(newkey, 0, sizeof *newkey);
cursor->val = *newval; memset(newval, 0, sizeof *newval);
}
int toku_brt_cursor(BRT brt, BRT_CURSOR *cursorptr) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0)
return ENOMEM;
cursor->brt = brt;
toku_init_dbt(&cursor->key);
toku_init_dbt(&cursor->val);
list_push(&brt->cursors, &cursor->cursors_link);
*cursorptr = cursor;
return 0;
}
int toku_brt_cursor_close(BRT_CURSOR cursor) {
brt_cursor_cleanup(cursor);
list_remove(&cursor->cursors_link);
toku_free_n(cursor, sizeof *cursor);
return 0;
}
static inline int compare_k_x(BRT brt, DBT *k, DBT *x) {
return brt->compare_fun(brt->db, k, x);
}
static inline int compare_v_y(BRT brt, DBT *v, DBT *y) {
return brt->dup_compare(brt->db, v, y);
}
static inline int compare_kv_xy(BRT brt, DBT *k, DBT *v, DBT *x, DBT *y) {
int cmp = brt->compare_fun(brt->db, k, x);
if (cmp == 0 && v && y)
cmp = brt->dup_compare(brt->db, v, y);
return cmp;
}
static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
int r = 0;
if (key)
r = toku_dbt_set_value(key, cursor->key.data, cursor->key.size, &cursor->brt->skey);
if (r == 0 && val)
r = toku_dbt_set_value(val, cursor->val.data, cursor->val.size, &cursor->brt->sval);
return r;
}
static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */
}
static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (brt_cursor_not_set(cursor))
return EINVAL;
if (op == DB_CURRENT) {
DBT newkey; toku_init_dbt(&newkey);
DBT newval; toku_init_dbt(&newval);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
int r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger);
if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0)
return DB_KEYEMPTY;
}
return brt_cursor_copyout(cursor, outkey, outval);
}
/* search for the first kv pair that matches the search object */
static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
}
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r;
}
/* search for the kv pair that matches the search object and is equal to kv */
static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &newkey, &newval) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
} else
r = DB_NOTFOUND;
}
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r;
}
/* search for the kv pair that matches the search object and is equal to k */
static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger);
if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &newkey) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
} else
r = DB_NOTFOUND;
}
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r;
}
static int brt_cursor_compare_one(brt_search_t *search, DBT *x, DBT *y) {
search = search; x = x; y = y;
return 1;
}
static int brt_cursor_first(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_last(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */
}
static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; y = y;
return compare_k_x(brt, search->k, x) < 0; /* return min x: k < x */
}
static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_nodup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next_dup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp < 0)
return 1;
else
return keycmp == 0 && y && compare_v_y(brt, search->v, y) < 0; /* return min xy: k <= x && v < y */
}
static int brt_cursor_next_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_dup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_get_both_range(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp < 0)
return 1;
else
return keycmp == 0 && (y == 0 || compare_v_y(brt, search->v, y) <= 0); /* return min xy: k <= x && v <= y */
}
static int brt_cursor_get_both_range(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_get_both_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_prev(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) > 0; /* return max xy: kv > xy */
}
static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_prev_nodup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; y = y;
return compare_k_x(brt, search->k, x) > 0; /* return max x: k > x */
}
static int brt_cursor_prev_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_nodup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
#ifdef DB_PREV_DUP
static int brt_cursor_compare_prev_dup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp > 0)
return 1;
else
return keycmp == 0 && y && compare_v_y(brt, search->v, y) > 0; /* return max xy: k >= x && v > y */
}
static int brt_cursor_prev_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_dup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval, logger);
}
#endif
static int brt_cursor_compare_set_range(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return kv <= xy */
}
static int brt_cursor_set(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_kv_xy(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, DBT *outkey, DBT *outval, TOKULOGGER logger) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, TOKUTXN txn) {
int r;
int op = get_flags & DB_OPFLAGS_MASK;
TOKULOGGER logger = toku_txn_logger(txn);
if (get_flags & ~DB_OPFLAGS_MASK)
return EINVAL;
switch (op) {
case DB_CURRENT:
case DB_CURRENT_BINDING:
r = brt_cursor_current(cursor, op, key, val, logger);
break;
case DB_FIRST:
r = brt_cursor_first(cursor, key, val, logger);
break;
case DB_LAST:
r = brt_cursor_last(cursor, key, val, logger);
break;
case DB_NEXT:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val, logger);
else
r = brt_cursor_next(cursor, key, val, logger);
break;
case DB_NEXT_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_next_dup(cursor, key, val, logger);
break;
case DB_NEXT_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val, logger);
else
r = brt_cursor_next_nodup(cursor, key, val, logger);
break;
case DB_PREV:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val, logger);
else
r = brt_cursor_prev(cursor, key, val, logger);
break;
#ifdef DB_PREV_DUP
case DB_PREV_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_prev_dup(cursor, key, val, logger);
break;
#endif
case DB_PREV_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val, logger);
else
r = brt_cursor_prev_nodup(cursor, key, val, logger);
break;
case DB_SET:
r = brt_cursor_set(cursor, key, 0, 0, val, logger);
break;
case DB_SET_RANGE:
r = brt_cursor_set_range(cursor, key, key, val, logger);
break;
case DB_GET_BOTH:
r = brt_cursor_set(cursor, key, val, 0, 0, logger);
break;
case DB_GET_BOTH_RANGE:
r = brt_cursor_get_both_range(cursor, key, val, 0, val, logger);
break;
default:
r = EINVAL;
break;
}
return r;
}
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) {
if ((flags & ~DB_DELETE_ANY) != 0)
return EINVAL;
if (brt_cursor_not_set(cursor))
return EINVAL;
int r = 0;
if (!(flags & DB_DELETE_ANY))
r = brt_cursor_current(cursor, DB_CURRENT, 0, 0, toku_txn_logger(txn));
if (r == 0)
r = toku_brt_delete_both(cursor->brt, &cursor->key, &cursor->val, txn);
return r;
}
int toku_brt_height_of_root(BRT brt, int *height) {
// for an open brt, return the current height.
int r;
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); }
return r;
}
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt);
void *node_v;
if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) {
goto died0;
}
BRTNODE node = node_v;
*height = node->height;
r = unpin_brtnode(brt, node); assert(r==0);
r = toku_unpin_brt_header(brt); assert(r==0);
return 0;
}
#endif