Addresses #1603 Root fifo is gone.

git-svn-id: file:///svn/toku/tokudb@11165 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Yoni Fogel 2013-04-16 23:57:48 -04:00
parent ca0865e525
commit 5c108195c3
12 changed files with 48 additions and 295 deletions

View file

@ -36,10 +36,9 @@ struct translation { //This is the BTT (block translation table)
//Unmovable reserved first, then reallocable.
// We reserve one blocknum for the translation table itself.
enum {RESERVED_BLOCKNUM_NULL=0,
enum {RESERVED_BLOCKNUM_NULL =0,
RESERVED_BLOCKNUM_TRANSLATION=1,
RESERVED_BLOCKNUM_FIFO=2,
RESERVED_BLOCKNUM_DESCRIPTOR=3,
RESERVED_BLOCKNUM_DESCRIPTOR =2,
RESERVED_BLOCKNUMS};
static const BLOCKNUM freelist_null = {-1}; // in a freelist, this indicates end of list
@ -135,9 +134,6 @@ copy_translation(struct translation * dst, struct translation * src, enum transl
//New version of btt is not yet stored on disk.
dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].size = 0;
dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].u.diskoff = diskoff_unused;
//New version of fifo is not yet stored on disk
dst->block_translation[RESERVED_BLOCKNUM_FIFO].size = 0;
dst->block_translation[RESERVED_BLOCKNUM_FIFO].u.diskoff = diskoff_unused;
}
// block table must be locked by caller of this function
@ -377,36 +373,6 @@ toku_blocknum_realloc_on_disk (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF
unlock_for_blocktable(bt);
}
void
toku_get_fifo_offset_on_disk(BLOCK_TABLE bt, DISKOFF *offset) {
lock_for_blocktable(bt);
struct translation *t = &bt->checkpointed;
BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_FIFO);
verify_valid_blocknum(t, b);
*offset = t->block_translation[b.b].u.diskoff;
unlock_for_blocktable(bt);
}
void
toku_realloc_fifo_on_disk_unlocked (BLOCK_TABLE bt, DISKOFF size, DISKOFF *offsetp) {
assert(bt->is_locked);
struct translation *t = &bt->inprogress;
assert(t->block_translation);
BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_FIFO);
struct block_translation_pair old_pair = t->block_translation[b.b];
//Each inprogress is allocated only once, no freeing required.
assert(old_pair.size == 0 && old_pair.u.diskoff == diskoff_unused);
//Allocate a new block
u_int64_t offset;
block_allocator_alloc_block(bt->block_allocator, size, &offset);
PRNTF("blokAllokator", 1L, size, offset, bt);
t->block_translation[b.b].u.diskoff = offset;
t->block_translation[b.b].size = size;
*offsetp = offset;
}
// Purpose of this function is to figure out where to put the inprogress btt on disk, allocate space for it there.
static void
blocknum_alloc_translation_on_disk_unlocked (BLOCK_TABLE bt) {
@ -428,11 +394,8 @@ PRNTF("blokAllokator", 1L, size, offset, bt);
t->block_translation[b.b].size = size;
}
// TODO: Revisit this after deciding on how to write header/btt/root-fifo to disk
//Fills wbuf with bt
//A clean shutdown runs checkpoint start so that current and inprogress are copies.
//
//
void
toku_serialize_translation_to_wbuf_unlocked(BLOCK_TABLE bt, struct wbuf *w,
int64_t *address, int64_t *size) {

View file

@ -34,8 +34,6 @@ void toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, struct brt_h
void toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *b, struct brt_header * h);
void toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b);
void toku_block_verify_no_free_blocknums(BLOCK_TABLE bt);
void toku_realloc_fifo_on_disk_unlocked (BLOCK_TABLE, DISKOFF size, DISKOFF *offset);
void toku_get_fifo_offset_on_disk(BLOCK_TABLE bt, DISKOFF *offset);
void toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, struct brt_header * h);
void toku_get_descriptor_offset_size(BLOCK_TABLE bt, DISKOFF *offset, DISKOFF *size);

