More surgery. A few tests run. Many still don't link. Addresses #1195.

git-svn-id: file:///svn/toku/tokudb.1195@7295 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Bradley C. Kuszmaul 2013-04-16 23:57:24 -04:00 committed by Yoni Fogel
parent 20894b913f
commit d3eb758aac
3 changed files with 416 additions and 409 deletions

View file

@ -253,12 +253,12 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM, enum brt_cmd_type, char
int toku_set_func_fsync (int (*fsync_function)(int));
// These two go together to do lookups in a brtnode using the keys in a command.
struct cmd_leafval_bessel_extra {
struct cmd_leafval_heaviside_extra {
BRT t;
BRT_CMD cmd;
int compare_both_keys; // Set to 1 for DUPSORT databases that are not doing a DELETE_BOTH
};
int toku_cmd_leafval_bessel (OMTVALUE leafentry, void *extra);
int toku_cmd_leafval_heaviside (OMTVALUE leafentry, void *extra);
int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger);
int toku_cachefile_root_put_cmd (CACHEFILE cf, BRT_CMD cmd, TOKULOGGER logger);

View file

@ -57,117 +57,6 @@ long long n_items_malloced;
static void verify_local_fingerprint_nonleaf (BRTNODE node);
static int toku_dump_brtnode (BRT brt, BLOCKNUM blocknum, int depth, bytevec lorange, ITEMLEN lolen, bytevec hirange, ITEMLEN hilen);
/* Frees a node, including all the stuff in the hash table. */
void toku_brtnode_free (BRTNODE *nodep) {
BRTNODE node=*nodep;
int i;
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, node, node->mdicts[0]);
if (node->height>0) {
for (i=0; i<node->u.n.n_children-1; i++) {
toku_free(node->u.n.childkeys[i]);
}
for (i=0; i<node->u.n.n_children; i++) {
if (BNC_BUFFER(node,i)) {
toku_fifo_free(&BNC_BUFFER(node,i));
}
}
toku_free(node->u.n.childkeys);
toku_free(node->u.n.childinfos);
} else {
if (node->u.l.buffer) // The buffer may have been freed already, in some cases.
toku_omt_destroy(&node->u.l.buffer);
void *mpbase = toku_mempool_get_base(&node->u.l.buffer_mempool);
toku_mempool_fini(&node->u.l.buffer_mempool);
toku_free(mpbase);
}
toku_free(node);
*nodep=0;
}
static int verify_in_mempool(OMTVALUE lev, u_int32_t UU(idx), void *vmp) {
LEAFENTRY le=lev;
struct mempool *mp=vmp;
assert(toku_mempool_inrange(mp, le, leafentry_memsize(le)));
return 0;
}
void toku_verify_all_in_mempool(BRTNODE node) {
if (node->height==0) {
toku_omt_iterate(node->u.l.buffer, verify_in_mempool, &node->u.l.buffer_mempool);
}
}
void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size __attribute__((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) {
struct brt_header *h = extraargs;
BRTNODE brtnode = brtnode_v;
// if ((write_me || keep_me) && (brtnode->height==0)) {
// toku_pma_verify_fingerprint(brtnode->u.l.buffer, brtnode->rand4fingerprint, brtnode->subtree_fingerprint);
// }
if (0) {
printf("%s:%d toku_brtnode_flush_callback %p thisnodename=%" PRId64 " keep_me=%u height=%d", __FILE__, __LINE__, brtnode, brtnode->thisnodename.b, keep_me, brtnode->height);
if (brtnode->height==0) printf(" buf=%p mempool-base=%p", brtnode->u.l.buffer, brtnode->u.l.buffer_mempool.base);
printf("\n");
}
//if (modified_lsn.lsn > brtnode->lsn.lsn) brtnode->lsn=modified_lsn;
assert(brtnode->thisnodename.b==nodename.b);
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (write_me) {
toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode, h);
}
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (!keep_me) {
toku_brtnode_free(&brtnode);
}
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
}
int toku_brtnode_fetch_callback (CACHEFILE cachefile, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn) {
assert(extraargs);
struct brt_header *h = extraargs;
BRTNODE *result=(BRTNODE*)brtnode_pv;
int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, fullhash, result, h);
if (r == 0) {
*sizep = brtnode_memory_size(*result);
*written_lsn = (*result)->disk_lsn;
}
//(*result)->parent_brtnode = 0; /* Don't know it right now. */
//printf("%s:%d installed %p (offset=%lld)\n", __FILE__, __LINE__, *result, nodename);
return r;
}
void toku_brtheader_free (struct brt_header *h) {
if (h->n_named_roots>0) {
int i;
for (i=0; i<h->n_named_roots; i++) {
toku_free(h->names[i]);
}
toku_free(h->names);
}
toku_fifo_free(&h->fifo);
toku_free(h->roots);
toku_free(h->root_hashes);
toku_free(h->flags_array);
toku_free(h->block_translation);
destroy_block_allocator(&h->block_allocator);
toku_free(h);
}
int toku_brtheader_close (CACHEFILE cachefile, void *header_v) {
struct brt_header *h = header_v;
//printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__,
// block_allocator_allocated_limit(h->block_allocator), h->unused_blocks.b*h->nodesize);
if (h->dirty) {
toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h);
u_int64_t write_to = block_allocator_allocated_limit(h->block_allocator); // Must compute this after writing the header.
//printf("%s:%d fifo written to %lu\n", __FILE__, __LINE__, write_to);
toku_serialize_fifo_at(toku_cachefile_fd(cachefile), write_to, h->fifo);
}
toku_brtheader_free(h);
return 0;
}
int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header **header)
// If the cachefile already has the header, then just get it.
// If the cachefile has not been initialized, then don't modify anything.
@ -188,16 +77,6 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header
return 0;
}
int toku_unpin_brtnode (BRT brt, BRTNODE node) {
// if (node->dirty && txn) {
// // For now just update the log_lsn. Later we'll have to deal with the checksums.
// node->log_lsn = toku_txn_get_last_lsn(txn);
// //if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn);
// }
VERIFY_NODE(brt,node);
return toku_cachetable_unpin(brt->cf, node->thisnodename, node->fullhash, node->dirty, brtnode_memory_size(node));
}
typedef struct kvpair {
bytevec key;
unsigned int keylen;
@ -222,32 +101,6 @@ static inline u_int32_t myrandom (void) {
return rstate;
}
// logs the memory allocation, but not the creation of the new node
int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logger) {
TAGMALLOC(BRTNODE, n);
int r;
BLOCKNUM name;
r = allocate_diskblocknumber (&name, t, logger);
assert(r==0);
assert(n);
assert(t->h->nodesize>0);
n->ever_been_written = 0;
initialize_brtnode(t, n, name, height);
*result = n;
assert(n->nodesize>0);
// n->brt = t;
//printf("%s:%d putting %p (%lld)\n", __FILE__, __LINE__, n, n->thisnodename);
u_int32_t fullhash = toku_cachetable_hash(t->cf, n->thisnodename);
n->fullhash = fullhash;
r=toku_cachetable_put(t->cf, n->thisnodename, fullhash,
n, brtnode_memory_size(n),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
assert(r==0);
return 0;
}
static int
handle_split_of_child_simple (BRT t, BRTNODE node, int childnum,
BRTNODE childa, BRTNODE childb,
@ -852,56 +705,6 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE
return 0;
}
static int
leafval_bessel_le_committed (u_int32_t klen, void *kval,
u_int32_t dlen, void *dval,
struct cmd_leafval_bessel_extra *be) {
BRT t = be->t;
DBT dbt;
int cmp = t->compare_fun(t->db,
toku_fill_dbt(&dbt, kval, klen),
be->cmd->u.id.key);
if (cmp == 0 && be->compare_both_keys && be->cmd->u.id.val->data) {
return t->dup_compare(t->db,
toku_fill_dbt(&dbt, dval, dlen),
be->cmd->u.id.val);
} else {
return cmp;
}
}
static int
leafval_bessel_le_both (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t clen __attribute__((__unused__)), void *cval __attribute__((__unused__)),
u_int32_t plen, void *pval,
struct cmd_leafval_bessel_extra *be) {
return leafval_bessel_le_committed(klen, kval, plen, pval, be);
}
static int
leafval_bessel_le_provdel (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t clen, void *cval,
struct cmd_leafval_bessel_extra *be) {
return leafval_bessel_le_committed(klen, kval, clen, cval, be);
}
static int
leafval_bessel_le_provpair (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t plen, void *pval,
struct cmd_leafval_bessel_extra *be) {
return leafval_bessel_le_committed(klen, kval, plen, pval, be);
}
int toku_cmd_leafval_bessel (OMTVALUE lev, void *extra) {
LEAFENTRY le=lev;
struct cmd_leafval_bessel_extra *be = extra;
LESWITCHCALL(le, leafval_bessel, be);
abort(); return 0; // make certain compilers happy
}
// Whenever anything provisional is happening, it's XID must match the cmd's.
static int
@ -1067,30 +870,6 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
return 0;
}
/* find the leftmost child that may contain the key */
unsigned int toku_brtnode_which_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
int i;
assert(node->height>0);
#define DO_PIVOT_SEARCH_LR 0
#if DO_PIVOT_SEARCH_LR
for (i=0; i<node->u.n.n_children-1; i++) {
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]);
if (cmp > 0) continue;
if (cmp < 0) return i;
return i;
}
return node->u.n.n_children-1;
#else
// give preference for appending to the dictionary. no change for
// random keys
for (i = node->u.n.n_children-2; i >= 0; i--) {
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]);
if (cmp > 0) return i+1;
}
return 0;
#endif
}
/* put a cmd into a nodes child */
static int
brt_nonleaf_put_cmd_child_node_simple (BRT t, BRTNODE node, int childnum, BOOL maybe, BRT_CMD cmd, TOKULOGGER logger,
@ -1518,23 +1297,6 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum, TOKULOGGER log
#define WHEN_BRTTRACE(x) ((void)0)
#endif
int toku_brt_create(BRT *brt_ptr) {
BRT brt = toku_malloc(sizeof *brt);
if (brt == 0)
return ENOMEM;
memset(brt, 0, sizeof *brt);
list_init(&brt->cursors);
brt->flags = 0;
brt->did_set_flags = 0;
brt->nodesize = BRT_DEFAULT_NODE_SIZE;
brt->compare_fun = toku_default_compare_fun;
brt->dup_compare = toku_default_compare_fun;
int r = toku_omt_create(&brt->txns);
if (r!=0) { toku_free(brt); return r; }
*brt_ptr = brt;
return 0;
}
int toku_brt_set_flags(BRT brt, unsigned int flags) {
brt->did_set_flags = 1;
brt->flags = flags;
@ -1571,32 +1333,6 @@ int toku_brt_get_fd(BRT brt, int *fdp) {
return 0;
}
static void compute_and_fill_remembered_hash (BRT brt, int rootnum) {
struct remembered_hash *rh = &brt->h->root_hashes[rootnum];
assert(brt->cf); // if cf is null, we'll be hosed.
rh->valid = TRUE;
rh->fnum=toku_cachefile_filenum(brt->cf);
rh->root=brt->h->roots[rootnum];
rh->fullhash = toku_cachetable_hash(brt->cf, rh->root);
}
static u_int32_t get_roothash (BRT brt, int rootnum) {
struct remembered_hash *rh = &brt->h->root_hashes[rootnum];
BLOCKNUM root = brt->h->roots[rootnum];
// compare cf first, since cf is NULL for invalid entries.
assert(rh);
//printf("v=%d\n", rh->valid);
if (rh->valid) {
//printf("f=%d\n", rh->fnum.fileid);
//printf("cf=%d\n", toku_cachefile_filenum(brt->cf).fileid);
if (rh->fnum.fileid == toku_cachefile_filenum(brt->cf).fileid)
if (rh->root.b == root.b)
return rh->fullhash;
}
compute_and_fill_remembered_hash(brt, rootnum);
return rh->fullhash;
}
// open a file for use by the brt. if the file does not exist, create it.
static int brt_open_file(BRT brt, const char *fname, const char *fname_in_env, int is_create, TOKUTXN txn, int *fdp) {
brt = brt;
@ -1899,77 +1635,12 @@ int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *ne
return r;
}
int toku_close_brt (BRT brt, TOKULOGGER logger) {
int r;
while (!list_empty(&brt->cursors)) {
BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link);
r=toku_brt_cursor_close(c);
if (r!=0) return r;
}
// Must do this work before closing the cf
r=toku_txn_note_close_brt(brt);
assert(r==0);
toku_omt_destroy(&brt->txns);
if (brt->cf) {
if (logger) {
assert(brt->fname);
BYTESTRING bs = {.len=strlen(brt->fname), .data=brt->fname};
LSN lsn;
r = toku_log_brtclose(logger, &lsn, 1, bs, toku_cachefile_filenum(brt->cf)); // flush the log on close, otherwise it might not make it out.
if (r!=0) return r;
}
assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // For the brt, the pinned count should be zero.
//printf("%s:%d closing cachetable\n", __FILE__, __LINE__);
// printf("%s:%d brt=%p ,brt->h=%p\n", __FILE__, __LINE__, brt, brt->h);
if ((r = toku_cachefile_close(&brt->cf, logger))!=0) return r;
}
if (brt->database_name) toku_free(brt->database_name);
if (brt->fname) toku_free(brt->fname);
if (brt->skey) { toku_free(brt->skey); }
if (brt->sval) { toku_free(brt->sval); }
toku_free(brt);
return 0;
}
int toku_brt_flush (BRT brt) {
return toku_cachefile_flush(brt->cf);
}
int toku_brt_debug_mode = 0;//strcmp(key,"hello387")==0;
CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *roothash) {
if (brt->database_name==0) {
assert(brt->h->n_named_roots==-1);
*roothash = get_roothash(brt, 0);
return &brt->h->roots[0];
} else {
int i;
for (i=0; i<brt->h->n_named_roots; i++) {
if (strcmp(brt->database_name, brt->h->names[i])==0) {
*roothash = get_roothash(brt, i);
return &brt->h->roots[i];
}
}
}
abort(); return 0; // make certain compilers happy
}
int toku_cachefile_root_put_cmd (CACHEFILE cf, BRT_CMD cmd, TOKULOGGER logger) {
int r;
struct brt_header *h = toku_cachefile_get_userdata(cf);
assert(h);
r = toku_fifo_enq_cmdstruct(h->fifo, cmd);
if (r!=0) return r;
{
BYTESTRING keybs = {.len=cmd->u.id.key->size, .data=cmd->u.id.key->data};
BYTESTRING valbs = {.len=cmd->u.id.val->size, .data=cmd->u.id.val->data};
r = toku_log_enqrootentry(logger, (LSN*)0, 0, toku_cachefile_filenum(cf), cmd->xid, cmd->type, keybs, valbs);
if (r!=0) return r;
}
return 0;
}
//strcmp(key,"hello387")==0;
static int push_something_simple(BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT_CMD cmd, TOKULOGGER logger) {
BRTNODE node = *nodep;
@ -2512,13 +2183,6 @@ toku_brt_search (BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULO
return r;
}
static inline void dbt_cleanup(DBT *dbt) {
if (dbt->data && ( (dbt->flags & DB_DBT_REALLOC)
|| (dbt->flags & DB_DBT_MALLOC))) {
toku_free_n(dbt->data, dbt->size); dbt->data = 0;
}
}
static BOOL brt_cursor_not_set(BRT_CURSOR cursor) {
return (BOOL)((cursor->key.data == 0)
||
@ -2591,24 +2255,6 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
return 0;
}
int toku_brt_cursor_close(BRT_CURSOR cursor) {
if (!cursor->current_in_omt) {
dbt_cleanup(&cursor->key);
dbt_cleanup(&cursor->val);
}
if (!cursor->prev_in_omt) {
dbt_cleanup(&cursor->prevkey);
dbt_cleanup(&cursor->prevval);
}
if (cursor->skey) toku_free(cursor->skey);
if (cursor->sval) toku_free(cursor->sval);
list_remove(&cursor->cursors_link);
toku_omt_cursor_set_invalidate_callback(cursor->omtcursor, NULL, NULL);
toku_omt_cursor_destroy(&cursor->omtcursor);
toku_free_n(cursor, sizeof *cursor);
return 0;
}
DBT *brt_cursor_peek_prev_key(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the previous key.
// Requires: The caller may not modify that DBT or the memory at which it points.
@ -3304,48 +2950,3 @@ int toku_brt_get_cursor_count (BRT brt) {
return n;
}
struct omt_compressor_state {
struct mempool *new_kvspace;
OMT omt;
};
static int move_it (OMTVALUE lev, u_int32_t idx, void *v) {
LEAFENTRY le=lev;
struct omt_compressor_state *oc = v;
u_int32_t size = leafentry_memsize(le);
LEAFENTRY newdata = toku_mempool_malloc(oc->new_kvspace, size, 1);
assert(newdata); // we do this on a fresh mempool, so nothing bad shouldhapepn
memcpy(newdata, le, size);
toku_omt_set_at(oc->omt, newdata, idx);
return 0;
}
// Compress things, and grow the mempool if needed.
static int omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_size) {
u_int32_t total_size_needed = memp->free_offset-memp->frag_size + added_size;
if (total_size_needed+total_size_needed/4 >= memp->size) {
memp->size = total_size_needed+total_size_needed/4;
}
void *newmem = toku_malloc(memp->size);
if (newmem == 0)
return ENOMEM;
struct mempool new_kvspace;
toku_mempool_init(&new_kvspace, newmem, memp->size);
struct omt_compressor_state oc = { &new_kvspace, omt };
toku_omt_iterate(omt, move_it, &oc);
toku_free(memp->base);
*memp = new_kvspace;
return 0;
}
void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size) {
void *v = toku_mempool_malloc(mp, size, 1);
if (v==0) {
if (0 == omt_compress_kvspace(omt, mp, size)) {
v = toku_mempool_malloc(mp, size, 1);
assert(v);
}
}
return v;
}

View file

@ -199,6 +199,7 @@ get_leaf_reactivity (BRTNODE node) {
static int
brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum reactivity *re, BOOL *did_io);
int toku_brt_debug_mode = 0;
//#define SLOW
#ifdef SLOW
@ -295,6 +296,104 @@ brtnode_memory_size (BRTNODE node)
}
}
int toku_unpin_brtnode (BRT brt, BRTNODE node) {
// if (node->dirty && txn) {
// // For now just update the log_lsn. Later we'll have to deal with the checksums.
// node->log_lsn = toku_txn_get_last_lsn(txn);
// //if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn);
// }
VERIFY_NODE(brt,node);
return toku_cachetable_unpin(brt->cf, node->thisnodename, node->fullhash, node->dirty, brtnode_memory_size(node));
}
void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size __attribute__((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) {
struct brt_header *h = extraargs;
BRTNODE brtnode = brtnode_v;
// if ((write_me || keep_me) && (brtnode->height==0)) {
// toku_pma_verify_fingerprint(brtnode->u.l.buffer, brtnode->rand4fingerprint, brtnode->subtree_fingerprint);
// }
if (0) {
printf("%s:%d toku_brtnode_flush_callback %p thisnodename=%" PRId64 " keep_me=%u height=%d", __FILE__, __LINE__, brtnode, brtnode->thisnodename.b, keep_me, brtnode->height);
if (brtnode->height==0) printf(" buf=%p mempool-base=%p", brtnode->u.l.buffer, brtnode->u.l.buffer_mempool.base);
printf("\n");
}
//if (modified_lsn.lsn > brtnode->lsn.lsn) brtnode->lsn=modified_lsn;
assert(brtnode->thisnodename.b==nodename.b);
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (write_me) {
toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode, h);
}
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (!keep_me) {
toku_brtnode_free(&brtnode);
}
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
}
int toku_brtnode_fetch_callback (CACHEFILE cachefile, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn) {
assert(extraargs);
struct brt_header *h = extraargs;
BRTNODE *result=(BRTNODE*)brtnode_pv;
int r = toku_deserialize_brtnode_from(toku_cachefile_fd(cachefile), nodename, fullhash, result, h);
if (r == 0) {
*sizep = brtnode_memory_size(*result);
*written_lsn = (*result)->disk_lsn;
}
//(*result)->parent_brtnode = 0; /* Don't know it right now. */
//printf("%s:%d installed %p (offset=%lld)\n", __FILE__, __LINE__, *result, nodename);
return r;
}
static int
leafval_heaviside_le_committed (u_int32_t klen, void *kval,
u_int32_t dlen, void *dval,
struct cmd_leafval_heaviside_extra *be) {
BRT t = be->t;
DBT dbt;
int cmp = t->compare_fun(t->db,
toku_fill_dbt(&dbt, kval, klen),
be->cmd->u.id.key);
if (cmp == 0 && be->compare_both_keys && be->cmd->u.id.val->data) {
return t->dup_compare(t->db,
toku_fill_dbt(&dbt, dval, dlen),
be->cmd->u.id.val);
} else {
return cmp;
}
}
static int
leafval_heaviside_le_both (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t clen __attribute__((__unused__)), void *cval __attribute__((__unused__)),
u_int32_t plen, void *pval,
struct cmd_leafval_heaviside_extra *be) {
return leafval_heaviside_le_committed(klen, kval, plen, pval, be);
}
static int
leafval_heaviside_le_provdel (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t clen, void *cval,
struct cmd_leafval_heaviside_extra *be) {
return leafval_heaviside_le_committed(klen, kval, clen, cval, be);
}
static int
leafval_heaviside_le_provpair (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t plen, void *pval,
struct cmd_leafval_heaviside_extra *be) {
return leafval_heaviside_le_committed(klen, kval, plen, pval, be);
}
int toku_cmd_leafval_heaviside (OMTVALUE lev, void *extra) {
LEAFENTRY le=lev;
struct cmd_leafval_heaviside_extra *be = extra;
LESWITCHCALL(le, leafval_heaviside, be);
abort(); return 0; // make certain compilers happy
}
// If you pass in data==0 then it only compares the key, not the data (even if is a DUPSORT database)
static int
brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck)
@ -327,6 +426,70 @@ static int log_and_save_brtenq(TOKULOGGER logger, BRT t, BRTNODE node, int child
return 0;
}
static int
verify_in_mempool (OMTVALUE lev, u_int32_t UU(idx), void *vmp)
{
LEAFENTRY le=lev;
struct mempool *mp=vmp;
assert(toku_mempool_inrange(mp, le, leafentry_memsize(le)));
return 0;
}
void
toku_verify_all_in_mempool (BRTNODE node)
{
if (node->height==0) {
toku_omt_iterate(node->u.l.buffer, verify_in_mempool, &node->u.l.buffer_mempool);
}
}
/* Frees a node, including all the stuff in the hash table. */
void toku_brtnode_free (BRTNODE *nodep) {
BRTNODE node=*nodep;
int i;
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, node, node->mdicts[0]);
if (node->height>0) {
for (i=0; i<node->u.n.n_children-1; i++) {
toku_free(node->u.n.childkeys[i]);
}
for (i=0; i<node->u.n.n_children; i++) {
if (BNC_BUFFER(node,i)) {
toku_fifo_free(&BNC_BUFFER(node,i));
}
}
toku_free(node->u.n.childkeys);
toku_free(node->u.n.childinfos);
} else {
if (node->u.l.buffer) // The buffer may have been freed already, in some cases.
toku_omt_destroy(&node->u.l.buffer);
void *mpbase = toku_mempool_get_base(&node->u.l.buffer_mempool);
toku_mempool_fini(&node->u.l.buffer_mempool);
toku_free(mpbase);
}
toku_free(node);
*nodep=0;
}
void toku_brtheader_free (struct brt_header *h) {
if (h->n_named_roots>0) {
int i;
for (i=0; i<h->n_named_roots; i++) {
toku_free(h->names[i]);
}
toku_free(h->names);
}
toku_fifo_free(&h->fifo);
toku_free(h->roots);
toku_free(h->root_hashes);
toku_free(h->flags_array);
toku_free(h->block_translation);
destroy_block_allocator(&h->block_allocator);
toku_free(h);
}
static int
allocate_diskblocknumber (BLOCKNUM *res, BRT brt, TOKULOGGER logger __attribute__((__unused__))) {
assert(brt->h->free_blocks.b == -1); // no blocks in the free list
@ -453,6 +616,30 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
return 0;
}
// logs the memory allocation, but not the creation of the new node
int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logger) {
TAGMALLOC(BRTNODE, n);
int r;
BLOCKNUM name;
r = allocate_diskblocknumber (&name, t, logger);
assert(r==0);
assert(n);
assert(t->h->nodesize>0);
n->ever_been_written = 0;
initialize_empty_brtnode(t, n, name, height);
*result = n;
assert(n->nodesize>0);
// n->brt = t;
//printf("%s:%d putting %p (%lld)\n", __FILE__, __LINE__, n, n->thisnodename);
u_int32_t fullhash = toku_cachetable_hash(t->cf, n->thisnodename);
n->fullhash = fullhash;
r=toku_cachetable_put(t->cf, n->thisnodename, fullhash,
n, brtnode_memory_size(n),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t->h);
assert(r==0);
return 0;
}
static int
fill_buf (OMTVALUE lev, u_int32_t idx, void *varray)
{
@ -1301,7 +1488,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
u_int32_t idx;
int r;
int compare_both = should_compare_both_keys(node, cmd);
struct cmd_leafval_bessel_extra be = {t, cmd, compare_both};
struct cmd_leafval_heaviside_extra be = {t, cmd, compare_both};
//static int counter=0;
//counter++;
@ -1314,12 +1501,12 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
r = toku_omt_fetch(node->u.l.buffer, idx-1, &storeddatav, NULL);
if (r != 0) goto fz;
storeddata = storeddatav;
int cmp = toku_cmd_leafval_bessel(storeddata, &be);
int cmp = toku_cmd_leafval_heaviside(storeddata, &be);
if (cmp >= 0) goto fz;
r = DB_NOTFOUND;
} else {
fz:
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be,
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx, NULL);
}
if (r==DB_NOTFOUND) {
@ -1354,7 +1541,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
case BRT_COMMIT_BOTH:
// Delete the one item
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be,
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx, NULL);
if (r == DB_NOTFOUND) break;
if (r != 0) return r;
@ -1375,7 +1562,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
case BRT_COMMIT_ANY:
// Delete all the matches
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be,
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx, NULL);
if (r == DB_NOTFOUND) break;
if (r != 0) return r;
@ -1391,8 +1578,8 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
// Now we must find the next one.
DBT valdbt;
BRT_CMD_S ncmd = { cmd->type, cmd->xid, .u.id={cmd->u.id.key, toku_fill_dbt(&valdbt, save_val, vallen)}};
struct cmd_leafval_bessel_extra nbe = {t, &ncmd, 1};
r = toku_omt_find(node->u.l.buffer, toku_cmd_leafval_bessel, &nbe, +1,
struct cmd_leafval_heaviside_extra nbe = {t, &ncmd, 1};
r = toku_omt_find(node->u.l.buffer, toku_cmd_leafval_heaviside, &nbe, +1,
&storeddatav, &idx, NULL);
toku_free(save_val);
@ -1464,6 +1651,30 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
return 0;
}
/* find the leftmost child that may contain the key */
unsigned int toku_brtnode_which_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
int i;
assert(node->height>0);
#define DO_PIVOT_SEARCH_LR 0
#if DO_PIVOT_SEARCH_LR
for (i=0; i<node->u.n.n_children-1; i++) {
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]);
if (cmp > 0) continue;
if (cmp < 0) return i;
return i;
}
return node->u.n.n_children-1;
#else
// give preference for appending to the dictionary. no change for
// random keys
for (i = node->u.n.n_children-2; i >= 0; i--) {
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]);
if (cmp > 0) return i+1;
}
return 0;
#endif
}
static int brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
enum reactivity re_array[], BOOL *did_io)
// Effect: Insert a message into a nonleaf. We may put it into a child, possibly causing the child to become reactive.
@ -1658,6 +1869,48 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT
return -1;
}
static void compute_and_fill_remembered_hash (BRT brt, int rootnum) {
struct remembered_hash *rh = &brt->h->root_hashes[rootnum];
assert(brt->cf); // if cf is null, we'll be hosed.
rh->valid = TRUE;
rh->fnum=toku_cachefile_filenum(brt->cf);
rh->root=brt->h->roots[rootnum];
rh->fullhash = toku_cachetable_hash(brt->cf, rh->root);
}
static u_int32_t get_roothash (BRT brt, int rootnum) {
struct remembered_hash *rh = &brt->h->root_hashes[rootnum];
BLOCKNUM root = brt->h->roots[rootnum];
// compare cf first, since cf is NULL for invalid entries.
assert(rh);
//printf("v=%d\n", rh->valid);
if (rh->valid) {
//printf("f=%d\n", rh->fnum.fileid);
//printf("cf=%d\n", toku_cachefile_filenum(brt->cf).fileid);
if (rh->fnum.fileid == toku_cachefile_filenum(brt->cf).fileid)
if (rh->root.b == root.b)
return rh->fullhash;
}
compute_and_fill_remembered_hash(brt, rootnum);
return rh->fullhash;
}
CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *roothash) {
if (brt->database_name==0) {
assert(brt->h->n_named_roots==-1);
*roothash = get_roothash(brt, 0);
return &brt->h->roots[0];
} else {
int i;
for (i=0; i<brt->h->n_named_roots; i++) {
if (strcmp(brt->database_name, brt->h->names[i])==0) {
*roothash = get_roothash(brt, i);
return &brt->h->roots[i];
}
}
}
abort(); return 0; // make certain compilers happy
}
int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger)
// Effect: Flush the root fifo into the brt, and then push the cmd into the brt.
@ -1698,6 +1951,21 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger)
return 0;
}
int toku_cachefile_root_put_cmd (CACHEFILE cf, BRT_CMD cmd, TOKULOGGER logger) {
int r;
struct brt_header *h = toku_cachefile_get_userdata(cf);
assert(h);
r = toku_fifo_enq_cmdstruct(h->fifo, cmd);
if (r!=0) return r;
{
BYTESTRING keybs = {.len=cmd->u.id.key->size, .data=cmd->u.id.key->data};
BYTESTRING valbs = {.len=cmd->u.id.val->size, .data=cmd->u.id.val->data};
r = toku_log_enqrootentry(logger, (LSN*)0, 0, toku_cachefile_filenum(cf), cmd->xid, cmd->type, keybs, valbs);
if (r!=0) return r;
}
return 0;
}
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn)
// Effect: Insert the key-val pair into brt.
{
@ -1717,3 +1985,141 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn)
return r;
}
struct omt_compressor_state {
struct mempool *new_kvspace;
OMT omt;
};
static int move_it (OMTVALUE lev, u_int32_t idx, void *v) {
LEAFENTRY le=lev;
struct omt_compressor_state *oc = v;
u_int32_t size = leafentry_memsize(le);
LEAFENTRY newdata = toku_mempool_malloc(oc->new_kvspace, size, 1);
assert(newdata); // we do this on a fresh mempool, so nothing bad shouldhapepn
memcpy(newdata, le, size);
toku_omt_set_at(oc->omt, newdata, idx);
return 0;
}
// Compress things, and grow the mempool if needed.
static int omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_size) {
u_int32_t total_size_needed = memp->free_offset-memp->frag_size + added_size;
if (total_size_needed+total_size_needed/4 >= memp->size) {
memp->size = total_size_needed+total_size_needed/4;
}
void *newmem = toku_malloc(memp->size);
if (newmem == 0)
return ENOMEM;
struct mempool new_kvspace;
toku_mempool_init(&new_kvspace, newmem, memp->size);
struct omt_compressor_state oc = { &new_kvspace, omt };
toku_omt_iterate(omt, move_it, &oc);
toku_free(memp->base);
*memp = new_kvspace;
return 0;
}
void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size) {
void *v = toku_mempool_malloc(mp, size, 1);
if (v==0) {
if (0 == omt_compress_kvspace(omt, mp, size)) {
v = toku_mempool_malloc(mp, size, 1);
assert(v);
}
}
return v;
}
/* ******************** open,close and create ********************** */
int toku_brtheader_close (CACHEFILE cachefile, void *header_v) {
struct brt_header *h = header_v;
//printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__,
// block_allocator_allocated_limit(h->block_allocator), h->unused_blocks.b*h->nodesize);
if (h->dirty) {
toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h);
u_int64_t write_to = block_allocator_allocated_limit(h->block_allocator); // Must compute this after writing the header.
//printf("%s:%d fifo written to %lu\n", __FILE__, __LINE__, write_to);
toku_serialize_fifo_at(toku_cachefile_fd(cachefile), write_to, h->fifo);
}
toku_brtheader_free(h);
return 0;
}
int toku_close_brt (BRT brt, TOKULOGGER logger) {
int r;
while (!list_empty(&brt->cursors)) {
BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link);
r=toku_brt_cursor_close(c);
if (r!=0) return r;
}
// Must do this work before closing the cf
r=toku_txn_note_close_brt(brt);
assert(r==0);
toku_omt_destroy(&brt->txns);
if (brt->cf) {
if (logger) {
assert(brt->fname);
BYTESTRING bs = {.len=strlen(brt->fname), .data=brt->fname};
LSN lsn;
r = toku_log_brtclose(logger, &lsn, 1, bs, toku_cachefile_filenum(brt->cf)); // flush the log on close, otherwise it might not make it out.
if (r!=0) return r;
}
assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // For the brt, the pinned count should be zero.
//printf("%s:%d closing cachetable\n", __FILE__, __LINE__);
// printf("%s:%d brt=%p ,brt->h=%p\n", __FILE__, __LINE__, brt, brt->h);
if ((r = toku_cachefile_close(&brt->cf, logger))!=0) return r;
}
if (brt->database_name) toku_free(brt->database_name);
if (brt->fname) toku_free(brt->fname);
if (brt->skey) { toku_free(brt->skey); }
if (brt->sval) { toku_free(brt->sval); }
toku_free(brt);
return 0;
}
int toku_brt_create(BRT *brt_ptr) {
BRT brt = toku_malloc(sizeof *brt);
if (brt == 0)
return ENOMEM;
memset(brt, 0, sizeof *brt);
list_init(&brt->cursors);
brt->flags = 0;
brt->did_set_flags = 0;
brt->nodesize = BRT_DEFAULT_NODE_SIZE;
brt->compare_fun = toku_default_compare_fun;
brt->dup_compare = toku_default_compare_fun;
int r = toku_omt_create(&brt->txns);
if (r!=0) { toku_free(brt); return r; }
*brt_ptr = brt;
return 0;
}
/* ************* CURSORS ********************* */
static inline void dbt_cleanup(DBT *dbt) {
if (dbt->data && ( (dbt->flags & DB_DBT_REALLOC)
|| (dbt->flags & DB_DBT_MALLOC))) {
toku_free_n(dbt->data, dbt->size); dbt->data = 0;
}
}
int toku_brt_cursor_close(BRT_CURSOR cursor) {
if (!cursor->current_in_omt) {
dbt_cleanup(&cursor->key);
dbt_cleanup(&cursor->val);
}
if (!cursor->prev_in_omt) {
dbt_cleanup(&cursor->prevkey);
dbt_cleanup(&cursor->prevval);
}
if (cursor->skey) toku_free(cursor->skey);
if (cursor->sval) toku_free(cursor->sval);
list_remove(&cursor->cursors_link);
toku_omt_cursor_set_invalidate_callback(cursor->omtcursor, NULL, NULL);
toku_omt_cursor_destroy(&cursor->omtcursor);
toku_free_n(cursor, sizeof *cursor);
return 0;
}