diff --git a/ft/block_table.c b/ft/block_table.c index 22d23e1aea0..3598413fbb2 100644 --- a/ft/block_table.c +++ b/ft/block_table.c @@ -84,15 +84,15 @@ static inline void unlock_for_blocktable (BLOCK_TABLE bt); static void -ft_set_dirty(FT h, BOOL for_checkpoint){ - assert(toku_mutex_is_locked(&h->blocktable->mutex)); - assert(h->type == FT_CURRENT); +ft_set_dirty(FT ft, BOOL for_checkpoint){ + assert(toku_mutex_is_locked(&ft->blocktable->mutex)); + assert(ft->h->type == FT_CURRENT); if (for_checkpoint) { - assert(h->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS); - h->checkpoint_header->dirty = 1; + assert(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS); + ft->checkpoint_header->dirty = 1; } else { - h->dirty = 1; + ft->h->dirty = 1; } } @@ -449,9 +449,9 @@ PRNTF("blokAllokator", 1L, size, offset, bt); //Fills wbuf with bt //A clean shutdown runs checkpoint start so that current and inprogress are copies. void -toku_serialize_translation_to_wbuf_unlocked(BLOCK_TABLE bt, struct wbuf *w, +toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, struct wbuf *w, int64_t *address, int64_t *size) { - assert(toku_mutex_is_locked(&bt->mutex)); + lock_for_blocktable(bt); struct translation *t = &bt->inprogress; BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_TRANSLATION); @@ -478,6 +478,7 @@ toku_serialize_translation_to_wbuf_unlocked(BLOCK_TABLE bt, struct wbuf *w, wbuf_int(w, checksum); *address = t->block_translation[b.b].u.diskoff; *size = t->block_translation[b.b].size; + unlock_for_blocktable(bt); } diff --git a/ft/block_table.h b/ft/block_table.h index 2de5fb3f9aa..d8ab87fd720 100644 --- a/ft/block_table.h +++ b/ft/block_table.h @@ -52,7 +52,7 @@ void toku_blocknum_realloc_on_disk(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DIS void toku_translate_blocknum_to_offset_size(BLOCK_TABLE bt, BLOCKNUM b, DISKOFF *offset, DISKOFF *size); //Serialization -void toku_serialize_translation_to_wbuf_unlocked(BLOCK_TABLE bt, struct wbuf *w, int64_t *address, int64_t *size); +void toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, struct wbuf *w, int64_t *address, int64_t *size); void toku_block_table_swap_for_redirect(BLOCK_TABLE old_bt, BLOCK_TABLE new_bt); diff --git a/ft/checkpoint.c b/ft/checkpoint.c index 2963706edc5..d14b313ecb3 100644 --- a/ft/checkpoint.c +++ b/ft/checkpoint.c @@ -67,9 +67,9 @@ static CHECKPOINT_STATUS_S cp_status; #define STATUS_INIT(k,t,l) { \ - cp_status.status[k].keyname = #k; \ - cp_status.status[k].type = t; \ - cp_status.status[k].legend = "checkpoint: " l; \ + cp_status.status[k].keyname = #k; \ + cp_status.status[k].type = t; \ + cp_status.status[k].legend = "checkpoint: " l; \ } static void @@ -106,7 +106,7 @@ status_init(void) { void toku_checkpoint_get_status(CACHETABLE ct, CHECKPOINT_STATUS statp) { if (!cp_status.initialized) - status_init(); + status_init(); STATUS_VALUE(CP_PERIOD) = toku_get_checkpoint_period_unlocked(ct); *statp = cp_status; } @@ -193,7 +193,7 @@ checkpoint_safe_checkpoint_unlock(void) { void toku_multi_operation_client_lock(void) { if (locked_mo) - (void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_MO), 1); + (void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_MO), 1); toku_pthread_rwlock_rdlock(&multi_operation_lock); } @@ -205,7 +205,7 @@ toku_multi_operation_client_unlock(void) { void toku_checkpoint_safe_client_lock(void) { if (locked_cs) - (void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_CS), 1); + (void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_CS), 1); toku_pthread_rwlock_rdlock(&checkpoint_safe_lock); toku_multi_operation_client_lock(); } @@ -241,23 +241,23 @@ toku_checkpoint_destroy(void) { // Take a checkpoint of all currently open dictionaries int toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, - void (*callback_f)(void*), void * extra, - void (*callback2_f)(void*), void * extra2, - checkpoint_caller_t caller_id) { + void (*callback_f)(void*), void * extra, + void (*callback2_f)(void*), void * extra2, + checkpoint_caller_t caller_id) { int r; int footprint_offset = (int) caller_id * 1000; assert(initialized); if (locked_cs) { - if (caller_id == SCHEDULED_CHECKPOINT) - (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_SCHED_CS), 1); - else if (caller_id == CLIENT_CHECKPOINT) - (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_CLIENT_CS), 1); - else if (caller_id == TXN_COMMIT_CHECKPOINT) - (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_TXN_CS), 1); - else - (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_OTHER_CS), 1); + if (caller_id == SCHEDULED_CHECKPOINT) + (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_SCHED_CS), 1); + else if (caller_id == CLIENT_CHECKPOINT) + (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_CLIENT_CS), 1); + else if (caller_id == TXN_COMMIT_CHECKPOINT) + (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_TXN_CS), 1); + else + (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAIT_OTHER_CS), 1); } (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAITERS_NOW), 1); @@ -265,27 +265,29 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, (void) __sync_fetch_and_sub(&STATUS_VALUE(CP_WAITERS_NOW), 1); if (STATUS_VALUE(CP_WAITERS_NOW) > STATUS_VALUE(CP_WAITERS_MAX)) - STATUS_VALUE(CP_WAITERS_MAX) = STATUS_VALUE(CP_WAITERS_NOW); // threadsafe, within checkpoint_safe lock + STATUS_VALUE(CP_WAITERS_MAX) = STATUS_VALUE(CP_WAITERS_NOW); // threadsafe, within checkpoint_safe lock SET_CHECKPOINT_FOOTPRINT(10); if (locked_mo) { - if (caller_id == SCHEDULED_CHECKPOINT) - STATUS_VALUE(CP_WAIT_SCHED_MO)++; // threadsafe, within checkpoint_safe lock - else if (caller_id == CLIENT_CHECKPOINT) - STATUS_VALUE(CP_WAIT_CLIENT_MO)++; - else if (caller_id == TXN_COMMIT_CHECKPOINT) - STATUS_VALUE(CP_WAIT_TXN_MO)++; - else - STATUS_VALUE(CP_WAIT_OTHER_MO)++; + if (caller_id == SCHEDULED_CHECKPOINT) + STATUS_VALUE(CP_WAIT_SCHED_MO)++; // threadsafe, within checkpoint_safe lock + else if (caller_id == CLIENT_CHECKPOINT) + STATUS_VALUE(CP_WAIT_CLIENT_MO)++; + else if (caller_id == TXN_COMMIT_CHECKPOINT) + STATUS_VALUE(CP_WAIT_TXN_MO)++; + else + STATUS_VALUE(CP_WAIT_OTHER_MO)++; } multi_operation_checkpoint_lock(); SET_CHECKPOINT_FOOTPRINT(20); ydb_lock(); + toku_ft_open_close_lock(); SET_CHECKPOINT_FOOTPRINT(30); STATUS_VALUE(CP_TIME_LAST_CHECKPOINT_BEGIN) = time(NULL); r = toku_cachetable_begin_checkpoint(ct, logger); + toku_ft_open_close_unlock(); multi_operation_checkpoint_unlock(); ydb_unlock(); @@ -299,7 +301,7 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, if (r==0 && logger) { last_completed_checkpoint_lsn = logger->last_completed_checkpoint_lsn; r = toku_logger_maybe_trim_log(logger, last_completed_checkpoint_lsn); - STATUS_VALUE(CP_LAST_LSN) = last_completed_checkpoint_lsn.lsn; + STATUS_VALUE(CP_LAST_LSN) = last_completed_checkpoint_lsn.lsn; } SET_CHECKPOINT_FOOTPRINT(60); @@ -307,9 +309,9 @@ toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, STATUS_VALUE(CP_TIME_LAST_CHECKPOINT_BEGIN_COMPLETE) = STATUS_VALUE(CP_TIME_LAST_CHECKPOINT_BEGIN); if (r == 0) - STATUS_VALUE(CP_CHECKPOINT_COUNT)++; + STATUS_VALUE(CP_CHECKPOINT_COUNT)++; else - STATUS_VALUE(CP_CHECKPOINT_COUNT_FAIL)++; + STATUS_VALUE(CP_CHECKPOINT_COUNT_FAIL)++; STATUS_VALUE(CP_FOOTPRINT) = 0; checkpoint_safe_checkpoint_unlock(); diff --git a/ft/ft-cachetable-wrappers.c b/ft/ft-cachetable-wrappers.c index f90646c4318..28a24613443 100644 --- a/ft/ft-cachetable-wrappers.c +++ b/ft/ft-cachetable-wrappers.c @@ -65,7 +65,7 @@ cachetable_put_empty_node_with_dep_nodes( void create_new_ftnode_with_dep_nodes( - FT h, + FT ft, FTNODE *result, int height, int n_children, @@ -76,15 +76,15 @@ create_new_ftnode_with_dep_nodes( BLOCKNUM name; cachetable_put_empty_node_with_dep_nodes( - h, + ft, num_dependent_nodes, dependent_nodes, &name, &fullhash, result); - assert(h->nodesize > 0); - assert(h->basementnodesize > 0); + assert(ft->h->nodesize > 0); + assert(ft->h->basementnodesize > 0); if (height == 0) { assert(n_children > 0); } @@ -94,9 +94,9 @@ create_new_ftnode_with_dep_nodes( name, height, n_children, - h->layout_version, - h->nodesize, - h->flags); + ft->h->layout_version, + ft->h->nodesize, + ft->h->flags); assert((*result)->nodesize > 0); (*result)->fullhash = fullhash; @@ -208,10 +208,10 @@ toku_pin_ftnode_off_client_thread( } void -toku_unpin_ftnode_off_client_thread(FT h, FTNODE node) +toku_unpin_ftnode_off_client_thread(FT ft, FTNODE node) { int r = toku_cachetable_unpin( - h->cf, + ft->cf, node->thisnodename, node->fullhash, (enum cachetable_dirty) node->dirty, @@ -221,11 +221,11 @@ toku_unpin_ftnode_off_client_thread(FT h, FTNODE node) } void -toku_unpin_ftnode(FT h, FTNODE node) +toku_unpin_ftnode(FT ft, FTNODE node) { // printf("%*sUnpin %ld\n", 8-node->height, "", node->thisnodename.b); //VERIFY_NODE(brt,node); - toku_unpin_ftnode_off_client_thread(h, node); + toku_unpin_ftnode_off_client_thread(ft, node); } void diff --git a/ft/ft-flusher.c b/ft/ft-flusher.c index 352ebee7290..d30c75dc8c3 100644 --- a/ft/ft-flusher.c +++ b/ft/ft-flusher.c @@ -718,15 +718,15 @@ ftleaf_split( invariant(node->height == 0); STATUS_VALUE(FT_FLUSHER_SPLIT_LEAF)++; if (node->n_children) { - // First move all the accumulated stat64info deltas into the first basement. - // After the split, either both nodes or neither node will be included in the next checkpoint. - // The accumulated stats in the dictionary will be correct in either case. - // By moving all the deltas into one (arbitrary) basement, we avoid the need to maintain - // correct information for a basement that is divided between two leafnodes (i.e. when split is - // not on a basement boundary). - STAT64INFO_S delta_for_leafnode = toku_get_and_clear_basement_stats(node); - BASEMENTNODE bn = BLB(node,0); - bn->stat64_delta = delta_for_leafnode; + // First move all the accumulated stat64info deltas into the first basement. + // After the split, either both nodes or neither node will be included in the next checkpoint. + // The accumulated stats in the dictionary will be correct in either case. + // By moving all the deltas into one (arbitrary) basement, we avoid the need to maintain + // correct information for a basement that is divided between two leafnodes (i.e. when split is + // not on a basement boundary). + STAT64INFO_S delta_for_leafnode = toku_get_and_clear_basement_stats(node); + BASEMENTNODE bn = BLB(node,0); + bn->stat64_delta = delta_for_leafnode; } @@ -807,9 +807,9 @@ ftleaf_split( name, 0, num_children_in_b, - h->layout_version, - h->nodesize, - h->flags); + h->h->layout_version, + h->h->nodesize, + h->h->flags); assert(B->nodesize > 0); B->fullhash = fullhash; } @@ -1002,7 +1002,7 @@ ft_split_child( FTNODE nodea, nodeb; DBT splitk; // printf("%s:%d node %" PRIu64 "->u.n.n_children=%d height=%d\n", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children, node->height); - assert(h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */ + assert(h->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */ // for test call_flusher_thread_callback(flt_flush_before_split); diff --git a/ft/ft-internal.h b/ft/ft-internal.h index 650154b5450..0634a16dfe1 100644 --- a/ft/ft-internal.h +++ b/ft/ft-internal.h @@ -335,67 +335,129 @@ u_int32_t compute_child_fullhash (CACHEFILE cf, FTNODE node, int childnum); enum ft_type {FT_CURRENT=1, FT_CHECKPOINT_INPROGRESS}; +struct ft_header { + enum ft_type type; + + int dirty; + + // Free-running counter incremented once per checkpoint (toggling LSB). + // LSB indicates which header location is used on disk so this + // counter is effectively a boolean which alternates with each checkpoint. + uint64_t checkpoint_count; + // LSN of creation of "checkpoint-begin" record in log. + LSN checkpoint_lsn; + + // see brt_layout_version.h. maybe don't need this if we assume + // it's always the current version after deserializing + const int layout_version; + // different (<) from layout_version if upgraded from a previous + // version (useful for debugging) + const int layout_version_original; + // build_id (svn rev number) of software that wrote this node to + // disk. (read from disk, overwritten when written to disk, I + // think). + const uint32_t build_id; + // build_id of software that created this tree + const uint32_t build_id_original; + + // time this tree was created + const uint64_t time_of_creation; + // and the root transaction id that created it + TXNID root_xid_that_created; + // last time this header was serialized to disk (read from disk, + // overwritten when written to disk) + uint64_t time_of_last_modification; + // last time that this tree was verified + uint64_t time_of_last_verification; + + // this field is protected by tree_lock, see comment for tree_lock + BLOCKNUM root_blocknum; + + const unsigned int flags; + const unsigned int nodesize; + const unsigned int basementnodesize; + const enum toku_compression_method compression_method; + + // Current Minimum MSN to be used when upgrading pre-MSN BRT's. + // This is decremented from our currnt MIN_MSN so as not to clash + // with any existing 'normal' MSN's. + MSN highest_unused_msn_for_upgrade; + + // last time that a hot optimize operation was begun + uint64_t time_of_last_optimize_begin; + // last time that a hot optimize operation was successfully completed + uint64_t time_of_last_optimize_end; + // the number of hot optimize operations currently in progress on this tree + uint32_t count_of_optimize_in_progress; + // the number of hot optimize operations in progress on this tree at the time of the last crash (this field is in-memory only) + uint32_t count_of_optimize_in_progress_read_from_disk; + // all messages before this msn have been applied to leaf nodes + MSN msn_at_start_of_last_completed_optimize; + + STAT64INFO_S on_disk_stats; + +}; + // brt_header is always the current version. struct ft { - enum ft_type type; - FT checkpoint_header; + FT_HEADER h; + FT_HEADER checkpoint_header; + + // These are (mostly) read-only. + CACHEFILE cf; + // unique id for dictionary + DICTIONARY_ID dict_id; + ft_compare_func compare_fun; + ft_update_func update_fun; + + // protected by locktree + DESCRIPTOR_S descriptor; + // protected by locktree and user. User + // makes sure this is only changed + // when no activity on tree + DESCRIPTOR_S cmp_descriptor; + + // These are not read-only: + // lock used by a thread to pin the root node to start a descent into // the tree. This lock protects the blocknum of the root node (root_blocknum). Any // thread that wants to descend down the tree starting at the root // must grab this lock before pinning the root. - toku_mutex_t tree_lock; - u_int64_t checkpoint_count; // Free-running counter incremented once per checkpoint (toggling LSB). - // LSB indicates which header location is used on disk so this - // counter is effectively a boolean which alternates with each checkpoint. - LSN checkpoint_lsn; // LSN of creation of "checkpoint-begin" record in log. - int dirty; - DICTIONARY_ID dict_id; // unique id for dictionary - int panic; // If nonzero there was a write error. Don't write any more, because it probably only gets worse. This is the error code. - char *panic_string; // A malloced string that can indicate what went wrong. - int layout_version; - int layout_version_original; // different (<) from layout_version if upgraded from a previous version (useful for debugging) - int layout_version_read_from_disk; // transient, not serialized to disk - uint32_t build_id; // build_id (svn rev number) of software that wrote this node to disk - uint32_t build_id_original; // build_id of software that created this tree (read from disk, overwritten when written to disk) - uint64_t time_of_creation; // time this tree was created - uint64_t time_of_last_modification; // last time this header was serialized to disk (read from disk, overwritten when written to disk) - uint64_t time_of_last_verification; // last time that this tree was verified - unsigned int nodesize; - unsigned int basementnodesize; - // this field is protected by tree_lock, see comment for tree_lock - BLOCKNUM root_blocknum; // roots of the dictionary - unsigned int flags; - DESCRIPTOR_S descriptor; - DESCRIPTOR_S cmp_descriptor; + toku_mutex_t tree_lock; + // protected by blocktable lock BLOCK_TABLE blocktable; + + // protected by atomic builtins + STAT64INFO_S in_memory_stats; + + // transient, not serialized to disk. updated when we do write to + // disk. tells us whether we can do partial eviction (we can't if + // the on-disk layout version is from before basement nodes) + int layout_version_read_from_disk; + // If a transaction created this BRT, which one? // If a transaction locked the BRT when it was empty, which transaction? (Only the latest one matters) // 0 if no such transaction + // only one thread can write to these at once, this is enforced by + // the lock tree TXNID txnid_that_created_or_locked_when_empty; - TXNID root_that_created_or_locked_when_empty; TXNID txnid_that_suppressed_recovery_logs; - TXNID root_xid_that_created; - struct toku_list live_ft_handles; - OMT txns; // transactions that are using this header - bool pinned_by_checkpoint; //Keep this header around for checkpoint, like a transaction - ft_compare_func compare_fun; - ft_update_func update_fun; - STAT64INFO_S in_memory_stats; - STAT64INFO_S on_disk_stats; - STAT64INFO_S checkpoint_staging_stats; - uint64_t time_of_last_optimize_begin; // last time that a hot optimize operation was begun - uint64_t time_of_last_optimize_end; // last time that a hot optimize operation was successfully completed - uint32_t count_of_optimize_in_progress; // the number of hot optimize operations currently in progress on this tree - uint32_t count_of_optimize_in_progress_read_from_disk; // the number of hot optimize operations in progress on this tree at the time of the last crash (this field is in-memory only) - MSN msn_at_start_of_last_completed_optimize; // all messages before this msn have been applied to leaf nodes - enum toku_compression_method compression_method; - // Current Minimum MSN to be used when upgrading pre-MSN BRT's. - // This is decremented from our currnt MIN_MSN so as not to clash - // with any existing 'normal' MSN's. - MSN highest_unused_msn_for_upgrade; + // protects modifying live_ft_handles, txns, and pinned_by_checkpoint + toku_mutex_t ft_ref_lock; + struct toku_list live_ft_handles; + // transactions that are using this header. you should only be able + // to modify this if you have a valid handle in the list of live brts + OMT txns; + // Keep this header around for checkpoint, like a transaction + bool pinned_by_checkpoint; + + // If nonzero there was a write error. Don't write any more, because it probably only gets worse. This is the error code. + int panic; + // A malloced string that can indicate what went wrong. + char *panic_string; }; // Copy the descriptor into a temporary variable, and tell DRD that subsequent code happens after reading that pointer. @@ -464,9 +526,14 @@ int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2le void toku_verify_or_set_counts(FTNODE); -int toku_serialize_ft_size (FT h); -int toku_serialize_ft_to (int fd, FT h); -int toku_serialize_ft_to_wbuf (struct wbuf *, FT h, int64_t address_translation, int64_t size_translation); +int toku_serialize_ft_size (FT_HEADER h); +int toku_serialize_ft_to (int fd, FT_HEADER h, BLOCK_TABLE blocktable, CACHEFILE cf); +int toku_serialize_ft_to_wbuf ( + struct wbuf *wbuf, + FT_HEADER h, + DISKOFF translation_location_on_disk, + DISKOFF translation_size_on_disk + ); enum deserialize_error_code toku_deserialize_ft_from (int fd, LSN max_acceptable_lsn, FT *ft); int toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF offset); void toku_serialize_descriptor_contents_to_wbuf(struct wbuf *wb, const DESCRIPTOR desc); @@ -579,7 +646,6 @@ struct ft_cursor { // is required, such as for flushes. // static inline void fill_bfe_for_full_read(struct ftnode_fetch_extra *bfe, FT h) { - invariant(h->type == FT_CURRENT); bfe->type = ftnode_fetch_all; bfe->h = h; bfe->search = NULL; @@ -608,7 +674,7 @@ static inline void fill_bfe_for_subset_read( BOOL disable_prefetching ) { - invariant(h->type == FT_CURRENT); + invariant(h->h->type == FT_CURRENT); bfe->type = ftnode_fetch_subset; bfe->h = h; bfe->search = search; @@ -627,7 +693,7 @@ static inline void fill_bfe_for_subset_read( // Currently used for stat64. // static inline void fill_bfe_for_min_read(struct ftnode_fetch_extra *bfe, FT h) { - invariant(h->type == FT_CURRENT); + invariant(h->h->type == FT_CURRENT); bfe->type = ftnode_fetch_none; bfe->h = h; bfe->search = NULL; @@ -659,7 +725,7 @@ static inline void destroy_bfe_for_prefetch(struct ftnode_fetch_extra *bfe) { static inline void fill_bfe_for_prefetch(struct ftnode_fetch_extra *bfe, FT h, FT_CURSOR c) { - invariant(h->type == FT_CURRENT); + invariant(h->h->type == FT_CURRENT); bfe->type = ftnode_fetch_prefetch; bfe->h = h; bfe->search = NULL; diff --git a/ft/ft-ops.c b/ft/ft-ops.c index 38ada882901..03776310226 100644 --- a/ft/ft-ops.c +++ b/ft/ft-ops.c @@ -150,6 +150,8 @@ static volatile FT_STATUS_S ft_status; ft_status.status[k].legend = "brt: " l; \ } +static toku_mutex_t ft_open_close_lock; + static void status_init(void) { @@ -307,8 +309,8 @@ toku_ft_nonleaf_is_gorged (FTNODE node) { (!buffers_are_empty)); } -static void ft_verify_flags(FT h, FTNODE node) { - assert(h->flags == node->flags); +static void ft_verify_flags(FT ft, FTNODE node) { + assert(ft->h->flags == node->flags); } int toku_ft_debug_mode = 0; @@ -599,16 +601,25 @@ static void ft_status_update_flush_reason(FTNODE node, BOOL for_checkpoint) { static void ftnode_update_disk_stats( FTNODE ftnode, - FT h, + FT ft, BOOL for_checkpoint ) { STAT64INFO_S deltas = ZEROSTATS; // capture deltas before rebalancing basements for serialization deltas = toku_get_and_clear_basement_stats(ftnode); - toku_ft_update_stats(&h->on_disk_stats, deltas); + // locking not necessary here with respect to checkpointing + // in Clayface (because of the pending lock and cachetable lock + // in toku_cachetable_begin_checkpoint) + // essentially, if we are dealing with a for_checkpoint + // parameter in a function that is called by the flush_callback, + // then the cachetable needs to ensure that this is called in a safe + // manner that does not interfere with the beginning + // of a checkpoint, which it does with the cachetable lock + // and pending lock + toku_ft_update_stats(&ft->h->on_disk_stats, deltas); if (for_checkpoint) { - toku_ft_update_stats(&h->checkpoint_staging_stats, deltas); + toku_ft_update_stats(&ft->checkpoint_header->on_disk_stats, deltas); } } @@ -637,15 +648,15 @@ void toku_ftnode_clone_callback( { FTNODE node = value_data; toku_assert_entire_node_in_memory(node); - FT h = write_extraargs; + FT ft = write_extraargs; FTNODE XMALLOC(cloned_node); //FTNODE cloned_node = (FTNODE)toku_xmalloc(sizeof(*FTNODE)); memset(cloned_node, 0, sizeof(*cloned_node)); if (node->height == 0) { // set header stats, must be done before rebalancing - ftnode_update_disk_stats(node, h, for_checkpoint); + ftnode_update_disk_stats(node, ft, for_checkpoint); // rebalance the leaf node - rebalance_ftnode_leaf(node, h->basementnodesize); + rebalance_ftnode_leaf(node, ft->h->basementnodesize); } cloned_node->max_msn_applied_to_node_on_disk = node->max_msn_applied_to_node_on_disk; @@ -870,7 +881,7 @@ void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h) { // callback for partially evicting a node int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR* new_attr, void* extraargs) { FTNODE node = (FTNODE)ftnode_pv; - FT h = extraargs; + FT ft = extraargs; // Don't partially evict dirty nodes if (node->dirty) { goto exit; @@ -888,7 +899,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR* if (BP_STATE(node,i) == PT_AVAIL) { if (BP_SHOULD_EVICT(node,i)) { STATUS_VALUE(FT_PARTIAL_EVICTIONS_NONLEAF)++; - cilk_spawn compress_internal_node_partition(node, i, h->compression_method); + cilk_spawn compress_internal_node_partition(node, i, ft->h->compression_method); } else { BP_SWEEP_CLOCK(node,i); @@ -919,7 +930,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR* else if (BP_STATE(node,i) == PT_AVAIL) { if (BP_SHOULD_EVICT(node,i)) { STATUS_VALUE(FT_PARTIAL_EVICTIONS_LEAF)++; - toku_evict_bn_from_memory(node, i, h); + toku_evict_bn_from_memory(node, i, ft); } else { BP_SWEEP_CLOCK(node,i); @@ -1272,7 +1283,7 @@ toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_c } static void -ft_init_new_root(FT h, FTNODE nodea, FTNODE nodeb, DBT splitk, CACHEKEY *rootp, FTNODE *newrootp) +ft_init_new_root(FT ft, FTNODE nodea, FTNODE nodeb, DBT splitk, CACHEKEY *rootp, FTNODE *newrootp) // Effect: Create a new root node whose two children are NODEA and NODEB, and the pivotkey is SPLITK. // Store the new root's identity in *ROOTP, and the node in *NEWROOTP. // Unpin nodea and nodeb. @@ -1281,11 +1292,11 @@ ft_init_new_root(FT h, FTNODE nodea, FTNODE nodeb, DBT splitk, CACHEKEY *rootp, FTNODE XMALLOC(newroot); int new_height = nodea->height+1; BLOCKNUM newroot_diskoff; - toku_allocate_blocknum(h->blocktable, &newroot_diskoff, h); + toku_allocate_blocknum(ft->blocktable, &newroot_diskoff, ft); assert(newroot); *rootp=newroot_diskoff; assert(new_height > 0); - toku_initialize_empty_ftnode (newroot, newroot_diskoff, new_height, 2, h->layout_version, h->nodesize, h->flags); + toku_initialize_empty_ftnode (newroot, newroot_diskoff, new_height, 2, ft->h->layout_version, ft->h->nodesize, ft->h->flags); //printf("new_root %lld %d %lld %lld\n", newroot_diskoff, newroot->height, nodea->thisnodename, nodeb->thisnodename); //printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey); toku_copyref_dbt(&newroot->childkeys[0], splitk); @@ -1301,12 +1312,19 @@ ft_init_new_root(FT h, FTNODE nodea, FTNODE nodeb, DBT splitk, CACHEKEY *rootp, BP_STATE(newroot,0) = PT_AVAIL; BP_STATE(newroot,1) = PT_AVAIL; newroot->dirty = 1; - toku_unpin_ftnode(h, nodea); - toku_unpin_ftnode(h, nodeb); //printf("%s:%d put %lld\n", __FILE__, __LINE__, newroot_diskoff); - u_int32_t fullhash = toku_cachetable_hash(h->cf, newroot_diskoff); + u_int32_t fullhash = toku_cachetable_hash(ft->cf, newroot_diskoff); newroot->fullhash = fullhash; - toku_cachetable_put(h->cf, newroot_diskoff, fullhash, newroot, make_ftnode_pair_attr(newroot), get_write_callbacks_for_node(h)); + toku_cachetable_put(ft->cf, newroot_diskoff, fullhash, newroot, make_ftnode_pair_attr(newroot), get_write_callbacks_for_node(ft)); + + //at this point, newroot is associated with newroot_diskoff, nodea is associated with root_blocknum + // make newroot_diskoff point to nodea + // make root_blocknum point to newroot + // also modify the blocknum and fullhash of nodea and newroot + // before doing this, assert(nodea->blocknum == ft->root_blocknum) + + toku_unpin_ftnode(ft, nodea); + toku_unpin_ftnode(ft, nodeb); *newrootp = newroot; } @@ -2048,7 +2066,7 @@ ft_nonleaf_put_cmd (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, F // return TRUE if root changed, FALSE otherwise static BOOL -ft_process_maybe_reactive_root (FT h, CACHEKEY *rootp, FTNODE *nodep) { +ft_process_maybe_reactive_root (FT ft, CACHEKEY *rootp, FTNODE *nodep) { FTNODE node = *nodep; toku_assert_entire_node_in_memory(node); enum reactivity re = get_node_reactivity(node); @@ -2060,18 +2078,18 @@ ft_process_maybe_reactive_root (FT h, CACHEKEY *rootp, FTNODE *nodep) { // The root node should split, so make a new root. FTNODE nodea,nodeb; DBT splitk; - assert(h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */ + assert(ft->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */ // // This happens on the client thread with the ydb lock, so it is safe to // not pass in dependent nodes. Although if we wanted to, we could pass // in just node. That would be correct. // if (node->height==0) { - ftleaf_split(h, node, &nodea, &nodeb, &splitk, TRUE, 0, NULL); + ftleaf_split(ft, node, &nodea, &nodeb, &splitk, TRUE, 0, NULL); } else { - ft_nonleaf_split(h, node, &nodea, &nodeb, &splitk, 0, NULL); + ft_nonleaf_split(ft, node, &nodea, &nodeb, &splitk, 0, NULL); } - ft_init_new_root(h, nodea, nodeb, splitk, rootp, nodep); + ft_init_new_root(ft, nodea, nodeb, splitk, rootp, nodep); return TRUE; } case RE_FUSIBLE: @@ -2695,7 +2713,7 @@ toku_ft_maybe_insert (FT_HANDLE brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn //We have transactions, and this is not 2440. We must send the full root-to-leaf-path message_xids = toku_txn_get_xids(txn); } - else if (txn->ancestor_txnid64 != brt->ft->root_xid_that_created) { + else if (txn->ancestor_txnid64 != brt->ft->h->root_xid_that_created) { //We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary. We must send the full root-to-leaf-path message_xids = toku_txn_get_xids(txn); } @@ -2800,6 +2818,7 @@ toku_ft_maybe_update_broadcast(FT_HANDLE brt, const DBT *update_function_extra, if (r != 0) { goto cleanup; } } + //TODO(yoni): remove treelsn here and similar calls (no longer being used) LSN treelsn; if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(brt->ft)).lsn) { @@ -2890,7 +2909,7 @@ toku_ft_maybe_delete(FT_HANDLE brt, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LSN //We have transactions, and this is not 2440. We must send the full root-to-leaf-path message_xids = toku_txn_get_xids(txn); } - else if (txn->ancestor_txnid64 != brt->ft->root_xid_that_created) { + else if (txn->ancestor_txnid64 != brt->ft->h->root_xid_that_created) { //We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary. We must send the full root-to-leaf-path message_xids = toku_txn_get_xids(txn); } @@ -3126,10 +3145,10 @@ cleanup: static void toku_ft_handle_inherit_options(FT_HANDLE t, FT ft) { struct ft_options options = { - .nodesize = ft->nodesize, - .basementnodesize = ft->basementnodesize, - .compression_method = ft->compression_method, - .flags = ft->flags, + .nodesize = ft->h->nodesize, + .basementnodesize = ft->h->basementnodesize, + .compression_method = ft->h->compression_method, + .flags = ft->h->flags, .compare_fun = ft->compare_fun, .update_fun = ft->update_fun }; @@ -3148,6 +3167,7 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr CACHEFILE cf = NULL; FT ft = NULL; BOOL did_create = FALSE; + toku_ft_open_close_lock(); if (t->did_set_flags) { r = verify_builtin_comparisons_consistent(t, t->options.flags); @@ -3211,7 +3231,7 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr if (!t->did_set_flags) { r = verify_builtin_comparisons_consistent(t, t->options.flags); if (r) { goto exit; } - } else if (t->options.flags != ft->flags) { /* if flags have been set then flags must match */ + } else if (t->options.flags != ft->h->flags) { /* if flags have been set then flags must match */ r = EINVAL; goto exit; } @@ -3277,7 +3297,10 @@ exit: // but we have not linked it to this brt. So, // we can simply try to remove the header. // We don't need to unlink this brt from the header - if (!toku_ft_needed(ft)) { + toku_ft_grab_reflock(ft); + BOOL needed = toku_ft_needed_unlocked(ft); + toku_ft_release_reflock(ft); + if (!needed) { //Close immediately. char *error_string = NULL; r = toku_remove_ft(ft, &error_string, false, ZERO_LSN); @@ -3288,6 +3311,7 @@ exit: toku_cachefile_close(&cf, 0, FALSE, ZERO_LSN); } } + toku_ft_open_close_unlock(); return r; } @@ -3406,36 +3430,24 @@ ft_compare_func toku_ft_get_bt_compare (FT_HANDLE brt) { return brt->options.compare_fun; } +static void +ft_remove_handle_ref_callback(FT UU(ft), void *extra) { + FT_HANDLE handle = extra; + toku_list_remove(&handle->live_ft_handle_link); +} + int toku_ft_handle_close (FT_HANDLE brt, bool oplsn_valid, LSN oplsn) // Effect: See ft-ops.h for the specification of this function. { - int r = 0; - FT h = brt->ft; - - // it is possible that a header was never opened - // for the brt - if (brt->ft) { - // TODO: figure out the proper locking here - // what really protects live_ft_handle_link? - toku_ft_lock(h); - toku_list_remove(&brt->live_ft_handle_link); - toku_ft_unlock(h); - - if (!toku_ft_needed(brt->ft)) { - // close header - char *error_string = NULL; - r = toku_remove_ft(h, &error_string, oplsn_valid, oplsn); - assert_zero(r); - assert(error_string == NULL); - } + FT ft = brt->ft; + if (ft) { + toku_ft_remove_reference(brt->ft, oplsn_valid, oplsn, ft_remove_handle_ref_callback, brt); } toku_free(brt); - - return r; + return 0; } -// test function int toku_close_ft_handle_nolsn (FT_HANDLE brt, char** UU(error_string)) { return toku_ft_handle_close(brt, FALSE, ZERO_LSN); } @@ -3530,7 +3542,7 @@ int toku_ft_cursor ( { if (is_snapshot_read) { invariant(ttxn != NULL); - int accepted = does_txn_read_entry(brt->ft->root_xid_that_created, ttxn); + int accepted = does_txn_read_entry(brt->ft->h->root_xid_that_created, ttxn); if (accepted!=TOKUDB_ACCEPT) { invariant(accepted==0); return TOKUDB_MVCC_DICTIONARY_TOO_NEW; @@ -5434,17 +5446,23 @@ int toku_ft_layer_init(void (*ydb_lock_callback)(void), void (*ydb_unlock_callback)(void)) { int r = 0; //Portability must be initialized first - if (r==0) - r = toku_portability_init(); - if (r==0) - toku_checkpoint_init(ydb_lock_callback, ydb_unlock_callback); - if (r == 0) - r = toku_ft_serialize_layer_init(); + r = toku_portability_init(); + if (r) { goto exit; } + + toku_checkpoint_init(ydb_lock_callback, ydb_unlock_callback); + + r = toku_ft_serialize_layer_init(); + if (r) { goto exit; } + + toku_mutex_init(&ft_open_close_lock, NULL); +exit: return r; } int toku_ft_layer_destroy(void) { - int r = 0; + int r = 0; + toku_mutex_destroy(&ft_open_close_lock); + if (r == 0) r = toku_ft_serialize_layer_destroy(); if (r==0) @@ -5455,6 +5473,14 @@ int toku_ft_layer_destroy(void) { return r; } +void toku_ft_open_close_lock(void) { + toku_mutex_lock(&ft_open_close_lock); +} + +void toku_ft_open_close_unlock(void) { + toku_mutex_unlock(&ft_open_close_lock); +} + //Suppress both rollback and recovery logs. void toku_ft_suppress_recovery_logs (FT_HANDLE brt, TOKUTXN txn) { diff --git a/ft/ft-ops.h b/ft/ft-ops.h index 33a2c42e077..3a264836aaf 100644 --- a/ft/ft-ops.h +++ b/ft/ft-ops.h @@ -241,6 +241,8 @@ toku_ft_handle_stat64 (FT_HANDLE, TOKUTXN, struct ftstat64_s *stat) __attribute_ int toku_ft_layer_init(void (*ydb_lock_callback)(void), void (*ydb_unlock_callback)(void)) __attribute__ ((warn_unused_result)); +void toku_ft_open_close_lock(void); +void toku_ft_open_close_unlock(void); int toku_ft_layer_destroy(void) __attribute__ ((warn_unused_result)); int toku_ft_serialize_layer_init(void) __attribute__ ((warn_unused_result)); int toku_ft_serialize_layer_destroy(void) __attribute__ ((warn_unused_result)); @@ -259,10 +261,6 @@ void toku_ft_suppress_recovery_logs (FT_HANDLE brt, TOKUTXN txn); int toku_ft_get_fragmentation(FT_HANDLE brt, TOKU_DB_FRAGMENTATION report) __attribute__ ((warn_unused_result)); -BOOL toku_ft_is_empty_fast (FT_HANDLE brt); -// Effect: Return TRUE if there are no messages or leaf entries in the tree. If so, it's empty. If there are messages or leaf entries, we say it's not empty -// even though if we were to optimize the tree it might turn out that they are empty. - BOOL toku_ft_is_empty_fast (FT_HANDLE brt) __attribute__ ((warn_unused_result)); // Effect: Return TRUE if there are no messages or leaf entries in the tree. If so, it's empty. If there are messages or leaf entries, we say it's not empty // even though if we were to optimize the tree it might turn out that they are empty. diff --git a/ft/ft-serialize.c b/ft/ft-serialize.c index 69cfa700b33..de853c7b758 100644 --- a/ft/ft-serialize.c +++ b/ft/ft-serialize.c @@ -139,10 +139,10 @@ exit: // We only deserialize brt header once and then share everything with all the brts. static enum deserialize_error_code -deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version) +deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version) { enum deserialize_error_code e = DS_OK; - FT h = NULL; + FT ft = NULL; invariant(version >= FT_LAYOUT_MIN_SUPPORTED_VERSION); invariant(version <= FT_LAYOUT_VERSION); // We already know: @@ -155,28 +155,25 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version) rbuf_literal_bytes(rb, &magic, 8); lazy_assert(memcmp(magic,"tokudata",8)==0); - CALLOC(h); - if (!h) { + XCALLOC(ft); + if (!ft) { e = DS_ERRNO; goto exit; } - h->type = FT_CURRENT; - h->checkpoint_header = NULL; - h->dirty = 0; - h->panic = 0; - h->panic_string = 0; - toku_list_init(&h->live_ft_handles); - int r = toku_omt_create(&h->txns); + ft->checkpoint_header = NULL; + ft->panic = 0; + ft->panic_string = 0; + toku_list_init(&ft->live_ft_handles); + int r = toku_omt_create(&ft->txns); assert_zero(r); //version MUST be in network order on disk regardless of disk order - h->layout_version_read_from_disk = rbuf_network_int(rb); - invariant(h->layout_version_read_from_disk >= FT_LAYOUT_MIN_SUPPORTED_VERSION); - invariant(h->layout_version_read_from_disk <= FT_LAYOUT_VERSION); - h->layout_version = FT_LAYOUT_VERSION; + ft->layout_version_read_from_disk = rbuf_network_int(rb); + invariant(ft->layout_version_read_from_disk >= FT_LAYOUT_MIN_SUPPORTED_VERSION); + invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION); //build_id MUST be in network order on disk regardless of disk order - h->build_id = rbuf_network_int(rb); + uint32_t build_id = rbuf_network_int(rb); //Size MUST be in network order regardless of disk order. u_int32_t size = rbuf_network_int(rb); @@ -188,16 +185,17 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version) int64_t byte_order_stored = *(int64_t*)tmp_byte_order_check; lazy_assert(byte_order_stored == toku_byte_order_host); - h->checkpoint_count = rbuf_ulonglong(rb); - h->checkpoint_lsn = rbuf_lsn(rb); - h->nodesize = rbuf_int(rb); + uint64_t checkpoint_count = rbuf_ulonglong(rb); + LSN checkpoint_lsn = rbuf_lsn(rb); + unsigned nodesize = rbuf_int(rb); DISKOFF translation_address_on_disk = rbuf_diskoff(rb); DISKOFF translation_size_on_disk = rbuf_diskoff(rb); lazy_assert(translation_address_on_disk > 0); lazy_assert(translation_size_on_disk > 0); // initialize the tree lock - toku_ft_init_treelock(h); + toku_ft_init_treelock(ft); + toku_ft_init_reflock(ft); //Load translation table { @@ -213,7 +211,7 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version) } toku_unlock_for_pwrite(); // Create table and read in data. - e = toku_blocktable_create_from_buffer(&h->blocktable, + e = toku_blocktable_create_from_buffer(&ft->blocktable, translation_address_on_disk, translation_size_on_disk, tbuf); @@ -223,73 +221,69 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version) } } - h->root_blocknum = rbuf_blocknum(rb); - h->flags = rbuf_int(rb); - if (h->layout_version_read_from_disk <= FT_LAYOUT_VERSION_13) { + BLOCKNUM root_blocknum = rbuf_blocknum(rb); + unsigned flags = rbuf_int(rb); + if (ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_13) { // deprecate 'TOKU_DB_VALCMP_BUILTIN'. just remove the flag - h->flags &= ~TOKU_DB_VALCMP_BUILTIN_13; + flags &= ~TOKU_DB_VALCMP_BUILTIN_13; } - h->layout_version_original = rbuf_int(rb); - h->build_id_original = rbuf_int(rb); - h->time_of_creation = rbuf_ulonglong(rb); - h->time_of_last_modification = rbuf_ulonglong(rb); - h->time_of_last_verification = 0; - if (h->layout_version_read_from_disk <= FT_LAYOUT_VERSION_18) { + int layout_version_original = rbuf_int(rb); + uint32_t build_id_original = rbuf_int(rb); + uint64_t time_of_creation = rbuf_ulonglong(rb); + uint64_t time_of_last_modification = rbuf_ulonglong(rb); + + if (ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_18) { // 17 was the last version with these fields, we no longer store // them, so read and discard them (void) rbuf_ulonglong(rb); // num_blocks_to_upgrade_13 - if (h->layout_version_read_from_disk >= FT_LAYOUT_VERSION_15) { + if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_15) { (void) rbuf_ulonglong(rb); // num_blocks_to_upgrade_14 } } - if (h->layout_version_read_from_disk >= FT_LAYOUT_VERSION_14) { - rbuf_TXNID(rb, &h->root_xid_that_created); - } else { - // fake creation during the last checkpoint - h->root_xid_that_created = h->checkpoint_lsn.lsn; + // fake creation during the last checkpoint + TXNID root_xid_that_created = checkpoint_lsn.lsn; + if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_14) { + rbuf_TXNID(rb, &root_xid_that_created); } - if (h->layout_version_read_from_disk >= FT_LAYOUT_VERSION_15) { - h->basementnodesize = rbuf_int(rb); - h->time_of_last_verification = rbuf_ulonglong(rb); - } else { - h->basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE; - h->time_of_last_verification = 0; + // TODO(leif): get this to default to what's specified, not the + // hard-coded default + unsigned basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE; + uint64_t time_of_last_verification = 0; + if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_15) { + basementnodesize = rbuf_int(rb); + time_of_last_verification = rbuf_ulonglong(rb); } - if (h->layout_version_read_from_disk >= FT_LAYOUT_VERSION_18) { - h->on_disk_stats.numrows = rbuf_ulonglong(rb); - h->on_disk_stats.numbytes = rbuf_ulonglong(rb); - h->in_memory_stats = h->on_disk_stats; - h->time_of_last_optimize_begin = rbuf_ulonglong(rb); - h->time_of_last_optimize_end = rbuf_ulonglong(rb); - h->count_of_optimize_in_progress = rbuf_int(rb); - h->count_of_optimize_in_progress_read_from_disk = h->count_of_optimize_in_progress; - h->msn_at_start_of_last_completed_optimize = rbuf_msn(rb); - } else { - e = toku_upgrade_subtree_estimates_to_stat64info(fd, h); - if (e != DS_OK) { - goto exit; - } - h->time_of_last_optimize_begin = 0; - h->time_of_last_optimize_end = 0; - h->count_of_optimize_in_progress = 0; - h->count_of_optimize_in_progress_read_from_disk = 0; - h->msn_at_start_of_last_completed_optimize = ZERO_MSN; + STAT64INFO_S on_disk_stats = ZEROSTATS; + uint64_t time_of_last_optimize_begin = 0; + uint64_t time_of_last_optimize_end = 0; + uint32_t count_of_optimize_in_progress = 0; + MSN msn_at_start_of_last_completed_optimize = ZERO_MSN; + if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_18) { + on_disk_stats.numrows = rbuf_ulonglong(rb); + on_disk_stats.numbytes = rbuf_ulonglong(rb); + ft->in_memory_stats = on_disk_stats; + time_of_last_optimize_begin = rbuf_ulonglong(rb); + time_of_last_optimize_end = rbuf_ulonglong(rb); + count_of_optimize_in_progress = rbuf_int(rb); + msn_at_start_of_last_completed_optimize = rbuf_msn(rb); } - if (h->layout_version_read_from_disk >= FT_LAYOUT_VERSION_19) { + + enum toku_compression_method compression_method; + MSN highest_unused_msn_for_upgrade = (MSN) { .msn = (MIN_MSN.msn - 1) }; + if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_19) { unsigned char method = rbuf_char(rb); - h->compression_method = (enum toku_compression_method) method; - h->highest_unused_msn_for_upgrade = rbuf_msn(rb); + compression_method = (enum toku_compression_method) method; + highest_unused_msn_for_upgrade = rbuf_msn(rb); } else { // we hard coded zlib until 5.2, then quicklz in 5.2 - if (h->layout_version_read_from_disk < FT_LAYOUT_VERSION_18) { - h->compression_method = TOKU_ZLIB_METHOD; + if (ft->layout_version_read_from_disk < FT_LAYOUT_VERSION_18) { + compression_method = TOKU_ZLIB_METHOD; } else { - h->compression_method = TOKU_QUICKLZ_METHOD; + compression_method = TOKU_QUICKLZ_METHOD; } - h->highest_unused_msn_for_upgrade.msn = MIN_MSN.msn - 1; } (void) rbuf_int(rb); //Read in checksum and ignore (already verified). @@ -300,21 +294,57 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version) goto exit; } - invariant(h); - invariant((uint32_t) h->layout_version_read_from_disk == version); - e = deserialize_descriptor_from(fd, h->blocktable, &h->descriptor, version); + struct ft_header h = { + .type = FT_CURRENT, + .dirty = 0, + .checkpoint_count = checkpoint_count, + .checkpoint_lsn = checkpoint_lsn, + .layout_version = FT_LAYOUT_VERSION, + .layout_version_original = layout_version_original, + .build_id = build_id, + .build_id_original = build_id_original, + .time_of_creation = time_of_creation, + .root_xid_that_created = root_xid_that_created, + .time_of_last_modification = time_of_last_modification, + .time_of_last_verification = time_of_last_verification, + .root_blocknum = root_blocknum, + .flags = flags, + .nodesize = nodesize, + .basementnodesize = basementnodesize, + .compression_method = compression_method, + .highest_unused_msn_for_upgrade = highest_unused_msn_for_upgrade, + .time_of_last_optimize_begin = time_of_last_optimize_begin, + .time_of_last_optimize_end = time_of_last_optimize_end, + .count_of_optimize_in_progress = count_of_optimize_in_progress, + .count_of_optimize_in_progress_read_from_disk = count_of_optimize_in_progress, + .msn_at_start_of_last_completed_optimize = msn_at_start_of_last_completed_optimize, + .on_disk_stats = on_disk_stats + }; + ft->h = toku_xmemdup(&h, sizeof h); + + if (ft->layout_version_read_from_disk < FT_LAYOUT_VERSION_18) { + // This needs ft->h to be non-null, so we have to do it after we + // read everything else. + e = toku_upgrade_subtree_estimates_to_stat64info(fd, ft); + if (e != DS_OK) { + goto exit; + } + } + + invariant((uint32_t) ft->layout_version_read_from_disk == version); + e = deserialize_descriptor_from(fd, ft->blocktable, &ft->descriptor, version); if (e != DS_OK) { goto exit; } // copy descriptor to cmp_descriptor for #4541 - h->cmp_descriptor.dbt.size = h->descriptor.dbt.size; - h->cmp_descriptor.dbt.data = toku_xmemdup(h->descriptor.dbt.data, h->descriptor.dbt.size); + ft->cmp_descriptor.dbt.size = ft->descriptor.dbt.size; + ft->cmp_descriptor.dbt.data = toku_xmemdup(ft->descriptor.dbt.data, ft->descriptor.dbt.size); // Version 13 descriptors had an extra 4 bytes that we don't read // anymore. Since the header is going to think it's the current // version if it gets written out, we need to write the descriptor in // the new format (without those bytes) before that happens. if (version <= FT_LAYOUT_VERSION_13) { - r = toku_update_descriptor(h, &h->cmp_descriptor, fd); + r = toku_update_descriptor(ft, &ft->cmp_descriptor, fd); if (r != 0) { errno = r; e = DS_ERRNO; @@ -322,11 +352,11 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ft, uint32_t version) } } exit: - if (e != DS_OK && h != NULL) { - toku_free(h); - h = NULL; + if (e != DS_OK && ft != NULL) { + toku_free(ft); + ft = NULL; } - *ft = h; + *ftp = ft; return e; } @@ -625,7 +655,7 @@ exit: } -int toku_serialize_ft_size (FT h) { +int toku_serialize_ft_size (FT_HEADER h) { u_int32_t size = serialize_ft_min_size(h->layout_version); //There is no dynamic data. lazy_assert(size <= BLOCK_ALLOCATOR_HEADER_RESERVE); @@ -633,7 +663,13 @@ int toku_serialize_ft_size (FT h) { } -int toku_serialize_ft_to_wbuf (struct wbuf *wbuf, FT h, DISKOFF translation_location_on_disk, DISKOFF translation_size_on_disk) { +int toku_serialize_ft_to_wbuf ( + struct wbuf *wbuf, + FT_HEADER h, + DISKOFF translation_location_on_disk, + DISKOFF translation_size_on_disk + ) +{ wbuf_literal_bytes(wbuf, "tokudata", 8); wbuf_network_int (wbuf, h->layout_version); //MUST be in network order regardless of disk order wbuf_network_int (wbuf, BUILD_ID); //MUST be in network order regardless of disk order @@ -643,7 +679,6 @@ int toku_serialize_ft_to_wbuf (struct wbuf *wbuf, FT h, DISKOFF translation_loca wbuf_LSN (wbuf, h->checkpoint_lsn); wbuf_int (wbuf, h->nodesize); - //printf("%s:%d bta=%lu size=%lu\n", __FILE__, __LINE__, h->block_translation_address_on_disk, 4 + 16*h->translated_blocknum_limit); wbuf_DISKOFF(wbuf, translation_location_on_disk); wbuf_DISKOFF(wbuf, translation_size_on_disk); wbuf_BLOCKNUM(wbuf, h->root_blocknum); @@ -655,8 +690,8 @@ int toku_serialize_ft_to_wbuf (struct wbuf *wbuf, FT h, DISKOFF translation_loca wbuf_TXNID(wbuf, h->root_xid_that_created); wbuf_int(wbuf, h->basementnodesize); wbuf_ulonglong(wbuf, h->time_of_last_verification); - wbuf_ulonglong(wbuf, h->checkpoint_staging_stats.numrows); - wbuf_ulonglong(wbuf, h->checkpoint_staging_stats.numbytes); + wbuf_ulonglong(wbuf, h->on_disk_stats.numrows); + wbuf_ulonglong(wbuf, h->on_disk_stats.numbytes); wbuf_ulonglong(wbuf, h->time_of_last_optimize_begin); wbuf_ulonglong(wbuf, h->time_of_last_optimize_end); wbuf_int(wbuf, h->count_of_optimize_in_progress); @@ -669,23 +704,21 @@ int toku_serialize_ft_to_wbuf (struct wbuf *wbuf, FT h, DISKOFF translation_loca return 0; } -int toku_serialize_ft_to (int fd, FT h) { +int toku_serialize_ft_to (int fd, FT_HEADER h, BLOCK_TABLE blocktable, CACHEFILE cf) { int rr = 0; - if (h->panic) return h->panic; lazy_assert(h->type==FT_CHECKPOINT_INPROGRESS); - toku_ft_lock(h); struct wbuf w_translation; int64_t size_translation; int64_t address_translation; { //Must serialize translation first, to get address,size for header. - toku_serialize_translation_to_wbuf_unlocked(h->blocktable, &w_translation, + toku_serialize_translation_to_wbuf(blocktable, &w_translation, &address_translation, &size_translation); lazy_assert(size_translation==w_translation.size); } struct wbuf w_main; - unsigned int size_main = toku_serialize_ft_size (h); + unsigned int size_main = toku_serialize_ft_size(h); { wbuf_init(&w_main, toku_xmalloc(size_main), size_main); { @@ -694,7 +727,6 @@ int toku_serialize_ft_to (int fd, FT h) { } lazy_assert(w_main.ndone==size_main); } - toku_ft_unlock(h); toku_lock_for_pwrite(); { //Actual Write translation table @@ -708,8 +740,8 @@ int toku_serialize_ft_to (int fd, FT h) { //If the header has a cachefile we need to do cachefile fsync (to //prevent crash if we redirected to dev null) //If there is no cachefile we still need to do an fsync. - if (h->cf) { - rr = toku_cachefile_fsync(h->cf); + if (cf) { + rr = toku_cachefile_fsync(cf); } else { rr = toku_file_fsync(fd); diff --git a/ft/ft-test-helpers.c b/ft/ft-test-helpers.c index 594a6c31b52..5bbb48b3735 100644 --- a/ft/ft-test-helpers.c +++ b/ft/ft-test-helpers.c @@ -74,7 +74,7 @@ int toku_testsetup_nonleaf (FT_HANDLE brt, int height, BLOCKNUM *blocknum, int n int toku_testsetup_root(FT_HANDLE brt, BLOCKNUM blocknum) { assert(testsetup_initialized); - brt->ft->root_blocknum = blocknum; + brt->ft->h->root_blocknum = blocknum; return 0; } diff --git a/ft/ft-verify.c b/ft/ft-verify.c index 6977dd592e0..88ea4b5b41c 100644 --- a/ft/ft-verify.c +++ b/ft/ft-verify.c @@ -410,8 +410,8 @@ toku_verify_ft_with_progress (FT_HANDLE brt, int (*progress_callback)(void *extr int r = toku_verify_ftnode(brt, ZERO_MSN, ZERO_MSN, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going); if (r == 0) { toku_ft_lock(brt->ft); - brt->ft->time_of_last_verification = time(NULL); - brt->ft->dirty = 1; + brt->ft->h->time_of_last_verification = time(NULL); + brt->ft->h->dirty = 1; toku_ft_unlock(brt->ft); } return r; diff --git a/ft/ft.c b/ft/ft.c index 535cc6d8f0a..4bc336653e0 100644 --- a/ft/ft.c +++ b/ft/ft.c @@ -14,89 +14,100 @@ toku_ft_suppress_rollbacks(FT h, TOKUTXN txn) { assert(h->txnid_that_created_or_locked_when_empty == TXNID_NONE || h->txnid_that_created_or_locked_when_empty == txnid); h->txnid_that_created_or_locked_when_empty = txnid; - TXNID rootid = toku_txn_get_root_txnid(txn); - assert(h->root_that_created_or_locked_when_empty == TXNID_NONE || - h->root_that_created_or_locked_when_empty == rootid); - h->root_that_created_or_locked_when_empty = rootid; } void -toku_reset_root_xid_that_created(FT h, TXNID new_root_xid_that_created) { +toku_reset_root_xid_that_created(FT ft, TXNID new_root_xid_that_created) { // Reset the root_xid_that_created field to the given value. // This redefines which xid created the dictionary. // hold lock around setting and clearing of dirty bit // (see cooperative use of dirty bit in ft_begin_checkpoint()) - toku_ft_lock (h); - h->root_xid_that_created = new_root_xid_that_created; - h->dirty = 1; - toku_ft_unlock (h); + toku_ft_lock (ft); + ft->h->root_xid_that_created = new_root_xid_that_created; + ft->h->dirty = 1; + toku_ft_unlock (ft); } static void -ft_destroy(FT h) { - if (!h->panic) assert(!h->checkpoint_header); +ft_destroy(FT ft) { + if (!ft->panic) assert(!ft->checkpoint_header); //header and checkpoint_header have same Blocktable pointer //cannot destroy since it is still in use by CURRENT - if (h->type == FT_CHECKPOINT_INPROGRESS) h->blocktable = NULL; - else { - assert(h->type == FT_CURRENT); - toku_blocktable_destroy(&h->blocktable); - if (h->descriptor.dbt.data) toku_free(h->descriptor.dbt.data); - if (h->cmp_descriptor.dbt.data) toku_free(h->cmp_descriptor.dbt.data); - toku_ft_destroy_treelock(h); - toku_omt_destroy(&h->txns); - } + assert(ft->h->type == FT_CURRENT); + toku_blocktable_destroy(&ft->blocktable); + if (ft->descriptor.dbt.data) toku_free(ft->descriptor.dbt.data); + if (ft->cmp_descriptor.dbt.data) toku_free(ft->cmp_descriptor.dbt.data); + toku_ft_destroy_treelock(ft); + toku_ft_destroy_reflock(ft); + toku_omt_destroy(&ft->txns); + toku_free(ft->h); } // Make a copy of the header for the purpose of a checkpoint +// Not reentrant for a single FT. +// See ft_checkpoint for explanation of why +// FT lock must be held. static void -ft_copy_for_checkpoint(FT h, LSN checkpoint_lsn) { - assert(h->type == FT_CURRENT); - assert(h->checkpoint_header == NULL); - assert(h->panic==0); +ft_copy_for_checkpoint_unlocked(FT ft, LSN checkpoint_lsn) { + assert(ft->h->type == FT_CURRENT); + assert(ft->checkpoint_header == NULL); + assert(ft->panic==0); - FT XMALLOC(ch); - *ch = *h; //Do a shallow copy + FT_HEADER ch = toku_xmemdup(ft->h, sizeof *ft->h); ch->type = FT_CHECKPOINT_INPROGRESS; //Different type //printf("checkpoint_lsn=%" PRIu64 "\n", checkpoint_lsn.lsn); ch->checkpoint_lsn = checkpoint_lsn; - ch->panic_string = NULL; //ch->blocktable is SHARED between the two headers - h->checkpoint_header = ch; -} - -static void -ft_free(FT h) { - ft_destroy(h); - toku_free(h); + ft->checkpoint_header = ch; } void -toku_ft_free (FT h) { - ft_free(h); +toku_ft_free (FT ft) { + ft_destroy(ft); + toku_free(ft); } void -toku_ft_init_treelock(FT h) { - toku_mutex_init(&h->tree_lock, NULL); +toku_ft_init_treelock(FT ft) { + toku_mutex_init(&ft->tree_lock, NULL); } void -toku_ft_destroy_treelock(FT h) { - toku_mutex_destroy(&h->tree_lock); +toku_ft_destroy_treelock(FT ft) { + toku_mutex_destroy(&ft->tree_lock); } void -toku_ft_grab_treelock(FT h) { - toku_mutex_lock(&h->tree_lock); +toku_ft_grab_treelock(FT ft) { + toku_mutex_lock(&ft->tree_lock); } void -toku_ft_release_treelock(FT h) { - toku_mutex_unlock(&h->tree_lock); +toku_ft_release_treelock(FT ft) { + toku_mutex_unlock(&ft->tree_lock); +} + +void +toku_ft_init_reflock(FT ft) { + toku_mutex_init(&ft->ft_ref_lock, NULL); +} + +void +toku_ft_destroy_reflock(FT ft) { + toku_mutex_destroy(&ft->ft_ref_lock); +} + +void +toku_ft_grab_reflock(FT ft) { + toku_mutex_lock(&ft->ft_ref_lock); +} + +void +toku_ft_release_reflock(FT ft) { + toku_mutex_unlock(&ft->ft_ref_lock); } ///////////////////////////////////////////////////////////////////////// @@ -106,13 +117,13 @@ toku_ft_release_treelock(FT h) { // maps to cf->log_fassociate_during_checkpoint static int ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) { - FT h = header_v; + FT ft = header_v; char* fname_in_env = toku_cachefile_fname_in_env(cf); BYTESTRING bs = { strlen(fname_in_env), // don't include the NUL fname_in_env }; TOKULOGGER logger = toku_cachefile_logger(cf); FILENUM filenum = toku_cachefile_filenum (cf); - int r = toku_log_fassociate(logger, NULL, 0, filenum, h->flags, bs); + int r = toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs); return r; } @@ -133,43 +144,64 @@ ft_log_suppress_rollback_during_checkpoint (CACHEFILE cf, void *header_v) { // Maps to cf->begin_checkpoint_userdata // Create checkpoint-in-progress versions of header and translation (btt) (and fifo for now...). -//Has access to fd (it is protected). +// Has access to fd (it is protected). +// +// Not reentrant for a single FT (see ft_checkpoint) static int ft_begin_checkpoint (LSN checkpoint_lsn, void *header_v) { - FT h = header_v; - int r = h->panic; + FT ft = header_v; + int r = ft->panic; if (r==0) { // hold lock around copying and clearing of dirty bit - toku_ft_lock (h); - assert(h->type == FT_CURRENT); - assert(h->checkpoint_header == NULL); - ft_copy_for_checkpoint(h, checkpoint_lsn); - h->dirty = 0; // this is only place this bit is cleared (in currentheader) - // on_disk_stats includes on disk changes since last checkpoint, - // so checkpoint_staging_stats now includes changes for checkpoint in progress. - h->checkpoint_staging_stats = h->on_disk_stats; - toku_block_translation_note_start_checkpoint_unlocked(h->blocktable); - toku_ft_unlock (h); + toku_ft_lock (ft); + assert(ft->h->type == FT_CURRENT); + assert(ft->checkpoint_header == NULL); + ft_copy_for_checkpoint_unlocked(ft, checkpoint_lsn); + ft->h->dirty = 0; // this is only place this bit is cleared (in currentheader) + toku_block_translation_note_start_checkpoint_unlocked(ft->blocktable); + toku_ft_unlock (ft); } return r; } +// #4922: Hack to remove data corruption race condition. +// Reading (and upgrading) a node up to version 19 causes this. +// We COULD skip this if we know that no nodes remained (as of last checkpoint) +// that are below version 19. +// If there are no nodes < version 19 this is harmless (field is unused). +// If there are, this will make certain the value is at least as low as necessary, +// and not much lower. (Too low is good, too high can cause data corruption). +// TODO(yoni): If we ever stop supporting upgrades of nodes < version 19 we can delete this. +// TODO(yoni): If we know no nodes are left to upgrade, we can skip this. (Probably not worth doing). +static void +ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(FT ft) { + if (ft->h->layout_version_original < FT_LAYOUT_VERSION_19) { + ft->checkpoint_header->highest_unused_msn_for_upgrade = ft->h->highest_unused_msn_for_upgrade; + } +} + // maps to cf->checkpoint_userdata // Write checkpoint-in-progress versions of header and translation to disk (really to OS internal buffer). // Copy current header's version of checkpoint_staging stat64info to checkpoint header. // Must have access to fd (protected). // Requires: all pending bits are clear. This implies that no thread will modify the checkpoint_staging // version of the stat64info. +// +// No locks are taken for checkpoint_count/lsn because this is single threaded. Can be called by: +// - ft_close +// - end_checkpoint +// checkpoints hold references to FTs and so they cannot be closed during a checkpoint. +// ft_close is not reentrant for a single FT +// end_checkpoint is not reentrant period static int ft_checkpoint (CACHEFILE cf, int fd, void *header_v) { - FT h = header_v; - FT ch = h->checkpoint_header; + FT ft = header_v; + FT_HEADER ch = ft->checkpoint_header; int r = 0; - if (h->panic!=0) goto handle_error; + if (ft->panic!=0) goto handle_error; //printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__, // block_allocator_allocated_limit(h->block_allocator), h->unused_blocks.b*h->nodesize); assert(ch); - if (ch->panic!=0) goto handle_error; assert(ch->type == FT_CHECKPOINT_INPROGRESS); if (ch->dirty) { // this is only place this bit is tested (in checkpoint_header) TOKULOGGER logger = toku_cachefile_logger(cf); @@ -178,22 +210,13 @@ ft_checkpoint (CACHEFILE cf, int fd, void *header_v) { if (r!=0) goto handle_error; } uint64_t now = (uint64_t) time(NULL); // 4018; - h->time_of_last_modification = now; + ft->h->time_of_last_modification = now; ch->time_of_last_modification = now; ch->checkpoint_count++; - // Threadsafety of checkpoint_staging_stats here depends on there being no pending bits set, - // so that all callers to flush callback should have the for_checkpoint argument false, - // and therefore will not modify the checkpoint_staging_stats. - // TODO 4184: If the flush callback is called with the for_checkpoint argument true even when all the pending bits - // are clear, then this is a problem. - ch->checkpoint_staging_stats = h->checkpoint_staging_stats; - // The in_memory_stats and on_disk_stats in the checkpoint header should be ignored, but we set them - // here just in case the serializer looks in the wrong place. - ch->in_memory_stats = ch->checkpoint_staging_stats; - ch->on_disk_stats = ch->checkpoint_staging_stats; + ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(ft); // write translation and header to disk (or at least to OS internal buffer) - r = toku_serialize_ft_to(fd, ch); + r = toku_serialize_ft_to(fd, ch, ft->blocktable, ft->cf); if (r!=0) goto handle_error; ch->dirty = 0; // this is only place this bit is cleared (in checkpoint_header) @@ -202,22 +225,16 @@ ft_checkpoint (CACHEFILE cf, int fd, void *header_v) { if (r!=0) { goto handle_error; } - h->checkpoint_count++; // checkpoint succeeded, next checkpoint will save to alternate header location - h->checkpoint_lsn = ch->checkpoint_lsn; //Header updated. + ft->h->checkpoint_count++; // checkpoint succeeded, next checkpoint will save to alternate header location + ft->h->checkpoint_lsn = ch->checkpoint_lsn; //Header updated. } else { - toku_block_translation_note_skipped_checkpoint(ch->blocktable); + toku_block_translation_note_skipped_checkpoint(ft->blocktable); } if (0) { handle_error: - if (h->panic) r = h->panic; - else if (ch->panic) { - r = ch->panic; - //Steal panic string. Cannot afford to malloc. - h->panic = ch->panic; - h->panic_string = ch->panic_string; - } - else toku_block_translation_note_failed_checkpoint(ch->blocktable); + if (ft->panic) r = ft->panic; + else toku_block_translation_note_failed_checkpoint(ft->blocktable); } return r; @@ -229,15 +246,15 @@ handle_error: // Must have access to fd (protected) static int ft_end_checkpoint (CACHEFILE UU(cachefile), int fd, void *header_v) { - FT h = header_v; - int r = h->panic; + FT ft = header_v; + int r = ft->panic; if (r==0) { - assert(h->type == FT_CURRENT); - toku_block_translation_note_end_checkpoint(h->blocktable, fd, h); + assert(ft->h->type == FT_CURRENT); + toku_block_translation_note_end_checkpoint(ft->blocktable, fd, ft); } - if (h->checkpoint_header) { // could be NULL only if panic was true at begin_checkpoint - ft_free(h->checkpoint_header); - h->checkpoint_header = NULL; + if (ft->checkpoint_header) { // could be NULL only if panic was true at begin_checkpoint + toku_free(ft->checkpoint_header); + ft->checkpoint_header = NULL; } return r; } @@ -246,16 +263,16 @@ ft_end_checkpoint (CACHEFILE UU(cachefile), int fd, void *header_v) { // Has access to fd (it is protected). static int ft_close (CACHEFILE cachefile, int fd, void *header_v, char **malloced_error_string, BOOL oplsn_valid, LSN oplsn) { - FT h = header_v; - assert(h->type == FT_CURRENT); - toku_ft_lock(h); - assert(!toku_ft_needed(h)); - toku_ft_unlock(h); + FT ft = header_v; + assert(ft->h->type == FT_CURRENT); + // We already have exclusive access to this field already, so skip the locking. + // This should already never fail. + invariant(!toku_ft_needed_unlocked(ft)); int r = 0; - if (h->panic) { - r = h->panic; + if (ft->panic) { + r = ft->panic; } else { - assert(h->cf == cachefile); + assert(ft->cf == cachefile); TOKULOGGER logger = toku_cachefile_logger(cachefile); LSN lsn = ZERO_LSN; //Get LSN @@ -263,8 +280,8 @@ ft_close (CACHEFILE cachefile, int fd, void *header_v, char **malloced_error_str //Use recovery-specified lsn lsn = oplsn; //Recovery cannot reduce lsn of a header. - if (lsn.lsn < h->checkpoint_lsn.lsn) - lsn = h->checkpoint_lsn; + if (lsn.lsn < ft->h->checkpoint_lsn.lsn) + lsn = ft->h->checkpoint_lsn; } else { //Get LSN from logger @@ -273,11 +290,11 @@ ft_close (CACHEFILE cachefile, int fd, void *header_v, char **malloced_error_str char* fname_in_env = toku_cachefile_fname_in_env(cachefile); assert(fname_in_env); BYTESTRING bs = {.len=strlen(fname_in_env), .data=fname_in_env}; - r = toku_log_fclose(logger, &lsn, h->dirty, bs, toku_cachefile_filenum(cachefile)); // flush the log on close (if new header is being written), otherwise it might not make it out. + r = toku_log_fclose(logger, &lsn, ft->h->dirty, bs, toku_cachefile_filenum(cachefile)); // flush the log on close (if new header is being written), otherwise it might not make it out. if (r!=0) return r; } } - if (h->dirty) { // this is the only place this bit is tested (in currentheader) + if (ft->h->dirty) { // this is the only place this bit is tested (in currentheader) if (logger) { //Rollback cachefile MUST NOT BE CLOSED DIRTY //It can be checkpointed only via 'checkpoint' assert(logger->rollback_cachefile != cachefile); @@ -286,18 +303,18 @@ ft_close (CACHEFILE cachefile, int fd, void *header_v, char **malloced_error_str //assert(lsn.lsn!=0); r2 = ft_begin_checkpoint(lsn, header_v); if (r==0) r = r2; - r2 = ft_checkpoint(cachefile, fd, h); + r2 = ft_checkpoint(cachefile, fd, ft); if (r==0) r = r2; r2 = ft_end_checkpoint(cachefile, fd, header_v); if (r==0) r = r2; - if (!h->panic) assert(!h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary) + if (!ft->panic) assert(!ft->h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary) } } - if (malloced_error_string) *malloced_error_string = h->panic_string; + if (malloced_error_string) *malloced_error_string = ft->panic_string; if (r == 0) { - r = h->panic; + r = ft->panic; } - toku_ft_free(h); + toku_ft_free(ft); return r; } @@ -309,84 +326,77 @@ ft_note_pin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) { //Set arbitrary brt (for given header) as pinned by checkpoint. //Only one can be pinned (only one checkpoint at a time), but not worth verifying. - FT h = header_v; - assert(!h->pinned_by_checkpoint); - h->pinned_by_checkpoint = true; + FT ft = header_v; + + // Note: open_close lock is held by checkpoint begin + toku_ft_grab_reflock(ft); + assert(!ft->pinned_by_checkpoint); + assert(toku_ft_needed_unlocked(ft)); + ft->pinned_by_checkpoint = true; + toku_ft_release_reflock(ft); return 0; } +static void +unpin_by_checkpoint_callback(FT ft, void *extra) { + invariant(extra == NULL); + invariant(ft->pinned_by_checkpoint); + ft->pinned_by_checkpoint = false; //Unpin +} + // maps to cf->note_unpin_by_checkpoint //Must be protected by ydb lock. //Called by end_checkpoint, which grabs ydb lock around note_unpin static int ft_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) { - FT h = header_v; - assert(h->pinned_by_checkpoint); - h->pinned_by_checkpoint = false; //Unpin - int r = 0; - //Close if necessary - if (!toku_ft_needed(h)) { - //Close immediately. - char *error_string = NULL; - r = toku_remove_ft(h, &error_string, false, ZERO_LSN); - lazy_assert_zero(r); - } - return r; - + FT ft = header_v; + toku_ft_remove_reference(ft, false, ZERO_LSN, unpin_by_checkpoint_callback, NULL); + return 0; } // // End of Functions that are callbacks to the cachefile ///////////////////////////////////////////////////////////////////////// -static int setup_initial_ft_root_node (FT h, BLOCKNUM blocknum) { +static int setup_initial_ft_root_node (FT ft, BLOCKNUM blocknum) { FTNODE XMALLOC(node); - toku_initialize_empty_ftnode(node, blocknum, 0, 1, h->layout_version, h->nodesize, h->flags); + toku_initialize_empty_ftnode(node, blocknum, 0, 1, ft->h->layout_version, ft->h->nodesize, ft->h->flags); BP_STATE(node,0) = PT_AVAIL; - u_int32_t fullhash = toku_cachetable_hash(h->cf, blocknum); + u_int32_t fullhash = toku_cachetable_hash(ft->cf, blocknum); node->fullhash = fullhash; - int r = toku_cachetable_put(h->cf, blocknum, fullhash, + int r = toku_cachetable_put(ft->cf, blocknum, fullhash, node, make_ftnode_pair_attr(node), - get_write_callbacks_for_node(h)); + get_write_callbacks_for_node(ft)); if (r != 0) toku_free(node); else - toku_unpin_ftnode(h, node); + toku_unpin_ftnode(ft, node); return r; } static int -ft_init (FT ft, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) { - ft->type = FT_CURRENT; +ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) { ft->checkpoint_header = NULL; - toku_ft_init_treelock(ft); - toku_blocktable_create_new(&ft->blocktable); - //Assign blocknum for root block, also dirty the header - toku_allocate_blocknum(ft->blocktable, &ft->root_blocknum, ft); + ft->layout_version_read_from_disk = FT_LAYOUT_VERSION; // fake, prevent unnecessary upgrade logic toku_list_init(&ft->live_ft_handles); int r = toku_omt_create(&ft->txns); assert_zero(r); - ft->flags = options->flags; - ft->nodesize = options->nodesize; - ft->basementnodesize = options->basementnodesize; - ft->compression_method = options->compression_method; + ft->compare_fun = options->compare_fun; ft->update_fun = options->update_fun; - if (ft->cf!=NULL) assert(ft->cf == cf); + if (ft->cf != NULL) { + assert(ft->cf == cf); + } ft->cf = cf; - ft->root_xid_that_created = txn ? txn->ancestor_txnid64 : TXNID_NONE; - ft->in_memory_stats = ZEROSTATS; - ft->on_disk_stats = ZEROSTATS; - ft->checkpoint_staging_stats = ZEROSTATS; - ft->highest_unused_msn_for_upgrade.msn = MIN_MSN.msn - 1; + ft->in_memory_stats = ZEROSTATS; - r = setup_initial_ft_root_node(ft, ft->root_blocknum); - if (r != 0) { - goto exit; + r = setup_initial_ft_root_node(ft, ft->h->root_blocknum); + if (r != 0) { + goto exit; } //printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, ft, 0); toku_cachefile_set_userdata(ft->cf, @@ -407,32 +417,60 @@ exit: } -// allocate and initialize a brt header. -// t->ft->cf is not set to anything. -int +static FT_HEADER +ft_header_new(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that_created) +{ + uint64_t now = (uint64_t) time(NULL); + struct ft_header h = { + .type = FT_CURRENT, + .dirty = 0, + .checkpoint_count = 0, + .checkpoint_lsn = ZERO_LSN, + .layout_version = FT_LAYOUT_VERSION, + .layout_version_original = FT_LAYOUT_VERSION, + .build_id = BUILD_ID, + .build_id_original = BUILD_ID, + .time_of_creation = now, + .root_xid_that_created = root_xid_that_created, + .time_of_last_modification = now, + .time_of_last_verification = 0, + .root_blocknum = root_blocknum, + .flags = options->flags, + .nodesize = options->nodesize, + .basementnodesize = options->basementnodesize, + .compression_method = options->compression_method, + .highest_unused_msn_for_upgrade = { .msn = (MIN_MSN.msn - 1) }, + .time_of_last_optimize_begin = 0, + .time_of_last_optimize_end = 0, + .count_of_optimize_in_progress = 0, + .count_of_optimize_in_progress_read_from_disk = 0, + .msn_at_start_of_last_completed_optimize = ZERO_MSN, + .on_disk_stats = ZEROSTATS + }; + return toku_xmemdup(&h, sizeof h); +} + +// allocate and initialize a fractal tree. +// t->ft->cf is not set to anything. TODO(leif): I don't think that's true +int toku_create_new_ft(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn) { int r; invariant(ftp); FT XCALLOC(ft); - - - ft->layout_version = FT_LAYOUT_VERSION; - ft->layout_version_original = FT_LAYOUT_VERSION; - ft->layout_version_read_from_disk = FT_LAYOUT_VERSION; // fake, prevent unnecessary upgrade logic - - ft->build_id = BUILD_ID; - ft->build_id_original = BUILD_ID; - - uint64_t now = (uint64_t) time(NULL); - ft->time_of_creation = now; - ft->time_of_last_modification = now; - ft->time_of_last_verification = 0; memset(&ft->descriptor, 0, sizeof(ft->descriptor)); memset(&ft->cmp_descriptor, 0, sizeof(ft->cmp_descriptor)); - r = ft_init(ft, options, cf, txn); + ft->h = ft_header_new(options, make_blocknum(0), (txn ? txn->ancestor_txnid64 : TXNID_NONE)); + + toku_ft_init_treelock(ft); + toku_ft_init_reflock(ft); + toku_blocktable_create_new(&ft->blocktable); + //Assign blocknum for root block, also dirty the header + toku_allocate_blocknum(ft->blocktable, &ft->h->root_blocknum, ft); + + r = ft_init(ft, options, cf); if (r != 0) { goto exit; } @@ -504,22 +542,30 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac void toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live) { - toku_ft_lock(ft); + toku_ft_grab_reflock(ft); live->ft = ft; toku_list_push(&ft->live_ft_handles, &live->live_ft_handle_link); - toku_ft_unlock(ft); + toku_ft_release_reflock(ft); } int -toku_ft_needed(FT h) { +toku_ft_needed_unlocked(FT h) { return !toku_list_empty(&h->live_ft_handles) || toku_omt_size(h->txns) != 0 || h->pinned_by_checkpoint; } +BOOL +toku_ft_has_one_reference_unlocked(FT ft) { + u_int32_t pinned_by_checkpoint = ft->pinned_by_checkpoint ? 1 : 0; + u_int32_t num_txns = toku_omt_size(ft->txns); + int num_handles = toku_list_num_elements_est(&ft->live_ft_handles); + return ((pinned_by_checkpoint + num_txns + num_handles) == 1); +} + + // Close brt. If opsln_valid, use given oplsn as lsn in brt header instead of logging // the close and using the lsn provided by logging the close. (Subject to constraint // that if a newer lsn is already in the dictionary, don't overwrite the dictionary.) int toku_remove_ft (FT h, char **error_string, BOOL oplsn_valid, LSN oplsn) { - assert(!h->pinned_by_checkpoint); int r = 0; // Must do this work before closing the cf if (h->cf) { @@ -534,11 +580,11 @@ int toku_remove_ft (FT h, char **error_string, BOOL oplsn_valid, LSN oplsn) { // for this header, returns NULL FT_HANDLE toku_ft_get_some_existing_ft_handle(FT h) { FT_HANDLE ft_handle_ret = NULL; - toku_ft_lock(h); + toku_ft_grab_reflock(h); if (!toku_list_empty(&h->live_ft_handles)) { ft_handle_ret = toku_list_struct(toku_list_head(&h->live_ft_handles), struct ft_handle, live_ft_handle_link); } - toku_ft_unlock(h); + toku_ft_release_reflock(h); return ft_handle_ret; } @@ -548,16 +594,16 @@ FT_HANDLE toku_ft_get_some_existing_ft_handle(FT h) { // convenient here for keeping the HOT variables threadsafe.) void toku_ft_note_hot_begin(FT_HANDLE brt) { - FT h = brt->ft; + FT ft = brt->ft; time_t now = time(NULL); // hold lock around setting and clearing of dirty bit // (see cooperative use of dirty bit in ft_begin_checkpoint()) - toku_ft_lock(h); - h->time_of_last_optimize_begin = now; - h->count_of_optimize_in_progress++; - h->dirty = 1; - toku_ft_unlock(h); + toku_ft_lock(ft); + ft->h->time_of_last_optimize_begin = now; + ft->h->count_of_optimize_in_progress++; + ft->h->dirty = 1; + toku_ft_unlock(ft); } @@ -565,47 +611,45 @@ toku_ft_note_hot_begin(FT_HANDLE brt) { // Note: See note for toku_ft_note_hot_begin(). void toku_ft_note_hot_complete(FT_HANDLE brt, BOOL success, MSN msn_at_start_of_hot) { - FT h = brt->ft; + FT ft = brt->ft; time_t now = time(NULL); - toku_ft_lock(h); - h->count_of_optimize_in_progress--; + toku_ft_lock(ft); + ft->h->count_of_optimize_in_progress--; if (success) { - h->time_of_last_optimize_end = now; - h->msn_at_start_of_last_completed_optimize = msn_at_start_of_hot; + ft->h->time_of_last_optimize_end = now; + ft->h->msn_at_start_of_last_completed_optimize = msn_at_start_of_hot; // If we just successfully completed an optimization and no other thread is performing // an optimization, then the number of optimizations in progress is zero. // If there was a crash during a HOT optimization, this is how count_of_optimize_in_progress // would be reset to zero on the disk after recovery from that crash. - if (h->count_of_optimize_in_progress == h->count_of_optimize_in_progress_read_from_disk) - h->count_of_optimize_in_progress = 0; + if (ft->h->count_of_optimize_in_progress == ft->h->count_of_optimize_in_progress_read_from_disk) + ft->h->count_of_optimize_in_progress = 0; } - h->dirty = 1; - toku_ft_unlock(h); + ft->h->dirty = 1; + toku_ft_unlock(ft); } void -toku_ft_init(FT h, - BLOCKNUM root_blocknum_on_disk, LSN checkpoint_lsn, TXNID root_xid_that_created, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method compression_method) { - memset(h, 0, sizeof *h); - h->layout_version = FT_LAYOUT_VERSION; - h->layout_version_original = FT_LAYOUT_VERSION; - h->build_id = BUILD_ID; - h->build_id_original = BUILD_ID; - uint64_t now = (uint64_t) time(NULL); - h->time_of_creation = now; - h->time_of_last_modification = now; - h->time_of_last_verification = 0; - h->checkpoint_count = 1; - h->checkpoint_lsn = checkpoint_lsn; - h->nodesize = target_nodesize; - h->basementnodesize = target_basementnodesize; - h->root_blocknum = root_blocknum_on_disk; - h->flags = 0; - h->root_xid_that_created = root_xid_that_created; - h->compression_method = compression_method; - h->highest_unused_msn_for_upgrade.msn = MIN_MSN.msn - 1; +toku_ft_init(FT ft, + BLOCKNUM root_blocknum_on_disk, + LSN checkpoint_lsn, + TXNID root_xid_that_created, + uint32_t target_nodesize, + uint32_t target_basementnodesize, + enum toku_compression_method compression_method) +{ + memset(ft, 0, sizeof *ft); + struct ft_options options = { + .nodesize = target_nodesize, + .basementnodesize = target_basementnodesize, + .compression_method = compression_method, + .flags = 0 + }; + ft->h = ft_header_new(&options, root_blocknum_on_disk, root_xid_that_created); + ft->h->checkpoint_count = 1; + ft->h->checkpoint_lsn = checkpoint_lsn; } // Open a brt for use by redirect. The new brt must have the same dict_id as the old_ft passed in. (FILENUM is assigned by the ft_handle_open() function.) @@ -620,11 +664,11 @@ ft_handle_open_for_redirect(FT_HANDLE *new_ftp, const char *fname_in_env, TOKUTX assert_zero(r); r = toku_ft_set_update(t, old_h->update_fun); assert_zero(r); - r = toku_ft_set_nodesize(t, old_h->nodesize); + r = toku_ft_set_nodesize(t, old_h->h->nodesize); assert_zero(r); - r = toku_ft_set_basementnodesize(t, old_h->basementnodesize); + r = toku_ft_set_basementnodesize(t, old_h->h->basementnodesize); assert_zero(r); - r = toku_ft_set_compression_method(t, old_h->compression_method); + r = toku_ft_set_compression_method(t, old_h->h->compression_method); assert_zero(r); CACHETABLE ct = toku_cachefile_get_cachetable(old_h->cf); r = toku_ft_handle_open_with_dict_id(t, fname_in_env, 0, 0, ct, txn, old_h->dict_id); @@ -662,14 +706,13 @@ dictionary_redirect_internal(const char *dst_fname_in_env, FT src_h, TOKUTXN txn // for each live brt, brt->ft is currently src_h // we want to change it to dummy_dst + toku_ft_grab_reflock(src_h); while (!toku_list_empty(&src_h->live_ft_handles)) { list = src_h->live_ft_handles.next; FT_HANDLE src_handle = NULL; src_handle = toku_list_struct(list, struct ft_handle, live_ft_handle_link); - toku_ft_lock(src_h); toku_list_remove(&src_handle->live_ft_handle_link); - toku_ft_unlock(src_h); toku_ft_note_ft_handle_open(dst_h, src_handle); if (src_handle->redirect_callback) { @@ -677,6 +720,9 @@ dictionary_redirect_internal(const char *dst_fname_in_env, FT src_h, TOKUTXN txn } } assert(dst_h); + // making sure that we are not leaking src_h + assert(toku_ft_needed_unlocked(src_h)); + toku_ft_release_reflock(src_h); r = toku_ft_handle_close(tmp_dst_ft, FALSE, ZERO_LSN); assert_zero(r); @@ -699,23 +745,16 @@ toku_dictionary_redirect_abort(FT old_h, FT new_h, TOKUTXN txn) { assert(old_filenum.fileid!=new_filenum.fileid); //Cannot be same file. //No living brts in old header. + toku_ft_grab_reflock(old_h); assert(toku_list_empty(&old_h->live_ft_handles)); + toku_ft_release_reflock(old_h); } - // If application did not close all DBs using the new file, then there should - // be no zombies and we need to redirect the DBs back to the original file. - if (!toku_list_empty(&new_h->live_ft_handles)) { - FT dst_h; - // redirect back from new_h to old_h - r = dictionary_redirect_internal(old_fname_in_env, new_h, txn, &dst_h); - assert_zero(r); - assert(dst_h == old_h); - } - else { - //No live brts. - //No need to redirect back. - r = 0; - } + FT dst_h; + // redirect back from new_h to old_h + r = dictionary_redirect_internal(old_fname_in_env, new_h, txn, &dst_h); + assert_zero(r); + assert(dst_h == old_h); return r; } @@ -784,7 +823,6 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTX // make rollback log entry if (txn) { - assert(!toku_list_empty(&new_h->live_ft_handles)); r = toku_txn_note_ft(txn, new_h); // mark new brt as touched by this txn FILENUM old_filenum = toku_cachefile_filenum(old_h->cf); @@ -817,7 +855,7 @@ toku_ft_maybe_add_txn_ref(FT h, TOKUTXN txn) { BOOL ref_added = FALSE; OMTVALUE txnv; u_int32_t index; - toku_ft_lock(h); + toku_ft_grab_reflock(h); // Does brt already know about transaction txn? int r = toku_omt_find_zero(h->txns, find_xid, txn, &txnv, &index); if (r==0) { @@ -832,85 +870,80 @@ toku_ft_maybe_add_txn_ref(FT h, TOKUTXN txn) { assert(r==0); ref_added = TRUE; exit: - toku_ft_unlock(h); + toku_ft_release_reflock(h); return ref_added; } -void -toku_ft_remove_txn_ref(FT h, TOKUTXN txn) { +static void +remove_txn_ref_callback(FT ft, void *context) { + TOKUTXN txn = context; OMTVALUE txnv_again=NULL; u_int32_t index; - toku_ft_lock(h); - int r = toku_omt_find_zero(h->txns, find_xid, txn, &txnv_again, &index); + int r = toku_omt_find_zero(ft->txns, find_xid, txn, &txnv_again, &index); assert(r==0); assert(txnv_again == txn); - r = toku_omt_delete_at(h->txns, index); + r = toku_omt_delete_at(ft->txns, index); assert(r==0); - // TODO: (Zardosht) figure out how to properly do this - // below this unlock, are depending on ydb lock - toku_ft_unlock(h); - if (!toku_ft_needed(h)) { - //Close immediately. - // I have no idea how this error string business works - char *error_string = NULL; - r = toku_remove_ft(h, &error_string, false, ZERO_LSN); - lazy_assert_zero(r); - } +} + +void +toku_ft_remove_txn_ref(FT ft, TOKUTXN txn) { + toku_ft_remove_reference(ft, false, ZERO_LSN, remove_txn_ref_callback, txn); } void toku_calculate_root_offset_pointer ( - FT h, + FT ft, CACHEKEY* root_key, u_int32_t *roothash ) { - *roothash = toku_cachetable_hash(h->cf, h->root_blocknum); - *root_key = h->root_blocknum; + *roothash = toku_cachetable_hash(ft->cf, ft->h->root_blocknum); + *root_key = ft->h->root_blocknum; } void toku_ft_set_new_root_blocknum( - FT h, + FT ft, CACHEKEY new_root_key ) { - h->root_blocknum = new_root_key; + ft->h->root_blocknum = new_root_key; } -LSN toku_ft_checkpoint_lsn(FT h) { - return h->checkpoint_lsn; +LSN toku_ft_checkpoint_lsn(FT ft) { + return ft->h->checkpoint_lsn; } -int toku_ft_set_panic(FT h, int panic, char *panic_string) { - if (h->panic == 0) { - h->panic = panic; - if (h->panic_string) { - toku_free(h->panic_string); +int toku_ft_set_panic(FT ft, int panic, char *panic_string) { + if (ft->panic == 0) { + ft->panic = panic; + if (ft->panic_string) { + toku_free(ft->panic_string); } - h->panic_string = toku_strdup(panic_string); + ft->panic_string = toku_strdup(panic_string); } return 0; } void -toku_ft_stat64 (FT h, struct ftstat64_s *s) { - s->fsize = toku_cachefile_size(h->cf); +toku_ft_stat64 (FT ft, struct ftstat64_s *s) { + s->fsize = toku_cachefile_size(ft->cf); // just use the in memory stats from the header // prevent appearance of negative numbers for numrows, numbytes - int64_t n = h->in_memory_stats.numrows; + int64_t n = ft->in_memory_stats.numrows; if (n < 0) { n = 0; } s->nkeys = s->ndata = n; - n = h->in_memory_stats.numbytes; + n = ft->in_memory_stats.numbytes; if (n < 0) { n = 0; } s->dsize = n; // 4018 - s->create_time_sec = h->time_of_creation; - s->modify_time_sec = h->time_of_last_modification; - s->verify_time_sec = h->time_of_last_verification; + s->create_time_sec = ft->h->time_of_creation; + s->modify_time_sec = ft->h->time_of_last_modification; + s->verify_time_sec = ft->h->time_of_last_verification; } // TODO: (Zardosht), once the fdlock has been removed from cachetable, remove @@ -963,3 +996,33 @@ toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) { (void) __sync_fetch_and_sub(&(headerstats->numrows), delta.numrows); (void) __sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes); } + +void +toku_ft_remove_reference(FT ft, bool oplsn_valid, LSN oplsn, remove_ft_ref_callback remove_ref, void *extra) { + toku_ft_grab_reflock(ft); + if (toku_ft_has_one_reference_unlocked(ft)) { + toku_ft_release_reflock(ft); + + toku_ft_open_close_lock(); + toku_ft_grab_reflock(ft); + + remove_ref(ft, extra); + BOOL needed = toku_ft_needed_unlocked(ft); + toku_ft_release_reflock(ft); + if (!needed) { + // close header + char *error_string = NULL; + int r; + r = toku_remove_ft(ft, &error_string, oplsn_valid, oplsn); + assert_zero(r); + assert(error_string == NULL); + } + + toku_ft_open_close_unlock(); + } + else { + remove_ref(ft, extra); + toku_ft_release_reflock(ft); + } +} + diff --git a/ft/ft.h b/ft/ft.h index 7bd3cd9477b..1260453c7a9 100644 --- a/ft/ft.h +++ b/ft/ft.h @@ -22,13 +22,19 @@ void toku_ft_destroy_treelock(FT h); void toku_ft_grab_treelock(FT h); void toku_ft_release_treelock(FT h); +void toku_ft_init_reflock(FT ft); +void toku_ft_destroy_reflock(FT ft); +void toku_ft_grab_reflock(FT ft); +void toku_ft_release_reflock(FT ft); + int toku_create_new_ft(FT *ftp, FT_OPTIONS options, CACHEFILE cf, TOKUTXN txn); void toku_ft_free (FT h); int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_acceptable_lsn, FT *header, BOOL* was_open); void toku_ft_note_ft_handle_open(FT ft, FT_HANDLE live); -int toku_ft_needed(FT h); +int toku_ft_needed_unlocked(FT h); +BOOL toku_ft_has_one_reference_unlocked(FT ft); int toku_remove_ft (FT h, char **error_string, BOOL oplsn_valid, LSN oplsn) __attribute__ ((warn_unused_result)); FT_HANDLE toku_ft_get_some_existing_ft_handle(FT h); @@ -36,14 +42,14 @@ FT_HANDLE toku_ft_get_some_existing_ft_handle(FT h); void toku_ft_note_hot_begin(FT_HANDLE brt); void toku_ft_note_hot_complete(FT_HANDLE brt, BOOL success, MSN msn_at_start_of_hot); -void +void toku_ft_init( FT h, - BLOCKNUM root_blocknum_on_disk, - LSN checkpoint_lsn, - TXNID root_xid_that_created, - uint32_t target_nodesize, - uint32_t target_basementnodesize, + BLOCKNUM root_blocknum_on_disk, + LSN checkpoint_lsn, + TXNID root_xid_that_created, + uint32_t target_nodesize, + uint32_t target_basementnodesize, enum toku_compression_method compression_method ); @@ -71,5 +77,8 @@ void toku_ft_update_cmp_descriptor(FT h); void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta); void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta); +void toku_ft_remove_reference(FT ft, + bool oplsn_valid, LSN oplsn, + remove_ft_ref_callback remove_ref, void *extra); #endif diff --git a/ft/ft_node-serialize.c b/ft/ft_node-serialize.c index aac9f2bad23..411d58ba36b 100644 --- a/ft/ft_node-serialize.c +++ b/ft/ft_node-serialize.c @@ -881,8 +881,8 @@ toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DA int r = toku_serialize_ftnode_to_memory( node, ndd, - h->basementnodesize, - h->compression_method, + h->h->basementnodesize, + h->h->compression_method, do_rebalancing, FALSE, // in_parallel &n_to_write, @@ -1786,7 +1786,7 @@ deserialize_and_upgrade_internal_node(FTNODE node, // of messages in the buffer. MSN lowest; u_int64_t amount = n_in_this_buffer; - lowest.msn = __sync_sub_and_fetch(&bfe->h->highest_unused_msn_for_upgrade.msn, amount); + lowest.msn = __sync_sub_and_fetch(&bfe->h->h->highest_unused_msn_for_upgrade.msn, amount); if (highest_msn.msn == 0) { highest_msn.msn = lowest.msn + n_in_this_buffer; } @@ -2035,7 +2035,7 @@ deserialize_and_upgrade_leaf_node(FTNODE node, // Whatever this is must be less than the MSNs of every message above // it, so it's ok to take it here. - bn->max_msn_applied = bfe->h->highest_unused_msn_for_upgrade; + bn->max_msn_applied = bfe->h->h->highest_unused_msn_for_upgrade; bn->stale_ancestor_messages_applied = false; node->max_msn_applied_to_node_on_disk = bn->max_msn_applied; @@ -2625,7 +2625,7 @@ toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE log size_t n_to_write; char *compressed_buf; { - int r = toku_serialize_rollback_log_to_memory(log, n_workitems, n_threads, h->compression_method, &n_to_write, &compressed_buf); + int r = toku_serialize_rollback_log_to_memory(log, n_workitems, n_threads, h->h->compression_method, &n_to_write, &compressed_buf); if (r!=0) return r; } @@ -2949,9 +2949,9 @@ toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h) FTNODE_DISK_DATA unused_ndd = NULL; struct ftnode_fetch_extra bfe; fill_bfe_for_min_read(&bfe, h); - e = deserialize_ftnode_from_fd(fd, h->root_blocknum, 0, &unused_node, &unused_ndd, - &bfe, &h->on_disk_stats); - h->in_memory_stats = h->on_disk_stats; + e = deserialize_ftnode_from_fd(fd, h->h->root_blocknum, 0, &unused_node, &unused_ndd, + &bfe, &h->h->on_disk_stats); + h->in_memory_stats = h->h->on_disk_stats; if (unused_node) { toku_ftnode_free(&unused_node); diff --git a/ft/ftdump.c b/ft/ftdump.c index f8e2a103fc0..32a0e7f291f 100644 --- a/ft/ftdump.c +++ b/ft/ftdump.c @@ -85,34 +85,34 @@ dump_descriptor(DESCRIPTOR d) { static void dump_header (int f, FT *header, CACHEFILE cf) { - FT h; + FT ft; int r; char timestr[26]; - r = toku_deserialize_ft_from (f, MAX_LSN, &h); + r = toku_deserialize_ft_from (f, MAX_LSN, &ft); assert(r==0); - h->cf = cf; + ft->cf = cf; printf("ft:\n"); - printf(" layout_version=%d\n", h->layout_version); - printf(" layout_version_original=%d\n", h->layout_version_original); - printf(" layout_version_read_from_disk=%d\n", h->layout_version_read_from_disk); - printf(" build_id=%d\n", h->build_id); - printf(" build_id_original=%d\n", h->build_id_original); - format_time(h->time_of_creation, timestr); - printf(" time_of_creation= %"PRIu64" %s\n", h->time_of_creation, timestr); - format_time(h->time_of_last_modification, timestr); - printf(" time_of_last_modification=%"PRIu64" %s\n", h->time_of_last_modification, timestr); - printf(" dirty=%d\n", h->dirty); - printf(" checkpoint_count=%" PRId64 "\n", h->checkpoint_count); - printf(" checkpoint_lsn=%" PRId64 "\n", h->checkpoint_lsn.lsn); - printf(" nodesize=%u\n", h->nodesize); - printf(" basementnodesize=%u\n", h->basementnodesize); - printf(" compression_method=%u\n", (unsigned) h->compression_method); - printf(" unnamed_root=%" PRId64 "\n", h->root_blocknum.b); - printf(" flags=%u\n", h->flags); - dump_descriptor(&h->descriptor); - printf(" estimated numrows=%" PRId64 "\n", h->in_memory_stats.numrows); - printf(" estimated numbytes=%" PRId64 "\n", h->in_memory_stats.numbytes); - *header = h; + printf(" layout_version=%d\n", ft->h->layout_version); + printf(" layout_version_original=%d\n", ft->h->layout_version_original); + printf(" layout_version_read_from_disk=%d\n", ft->layout_version_read_from_disk); + printf(" build_id=%d\n", ft->h->build_id); + printf(" build_id_original=%d\n", ft->h->build_id_original); + format_time(ft->h->time_of_creation, timestr); + printf(" time_of_creation= %"PRIu64" %s\n", ft->h->time_of_creation, timestr); + format_time(ft->h->time_of_last_modification, timestr); + printf(" time_of_last_modification=%"PRIu64" %s\n", ft->h->time_of_last_modification, timestr); + printf(" dirty=%d\n", ft->h->dirty); + printf(" checkpoint_count=%" PRId64 "\n", ft->h->checkpoint_count); + printf(" checkpoint_lsn=%" PRId64 "\n", ft->h->checkpoint_lsn.lsn); + printf(" nodesize=%u\n", ft->h->nodesize); + printf(" basementnodesize=%u\n", ft->h->basementnodesize); + printf(" compression_method=%u\n", (unsigned) ft->h->compression_method); + printf(" unnamed_root=%" PRId64 "\n", ft->h->root_blocknum.b); + printf(" flags=%u\n", ft->h->flags); + dump_descriptor(&ft->descriptor); + printf(" estimated numrows=%" PRId64 "\n", ft->in_memory_stats.numrows); + printf(" estimated numbytes=%" PRId64 "\n", ft->in_memory_stats.numbytes); + *header = ft; } static int @@ -506,14 +506,14 @@ main (int argc, const char *const argv[]) { const char *n = argv[0]; int f = open(n, O_RDWR + O_BINARY); assert(f>=0); - FT h; + FT ft; // create a cachefile for the header int r = toku_create_cachetable(&ct, 1<<25, (LSN){0}, 0); assert(r == 0); CACHEFILE cf; r = toku_cachetable_openfd (&cf, ct, f, n); assert(r==0); - dump_header(f, &h, cf); + dump_header(f, &ft, cf); if (interactive) { while (1) { printf("ftdump>"); fflush(stdout); @@ -530,25 +530,25 @@ main (int argc, const char *const argv[]) { if (strcmp(fields[0], "help") == 0) { interactive_help(); } else if (strcmp(fields[0], "header") == 0) { - toku_ft_free(h); - dump_header(f, &h, cf); + toku_ft_free(ft); + dump_header(f, &ft, cf); } else if (strcmp(fields[0], "block") == 0 && nfields == 2) { BLOCKNUM blocknum = make_blocknum(getuint64(fields[1])); - dump_block(f, blocknum, h); + dump_block(f, blocknum, ft); } else if (strcmp(fields[0], "node") == 0 && nfields == 2) { BLOCKNUM off = make_blocknum(getuint64(fields[1])); - dump_node(f, off, h); + dump_node(f, off, ft); } else if (strcmp(fields[0], "dumpdata") == 0 && nfields == 2) { dump_data = strtol(fields[1], NULL, 10); } else if (strcmp(fields[0], "block_translation") == 0 || strcmp(fields[0], "bx") == 0) { u_int64_t offset = 0; if (nfields == 2) offset = getuint64(fields[1]); - dump_block_translation(h, offset); + dump_block_translation(ft, offset); } else if (strcmp(fields[0], "fragmentation") == 0) { - dump_fragmentation(f, h); + dump_fragmentation(f, ft); } else if (strcmp(fields[0], "garbage") == 0) { - dump_garbage_stats(f, h); + dump_garbage_stats(f, ft); } else if (strcmp(fields[0], "file") == 0 && nfields >= 3) { u_int64_t offset = getuint64(fields[1]); u_int64_t size = getuint64(fields[2]); @@ -565,18 +565,18 @@ main (int argc, const char *const argv[]) { } } } else if (rootnode) { - dump_node(f, h->root_blocknum, h); + dump_node(f, ft->h->root_blocknum, ft); } else { printf("Block translation:"); - toku_dump_translation_table(stdout, h->blocktable); + toku_dump_translation_table(stdout, ft->blocktable); struct __dump_node_extra info; info.f = f; - info.h = h; - toku_blocktable_iterate(h->blocktable, TRANSLATION_CHECKPOINTED, + info.h = ft; + toku_blocktable_iterate(ft->blocktable, TRANSLATION_CHECKPOINTED, dump_node_wrapper, &info, TRUE, TRUE); } - toku_ft_free(h); + toku_ft_free(ft); return 0; } diff --git a/ft/ftloader.c b/ft/ftloader.c index ac1e5158017..25dcbbfe8c5 100644 --- a/ft/ftloader.c +++ b/ft/ftloader.c @@ -507,7 +507,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, #define SET_TO_MY_STRDUP(lval, s) do { char *v = toku_strdup(s); if (!v) { int r = errno; toku_ft_loader_internal_destroy(bl, TRUE); return r; } lval = v; } while (0) MY_CALLOC_N(N, bl->root_xids_that_created); - for (int i=0; iroot_xids_that_created[i]=brts[i]->ft->root_xid_that_created; + for (int i=0; iroot_xids_that_created[i]=brts[i]->ft->h->root_xid_that_created; MY_CALLOC_N(N, bl->dbs); for (int i=0; idbs[i]=dbs[i]; MY_CALLOC_N(N, bl->descriptors); @@ -2206,11 +2206,12 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, if (bl->root_xids_that_created) root_xid_that_created = bl->root_xids_that_created[which_db]; - struct ft h; - toku_ft_init(&h, (BLOCKNUM){0}, bl->load_lsn, root_xid_that_created, target_nodesize, target_basementnodesize, target_compression_method); + // TODO: (Zardosht/Yoni/Leif), do this code properly + struct ft ft; + toku_ft_init(&ft, (BLOCKNUM){0}, bl->load_lsn, root_xid_that_created, target_nodesize, target_basementnodesize, target_compression_method); struct dbout out; - dbout_init(&out, &h); + dbout_init(&out, &ft); out.fd = fd; out.current_off = 8192; // leave 8K reserved at beginning out.n_translations = 3; // 3 translations reserved at the beginning @@ -2333,7 +2334,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, } if (deltas.numrows || deltas.numbytes) { - toku_ft_update_stats(&h.in_memory_stats, deltas); + toku_ft_update_stats(&ft.in_memory_stats, deltas); } cleanup_maxkey(&maxkey); @@ -2375,7 +2376,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, { invariant(sts.n_subtrees==1); - out.h->root_blocknum = make_blocknum(sts.subtrees[0].block); + out.h->h->root_blocknum = make_blocknum(sts.subtrees[0].block); toku_free(sts.subtrees); sts.subtrees = NULL; // write the descriptor @@ -2766,16 +2767,15 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk) { int result = 0; - - out->h->checkpoint_staging_stats = out->h->in_memory_stats; // #4184 - unsigned int size = toku_serialize_ft_size (out->h); + unsigned int size = toku_serialize_ft_size (out->h->h); struct wbuf wbuf; char *MALLOC_N(size, buf); if (buf == NULL) { result = errno; } else { wbuf_init(&wbuf, buf, size); - toku_serialize_ft_to_wbuf(&wbuf, out->h, translation_location_on_disk, translation_size_on_disk); + out->h->h->on_disk_stats = out->h->in_memory_stats; + toku_serialize_ft_to_wbuf(&wbuf, out->h->h, translation_location_on_disk, translation_size_on_disk); if (wbuf.ndone != size) result = EINVAL; else diff --git a/ft/fttypes.h b/ft/fttypes.h index 7ff78dfd1f5..890551be616 100644 --- a/ft/fttypes.h +++ b/ft/fttypes.h @@ -38,6 +38,7 @@ typedef struct ftnode_leaf_basement_node *BASEMENTNODE; typedef struct ftnode_nonleaf_childinfo *NONLEAF_CHILDINFO; typedef struct sub_block *SUB_BLOCK; typedef struct ft *FT; +typedef struct ft_header *FT_HEADER; typedef struct ft_options *FT_OPTIONS; struct wbuf; struct dbuf; @@ -252,6 +253,7 @@ typedef int (*ft_compare_func)(DB *, const DBT *, const DBT *); typedef void (*setval_func)(const DBT *, void *); typedef int (*ft_update_func)(DB *, const DBT *, const DBT *, const DBT *, setval_func, void *); typedef void (*on_redirect_callback)(FT_HANDLE, void*); +typedef void (*remove_ft_ref_callback)(FT, void*); #define UU(x) x __attribute__((__unused__)) diff --git a/ft/logger.c b/ft/logger.c index 91457efcbb8..8ea73950615 100644 --- a/ft/logger.c +++ b/ft/logger.c @@ -197,7 +197,7 @@ toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, BOOL create) //Verify it is empty assert(!t->ft->panic); //Must have no data blocks (rollback logs or otherwise). - toku_block_verify_no_data_blocks_except_root_unlocked(t->ft->blocktable, t->ft->root_blocknum); + toku_block_verify_no_data_blocks_except_root_unlocked(t->ft->blocktable, t->ft->h->root_blocknum); BOOL is_empty; is_empty = toku_ft_is_empty_fast(t); assert(is_empty); @@ -216,26 +216,26 @@ toku_logger_close_rollback(TOKULOGGER logger, BOOL recovery_failed) { if (!logger->is_panicked && cf) { FT_HANDLE ft_to_close; { //Find "brt" - FT h = toku_cachefile_get_userdata(cf); - if (!h->panic && recovery_failed) { - r = toku_ft_set_panic(h, EINVAL, "Recovery failed"); + FT ft = toku_cachefile_get_userdata(cf); + if (!ft->panic && recovery_failed) { + r = toku_ft_set_panic(ft, EINVAL, "Recovery failed"); assert_zero(r); } //Verify it is safe to close it. - if (!h->panic) { //If paniced, it is safe to close. - assert(!h->dirty); //Must not be dirty. + if (!ft->panic) { //If paniced, it is safe to close. + assert(!ft->h->dirty); //Must not be dirty. //Must have no data blocks (rollback logs or otherwise). - toku_block_verify_no_data_blocks_except_root_unlocked(h->blocktable, h->root_blocknum); + toku_block_verify_no_data_blocks_except_root_unlocked(ft->blocktable, ft->h->root_blocknum); } - assert(!h->dirty); - ft_to_close = toku_ft_get_some_existing_ft_handle(h); + assert(!ft->h->dirty); + ft_to_close = toku_ft_get_some_existing_ft_handle(ft); assert(ft_to_close); { BOOL is_empty; is_empty = toku_ft_is_empty_fast(ft_to_close); assert(is_empty); } - assert(!h->dirty); // it should not have been dirtied by the toku_ft_is_empty test. + assert(!ft->h->dirty); // it should not have been dirtied by the toku_ft_is_empty test. } r = toku_ft_handle_close(ft_to_close, FALSE, ZERO_LSN); diff --git a/ft/tests/ft-bfe-query.c b/ft/tests/ft-bfe-query.c index 22f77cb710d..488cc866fdf 100644 --- a/ft/tests/ft-bfe-query.c +++ b/ft/tests/ft-bfe-query.c @@ -330,11 +330,15 @@ test_prefetching(void) { FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); //Want to use block #20 diff --git a/ft/tests/ft-clock-test.c b/ft/tests/ft-clock-test.c index 7fba1dfcfa6..c9d13083d18 100644 --- a/ft/tests/ft-clock-test.c +++ b/ft/tests/ft-clock-test.c @@ -273,11 +273,15 @@ test_serialize_nonleaf(void) { FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); //Want to use block #20 @@ -359,11 +363,15 @@ test_serialize_leaf(void) { FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); //Want to use block #20 diff --git a/ft/tests/ft-serialize-benchmark.c b/ft/tests/ft-serialize-benchmark.c index 147ddc9654e..5fe7b76469a 100644 --- a/ft/tests/ft-serialize-benchmark.c +++ b/ft/tests/ft-serialize-benchmark.c @@ -104,11 +104,15 @@ test_serialize_leaf(int valsize, int nelts, double entropy) { FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; brt_h->compare_fun = long_key_cmp; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); @@ -237,11 +241,15 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) { FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; brt_h->compare_fun = long_key_cmp; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); diff --git a/ft/tests/ft-serialize-test.c b/ft/tests/ft-serialize-test.c index 3c6ece3d74d..d5e5a45a686 100644 --- a/ft/tests/ft-serialize-test.c +++ b/ft/tests/ft-serialize-test.c @@ -250,11 +250,15 @@ test_serialize_leaf_check_msn(enum ftnode_verify_type bft, BOOL do_clone) { FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); //Want to use block #20 @@ -392,11 +396,15 @@ test_serialize_leaf_with_large_pivots(enum ftnode_verify_type bft, BOOL do_clone FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); //Want to use block #20 @@ -531,11 +539,15 @@ test_serialize_leaf_with_many_rows(enum ftnode_verify_type bft, BOOL do_clone) { FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); //Want to use block #20 @@ -675,11 +687,15 @@ test_serialize_leaf_with_large_rows(enum ftnode_verify_type bft, BOOL do_clone) FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); //Want to use block #20 @@ -835,11 +851,15 @@ test_serialize_leaf_with_empty_basement_nodes(enum ftnode_verify_type bft, BOOL FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); //Want to use block #20 @@ -959,11 +979,15 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum ftnode_verify_type b FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); //Want to use block #20 @@ -1088,11 +1112,15 @@ test_serialize_leaf(enum ftnode_verify_type bft, BOOL do_clone) { FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); //Want to use block #20 @@ -1230,11 +1258,15 @@ test_serialize_nonleaf(enum ftnode_verify_type bft, BOOL do_clone) { FT_HANDLE XMALLOC(brt); FT XCALLOC(brt_h); + toku_ft_init(brt_h, + make_blocknum(0), + ZERO_LSN, + TXNID_NONE, + 4*1024*1024, + 128*1024, + TOKU_DEFAULT_COMPRESSION_METHOD); brt->ft = brt_h; - brt_h->type = FT_CURRENT; brt_h->panic = 0; brt_h->panic_string = 0; - brt_h->basementnodesize = 128*1024; - brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD; toku_ft_init_treelock(brt_h); toku_blocktable_create_new(&brt_h->blocktable); //Want to use block #20 diff --git a/ft/tests/ft-test-header.c b/ft/tests/ft-test-header.c index 8cdd7b6d916..7fb2c1b5c85 100644 --- a/ft/tests/ft-test-header.c +++ b/ft/tests/ft-test-header.c @@ -25,14 +25,15 @@ static void test_header (void) { r = toku_open_ft_handle(fname, 1, &t, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, null_txn, toku_builtin_compare_fun); assert(r==0); // now insert some info into the header - FT h = t->ft; - h->dirty = 1; - h->layout_version_original = 13; - h->layout_version_read_from_disk = 14; - h->build_id_original = 1234; - h->in_memory_stats = (STAT64INFO_S) {10, 11}; - h->on_disk_stats = (STAT64INFO_S) {20, 21}; - h->checkpoint_staging_stats = (STAT64INFO_S) {30, 31}; + FT ft = t->ft; + ft->h->dirty = 1; + // cast away const because we actually want to fiddle with the header + // in this test + *((int *) &ft->h->layout_version_original) = 13; + ft->layout_version_read_from_disk = 14; + *((uint32_t *) &ft->h->build_id_original) = 1234; + ft->in_memory_stats = (STAT64INFO_S) {10, 11}; + ft->h->on_disk_stats = (STAT64INFO_S) {20, 21}; r = toku_close_ft_handle_nolsn(t, 0); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0); @@ -43,20 +44,17 @@ static void test_header (void) { r = toku_open_ft_handle(fname, 0, &t, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, null_txn, toku_builtin_compare_fun); assert(r==0); - h = t->ft; - STAT64INFO_S expected_stats = {20, 21}; // on checkpoint, on_disk_stats copied to checkpoint_staging_stats - assert(h->layout_version == FT_LAYOUT_VERSION); - assert(h->layout_version_original == 13); - assert(h->layout_version_read_from_disk == FT_LAYOUT_VERSION); - assert(h->build_id_original == 1234); - assert(h->in_memory_stats.numrows == expected_stats.numrows); - assert(h->on_disk_stats.numbytes == expected_stats.numbytes); + ft = t->ft; + STAT64INFO_S expected_stats = {20, 21}; // on checkpoint, on_disk_stats copied to ft->checkpoint_header->on_disk_stats + assert(ft->h->layout_version == FT_LAYOUT_VERSION); + assert(ft->h->layout_version_original == 13); + assert(ft->layout_version_read_from_disk == FT_LAYOUT_VERSION); + assert(ft->h->build_id_original == 1234); + assert(ft->in_memory_stats.numrows == expected_stats.numrows); + assert(ft->h->on_disk_stats.numbytes == expected_stats.numbytes); r = toku_close_ft_handle_nolsn(t, 0); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0); - - - } int diff --git a/ft/txn.c b/ft/txn.c index 263d18dcda4..28844abbe06 100644 --- a/ft/txn.c +++ b/ft/txn.c @@ -658,7 +658,6 @@ static int remove_txn (OMTVALUE hv, u_int32_t UU(idx), void *txnv) if (txn->txnid64==h->txnid_that_created_or_locked_when_empty) { h->txnid_that_created_or_locked_when_empty = TXNID_NONE; - h->root_that_created_or_locked_when_empty = TXNID_NONE; } if (txn->txnid64==h->txnid_that_suppressed_recovery_logs) { h->txnid_that_suppressed_recovery_logs = TXNID_NONE; diff --git a/src/ydb_db.c b/src/ydb_db.c index 4c3d9da6104..e77e7582f66 100644 --- a/src/ydb_db.c +++ b/src/ydb_db.c @@ -500,7 +500,8 @@ toku_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, u_int32_t goto cleanup; } if (!is_db_hot_index) { - r = toku_db_pre_acquire_fileops_lock(db, txn); + //TODO(zardosht): why doesn't hot_index need to do locking? + r = toku_db_pre_acquire_table_lock(db, txn); if (r != 0) { goto cleanup; } } @@ -677,9 +678,9 @@ locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYP static int locked_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, u_int32_t flags) { - toku_ydb_lock(); + toku_multi_operation_client_lock(); //Cannot begin checkpoint int r = toku_db_change_descriptor(db, txn, descriptor, flags); - toku_ydb_unlock(); + toku_multi_operation_client_unlock(); //Can now begin checkpoint return r; } diff --git a/toku_include/toku_list.h b/toku_include/toku_list.h index dbdbe24d868..ebf4f1735ae 100644 --- a/toku_include/toku_list.h +++ b/toku_include/toku_list.h @@ -19,6 +19,13 @@ struct toku_list { struct toku_list *next, *prev; }; +static inline int toku_list_num_elements_est(struct toku_list *head) { + if (head->next == head) return 0; + if (head->next == head->prev) return 1; + return 2; +} + + static inline void toku_list_init(struct toku_list *head) { head->next = head->prev = head; }