View file

@ -170,8 +170,6 @@ struct brt_header {
unsigned int flags;
DBT descriptor;
FIFO fifo; // all the abort and commit commands. If the header gets flushed to disk, we write the fifo contents beyond the unused_memory.
u_int64_t root_put_counter; // the generation number of the brt
BLOCK_TABLE blocktable;
@ -192,7 +190,7 @@ struct brt {
DBT temp_descriptor;
int (*compare_fun)(DB*,const DBT*,const DBT*);
int (*dup_compare)(DB*,const DBT*,const DBT*);
DB *db; // To pass to the compare fun
DB *db; // To pass to the compare fun, and close once transactions are done.
OMT txns; // transactions that are using this OMT (note that the transaction checks the cf also)
@ -200,6 +198,9 @@ struct brt {
// If a transaction locked the BRT when it was empty, which transaction? (Only the latest one matters)
// 0 if no such transaction
TXNID txnid_that_created_or_locked_when_empty;
int was_closed; //True when this brt was closed, but is being kept around for transactions.
int (*close_db)(DB*, u_int32_t);
u_int32_t close_flags;
};
/* serialization code */
@ -216,9 +217,6 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h, int6
int toku_deserialize_brtheader_from (int fd, struct brt_header **brth);
int toku_serialize_descriptor_contents_to_fd(int fd, DBT *desc, DISKOFF offset);
int toku_serialize_fifo_at (int fd, toku_off_t freeoff, FIFO fifo, u_int64_t fifo_size); // Write a fifo into a disk, without worrying about fitting it into a block. This write is done at the end of the file.
u_int64_t toku_fifo_get_serialized_size (FIFO fifo);
void toku_brtnode_free (BRTNODE *node);
#if 1
@ -299,7 +297,6 @@ struct cmd_leafval_heaviside_extra {
int toku_cmd_leafval_heaviside (OMTVALUE leafentry, void *extra);
int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger);
int toku_cachefile_root_put_cmd (CACHEFILE cf, BRT_CMD cmd, TOKULOGGER logger);
void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size, void **maybe_free);
// Effect: Allocate a new object of size SIZE in MP. If MP runs out of space, allocate new a new mempool space, and copy all the items

View file

@ -157,8 +157,6 @@ static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokulea
4+ // localfingerprint
4); // crc32 at the end
static int deserialize_fifo_at (int fd, toku_off_t at, FIFO *fifo);
static int
addupsize (OMTVALUE lev, u_int32_t UU(idx), void *vp) {
LEAFENTRY le=lev;
@ -1183,12 +1181,6 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) {
if (rc.ndone!=rc.size) {ret = EINVAL; goto died1;}
toku_free(rc.buf);
rc.buf = NULL;
{
int r;
DISKOFF offset;
toku_get_fifo_offset_on_disk(h->blocktable, &offset);
if ((r = deserialize_fifo_at(fd, offset, &h->fifo))) return r;
}
*brth = h;
return 0;
}
@ -1301,71 +1293,6 @@ unsigned int toku_brtnode_pivot_key_len (BRTNODE node, struct kv_pair *pk) {
}
}
u_int64_t
toku_fifo_get_serialized_size (FIFO fifo) {
//printf("%s:%d Serializing fifo at %" PRId64 " (count=%d)\n", __FILE__, __LINE__, freeoff, toku_fifo_n_entries(fifo));
u_int64_t size = 0;
size += 4; //wbuf_int(&w, toku_fifo_n_entries(fifo));
FIFO_ITERATE(fifo, UU(key), keylen, UU(val), vallen, UU(type), UU(xid),
{
size_t size_entry=keylen+vallen+1+8+4+4;
size += size_entry;
});
return size;
}
// To serialize the fifo, we just write it all at the end of the file.
// For now, just do all the writes as separate system calls. This function is hardly ever called, and
// we might not be able to allocate a large enough buffer to hold everything,
// and it would be more complex to batch up several writes.
int toku_serialize_fifo_at (int fd, toku_off_t freeoff_start, FIFO fifo, u_int64_t fifo_size) {
//printf("%s:%d Serializing fifo at %" PRId64 " (count=%d)\n", __FILE__, __LINE__, freeoff, toku_fifo_n_entries(fifo));
toku_off_t freeoff = freeoff_start;
lock_for_pwrite();
{
enum { size=4 };
char buf[size];
struct wbuf w;
wbuf_init(&w, buf, size);
wbuf_int(&w, toku_fifo_n_entries(fifo));
ssize_t nwrote;
int r = toku_pwrite_extend(fd, w.buf, size, freeoff, &nwrote);
if (r) {
unlock_for_pwrite();
return r;
}
assert(nwrote==size);
freeoff+=size;
}
FIFO_ITERATE(fifo, key, keylen, val, vallen, type, xid,
{
size_t size=keylen+vallen+1+8+4+4;
char *MALLOC_N(size, buf);
assert(buf!=0);
struct wbuf w;
wbuf_init(&w, buf, size);
assert(type>=0 && type<256);
wbuf_char(&w, (unsigned char)type);
wbuf_TXNID(&w, xid);
wbuf_bytes(&w, key, keylen);
//printf("%s:%d Writing %d bytes: %s\n", __FILE__, __LINE__, vallen, (char*)val);
wbuf_bytes(&w, val, vallen);
assert(w.ndone==size);
ssize_t nwrote;
int r = toku_pwrite_extend(fd, w.buf, (size_t)size, freeoff, &nwrote);
if (r) {
unlock_for_pwrite();
return r;
}
assert((ssize_t)size==nwrote);
freeoff+=size;
toku_free(buf);
});
assert(freeoff-freeoff_start == (toku_off_t)fifo_size);
unlock_for_pwrite();
return 0;
}
static int
read_int (int fd, toku_off_t *at, u_int32_t *result) {
int v;
@ -1377,15 +1304,7 @@ read_int (int fd, toku_off_t *at, u_int32_t *result) {
return 0;
}
static int
read_char (int fd, toku_off_t *at, char *result) {
ssize_t r = pread(fd, result, 1, *at);
if (r<0) return errno;
assert(r==1);
(*at)++;
return 0;
}
static int read_u_int64_t UU((int fd, toku_off_t *at, u_int64_t *result));
static int
read_u_int64_t (int fd, toku_off_t *at, u_int64_t *result) {
u_int32_t v1=0,v2=0;
@ -1396,47 +1315,6 @@ read_u_int64_t (int fd, toku_off_t *at, u_int64_t *result) {
return 0;
}
static int
read_nbytes (int fd, toku_off_t *at, char **data, u_int32_t len) {
char *result = toku_malloc(len);
if (result==0) return errno;
ssize_t r = pread(fd, result, len, *at);
//printf("%s:%d read %d bytes at %" PRId64 ", which are %s\n", __FILE__, __LINE__, len, *at, result);
if (r<0) return errno;
assert(r==(ssize_t)len);
(*at)+=len;
*data=result;
return 0;
}
static int deserialize_fifo_at (int fd, toku_off_t at, FIFO *fifo) {
FIFO result;
int r = toku_fifo_create(&result);
if (r) return r;
u_int32_t count=0;
if ((r=read_int(fd, &at, &count))) return r;
u_int32_t i;
for (i=0; i<count; i++) {
char type;
TXNID xid;
u_int32_t keylen=0, vallen=0;
char *key=0, *val=0;
if ((r=read_char(fd, &at, &type))) return r;
if ((r=read_u_int64_t(fd, &at, &xid))) return r;
if ((r=read_int(fd, &at, &keylen))) return r;
if ((r=read_nbytes(fd, &at, &key, keylen))) return r;
if ((r=read_int(fd, &at, &vallen))) return r;
if ((r=read_nbytes(fd, &at, &val, vallen))) return r;
//printf("%s:%d read %d byte key, key=%s\n dlen=%d data=%s\n", __FILE__, __LINE__, keylen, key, vallen, val);
if ((r=toku_fifo_enq(result, key, keylen, val, vallen, type, xid))) return r;
toku_free(key);
toku_free(val);
}
*fifo = result;
//printf("%s:%d *fifo=%p\n", __FILE__, __LINE__, result);
return 0;
}
int toku_db_badformat(void) {
return DB_BADFORMAT;
}

View file

@ -593,24 +593,10 @@ void toku_brtnode_free (BRTNODE *nodep) {
*nodep=0;
}
static void
brtheader_partial_destroy(struct brt_header *h) {
if (h->type == BRTHEADER_CHECKPOINT_INPROGRESS) {
//Share fifo till #1603
h->fifo = NULL;
}
else {
assert(h->type == BRTHEADER_CURRENT);
toku_fifo_free(&h->fifo); //TODO: #1603 delete
}
}
static void
brtheader_destroy(struct brt_header *h) {
if (!h->panic) assert(!h->checkpoint_header);
brtheader_partial_destroy(h);
//header and checkpoint_header have same Blocktable pointer
//cannot destroy since it is still in use by CURRENT
if (h->type == BRTHEADER_CHECKPOINT_INPROGRESS) h->blocktable = NULL;
@ -2647,16 +2633,6 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger)
VERIFY_NODE(brt, node);
assert(node->fullhash==fullhash);
// push the fifo stuff
{
DBT okey,odata;
BRT_CMD_S ocmd;
while (0==toku_fifo_peek_cmdstruct(brt->h->fifo, &ocmd, &okey, &odata)) {
if ((r = push_something_at_root(brt, &node, rootp, &ocmd, logger))) return r;
r = toku_fifo_deq(brt->h->fifo);
assert(r==0);
}
}
VERIFY_NODE(brt, node);
verify_local_fingerprint_nonleaf(node);
@ -2670,21 +2646,6 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger)
return 0;
}
int toku_cachefile_root_put_cmd (CACHEFILE cf, BRT_CMD cmd, TOKULOGGER logger) {
int r;
struct brt_header *h = toku_cachefile_get_userdata(cf);
assert(h);
r = toku_fifo_enq_cmdstruct(h->fifo, cmd);
if (r!=0) return r;
{
BYTESTRING keybs = {.len=cmd->u.id.key->size, .data=cmd->u.id.key->data};
BYTESTRING valbs = {.len=cmd->u.id.val->size, .data=cmd->u.id.val->data};
r = toku_log_enqrootentry(logger, (LSN*)0, 0, toku_cachefile_filenum(cf), cmd->xid, cmd->type, keybs, valbs);
if (r!=0) return r;
}
return 0;
}
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn)
// Effect: Insert the key-val pair into brt.
{
@ -2867,7 +2828,6 @@ brt_init_header_partial (BRT t) {
compute_and_fill_remembered_hash(t);
toku_fifo_create(&t->h->fifo);
t->h->root_put_counter = global_root_put_counter++;
#if 0 //TODO: logged header logic //TODO: #1605
@ -3110,19 +3070,7 @@ toku_brtheader_begin_checkpoint (CACHEFILE UU(cachefile), LSN checkpoint_lsn, vo
brtheader_copy_for_checkpoint(h, checkpoint_lsn);
h->dirty = 0; // this is only place this bit is cleared (in currentheader)
toku_block_translation_note_start_checkpoint_unlocked(h->blocktable);
//FIFO handling
toku_off_t write_to;
struct brt_header *ch = h->checkpoint_header;
//TODO: #1616 Delete code handling root fifo.
//We would want retrieving 'write_to' and writing to that point to be
//atomic. This is only done during checkpoint of a BRT, so we allow it (not good?).
//fifo
u_int64_t fifo_size = toku_fifo_get_serialized_size (ch->fifo);
toku_realloc_fifo_on_disk_unlocked (ch->blocktable, fifo_size, &write_to);
//printf("%s:%d fifo written to %lu\n", __FILE__, __LINE__, write_to);
toku_block_unlock_for_multiple_operations (h->blocktable);
r = toku_serialize_fifo_at(toku_cachefile_fd(cachefile), write_to, ch->fifo, fifo_size);
}
return r;
}
@ -3216,7 +3164,29 @@ toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error
return r;
}
int
toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t), u_int32_t close_flags) {
//Requires: close_db needs to call toku_close_brt to delete the final reference.
int r;
if (brt->was_closed) r = EINVAL;
else if (brt->db && brt->db!=db) r = EINVAL;
else {
assert(brt->close_db==NULL);
brt->close_db = close_db;
brt->close_flags = close_flags;
brt->was_closed = 1;
if (!brt->db) brt->db = db;
if (toku_omt_size(brt->txns) == 0) {
//Close immediately.
r = brt->close_db(brt->db, brt->close_flags);
}
else r = 0;
}
return r;
}
int toku_close_brt (BRT brt, TOKULOGGER logger, char **error_string) {
assert(toku_omt_size(brt->txns)==0);
int r;
while (!list_empty(&brt->cursors)) {
BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link);
@ -3753,17 +3723,6 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTI
BRTNODE node = node_v;
// push the fifo stuff
{
DBT okey,odata;
BRT_CMD_S ocmd;
while (0==toku_fifo_peek_cmdstruct(brt->h->fifo, &ocmd, &okey, &odata)) {
if ((r = push_something_at_root(brt, &node, rootp, &ocmd, logger))) return r;
r = toku_fifo_deq(brt->h->fifo);
assert(r==0);
}
}
{
enum reactivity re = RE_STABLE;
BOOL doprefetch = FALSE;
@ -4711,9 +4670,7 @@ int toku_brt_truncate (BRT brt) {
toku_block_translation_truncate_unlocked(brt->h->blocktable, brt->h);
//Assign blocknum for root block, also dirty the header
toku_allocate_blocknum_unlocked(brt->h->blocktable, &brt->h->root, brt->h);
// reinit the header
brtheader_partial_destroy(brt->h);
r = brt_init_header_partial(brt);
}

View file

@ -45,6 +45,7 @@ int toku_brt_insert (BRT, DBT *, DBT *, TOKUTXN);
int toku_brt_lookup (BRT brt, DBT *k, DBT *v, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v);
int toku_brt_delete (BRT brt, DBT *k, TOKUTXN);
int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN); // Delete a pair only if both k and v are equal according to the comparison function.
int toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t), u_int32_t close_flags);
int toku_close_brt (BRT, TOKULOGGER, char **error_string);
int toku_dump_brt (FILE *,BRT brt);

