From 3af91c0f511bb7787a7df92baefd0ac7bed89cd9 Mon Sep 17 00:00:00 2001 From: Yoni Fogel Date: Wed, 17 Apr 2013 00:01:17 -0400 Subject: [PATCH] refs #5467 merge "kill put loader, fix hot indexer freeze issue" onto main git-svn-id: file:///svn/toku/tokudb@50137 c7de825b-a66e-492c-adef-691d508d4ae1 --- buildheader/make_tdb.cc | 8 +- ft/cachetable-internal.h | 1 - ft/cachetable.cc | 19 +--- ft/cachetable.h | 1 - ft/checkpoint.h | 4 +- ft/ft-internal.h | 10 +- ft/ft-ops.cc | 104 ++++++------------ ft/ft-ops.h | 10 +- ft/ft.cc | 38 ++----- ft/ft.h | 11 +- ft/ft_layout_version.h | 3 +- ft/log-internal.h | 1 - ft/logformat.cc | 14 +-- ft/recover.cc | 20 ---- ft/rollback-apply.cc | 13 --- ft/rollback.cc | 5 +- ft/rollback.h | 2 +- ft/tests/cachetable-pin-checkpoint.cc | 7 +- ft/tests/cachetable-put-checkpoint.cc | 13 +-- ft/tests/cachetable-test.h | 2 - ft/txn.cc | 19 +--- ft/txn.h | 4 - src/indexer.cc | 16 ++- src/loader.cc | 116 +++++--------------- src/loader.h | 2 +- src/tests/CMakeLists.txt | 7 +- src/tests/bug1381.cc | 10 +- src/tests/loader-blobs-create-leaf.c.notyet | 14 ++- src/tests/loader-cleanup-test.cc | 50 +++++---- src/tests/loader-create-abort.cc | 4 +- src/tests/loader-create-close.cc | 8 +- src/tests/loader-dup-test.cc | 43 ++++---- src/tests/loader-no-puts.cc | 29 +++-- src/tests/loader-reference-test.cc | 35 ++++-- src/tests/loader-stress-del.cc | 28 +++-- src/tests/loader-stress-test.cc | 37 +++++-- src/tests/loader-tpch-load.cc | 17 ++- src/tests/recover-loader-test.cc | 12 +- src/tests/recover-test-logsuppress-put.cc | 3 +- src/tests/test_5469.cc | 4 +- src/tests/threaded_stress_test_helpers.h | 2 +- src/ydb.cc | 2 +- src/ydb_txn.cc | 13 +-- 43 files changed, 318 insertions(+), 443 deletions(-) diff --git a/buildheader/make_tdb.cc b/buildheader/make_tdb.cc index 56fd4aa1cfb..f90c05a5ee6 100644 --- a/buildheader/make_tdb.cc +++ b/buildheader/make_tdb.cc @@ -272,7 +272,11 @@ static void print_defines (void) { /* LOADER flags */ printf("/* LOADER flags */\n"); - printf("#define LOADER_USE_PUTS 1\n"); // minimize space usage + { + uint32_t loader_flags = 0; + dodefine_from_track(loader_flags, LOADER_DISALLOW_PUTS); // Loader is only used for side effects. + dodefine_from_track(loader_flags, LOADER_COMPRESS_INTERMEDIATES); + } } static void print_db_env_struct (void) { @@ -672,7 +676,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) { printf(" uint8_t stalled_on_checkpoint;\n"); printf("} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;\n"); printf("typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);\n"); - printf("struct txn_stat {\n uint64_t rollback_raw_count;\n};\n"); + printf("struct txn_stat {\n uint64_t rollback_raw_count;\n uint64_t rollback_num_entries;\n};\n"); print_db_txn_struct(); print_db_txn_stat_struct(); diff --git a/ft/cachetable-internal.h b/ft/cachetable-internal.h index 52b21bac2b7..ae0d8b6d55f 100644 --- a/ft/cachetable-internal.h +++ b/ft/cachetable-internal.h @@ -109,7 +109,6 @@ struct cachefile { void *userdata; void (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files. - void (*log_suppress_rollback_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log which files need rollbacks suppressed void (*close_userdata)(CACHEFILE cf, int fd, void *userdata, bool lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function. void (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function. void (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function. diff --git a/ft/cachetable.cc b/ft/cachetable.cc index d9d35f8d0f6..4f689a36cd9 100644 --- a/ft/cachetable.cc +++ b/ft/cachetable.cc @@ -2815,7 +2815,6 @@ void toku_cachefile_set_userdata (CACHEFILE cf, void *userdata, void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), - void (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*), void (*close_userdata)(CACHEFILE, int, void*, bool, LSN), void (*checkpoint_userdata)(CACHEFILE, int, void*), void (*begin_checkpoint_userdata)(LSN, void*), @@ -2824,7 +2823,6 @@ toku_cachefile_set_userdata (CACHEFILE cf, void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) { cf->userdata = userdata; cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint; - cf->log_suppress_rollback_during_checkpoint = log_suppress_rollback_during_checkpoint; cf->close_userdata = close_userdata; cf->checkpoint_userdata = checkpoint_userdata; cf->begin_checkpoint_userdata = begin_checkpoint_userdata; @@ -4181,34 +4179,27 @@ void checkpointer::log_begin_checkpoint() { TXNID last_xid = toku_txn_manager_get_last_xid(mgr); toku_log_begin_checkpoint(m_logger, &begin_lsn, 0, 0, last_xid); m_lsn_of_checkpoint_in_progress = begin_lsn; - + // Log the list of open dictionaries. for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) { assert(cf->log_fassociate_during_checkpoint); cf->log_fassociate_during_checkpoint(cf, cf->userdata); } - + // Write open transactions to the log. r = toku_txn_manager_iter_over_live_txns ( - m_logger->txn_manager, + m_logger->txn_manager, this); assert(r == 0); - - // Writes list of dictionaries that have had - // rollback logs suppressed. - for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) { - assert(cf->log_suppress_rollback_during_checkpoint); - cf->log_suppress_rollback_during_checkpoint(cf, cf->userdata); - } } // -// Sets the pending bits of EVERY PAIR in the cachetable, regardless of +// Sets the pending bits of EVERY PAIR in the cachetable, regardless of // whether the PAIR is clean or not. It will be the responsibility of // end_checkpoint or client threads to simply clear the pending bit // if the PAIR is clean. // -// On entry and exit , the pair list's read list lock is grabbed, and +// On entry and exit , the pair list's read list lock is grabbed, and // both pending locks are grabbed // void checkpointer::turn_on_pending_bits() { diff --git a/ft/cachetable.h b/ft/cachetable.h index 95e7f1e091a..8a1405252c4 100644 --- a/ft/cachetable.h +++ b/ft/cachetable.h @@ -191,7 +191,6 @@ typedef void (*CACHETABLE_REMOVE_KEY)(CACHEKEY* cachekey, bool for_checkpoint, v void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), - void (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*), void (*close_userdata)(CACHEFILE, int, void*, bool, LSN), void (*checkpoint_userdata)(CACHEFILE, int, void*), void (*begin_checkpoint_userdata)(LSN, void*), diff --git a/ft/checkpoint.h b/ft/checkpoint.h index 7a0f12265f2..5d19208b30a 100644 --- a/ft/checkpoint.h +++ b/ft/checkpoint.h @@ -64,8 +64,8 @@ void toku_checkpoint_destroy(void); typedef enum {SCHEDULED_CHECKPOINT = 0, // "normal" checkpoint taken on checkpoint thread CLIENT_CHECKPOINT = 1, // induced by client, such as FLUSH LOGS or SAVEPOINT - TXN_COMMIT_CHECKPOINT = 2, - STARTUP_CHECKPOINT = 3, + INDEXER_CHECKPOINT = 2, + STARTUP_CHECKPOINT = 3, UPGRADE_CHECKPOINT = 4, RECOVERY_CHECKPOINT = 5, SHUTDOWN_CHECKPOINT = 6} checkpoint_caller_t; diff --git a/ft/ft-internal.h b/ft/ft-internal.h index 187cfc5395b..332112f0363 100644 --- a/ft/ft-internal.h +++ b/ft/ft-internal.h @@ -462,18 +462,10 @@ struct ft { // the on-disk layout version is from before basement nodes) int layout_version_read_from_disk; - // If a transaction created this BRT, which one? - // If a transaction locked the BRT when it was empty, which transaction? (Only the latest one matters) - // 0 if no such transaction - // only one thread can write to these at once, this is enforced by - // the lock tree - TXNID txnid_that_created_or_locked_when_empty; - TXNID txnid_that_suppressed_recovery_logs; - // Logically the reference count is zero if live_ft_handles is empty, txns is 0, and pinned_by_checkpoint is false. // ft_ref_lock protects modifying live_ft_handles, txns, and pinned_by_checkpoint. - toku_mutex_t ft_ref_lock; + toku_mutex_t ft_ref_lock; struct toku_list live_ft_handles; // Number of transactions that are using this FT. you should only be able // to modify this if you have a valid handle in live_ft_handles diff --git a/ft/ft-ops.cc b/ft/ft-ops.cc index 3942c63d773..921793928e9 100644 --- a/ft/ft-ops.cc +++ b/ft/ft-ops.cc @@ -2991,7 +2991,7 @@ void toku_ft_hot_index(FT_HANDLE brt __attribute__ ((unused)), TOKUTXN txn, FILE void toku_ft_log_put (TOKUTXN txn, FT_HANDLE brt, const DBT *key, const DBT *val) { TOKULOGGER logger = toku_txn_logger(txn); - if (logger && brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) { + if (logger) { BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; BYTESTRING valbs = {.len=val->size, .data=(char *) val->data}; TXNID xid = toku_txn_get_txnid(txn); @@ -3000,28 +3000,22 @@ toku_ft_log_put (TOKUTXN txn, FT_HANDLE brt, const DBT *key, const DBT *val) { } void -toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int num_fts, const DBT *key, const DBT *val) { +toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, uint32_t num_fts, const DBT *key, const DBT *val) { assert(txn); assert(num_fts > 0); TOKULOGGER logger = toku_txn_logger(txn); if (logger) { FILENUM fnums[num_fts]; - int i; - uint32_t num_unsuppressed_fts = 0; + uint32_t i; for (i = 0; i < num_fts; i++) { - if (brts[i]->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) { - //Logging not suppressed for this brt. - fnums[num_unsuppressed_fts++] = toku_cachefile_filenum(brts[i]->ft->cf); - } - } - if (num_unsuppressed_fts) { - FILENUMS filenums = {.num = num_unsuppressed_fts, .filenums = fnums}; - BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; - BYTESTRING valbs = {.len=val->size, .data=(char *) val->data}; - TXNID xid = toku_txn_get_txnid(txn); - FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE; - toku_log_enq_insert_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs); + fnums[i] = toku_cachefile_filenum(brts[i]->ft->cf); } + FILENUMS filenums = {.num = num_fts, .filenums = fnums}; + BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; + BYTESTRING valbs = {.len=val->size, .data=(char *) val->data}; + TXNID xid = toku_txn_get_txnid(txn); + FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE; + toku_log_enq_insert_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs); } } @@ -3030,21 +3024,13 @@ void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool XIDS message_xids = xids_get_root_xids(); //By default use committed messages TXNID xid = toku_txn_get_txnid(txn); if (txn) { - if (ft_h->ft->txnid_that_created_or_locked_when_empty != xid) { - BYTESTRING keybs = {key->size, (char *) key->data}; - toku_logger_save_rollback_cmdinsert(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs); - toku_txn_maybe_note_ft(txn, ft_h->ft); - //We have transactions, and this is not 2440. We must send the full root-to-leaf-path - message_xids = toku_txn_get_xids(txn); - } - else if (txn->ancestor_txnid64 != ft_h->ft->h->root_xid_that_created) { - //We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary. We must send the full root-to-leaf-path - message_xids = toku_txn_get_xids(txn); - } + BYTESTRING keybs = {key->size, (char *) key->data}; + toku_logger_save_rollback_cmdinsert(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs); + toku_txn_maybe_note_ft(txn, ft_h->ft); + message_xids = toku_txn_get_xids(txn); } TOKULOGGER logger = toku_txn_logger(txn); - if (do_logging && logger && - ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) { + if (do_logging && logger) { BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; BYTESTRING valbs = {.len=val->size, .data=(char *) val->data}; if (type == FT_INSERT) { @@ -3084,8 +3070,7 @@ void toku_ft_maybe_update(FT_HANDLE ft_h, const DBT *key, const DBT *update_func TOKULOGGER logger; logger = toku_txn_logger(txn); - if (do_logging && logger && - ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) { + if (do_logging && logger) { BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; BYTESTRING extrabs = {.len=update_function_extra->size, .data = (char *) update_function_extra->data}; @@ -3116,8 +3101,7 @@ void toku_ft_maybe_update_broadcast(FT_HANDLE ft_h, const DBT *update_function_e TOKULOGGER logger; logger = toku_txn_logger(txn); - if (do_logging && logger && - ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) { + if (do_logging && logger) { BYTESTRING extrabs = {.len=update_function_extra->size, .data = (char *) update_function_extra->data}; toku_log_enq_updatebroadcast(logger, NULL, 0, txn, @@ -3157,7 +3141,7 @@ void toku_ft_delete(FT_HANDLE brt, DBT *key, TOKUTXN txn) { void toku_ft_log_del(TOKUTXN txn, FT_HANDLE brt, const DBT *key) { TOKULOGGER logger = toku_txn_logger(txn); - if (logger && brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) { + if (logger) { BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; TXNID xid = toku_txn_get_txnid(txn); toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(brt->ft->cf), xid, keybs); @@ -3165,28 +3149,22 @@ toku_ft_log_del(TOKUTXN txn, FT_HANDLE brt, const DBT *key) { } void -toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int num_fts, const DBT *key, const DBT *val) { +toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, uint32_t num_fts, const DBT *key, const DBT *val) { assert(txn); assert(num_fts > 0); TOKULOGGER logger = toku_txn_logger(txn); if (logger) { FILENUM fnums[num_fts]; - int i; - uint32_t num_unsuppressed_fts = 0; + uint32_t i; for (i = 0; i < num_fts; i++) { - if (brts[i]->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) { - //Logging not suppressed for this brt. - fnums[num_unsuppressed_fts++] = toku_cachefile_filenum(brts[i]->ft->cf); - } - } - if (num_unsuppressed_fts) { - FILENUMS filenums = {.num = num_unsuppressed_fts, .filenums = fnums}; - BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; - BYTESTRING valbs = {.len=val->size, .data=(char *) val->data}; - TXNID xid = toku_txn_get_txnid(txn); - FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE; - toku_log_enq_delete_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs); + fnums[i] = toku_cachefile_filenum(brts[i]->ft->cf); } + FILENUMS filenums = {.num = num_fts, .filenums = fnums}; + BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; + BYTESTRING valbs = {.len=val->size, .data=(char *) val->data}; + TXNID xid = toku_txn_get_txnid(txn); + FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE; + toku_log_enq_delete_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs); } } @@ -3194,21 +3172,13 @@ void toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, bool oplsn_vali XIDS message_xids = xids_get_root_xids(); //By default use committed messages TXNID xid = toku_txn_get_txnid(txn); if (txn) { - if (ft_h->ft->txnid_that_created_or_locked_when_empty != xid) { - BYTESTRING keybs = {key->size, (char *) key->data}; - toku_logger_save_rollback_cmddelete(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs); - toku_txn_maybe_note_ft(txn, ft_h->ft); - //We have transactions, and this is not 2440. We must send the full root-to-leaf-path - message_xids = toku_txn_get_xids(txn); - } - else if (txn->ancestor_txnid64 != ft_h->ft->h->root_xid_that_created) { - //We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary. We must send the full root-to-leaf-path - message_xids = toku_txn_get_xids(txn); - } + BYTESTRING keybs = {key->size, (char *) key->data}; + toku_logger_save_rollback_cmddelete(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs); + toku_txn_maybe_note_ft(txn, ft_h->ft); + message_xids = toku_txn_get_xids(txn); } TOKULOGGER logger = toku_txn_logger(txn); - if (do_logging && logger && - ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) { + if (do_logging && logger) { BYTESTRING keybs = {.len=key->size, .data=(char *) key->data}; toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs); } @@ -3550,7 +3520,6 @@ ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only toku_ft_note_ft_handle_open(ft, ft_h); if (txn_created) { assert(txn); - toku_ft_suppress_rollbacks(ft, txn); toku_txn_maybe_note_ft(txn, ft); } @@ -5797,15 +5766,6 @@ void toku_ft_open_close_unlock(void) { toku_mutex_unlock(&ft_open_close_lock); } -//Suppress both rollback and recovery logs. -void -toku_ft_suppress_recovery_logs (FT_HANDLE brt, TOKUTXN txn) { - assert(brt->ft->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(txn)); - assert(brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE); - brt->ft->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(txn); - txn->checkpoint_needed_before_commit = true; -} - // Prepare to remove a dictionary from the database when this transaction is committed: // - mark transaction as NEED fsync on commit // - make entry in rollback log diff --git a/ft/ft-ops.h b/ft/ft-ops.h index cbf7c1d794c..edaa8124660 100644 --- a/ft/ft-ops.h +++ b/ft/ft-ops.h @@ -143,9 +143,9 @@ void toku_ft_load(FT_HANDLE brt, TOKUTXN txn, char const * new_iname, int do_fsy void toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn); void toku_ft_hot_index(FT_HANDLE brt, TOKUTXN txn, FILENUMS filenums, int do_fsync, LSN *lsn); -void toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int num_fts, const DBT *key, const DBT *val); +void toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, uint32_t num_fts, const DBT *key, const DBT *val); void toku_ft_log_put (TOKUTXN txn, FT_HANDLE brt, const DBT *key, const DBT *val); -void toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int num_fts, const DBT *key, const DBT *val); +void toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, uint32_t num_fts, const DBT *key, const DBT *val); void toku_ft_log_del (TOKUTXN txn, FT_HANDLE brt, const DBT *key); // Effect: Delete a key from a brt @@ -233,12 +233,6 @@ void toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size // Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size // Return 0 on success, otherwise an error number. -void toku_ft_suppress_recovery_logs (FT_HANDLE brt, TOKUTXN txn); -// Effect: suppresses recovery logs -// Requires: this is a (target) redirected brt -// implies: txnid_that_created_or_locked_when_empty matches txn -// implies: toku_txn_note_ft(brt, txn) has been called - int toku_ft_get_fragmentation(FT_HANDLE brt, TOKU_DB_FRAGMENTATION report) __attribute__ ((warn_unused_result)); bool toku_ft_is_empty_fast (FT_HANDLE brt) __attribute__ ((warn_unused_result)); diff --git a/ft/ft.cc b/ft/ft.cc index b635d9b06c4..f83a86fdf70 100644 --- a/ft/ft.cc +++ b/ft/ft.cc @@ -15,14 +15,6 @@ #include #include -void -toku_ft_suppress_rollbacks(FT h, TOKUTXN txn) { - TXNID txnid = toku_txn_get_txnid(txn); - assert(h->txnid_that_created_or_locked_when_empty == TXNID_NONE || - h->txnid_that_created_or_locked_when_empty == txnid); - h->txnid_that_created_or_locked_when_empty = txnid; -} - void toku_reset_root_xid_that_created(FT ft, TXNID new_root_xid_that_created) { // Reset the root_xid_that_created field to the given value. @@ -109,22 +101,6 @@ ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) { toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs, unlink_on_close); } -// maps to cf->log_suppress_rollback_during_checkpoint -static void -ft_log_suppress_rollback_during_checkpoint (CACHEFILE cf, void *header_v) { - FT h = (FT) header_v; - TXNID xid = h->txnid_that_created_or_locked_when_empty; - if (xid != TXNID_NONE) { - //Only log if useful. - TOKULOGGER logger = toku_cachefile_logger(cf); - FILENUM filenum = toku_cachefile_filenum (cf); - // We don't have access to the txn here, but the txn is - // necessarily already marked as non-readonly. Use NULL. - TOKUTXN txn = NULL; - toku_log_suppress_rollback(logger, NULL, 0, txn, filenum, xid); - } -} - // Maps to cf->begin_checkpoint_userdata // Create checkpoint-in-progress versions of header and translation (btt) (and fifo for now...). // Has access to fd (it is protected). @@ -331,7 +307,6 @@ static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) { toku_cachefile_set_userdata(ft->cf, ft, ft_log_fassociate_during_checkpoint, - ft_log_suppress_rollback_during_checkpoint, ft_close, ft_checkpoint, ft_begin_checkpoint, @@ -432,7 +407,6 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac toku_cachefile_set_userdata(cf, (void*)h, ft_log_fassociate_during_checkpoint, - ft_log_suppress_rollback_during_checkpoint, ft_close, ft_checkpoint, ft_begin_checkpoint, @@ -720,15 +694,17 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKU if (txn) { toku_txn_maybe_note_ft(txn, new_ft); // mark new ft as touched by this txn + // There is no recovery log entry for redirect, + // and rollback log entries are not allowed for read-only transactions. + // Normally the recovery log entry would ensure the begin was logged. + if (!txn->begin_was_logged) { + toku_maybe_log_begin_txn_for_write_operation(txn); + } FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf); FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf); toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum); - - TXNID xid = toku_txn_get_txnid(txn); - toku_ft_suppress_rollbacks(new_ft, txn); - toku_log_suppress_rollback(txn->logger, NULL, 0, txn, new_filenum, xid); } - + cleanup: return r; } diff --git a/ft/ft.h b/ft/ft.h index 51e5e29423e..ec58408a4db 100644 --- a/ft/ft.h +++ b/ft/ft.h @@ -20,9 +20,6 @@ void toku_ft_unlink(FT_HANDLE handle); void toku_ft_unlink_on_commit(FT_HANDLE handle, TOKUTXN txn); -//Effect: suppresses rollback logs -void toku_ft_suppress_rollbacks(FT h, TOKUTXN txn); - void toku_ft_init_reflock(FT ft); void toku_ft_destroy_reflock(FT ft); void toku_ft_grab_reflock(FT ft); @@ -60,19 +57,19 @@ toku_ft_init( int toku_dictionary_redirect_abort(FT old_h, FT new_h, TOKUTXN txn) __attribute__ ((warn_unused_result)); int toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTXN txn); void toku_reset_root_xid_that_created(FT h, TXNID new_root_xid_that_created); -// Reset the root_xid_that_created field to the given value. +// Reset the root_xid_that_created field to the given value. // This redefines which xid created the dictionary. void toku_ft_add_txn_ref(FT h); void toku_ft_remove_txn_ref(FT h); void toku_calculate_root_offset_pointer ( FT h, CACHEKEY* root_key, uint32_t *roothash); -void toku_ft_set_new_root_blocknum(FT h, CACHEKEY new_root_key); +void toku_ft_set_new_root_blocknum(FT h, CACHEKEY new_root_key); LSN toku_ft_checkpoint_lsn(FT h) __attribute__ ((warn_unused_result)); void toku_ft_stat64 (FT h, struct ftstat64_s *s); -// unconditionally set the descriptor for an open FT. can't do this when -// any operation has already occurred on the ft. +// unconditionally set the descriptor for an open FT. can't do this when +// any operation has already occurred on the ft. // see toku_ft_change_descriptor(), which is the transactional version // used by the ydb layer. it better describes the client contract. void toku_ft_update_descriptor(FT ft, DESCRIPTOR d); diff --git a/ft/ft_layout_version.h b/ft/ft_layout_version.h index f1ae174db87..7fbb5f96afe 100644 --- a/ft/ft_layout_version.h +++ b/ft/ft_layout_version.h @@ -28,7 +28,8 @@ enum ft_layout_version_e { FT_LAYOUT_VERSION_20 = 20, // Deadshot: Add compression method to log_fcreate, // mgr_last_xid after begin checkpoint, // last_xid to shutdown - FT_LAYOUT_VERSION_21 = 21, // Ming: Add max_msn_in_ft to header + FT_LAYOUT_VERSION_21 = 21, // Ming: Add max_msn_in_ft to header, + // Removed log suppression logentry FT_NEXT_VERSION, // the version after the current version FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line. FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported diff --git a/ft/log-internal.h b/ft/log-internal.h index 850b5eabbc0..677ae7311ab 100644 --- a/ft/log-internal.h +++ b/ft/log-internal.h @@ -152,7 +152,6 @@ struct tokutxn { bool begin_was_logged; // These are not read until a commit, prepare, or abort starts, and // they're "monotonic" (only go false->true) during operation: - bool checkpoint_needed_before_commit; bool do_fsync; bool force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn) diff --git a/ft/logformat.cc b/ft/logformat.cc index 9788114621b..dd9010d0375 100644 --- a/ft/logformat.cc +++ b/ft/logformat.cc @@ -131,15 +131,12 @@ const struct logtype logtypes[] = { {"uint64_t", "rollentry_raw_count", 0}, {"FILENUMS", "open_filenums", 0}, {"uint8_t", "force_fsync_on_commit", 0}, - {"uint64_t", "num_rollback_nodes", 0}, - {"uint64_t", "num_rollentries", 0}, - {"BLOCKNUM", "spilled_rollback_head", 0}, - {"BLOCKNUM", "spilled_rollback_tail", 0}, - {"BLOCKNUM", "current_rollback", 0}, + {"uint64_t", "num_rollback_nodes", 0}, + {"uint64_t", "num_rollentries", 0}, + {"BLOCKNUM", "spilled_rollback_head", 0}, + {"BLOCKNUM", "spilled_rollback_tail", 0}, + {"BLOCKNUM", "current_rollback", 0}, NULLFIELD}, ASSERT_BEGIN_WAS_LOGGED}, // record all transactions - {"suppress_rollback", 'S', FA{{"FILENUM", "filenum", 0}, - {"TXNID", "xid", 0}, - NULLFIELD}, SHOULD_LOG_BEGIN}, // Records produced by transactions {"xbegin", 'b', FA{{"TXNID", "xid", 0},{"TXNID", "parentxid", 0},NULLFIELD}, IGNORE_LOG_BEGIN}, {"xcommit",'C', FA{{"TXNID", "xid", 0},NULLFIELD}, ASSERT_BEGIN_WAS_LOGGED}, @@ -413,7 +410,6 @@ generate_log_writer (void) { switch (lt->log_begin_action) { case SHOULD_LOG_BEGIN: { fprintf(cf, " //txn can be NULL during tests\n"); - fprintf(cf, " //txn can be also be NULL for suppress_rollback during checkpoint,\n"); fprintf(cf, " //never null when not checkpoint.\n"); fprintf(cf, " if (txn && !txn->begin_was_logged) {\n"); fprintf(cf, " toku_maybe_log_begin_txn_for_write_operation(txn);\n"); diff --git a/ft/recover.cc b/ft/recover.cc index 8c28f27ab72..bb3f85cc823 100644 --- a/ft/recover.cc +++ b/ft/recover.cc @@ -629,26 +629,6 @@ static int toku_recover_backward_xstillopenprepared (struct logtype_xstillopenpr return 0; } -static int toku_recover_suppress_rollback (struct logtype_suppress_rollback *UU(l), RECOVER_ENV UU(renv)) { - struct file_map_tuple *tuple = NULL; - int r = file_map_find(&renv->fmap, l->filenum, &tuple); - if (r==0) { - //File is open - TOKUTXN txn = NULL; - toku_txnid2txn(renv->logger, l->xid, &txn); - assert(txn!=NULL); - FT ft = tuple->ft_handle->ft; - toku_ft_suppress_rollbacks(ft, txn); - toku_txn_maybe_note_ft(txn, ft); - } - return 0; -} - -static int toku_recover_backward_suppress_rollback (struct logtype_suppress_rollback *UU(l), RECOVER_ENV UU(renv)) { - // nothing - return 0; -} - static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) { int r; r = recover_transaction(NULL, l->xid, l->parentxid, renv->logger); diff --git a/ft/rollback-apply.cc b/ft/rollback-apply.cc index 1ddf8f2bf04..879d25c8d1e 100644 --- a/ft/rollback-apply.cc +++ b/ft/rollback-apply.cc @@ -46,14 +46,6 @@ int note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child) { TOKUTXN parent = child->parent; toku_txn_maybe_note_ft(parent, ft); - if (ft->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(child)) { - //Pass magic "no rollback needed" flag to parent. - ft->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(parent); - } - if (ft->txnid_that_suppressed_recovery_logs == toku_txn_get_txnid(child)) { - //Pass magic "no recovery needed" flag to parent. - ft->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(parent); - } return 0; } @@ -217,11 +209,6 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) { r = txn->open_fts.iterate(txn); assert(r==0); - // Merge the list of headers that must be checkpointed before commit - if (txn->checkpoint_needed_before_commit) { - txn->parent->checkpoint_needed_before_commit = true; - } - //If this transaction needs an fsync (if it commits) //save that in the parent. Since the commit really happens in the root txn. txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit; diff --git a/ft/rollback.cc b/ft/rollback.cc index a3e6a938e67..3169dd6a09d 100644 --- a/ft/rollback.cc +++ b/ft/rollback.cc @@ -210,10 +210,11 @@ exit: } // Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression) -int toku_logger_txn_rollback_raw_count(TOKUTXN txn, uint64_t *raw_count) +int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat) { toku_txn_lock(txn); - *raw_count = txn->roll_info.rollentry_raw_count; + txn_stat->rollback_raw_count = txn->roll_info.rollentry_raw_count; + txn_stat->rollback_num_entries = txn->roll_info.num_rollentries; toku_txn_unlock(txn); return 0; } diff --git a/ft/rollback.h b/ft/rollback.h index 815e146c7f8..cff00d60a43 100644 --- a/ft/rollback.h +++ b/ft/rollback.h @@ -48,7 +48,7 @@ void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len); void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log); void toku_txn_maybe_note_ft (TOKUTXN txn, FT h); -int toku_logger_txn_rollback_raw_count(TOKUTXN txn, uint64_t *raw_count); +int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat); int toku_find_xid_by_xid (const TXNID &xid, const TXNID &xidfind); diff --git a/ft/tests/cachetable-pin-checkpoint.cc b/ft/tests/cachetable-pin-checkpoint.cc index 321db6ff660..5508e9c6b88 100644 --- a/ft/tests/cachetable-pin-checkpoint.cc +++ b/ft/tests/cachetable-pin-checkpoint.cc @@ -330,12 +330,11 @@ cachetable_test (void) { char fname1[] = __SRCFILE__ "test1.dat"; unlink(fname1); r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0); - + toku_cachefile_set_userdata( - f1, - NULL, + f1, + NULL, &dummy_log_fassociate, - &dummy_log_rollback, &dummy_close_usr, &dummy_chckpnt_usr, &test_begin_checkpoint, diff --git a/ft/tests/cachetable-put-checkpoint.cc b/ft/tests/cachetable-put-checkpoint.cc index 8c53be2eaf4..61cafb0674b 100644 --- a/ft/tests/cachetable-put-checkpoint.cc +++ b/ft/tests/cachetable-put-checkpoint.cc @@ -457,25 +457,24 @@ cachetable_test (void) { time_of_test = 60; int r; - + toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER); char fname1[] = __SRCFILE__ "test-put-checkpoint.dat"; unlink(fname1); r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0); - + toku_cachefile_set_userdata( - f1, - NULL, + f1, + NULL, &dummy_log_fassociate, - &dummy_log_rollback, &dummy_close_usr, &dummy_chckpnt_usr, test_begin_checkpoint, // called in begin_checkpoint - &dummy_end, + &dummy_end, &dummy_note_pin, &dummy_note_unpin ); - + toku_pthread_t time_tid; toku_pthread_t checkpoint_tid; toku_pthread_t move_tid[NUM_MOVER_THREADS]; diff --git a/ft/tests/cachetable-test.h b/ft/tests/cachetable-test.h index e06b134c303..39ea67fa18d 100644 --- a/ft/tests/cachetable-test.h +++ b/ft/tests/cachetable-test.h @@ -10,7 +10,6 @@ // Dummy callbacks for checkpointing // static void dummy_log_fassociate(CACHEFILE UU(cf), void* UU(p)) { } -static void dummy_log_rollback(CACHEFILE UU(cf), void* UU(p)) { } static void dummy_close_usr(CACHEFILE UU(cf), int UU(i), void* UU(p), bool UU(b), LSN UU(lsn)) { } static void dummy_chckpnt_usr(CACHEFILE UU(cf), int UU(i), void* UU(p)) { } static void dummy_begin(LSN UU(lsn), void* UU(p)) { } @@ -28,7 +27,6 @@ create_dummy_functions(CACHEFILE cf) toku_cachefile_set_userdata(cf, ud, &dummy_log_fassociate, - &dummy_log_rollback, &dummy_close_usr, &dummy_chckpnt_usr, &dummy_begin, diff --git a/ft/txn.cc b/ft/txn.cc index 9e097b90640..629edbca359 100644 --- a/ft/txn.cc +++ b/ft/txn.cc @@ -152,7 +152,6 @@ void toku_txn_create_txn ( .live_root_txn_list = nullptr, .xids = xids, .begin_was_logged = false, - .checkpoint_needed_before_commit = false, .do_fsync = false, .force_fsync_on_commit = false, .do_fsync_lsn = ZERO_LSN, @@ -246,21 +245,11 @@ int toku_txn_commit_txn(TOKUTXN txn, int nosync, poll, poll_extra); } - -void -toku_txn_require_checkpoint_on_commit(TOKUTXN txn) { - txn->checkpoint_needed_before_commit = true; -} - struct xcommit_info { int r; TOKUTXN txn; }; -bool toku_txn_requires_checkpoint(TOKUTXN txn) { - return (!txn->parent && txn->checkpoint_needed_before_commit); -} - int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { @@ -377,16 +366,10 @@ void toku_txn_close_txn(TOKUTXN txn) { } int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn); -int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn) +int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const UU(txn)) // Effect: This function is called on every open FT that a transaction used. // This function removes the transaction from that FT. { - if (txn->txnid64==h->txnid_that_created_or_locked_when_empty) { - h->txnid_that_created_or_locked_when_empty = TXNID_NONE; - } - if (txn->txnid64==h->txnid_that_suppressed_recovery_logs) { - h->txnid_that_suppressed_recovery_logs = TXNID_NONE; - } toku_ft_remove_txn_ref(h); return 0; diff --git a/ft/txn.h b/ft/txn.h index 3c9e565d562..2147a6c86a4 100644 --- a/ft/txn.h +++ b/ft/txn.h @@ -44,7 +44,6 @@ int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info); int toku_txn_commit_txn (TOKUTXN txn, int nosync, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); -bool toku_txn_requires_checkpoint(TOKUTXN txn); int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); @@ -66,9 +65,6 @@ void toku_txn_get_fsync_info(TOKUTXN ttxn, bool* do_fsync, LSN* do_fsync_lsn); // Complete and destroy a txn void toku_txn_close_txn(TOKUTXN txn); -// Require a checkpoint upon commit -void toku_txn_require_checkpoint_on_commit(TOKUTXN txn); - // Remove a txn from any live txn lists void toku_txn_complete_txn(TOKUTXN txn); diff --git a/src/indexer.cc b/src/indexer.cc index 892693e0072..2afb2561c06 100644 --- a/src/indexer.cc +++ b/src/indexer.cc @@ -27,6 +27,7 @@ #include #include #include +#include "loader.h" /////////////////////////////////////////////////////////////////////////////////// // Engine status @@ -203,7 +204,7 @@ toku_indexer_create_indexer(DB_ENV *env, // { DB_LOADER* loader = NULL; - rval = env->create_loader(env, txn, &loader, NULL, N, &dest_dbs[0], NULL, NULL, DB_PRELOCKED_WRITE | LOADER_USE_PUTS); + rval = toku_loader_create_loader(env, txn, &loader, NULL, N, &dest_dbs[0], NULL, NULL, DB_PRELOCKED_WRITE | LOADER_DISALLOW_PUTS, true); if (rval) { goto create_exit; } @@ -473,6 +474,11 @@ build_index(DB_INDEXER *indexer) { // - unique checks? if ( result == 0 ) { + // Perform a checkpoint so that all of the indexing makes it to disk before continuing. + // Otherwise indexing would not be crash-safe becasue none of the undo-do messages are in the recovery log. + DB_ENV *env = indexer->i->env; + CHECKPOINTER cp = toku_cachetable_get_checkpointer(env->i->cachetable); + toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, INDEXER_CHECKPOINT); (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD), 1); } else { (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD_FAIL), 1); @@ -487,14 +493,6 @@ close_indexer(DB_INDEXER *indexer) { int r = 0; (void) toku_sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1); - // Mark txn as needing a checkpoint. - // (This will cause a checkpoint, which is necessary - // because these files are not necessarily on disk and all the operations - // to create them are not in the recovery log.) - DB_TXN *txn = indexer->i->txn; - TOKUTXN tokutxn = db_txn_struct_i(txn)->tokutxn; - toku_txn_require_checkpoint_on_commit(tokutxn); - // Disassociate the indexer from the hot db and free_indexer disassociate_indexer_from_hot_dbs(indexer); free_indexer(indexer); diff --git a/src/loader.cc b/src/loader.cc index 1240a2e9873..d66e67ecd41 100644 --- a/src/loader.cc +++ b/src/loader.cc @@ -89,9 +89,6 @@ struct __toku_loader_internal { void *poll_extra; char *temp_file_template; - DBT *ekeys; - DBT *evals; - DBT err_key; /* error key */ DBT err_val; /* error val */ int err_i; /* error i */ @@ -109,21 +106,6 @@ struct __toku_loader_internal { static void free_loader_resources(DB_LOADER *loader) { if ( loader->i ) { - for (int i=0; ii->N; i++) { - if (loader->i->ekeys && - loader->i->ekeys[i].data && - loader->i->ekeys[i].flags == DB_DBT_REALLOC) { - toku_free(loader->i->ekeys[i].data); - } - if (loader->i->evals && - loader->i->evals[i].data && - loader->i->evals[i].flags == DB_DBT_REALLOC) { - toku_free(loader->i->evals[i].data); - } - } - if (loader->i->ekeys) toku_free(loader->i->ekeys); - if (loader->i->evals) toku_free(loader->i->evals); - if (loader->i->err_key.data) toku_free(loader->i->err_key.data); if (loader->i->err_val.data) toku_free(loader->i->err_val.data); @@ -171,24 +153,27 @@ static int ft_loader_close_and_redirect(DB_LOADER *loader) { } -static int create_loader(DB_ENV *env, - DB_TXN *txn, - DB_LOADER **blp, - DB *src_db, - int N, - DB *dbs[], - uint32_t db_flags[/*N*/], - uint32_t dbt_flags[/*N*/], - uint32_t loader_flags, - bool check_empty) -{ +// loader_flags currently has the following flags: +// LOADER_DISALLOW_PUTS loader->put is not allowed. +// Loader is only being used for its side effects +// DB_PRELOCKED_WRITE Table lock is already held, no need to relock. +int +toku_loader_create_loader(DB_ENV *env, + DB_TXN *txn, + DB_LOADER **blp, + DB *src_db, + int N, + DB *dbs[], + uint32_t db_flags[/*N*/], + uint32_t dbt_flags[/*N*/], + uint32_t loader_flags, + bool check_empty) { int rval; - bool use_ft_loader = (loader_flags == 0); *blp = NULL; // set later when created DB_LOADER *loader = NULL; - bool use_puts = loader_flags&LOADER_USE_PUTS; + bool puts_allowed = !(loader_flags & LOADER_DISALLOW_PUTS); XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func) XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL) @@ -248,10 +233,8 @@ static int create_loader(DB_ENV *env, for (int i=0; ii->ft_handle; } - loader->i->ekeys = NULL; - loader->i->evals = NULL; LSN load_lsn; - rval = locked_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, use_ft_loader); + rval = locked_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, puts_allowed); if ( rval!=0 ) { toku_free(new_inames_in_env); toku_free(brts); @@ -269,7 +252,7 @@ static int create_loader(DB_ENV *env, loader->i->temp_file_template, load_lsn, ttxn, - !use_puts); + puts_allowed); if ( rval!=0 ) { toku_free(new_inames_in_env); toku_free(brts); @@ -278,18 +261,9 @@ static int create_loader(DB_ENV *env, loader->i->inames_in_env = new_inames_in_env; toku_free(brts); - if (use_puts) { - XCALLOC_N(loader->i->N, loader->i->ekeys); - XCALLOC_N(loader->i->N, loader->i->evals); - // the following function grabs the ydb lock, so we - // first unlock before calling it + if (!puts_allowed) { rval = ft_loader_close_and_redirect(loader); assert_zero(rval); - for (int i=0; ii->ekeys[i].flags = DB_DBT_REALLOC; - loader->i->evals[i].flags = DB_DBT_REALLOC; - toku_ft_suppress_recovery_logs(dbs[i]->i->ft_handle, db_txn_struct_i(txn)->tokutxn); - } loader->i->ft_loader = NULL; // close the ft_loader and skip to the redirection rval = 0; @@ -312,37 +286,6 @@ static int create_loader(DB_ENV *env, return rval; } -// loader_flags currently has three possible values: -// 0 use brt loader -// USE_PUTS do not use brt loader, use log suppression mechanism (2440) -// which results in recursive call here via toku_db_pre_acquire_table_lock() -// DB_PRELOCKED_WRITE do not use brt loader, this is the recursive (inner) call via -// toku_db_pre_acquire_table_lock() -int toku_loader_create_loader(DB_ENV *env, - DB_TXN *txn, - DB_LOADER **blp, - DB *src_db, - int N, - DB *dbs[], - uint32_t db_flags[/*N*/], - uint32_t dbt_flags[/*N*/], - uint32_t loader_flags) -{ - return create_loader( - env, - txn, - blp, - src_db, - N, - dbs, - db_flags, - dbt_flags, - loader_flags, - true - ); -} - - int toku_loader_set_poll_function(DB_LOADER *loader, int (*poll_func)(void *extra, float progress), void *poll_extra) @@ -377,16 +320,9 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val) goto cleanup; } - if (loader->i->loader_flags & LOADER_USE_PUTS) { - r = loader->i->env->put_multiple(loader->i->env, - loader->i->src_db, // src_db - loader->i->txn, - key, val, - loader->i->N, // num_dbs - loader->i->dbs, // (DB**)db_array - loader->i->ekeys, - loader->i->evals, - loader->i->db_flags); // flags_array + if (loader->i->loader_flags & LOADER_DISALLOW_PUTS) { + r = EINVAL; + goto cleanup; } else { // calling toku_ft_loader_put without a lock assumes that the @@ -422,7 +358,7 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val) static void redirect_loader_to_empty_dictionaries(DB_LOADER *loader) { DB_LOADER* tmp_loader = NULL; - int r = create_loader( + int r = toku_loader_create_loader( loader->i->env, loader->i->txn, &tmp_loader, @@ -446,7 +382,7 @@ int toku_loader_close(DB_LOADER *loader) if ( loader->i->error_callback != NULL ) { loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra); } - if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) { + if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) { r = toku_ft_loader_abort(loader->i->ft_loader, true); redirect_loader_to_empty_dictionaries(loader); } @@ -455,7 +391,7 @@ int toku_loader_close(DB_LOADER *loader) } } else { // no error outstanding - if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) { + if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) { r = ft_loader_close_and_redirect(loader); if (r) { redirect_loader_to_empty_dictionaries(loader); @@ -481,7 +417,7 @@ int toku_loader_abort(DB_LOADER *loader) } } - if (!(loader->i->loader_flags & LOADER_USE_PUTS) ) { + if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS) ) { r = toku_ft_loader_abort(loader->i->ft_loader, true); lazy_assert_zero(r); } diff --git a/src/loader.h b/src/loader.h index 176f6245c99..edbb4406f52 100644 --- a/src/loader.h +++ b/src/loader.h @@ -40,7 +40,7 @@ Create and set up a loader. Modifies: :: env, txn, blp, and dbs. */ -int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags); +int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags, bool check_empty); /* diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 2cf609be677..e103bdfe83a 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -541,12 +541,14 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) add_test(ydb/loader-stress-test2.tdb loader-stress-test.tdb -r 5000 -s -e dir.loader-stress-test2.tdb) add_test(ydb/loader-stress-test3.tdb loader-stress-test.tdb -u -c -e dir.loader-stress-test3.tdb) add_test(ydb/loader-stress-test4.tdb loader-stress-test.tdb -r 10000000 -c -e dir.loader-stress-test4.tdb) + add_test(ydb/loader-stress-test5.tdb loader-stress-test.tdb -c -z -e dir.loader-stress-test5.tdb) set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES dir.loader-stress-test0.tdb dir.loader-stress-test1.tdb dir.loader-stress-test2.tdb dir.loader-stress-test3.tdb dir.loader-stress-test4.tdb + dir.loader-stress-test5.tdb ) list(REMOVE_ITEM loader_tests loader-dup-test.loader) @@ -643,11 +645,12 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) get_filename_component(base ${loader_test} NAME_WE) add_test(ydb/${base}.nop.loader ${base}.tdb -e "dir.${base}.nop.loader") add_test(ydb/${base}.p.loader ${base}.tdb -p -e "dir.${base}.p.loader") + add_test(ydb/${base}.comp.loader ${base}.tdb -z -e "dir.${base}.comp.loader") if("${tdb_tests_that_should_fail}" MATCHES "${base}.loader") list(REMOVE_ITEM tdb_tests_that_should_fail ${base}.loader) - list(APPEND tdb_tests_that_should_fail ${base}.nop.loader ${base}.p.loader) + list(APPEND tdb_tests_that_should_fail ${base}.nop.loader ${base}.p.loader ${base}.comp.loader) endif() - set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${base}.nop.loader" "dir.${base}.p.loader") + set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${base}.nop.loader" "dir.${base}.p.loader" "dir.${base}.comp.loader") endforeach(loader_test) set(tdb_tests_that_should_fail "ydb/${tdb_tests_that_should_fail}") diff --git a/src/tests/bug1381.cc b/src/tests/bug1381.cc index b772b9b37a5..a5ada7c8405 100644 --- a/src/tests/bug1381.cc +++ b/src/tests/bug1381.cc @@ -84,7 +84,7 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) { &db, &mult_put_flags, &mult_dbt_flags, - LOADER_USE_PUTS + LOADER_COMPRESS_INTERMEDIATES ); CKERR(r); } @@ -102,7 +102,7 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) { CKERR(r); } else { - r = db->put(db, txn, &key, &val, 0); + r = db->put(db, txn, &key, &val, 0); CKERR(r); } } @@ -116,11 +116,13 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) { *raw_count = s2->rollback_raw_count - s1->rollback_raw_count; if (do_loader) { - assert(s1->rollback_raw_count == s2->rollback_raw_count); + assert(s1->rollback_raw_count < s2->rollback_raw_count); + assert(s1->rollback_num_entries + 1 == s2->rollback_num_entries); } else { assert(s1->rollback_raw_count < s2->rollback_raw_count); + assert(s1->rollback_num_entries < s2->rollback_num_entries); } - + toku_free(s1); toku_free(s2); r = txn->commit(txn, 0); CKERR(r); diff --git a/src/tests/loader-blobs-create-leaf.c.notyet b/src/tests/loader-blobs-create-leaf.c.notyet index cdfd923fcc9..04a26a5cc25 100644 --- a/src/tests/loader-blobs-create-leaf.c.notyet +++ b/src/tests/loader-blobs-create-leaf.c.notyet @@ -36,7 +36,13 @@ static void insert(DB_LOADER *loader, int k, int val_size) { DBT key = { .data = key_buffer, .size = sizeof key_buffer }; DBT value = { .data = val_buffer, .size = val_size }; - r = loader->put(loader, &key, &value); assert_zero(r); + r = loader->put(loader, &key, &value); + if (DISALLOW_PUTS) { + assert(r == EINVAL); + } + else { + assert_zero(r); + } toku_free(val_buffer); } @@ -65,8 +71,12 @@ int test_main(int argc, char * const argv[]) { if (verbose > 0) verbose--; continue; } + if (strcmp(arg, "-z") == 0) { + loader_flags |= LOADER_COMPRESS_INTERMEDIATES; + continue; + } if (strcmp(arg, "-p") == 0) { - loader_flags = LOADER_USE_PUTS; + loader_flags |= LOADER_DISALLOW_PUTS; continue; } if (strcmp(arg, "--txn") == 0 && i+1 < argc) { diff --git a/src/tests/loader-cleanup-test.cc b/src/tests/loader-cleanup-test.cc index 6b598e88f5c..3871c830e84 100644 --- a/src/tests/loader-cleanup-test.cc +++ b/src/tests/loader-cleanup-test.cc @@ -68,7 +68,8 @@ int NUM_DBS=default_NUM_DBS; int NUM_ROWS=default_NUM_ROWS; //static int NUM_ROWS=50000000; int CHECK_RESULTS=0; -int USE_PUTS=0; +int DISALLOW_PUTS=0; +int COMPRESS=0; int event_trigger_lo=0; // what event triggers to use? int event_trigger_hi =0; // 0 and 0 mean none. enum {MAGIC=311}; @@ -528,13 +529,17 @@ static void check_results(DB **dbs) CKERR(r); for(int i=0;ic_get(cursor, &key, &val, DB_NEXT); - CKERR(r); - k = *(unsigned int*)key.data; - pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j); - v = *(unsigned int*)val.data; - // test that we have the expected keys and values - assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j)); + if (DISALLOW_PUTS) { + CKERR2(r, EINVAL); + } else { + CKERR(r); + k = *(unsigned int*)key.data; + pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j); + v = *(unsigned int*)val.data; + // test that we have the expected keys and values + assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j)); // printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j)); + } } {printf("."); fflush(stdout);} r = cursor->c_close(cursor); @@ -596,7 +601,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) db_flags[i] = DB_NOOVERWRITE; dbt_flags[i] = 0; } - uint32_t loader_flags = USE_PUTS; // set with -p option + uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p/-z option if (verbose >= 2) printf("old inames:\n"); @@ -612,8 +617,10 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) r = loader->set_poll_function(loader, poll_function, expect_poll_void); CKERR(r); - if (verbose) - printf("USE_PUTS = %d\n", USE_PUTS); + if (verbose) { + printf("DISALLOW_PUTS = %d\n", DISALLOW_PUTS); + printf("COMPRESS = %d\n", COMPRESS); + } if (verbose >= 2) printf("new inames:\n"); get_inames(new_inames, dbs); @@ -627,7 +634,9 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) dbt_init(&key, &k, sizeof(unsigned int)); dbt_init(&val, &v, sizeof(unsigned int)); r = loader->put(loader, &key, &val); - if (r != 0) { + if (DISALLOW_PUTS) { + assert(r == EINVAL); + } else if (r != 0) { assert(error_injection && error_injected); failed_put = r; } @@ -649,13 +658,13 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) } r = loader->close(loader); CKERR(r); - if (!USE_PUTS) { + if (!DISALLOW_PUTS) { assert(poll_count>0); // You cannot count temp files here } } else if (t == abort_via_poll) { - assert(!USE_PUTS); // test makes no sense with USE_PUTS + assert(!DISALLOW_PUTS); // test makes no sense with DISALLOW_PUTS if (verbose) printf("closing, but expecting abort via poll\n"); r = loader->close(loader); @@ -674,7 +683,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) else printf("closing, expecting no error because number of system calls was less than predicted (%s)\n", type); } - if (!USE_PUTS && error_injected) { + if (!DISALLOW_PUTS && error_injected) { if (r == 0) { printf("loader->close() returned 0 but should have failed due to injected error from %s on call %d\n", err_type_str(t), trigger); @@ -725,7 +734,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) r = txn->commit(txn, 0); CKERR(r); - if (!USE_PUTS) { + if (!DISALLOW_PUTS) { assert_inames_missing(old_inames); } if ( CHECK_RESULTS ) { @@ -736,7 +745,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) else { r = txn->abort(txn); CKERR(r); - if (!USE_PUTS) { + if (!DISALLOW_PUTS) { assert_inames_missing(new_inames); } } @@ -962,7 +971,8 @@ static void usage(const char *cmd) { fprintf(stderr, "Usage: -h -c -s -p -d -r -t \n%s\n", cmd); fprintf(stderr, " where -h print this message.\n"); fprintf(stderr, " -c check the results.\n"); - fprintf(stderr, " -p LOADER_USE_PUTS.\n"); + fprintf(stderr, " -p LOADER_DISALLOW_PUTS.\n"); + fprintf(stderr, " -z LOADER_COMPRESS_INTERMEDIATES.\n"); fprintf(stderr, " -k Test only normal operation and abort_via_poll (but thoroughly).\n"); fprintf(stderr, " -s size_factor=1.\n"); fprintf(stderr, " -d Number of indexes to create (default=%d).\n", default_NUM_DBS); @@ -998,8 +1008,10 @@ static void do_args(int argc, char * const argv[]) { NUM_ROWS = atoi(argv[0]); } else if (strcmp(argv[0], "-c")==0) { CHECK_RESULTS = 1; + } else if (strcmp(argv[0], "-z")==0) { + COMPRESS = LOADER_COMPRESS_INTERMEDIATES; } else if (strcmp(argv[0], "-p")==0) { - USE_PUTS = 0; + DISALLOW_PUTS = LOADER_DISALLOW_PUTS; printf("DISABLED Using puts as part of #4503\n"); } else if (strcmp(argv[0], "-k")==0) { test_only_abort_via_poll = 1; @@ -1010,7 +1022,7 @@ static void do_args(int argc, char * const argv[]) { argc--; argv++; event_trigger_hi = atoi(argv[0]); } else if (strcmp(argv[0], "-s")==0) { - db_env_set_loader_size_factor(1); + db_env_set_loader_size_factor(1); } else if (strcmp(argv[0],"-e") == 0 && argc > 1) { argc--; argv++; envdir = argv[0]; diff --git a/src/tests/loader-create-abort.cc b/src/tests/loader-create-abort.cc index efc6ede0323..20ee54aaf54 100644 --- a/src/tests/loader-create-abort.cc +++ b/src/tests/loader-create-abort.cc @@ -61,7 +61,9 @@ static void do_args(int argc, char * const argv[]) { verbose--; if (verbose<0) verbose=0; } else if (strcmp(argv[0], "-p") == 0) { - loader_flags = LOADER_USE_PUTS; + loader_flags |= LOADER_COMPRESS_INTERMEDIATES; + } else if (strcmp(argv[0], "-z") == 0) { + loader_flags |= LOADER_DISALLOW_PUTS; } else if (strcmp(argv[0], "-e") == 0) { argc--; argv++; if (argc > 0) diff --git a/src/tests/loader-create-close.cc b/src/tests/loader-create-close.cc index c9e246b12e2..82ac0f2ef6d 100644 --- a/src/tests/loader-create-close.cc +++ b/src/tests/loader-create-close.cc @@ -29,7 +29,7 @@ static void loader_open_abort(int ndb) { r = env->set_generate_row_callback_for_put(env, put_multiple_generate); CKERR(r); int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE; - r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); + r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); env->set_errfile(env, stderr); DB *dbs[ndb]; @@ -49,7 +49,7 @@ static void loader_open_abort(int ndb) { DB_LOADER *loader; r = env->create_loader(env, txn, &loader, ndb > 0 ? dbs[0] : NULL, ndb, dbs, db_flags, dbt_flags, loader_flags); CKERR(r); - + r = loader->close(loader); CKERR(r); r = txn->commit(txn, 0); CKERR(r); @@ -77,7 +77,9 @@ static void do_args(int argc, char * const argv[]) { verbose--; if (verbose<0) verbose=0; } else if (strcmp(argv[0], "-p") == 0) { - loader_flags = LOADER_USE_PUTS; + loader_flags |= LOADER_DISALLOW_PUTS; + } else if (strcmp(argv[0], "-z") == 0) { + loader_flags |= LOADER_COMPRESS_INTERMEDIATES; } else if (strcmp(argv[0], "-e") == 0) { argc--; argv++; if (argc > 0) diff --git a/src/tests/loader-dup-test.cc b/src/tests/loader-dup-test.cc index 70a58288617..4ad059fd770 100644 --- a/src/tests/loader-dup-test.cc +++ b/src/tests/loader-dup-test.cc @@ -14,7 +14,8 @@ enum {MAX_DBS=256}; int NUM_DBS=5; int NUM_ROWS=100000; int CHECK_RESULTS=0; -int USE_PUTS=0; +int DISALLOW_PUTS=0; +int COMPRESS=0; enum {MAGIC=311}; bool dup_row_at_end = false; // false: duplicate at the begining. true: duplicate at the end. The duplicated row is row 0. @@ -156,14 +157,18 @@ static void check_results(DB **dbs) r = dbs[j]->cursor(dbs[j], txn, &cursor, 0); CKERR(r); for(int i=0;ic_get(cursor, &key, &val, DB_NEXT); - CKERR(r); - k = *(unsigned int*)key.data; - pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j); - v = *(unsigned int*)val.data; - // test that we have the expected keys and values - assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j)); + r = cursor->c_get(cursor, &key, &val, DB_NEXT); + if (DISALLOW_PUTS) { + CKERR2(r, EINVAL); + } else { + CKERR(r); + k = *(unsigned int*)key.data; + pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j); + v = *(unsigned int*)val.data; + // test that we have the expected keys and values + assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j)); // printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j)); + } } {printf("."); fflush(stdout);} r = cursor->c_close(cursor); @@ -200,14 +205,14 @@ static void test_loader(DB **dbs) DB_LOADER *loader; uint32_t db_flags[MAX_DBS]; uint32_t dbt_flags[MAX_DBS]; - for(int i=0;itxn_begin(env, NULL, &txn, 0); + r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags); CKERR(r); @@ -238,11 +243,9 @@ static void test_loader(DB **dbs) dbt_init(&key, &k, sizeof(unsigned int)); dbt_init(&val, &v, sizeof(unsigned int)); r = loader->put(loader, &key, &val); - if (USE_PUTS) { - //PUT loader can return -1 if it finds an error during the puts. - CKERR2s(r, 0,-1); - } - else { + if (DISALLOW_PUTS) { + CKERR2(r, EINVAL); + } else { CKERR(r); } if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} } @@ -351,7 +354,7 @@ int test_main(int argc, char * const *argv) { else { int sizes[]={1,4000000,-1}; //Make PUT loader take about the same amount of time: - if (USE_PUTS) sizes[1] /= 25; + if (DISALLOW_PUTS) sizes[1] /= 25; for (int i=0; sizes[i]>=0; i++) { if (verbose) printf("Doing %d\n", sizes[i]); NUM_ROWS = sizes[i]; @@ -404,8 +407,10 @@ static void do_args(int argc, char * const argv[]) { num_rows_set = true; } else if (strcmp(argv[0], "-c")==0) { CHECK_RESULTS = 1; + } else if (strcmp(argv[0], "-z")==0) { + COMPRESS = LOADER_COMPRESS_INTERMEDIATES; } else if (strcmp(argv[0], "-p")==0) { - USE_PUTS = 1; + DISALLOW_PUTS = LOADER_DISALLOW_PUTS; } else if (strcmp(argv[0], "-s")==0) { db_env_set_loader_size_factor(1); } else if (strcmp(argv[0], "-E")==0) { diff --git a/src/tests/loader-no-puts.cc b/src/tests/loader-no-puts.cc index 7237c4ea213..969b36dd128 100644 --- a/src/tests/loader-no-puts.cc +++ b/src/tests/loader-no-puts.cc @@ -11,7 +11,8 @@ static const char *envdir = ENVDIR; DB_ENV *env; -int USE_PUTS=0; +int DISALLOW_PUTS=0; +int COMPRESS=0; enum {MAX_NAME=128}; enum {NUM_DBS=1}; enum {NUM_KV_PAIRS=3}; @@ -50,10 +51,10 @@ static void test_loader(DB **dbs) db_flags[i] = DB_NOOVERWRITE; dbt_flags[i] = 0; } - uint32_t loader_flags = USE_PUTS; // set with -p option + uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option // create and initialize loader - r = env->txn_begin(env, NULL, &txn, 0); + r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags); CKERR(r); @@ -68,7 +69,11 @@ static void test_loader(DB **dbs) dbt_init(&key, &kv_pairs[i].key, sizeof(kv_pairs[i].key)); dbt_init(&val, &kv_pairs[i].val, sizeof(kv_pairs[i].val)); r = loader->put(loader, &key, &val); - CKERR(r); + if (DISALLOW_PUTS) { + CKERR2(r, EINVAL); + } else { + CKERR(r); + } } */ // close the loader @@ -88,10 +93,14 @@ static void test_loader(DB **dbs) CKERR(r); for(int i=0;ic_get(cursor, &key, &val, DB_NEXT); - if (r!=0) { fprintf(stderr, "r==%d, failure\n", r); } - CKERR(r); - assert(*(int64_t*)key.data == kv_pairs[i].key); - assert(*(int64_t*)val.data == kv_pairs[i].val); + if (DISALLOW_PUTS) { + CKERR2(r, DB_NOTFOUND); + } else { + if (r!=0) { fprintf(stderr, "r==%d, failure\n", r); } + CKERR(r); + assert(*(int64_t*)key.data == kv_pairs[i].key); + assert(*(int64_t*)val.data == kv_pairs[i].val); + } } cursor->c_close(cursor); } @@ -178,8 +187,10 @@ static void do_args(int argc, char * const argv[]) { } else if (strcmp(argv[0],"-q")==0) { verbose--; if (verbose<0) verbose=0; + } else if (strcmp(argv[0], "-z")==0) { + COMPRESS = LOADER_COMPRESS_INTERMEDIATES; } else if (strcmp(argv[0], "-p")==0) { - USE_PUTS = 1; + DISALLOW_PUTS = LOADER_DISALLOW_PUTS; } else if (strcmp(argv[0], "-e") == 0) { argc--; argv++; if (argc > 0) diff --git a/src/tests/loader-reference-test.cc b/src/tests/loader-reference-test.cc index 79526587642..801d01d018f 100644 --- a/src/tests/loader-reference-test.cc +++ b/src/tests/loader-reference-test.cc @@ -11,7 +11,8 @@ static const char *envdir = ENVDIR; DB_ENV *env; -int USE_PUTS=0; +int DISALLOW_PUTS=0; +int COMPRESS=0; enum {MAX_NAME=128}; enum {NUM_DBS=1}; enum {NUM_KV_PAIRS=3}; @@ -47,14 +48,14 @@ static void test_loader(DB **dbs) DB_LOADER *loader; uint32_t db_flags[NUM_DBS]; uint32_t dbt_flags[NUM_DBS]; - for(int i=0;itxn_begin(env, NULL, &txn, 0); + r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags); CKERR(r); @@ -62,7 +63,7 @@ static void test_loader(DB **dbs) CKERR(r); r = loader->set_poll_function(loader, NULL, NULL); CKERR(r); - + uint64_t before_puts = toku_test_get_latest_lsn(env); // using loader->put, put values into DB DBT key, val; @@ -70,7 +71,11 @@ static void test_loader(DB **dbs) dbt_init(&key, &kv_pairs[i].key, sizeof(kv_pairs[i].key)); dbt_init(&val, &kv_pairs[i].val, sizeof(kv_pairs[i].val)); r = loader->put(loader, &key, &val); - CKERR(r); + if (DISALLOW_PUTS) { + CKERR2(r, EINVAL); + } else { + CKERR(r); + } } uint64_t after_puts = toku_test_get_latest_lsn(env); assert(before_puts == after_puts); @@ -90,11 +95,15 @@ static void test_loader(DB **dbs) r = dbs[j]->cursor(dbs[j], txn, &cursor, 0); CKERR(r); for(int i=0;ic_get(cursor, &key, &val, DB_NEXT); + r = cursor->c_get(cursor, &key, &val, DB_NEXT); if (r!=0) { fprintf(stderr, "r==%d, failure\n", r); } - CKERR(r); - assert(*(int64_t*)key.data == kv_pairs[i].key); - assert(*(int64_t*)val.data == kv_pairs[i].val); + if (DISALLOW_PUTS) { + CKERR2(r, DB_NOTFOUND); + } else { + CKERR(r); + assert(*(int64_t*)key.data == kv_pairs[i].key); + assert(*(int64_t*)val.data == kv_pairs[i].val); + } } cursor->c_close(cursor); } @@ -185,7 +194,9 @@ static void do_args(int argc, char * const argv[]) { fprintf(stderr, "Usage:\n%s\n", cmd); exit(resultcode); } else if (strcmp(argv[0], "-p")==0) { - USE_PUTS = 1; + DISALLOW_PUTS = LOADER_DISALLOW_PUTS; + } else if (strcmp(argv[0], "-z")==0) { + COMPRESS = LOADER_COMPRESS_INTERMEDIATES; } else if (strcmp(argv[0], "--block_size") == 0) { argc--; argv++; block_size = atoi(argv[0]); diff --git a/src/tests/loader-stress-del.cc b/src/tests/loader-stress-del.cc index c696883e73b..ccca847d7a7 100644 --- a/src/tests/loader-stress-del.cc +++ b/src/tests/loader-stress-del.cc @@ -23,7 +23,8 @@ enum {MAX_DBS=1024}; int NUM_DBS=1; int NUM_ROWS=1000000; int CHECK_RESULTS=1; -int USE_PUTS=0; +int DISALLOW_PUTS=0; +int COMPRESS=0; enum { old_default_cachesize=1024 }; // MB int CACHESIZE=old_default_cachesize; int ALLOW_DUPS=0; @@ -247,13 +248,18 @@ static void check_results(DB **dbs) { // generate the expected keys unsigned int *expected_key = (unsigned int *) toku_malloc(NUM_ROWS * sizeof (unsigned int)); - for (int i = 0; i < NUM_ROWS; i++) + for (int i = 0; i < NUM_ROWS; i++) { expected_key[i] = j == 0 ? (unsigned int)(i+1) : twiddle32(i+1, j); + } // sort the keys qsort(expected_key, NUM_ROWS, sizeof (unsigned int), uint_cmp); for (int i = 0; i < NUM_ROWS+1; i++) { r = cursor->c_get(cursor, &key, &val, DB_NEXT); + if (DISALLOW_PUTS) { + CKERR2(r, DB_NOTFOUND); + break; + } if (r == DB_NOTFOUND) { assert(i == NUM_ROWS); // check that there are exactly NUM_ROWS in the dictionary break; @@ -393,16 +399,16 @@ static void test_loader(DB **dbs) uint32_t db_flags[MAX_DBS]; uint32_t dbt_flags[MAX_DBS]; uint32_t flags = DB_NOOVERWRITE; - if ( (USE_PUTS == 1) && (ALLOW_DUPS == 1) ) flags = 0; + if ( (DISALLOW_PUTS != 0) && (ALLOW_DUPS == 1) ) flags = 0; for(int i=0;itxn_begin(env, NULL, &txn, 0); + r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); hiwater_start = hiwater; if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld\n", __FILE__, __LINE__, hiwater, water); @@ -423,7 +429,11 @@ static void test_loader(DB **dbs) dbt_init(&key, &k, sizeof(unsigned int)); dbt_init(&val, &v, sizeof(unsigned int)); r = loader->put(loader, &key, &val); - CKERR(r); + if (DISALLOW_PUTS) { + CKERR2(r, EINVAL); + } else { + CKERR(r); + } if ( verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} } } if ( verbose ) {printf("\n"); fflush(stdout);} @@ -445,7 +455,7 @@ static void test_loader(DB **dbs) CKERR2s(r,0,TOKUDB_CANCELED); if (r==0) { - if ( USE_PUTS == 0 ) { + if ( DISALLOW_PUTS == 0 ) { if (poll_count == 0) printf("%s:%d\n", __FILE__, __LINE__); assert(poll_count>0); } @@ -650,7 +660,9 @@ static void do_args(int argc, char * const argv[]) { } else if (strcmp(argv[0], "-c")==0) { CHECK_RESULTS = 1; } else if (strcmp(argv[0], "-p")==0) { - USE_PUTS = 1; + DISALLOW_PUTS = LOADER_DISALLOW_PUTS; + } else if (strcmp(argv[0], "-z")==0) { + COMPRESS = LOADER_COMPRESS_INTERMEDIATES; } else if (strcmp(argv[0], "-m")==0) { argc--; argv++; CACHESIZE = atoi(argv[0]); diff --git a/src/tests/loader-stress-test.cc b/src/tests/loader-stress-test.cc index 18c71f5cc7c..72bb45492c1 100644 --- a/src/tests/loader-stress-test.cc +++ b/src/tests/loader-stress-test.cc @@ -32,7 +32,8 @@ enum {MAX_DBS=1024}; int NUM_DBS=5; int NUM_ROWS=100000; int CHECK_RESULTS=0; -int USE_PUTS=0; +int DISALLOW_PUTS=0; +int COMPRESS=0; enum { old_default_cachesize=1024 }; // MB int CACHESIZE=old_default_cachesize; int ALLOW_DUPS=0; @@ -267,6 +268,10 @@ static void check_results(DB **dbs) { for (int i = 0; i < NUM_ROWS+1; i++) { r = cursor->c_get(cursor, &key, &val, DB_NEXT); + if (DISALLOW_PUTS) { + CKERR2(r, DB_NOTFOUND); + break; + } if (r == DB_NOTFOUND) { assert(i == NUM_ROWS); // check that there are exactly NUM_ROWS in the dictionary break; @@ -357,16 +362,16 @@ static void test_loader(DB **dbs) uint32_t db_flags[MAX_DBS]; uint32_t dbt_flags[MAX_DBS]; uint32_t flags = DB_NOOVERWRITE; - if ( (USE_PUTS == 1) && (ALLOW_DUPS == 1) ) flags = 0; + if ( (DISALLOW_PUTS) && (ALLOW_DUPS == 1) ) flags = 0; for(int i=0;itxn_begin(env, NULL, &txn, 0); + r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); hiwater_start = hiwater; if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld\n", __FILE__, __LINE__, hiwater, water); @@ -387,7 +392,11 @@ static void test_loader(DB **dbs) dbt_init(&key, &k, sizeof(unsigned int)); dbt_init(&val, &v, sizeof(unsigned int)); r = loader->put(loader, &key, &val); - CKERR(r); + if (DISALLOW_PUTS) { + CKERR2(r, EINVAL); + } else { + CKERR(r); + } if ( verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} } } if ( verbose ) {printf("\n"); fflush(stdout);} @@ -409,7 +418,7 @@ static void test_loader(DB **dbs) CKERR2s(r,0,TOKUDB_CANCELED); if (r==0) { - if ( USE_PUTS == 0 ) { + if (!DISALLOW_PUTS) { if (poll_count == 0) printf("%s:%d\n", __FILE__, __LINE__); assert(poll_count>0); } @@ -432,9 +441,15 @@ static void test_loader(DB **dbs) if (verbose) printf("NUM_ROWS=%d n_keys=%" PRIu64 " n_data=%" PRIu64 " dsize=%" PRIu64 " fsize=%" PRIu64 "\n", NUM_ROWS, stats.bt_nkeys, stats.bt_ndata, stats.bt_dsize, stats.bt_fsize); - assert(stats.bt_nkeys <= (uint64_t)NUM_ROWS); // Fix as part of #4129. Was == - assert(stats.bt_ndata <= (uint64_t)NUM_ROWS); - assert(stats.bt_dsize == ((uint64_t)NUM_ROWS) * 2 * sizeof(unsigned int)); + if (DISALLOW_PUTS) { + assert(stats.bt_nkeys == 0); // Fix as part of #4129. Was == + assert(stats.bt_ndata == 0); + assert(stats.bt_dsize == 0); + } else { + assert(stats.bt_nkeys <= (uint64_t)NUM_ROWS); // Fix as part of #4129. Was == + assert(stats.bt_ndata <= (uint64_t)NUM_ROWS); + assert(stats.bt_dsize == ((uint64_t)NUM_ROWS) * 2 * sizeof(unsigned int)); + } r = txn->commit(txn, 0); CKERR(r); } @@ -633,7 +648,9 @@ static void do_args(int argc, char * const argv[]) { } else if (strcmp(argv[0], "-c")==0) { CHECK_RESULTS = 1; } else if (strcmp(argv[0], "-p")==0) { - USE_PUTS = 1; + DISALLOW_PUTS = LOADER_DISALLOW_PUTS; + } else if (strcmp(argv[0], "-z")==0) { + COMPRESS = LOADER_COMPRESS_INTERMEDIATES; } else if (strcmp(argv[0], "-m")==0) { argc--; argv++; CACHESIZE = atoi(argv[0]); diff --git a/src/tests/loader-tpch-load.cc b/src/tests/loader-tpch-load.cc index 9e6eb9f1fcb..ee765e7fe34 100644 --- a/src/tests/loader-tpch-load.cc +++ b/src/tests/loader-tpch-load.cc @@ -13,7 +13,8 @@ enum {MAX_NAME=128}; enum {MAX_DBS=16}; enum {MAX_ROW_LEN=1024}; static int NUM_DBS=10; -static int USE_PUTS=0; +static int DISALLOW_PUTS=0; +static int COMPRESS=0; static int USE_REGION=0; static const char *envdir = ENVDIR; @@ -291,7 +292,7 @@ static int test_loader(DB **dbs) db_flags[i] = DB_NOOVERWRITE; dbt_flags[i] = 0; } - uint32_t loader_flags = USE_PUTS; // set with -p option + uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option FILE *fp; // select which table to loader @@ -335,7 +336,11 @@ static int test_loader(DB **dbs) dbt_init(&key, &k, sizeof(int)); dbt_init(&val, v, strlen(v)+1); r = loader->put(loader, &key, &val); - CKERR(r); + if (DISALLOW_PUTS) { + CKERR2(r, EINVAL); + } else { + CKERR(r); + } if (verbose) { if((i++%10000) == 0){printf("."); fflush(stdout);} } c = tpch_read_row(fp, &k, v); } @@ -350,7 +355,7 @@ static int test_loader(DB **dbs) printf(" done\n"); CKERR(r); - if ( USE_PUTS == 0 ) assert(poll_count>0); + if ( DISALLOW_PUTS == 0 ) assert(poll_count>0); r = txn->commit(txn, 0); CKERR(r); @@ -442,7 +447,9 @@ static void do_args(int argc, char * const argv[]) { fprintf(stderr, "Usage: -h -p -g\n%s\n", cmd); exit(resultcode); } else if (strcmp(argv[0], "-p")==0) { - USE_PUTS = 1; + DISALLOW_PUTS = LOADER_DISALLOW_PUTS; + } else if (strcmp(argv[0], "-z")==0) { + COMPRESS = LOADER_COMPRESS_INTERMEDIATES; } else if (strcmp(argv[0], "-g")==0) { USE_REGION = 1; } else if (strcmp(argv[0], "-e") == 0) { diff --git a/src/tests/recover-loader-test.cc b/src/tests/recover-loader-test.cc index 7f4a45d6cf3..114acc10177 100644 --- a/src/tests/recover-loader-test.cc +++ b/src/tests/recover-loader-test.cc @@ -53,7 +53,7 @@ static bool do_test=false, do_recover=false; static DB_ENV *env; static int NUM_ROWS=50000000; -static int USE_PUTS=0; +static int COMPRESS=0; enum {MAX_NAME=128}; enum {MAGIC=311}; @@ -290,7 +290,7 @@ static void test_loader(DB **dbs) db_flags[i] = DB_NOOVERWRITE; dbt_flags[i] = 0; } - uint32_t loader_flags = USE_PUTS; // set with -p option + uint32_t loader_flags = COMPRESS; // set with -p option int n = count_temp(env->i->real_data_dir); assert(n == 0); // Must be no temp files before loader is run @@ -308,7 +308,7 @@ static void test_loader(DB **dbs) r = loader->set_poll_function(loader, poll_function, expect_poll_void); CKERR(r); - printf("USE_PUTS = %d\n", USE_PUTS); + printf("COMPRESS = %d\n", COMPRESS); if (verbose) printf("new inames:\n"); get_inames(new_inames, dbs); @@ -461,9 +461,9 @@ static void do_args(int argc, char * const argv[]) { } else if (strcmp(argv[0], "-r")==0) { argc--; argv++; NUM_ROWS = atoi(argv[0]); - } else if (strcmp(argv[0], "-p")==0) { - USE_PUTS = LOADER_USE_PUTS; - printf("Using puts\n"); + } else if (strcmp(argv[0], "-z")==0) { + COMPRESS = LOADER_COMPRESS_INTERMEDIATES; + printf("Compressing\n"); } else if (strcmp(argv[0], "--test")==0) { do_test=true; } else if (strcmp(argv[0], "--recover") == 0) { diff --git a/src/tests/recover-test-logsuppress-put.cc b/src/tests/recover-test-logsuppress-put.cc index 1f74f5f94f7..756df83d36b 100644 --- a/src/tests/recover-test-logsuppress-put.cc +++ b/src/tests/recover-test-logsuppress-put.cc @@ -5,6 +5,7 @@ // Verify that log-suppress recovery is done properly. (See ticket 2781.) +// TODO: determine if this is useful at all anymore (log suppression does not exist anymore) #include @@ -97,7 +98,7 @@ load(DB **dbs) { db_flags[i] = DB_NOOVERWRITE; dbt_flags[i] = 0; } - uint32_t loader_flags = LOADER_USE_PUTS; + uint32_t loader_flags = LOADER_COMPRESS_INTERMEDIATES; // create and initialize loader r = env->txn_begin(env, NULL, &txn, 0); diff --git a/src/tests/test_5469.cc b/src/tests/test_5469.cc index 8665b631026..f0c88ee9df1 100644 --- a/src/tests/test_5469.cc +++ b/src/tests/test_5469.cc @@ -21,7 +21,7 @@ static int put_multiple_generate(DB *UU(dest_db), DB *UU(src_db), DBT *dest_key, } static void -test_loader_abort (bool use_puts, bool abort_loader, bool abort_txn) { +test_loader_abort (bool do_compress, bool abort_loader, bool abort_txn) { DB_ENV * env; DB *db; DB_TXN *txn; @@ -35,7 +35,7 @@ test_loader_abort (bool use_puts, bool abort_loader, bool abort_txn) { DB_LOADER *loader; uint32_t db_flags = 0; uint32_t dbt_flags = 0; - uint32_t loader_flags = use_puts ? LOADER_USE_PUTS : 0; + uint32_t loader_flags = do_compress ? LOADER_COMPRESS_INTERMEDIATES : 0; DBC* cursor = NULL; /* create the dup database file */ diff --git a/src/tests/threaded_stress_test_helpers.h b/src/tests/threaded_stress_test_helpers.h index 42f309b63c2..59ac13cca35 100644 --- a/src/tests/threaded_stress_test_helpers.h +++ b/src/tests/threaded_stress_test_helpers.h @@ -881,7 +881,7 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), v r = db_load->open(db_load, txn, "loader-db", NULL, DB_BTREE, DB_CREATE, 0666); assert(r == 0); DB_LOADER *loader; - uint32_t loader_flags = (num == 0) ? 0 : LOADER_USE_PUTS; + uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES; r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags); CKERR(r); diff --git a/src/ydb.cc b/src/ydb.cc index 306ae881cda..90446cfdbd0 100644 --- a/src/ydb.cc +++ b/src/ydb.cc @@ -1453,7 +1453,7 @@ env_create_loader(DB_ENV *env, uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags) { - int r = toku_loader_create_loader(env, txn, blp, src_db, N, dbs, db_flags, dbt_flags, loader_flags); + int r = toku_loader_create_loader(env, txn, blp, src_db, N, dbs, db_flags, dbt_flags, loader_flags, true); return r; } diff --git a/src/ydb_txn.cc b/src/ydb_txn.cc index fadf3de55b3..afb595a58d4 100644 --- a/src/ydb_txn.cc +++ b/src/ydb_txn.cc @@ -219,26 +219,21 @@ toku_txn_prepare (DB_TXN *txn, uint8_t gid[DB_GID_SIZE]) { return toku_txn_xa_prepare(txn, &xid); } -static int +static int toku_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) { XMALLOC(*txn_stat); - return toku_logger_txn_rollback_raw_count(db_txn_struct_i(txn)->tokutxn, &(*txn_stat)->rollback_raw_count); + return toku_logger_txn_rollback_stats(db_txn_struct_i(txn)->tokutxn, *txn_stat); } -static int +static int locked_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) { - int r = toku_txn_txn_stat(txn, txn_stat); + int r = toku_txn_txn_stat(txn, txn_stat); return r; } static int locked_txn_commit_with_progress(DB_TXN *txn, uint32_t flags, TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { - TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; - if (toku_txn_requires_checkpoint(ttxn)) { - CHECKPOINTER cp = toku_cachetable_get_checkpointer(txn->mgrp->i->cachetable); - toku_checkpoint(cp, txn->mgrp->i->logger, NULL, NULL, NULL, NULL, TXN_COMMIT_CHECKPOINT); - } bool holds_mo_lock = false; if (!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) { // A readonly transaction does no logging, and therefore does not