View file

@ -33,31 +33,6 @@ dump_header (int f, struct brt_header **header) {
printf(" unnamed_root=%" PRId64 "\n", h->root.b);
printf(" flags=%u\n", h->flags);
*header = h;
printf("Fifo:\n");
printf(" fifo has %d entries\n", toku_fifo_n_entries(h->fifo));
if (dump_data) {
FIFO_ITERATE(h->fifo, key, keylen, data, datalen, type, xid,
{
printf(" ");
switch (type) {
case BRT_NONE: printf("NONE"); goto ok;
case BRT_INSERT: printf("INSERT"); goto ok;
case BRT_DELETE_ANY: printf("DELETE_ANY"); goto ok;
case BRT_DELETE_BOTH: printf("DELETE_BOTH"); goto ok;
case BRT_ABORT_ANY: printf("ABORT_ANY"); goto ok;
case BRT_ABORT_BOTH: printf("ABORT_BOTH"); goto ok;
case BRT_COMMIT_ANY: printf("COMMIT_ANY"); goto ok;
case BRT_COMMIT_BOTH: printf("COMMIT_BOTH"); goto ok;
}
printf("huh?");
ok:
printf(" %lld ", (long long)xid);
print_item(key, keylen);
printf(" ");
print_item(data, datalen);
printf("\n");
});
}
}
static int

View file

@ -1148,6 +1148,7 @@ static int remove_brt (OMTVALUE txnv, u_int32_t UU(idx), void *brtv) {
}
int toku_txn_note_close_brt (BRT brt) {
assert(toku_omt_size(brt->txns)==0);
int r = toku_omt_iterate(brt->txns, remove_brt, brt);
assert(r==0);
return 0;
@ -1163,7 +1164,12 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
assert((void*)txnv_again==txnv);
r = toku_omt_delete_at(brt->txns, index);
assert(r==0);
return 0;
if (toku_omt_size(brt->txns)==0 && brt->was_closed) {
//Close immediately.
assert(brt->close_db);
r = brt->close_db(brt->db, brt->close_flags);
}
return r;
}
// for every BRT in txn, remove it.

View file

@ -140,8 +140,6 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"deqrootentry", 'A', FA{{"FILENUM", "filenum", 0},
NULLFIELD}},
{0,0,FA{NULLFIELD}}
};

View file

@ -167,8 +167,6 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,
//toku_blocktable_create_from_loggedheader(&h->blocktable, header);
assert(0); //create from loggedheader disabled for now. //TODO: #1605
assert(h->blocktable);
r=toku_fifo_create(&h->fifo);
assert(r==0);
h->root = header.root;
h->root_hash.valid= FALSE;
//toku_cachetable_put(pair->cf, header_blocknum, fullhash, h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
@ -189,26 +187,6 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,
toku_cachefile_set_userdata(pair->cf, pair->brt->h, toku_brtheader_close, toku_brtheader_checkpoint, toku_brtheader_begin_checkpoint, toku_brtheader_end_checkpoint);
}
static void toku_recover_deqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
//void *h_v;
//r = toku_cachetable_get_and_pin(pair->cf, header_blocknum, fullhash,
// &h_v, NULL, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
struct brt_header *h=0;
bytevec storedkey,storeddata;
ITEMLEN storedkeylen, storeddatalen;
TXNID storedxid;
u_int32_t storedtype;
r = toku_fifo_peek(h->fifo, &storedkey, &storedkeylen, &storeddata, &storeddatalen, &storedtype, &storedxid);
assert(r==0);
r = toku_fifo_deq(h->fifo);
assert(r==0);
//r = toku_cachetable_unpin(pair->cf, header_blocknum, fullhash, CACHETABLE_DIRTY, 0);
//assert(r==0);
}
static void
toku_recover_enqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, u_int32_t typ, BYTESTRING key, BYTESTRING val) {
struct cf_pair *pair = NULL;

View file

@ -76,14 +76,9 @@ static int do_insertion (enum brt_cmd_type type, TXNID xid, FILENUM filenum, BYT
OMTVALUE brtv=NULL;
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL);
if (r==DB_NOTFOUND) {
r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn));
if (r!=0) return r;
} else {
assert(r==0);
BRT brt = brtv;
r = toku_brt_root_put_cmd(brt, &brtcmd, txn->logger);
}
assert(r==0);
BRT brt = brtv;
r = toku_brt_root_put_cmd(brt, &brtcmd, txn->logger);
return toku_cachefile_close(&cf, toku_txn_logger(txn), 0);
}

View file

@ -1192,7 +1192,8 @@ int log_compare(const DB_LSN * a, const DB_LSN * b) {
return 0;
}
static int toku_db_close(DB * db, u_int32_t UU(flags)) {
static int
db_close_before_brt(DB *db, u_int32_t UU(flags)) {
char *error_string = 0;
int r1 = toku_close_brt(db->i->brt, db->dbenv->i->logger, &error_string);
if (r1) {
@ -1232,6 +1233,12 @@ static int toku_db_close(DB * db, u_int32_t UU(flags)) {
return 0;
}
static int toku_db_close(DB * db, u_int32_t flags) {
int r = toku_brt_db_delay_closed(db->i->brt, db, db_close_before_brt, flags);
return r;
}
//Get the main portion of a cursor flag (excluding the bitwise or'd components).
static int get_main_cursor_flag(u_int32_t flags) {
return flags & DB_OPFLAGS_MASK;