diff --git a/buildheader/make_tdb.cc b/buildheader/make_tdb.cc index c7dfe5fc699..930f6af6917 100644 --- a/buildheader/make_tdb.cc +++ b/buildheader/make_tdb.cc @@ -131,6 +131,9 @@ static void print_defines (void) { dodefine(DB_ARCH_ABS); dodefine(DB_ARCH_LOG); +#define DB_BLACKHOLE 0x0001000 /* unused common bit according to BDB */ + dodefine(DB_BLACKHOLE); + dodefine(DB_CREATE); dodefine(DB_CXX_NO_EXCEPTIONS); dodefine(DB_EXCL); diff --git a/ft/ft-internal.h b/ft/ft-internal.h index ab8c0f67681..5d10cae7b26 100644 --- a/ft/ft-internal.h +++ b/ft/ft-internal.h @@ -467,6 +467,9 @@ struct ft { int panic; // A malloced string that can indicate what went wrong. char *panic_string; + + // is this ft a blackhole? if so, all messages are dropped. + bool blackhole; }; // Copy the descriptor into a temporary variable, and tell DRD that subsequent code happens after reading that pointer. diff --git a/ft/ft-ops.cc b/ft/ft-ops.cc index e87a44392ce..d1ad0db7713 100644 --- a/ft/ft-ops.cc +++ b/ft/ft-ops.cc @@ -2453,16 +2453,21 @@ static void push_something_at_root (FT h, FTNODE *nodep, FT_MSG cmd) } int -toku_ft_root_put_cmd (FT h, FT_MSG_S * cmd) +toku_ft_root_put_cmd (FT ft, FT_MSG_S * cmd) // Effect: // - assign msn to cmd // - push the cmd into the brt // - cmd will set new msn in tree { + // blackhole fractal trees drop all messages, so do nothing. + if (ft->blackhole) { + return 0; + } + FTNODE node; CACHEKEY root_key; //assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable)); - assert(h); + assert(ft); // // As of Dr. Noga, the following code is currently protected by two locks: // - the ydb lock @@ -2492,16 +2497,16 @@ toku_ft_root_put_cmd (FT h, FT_MSG_S * cmd) // others // { - toku_ft_grab_treelock(h); + toku_ft_grab_treelock(ft); uint32_t fullhash; - toku_calculate_root_offset_pointer(h, &root_key, &fullhash); + toku_calculate_root_offset_pointer(ft, &root_key, &fullhash); // get the root node struct ftnode_fetch_extra bfe; - fill_bfe_for_full_read(&bfe, h); + fill_bfe_for_full_read(&bfe, ft); toku_pin_ftnode_off_client_thread( - h, + ft, root_key, fullhash, &bfe, @@ -2520,28 +2525,28 @@ toku_ft_root_put_cmd (FT h, FT_MSG_S * cmd) //VERIFY_NODE(brt, node); assert(node->fullhash==fullhash); - ft_verify_flags(h, node); + ft_verify_flags(ft, node); // first handle a reactive root, then put in the message CACHEKEY new_root_key; - bool root_changed = ft_process_maybe_reactive_root(h, &new_root_key, &node); + bool root_changed = ft_process_maybe_reactive_root(ft, &new_root_key, &node); if (root_changed) { - toku_ft_set_new_root_blocknum(h, new_root_key); + toku_ft_set_new_root_blocknum(ft, new_root_key); } - toku_ft_release_treelock(h); + toku_ft_release_treelock(ft); } - push_something_at_root(h, &node, cmd); + push_something_at_root(ft, &node, cmd); // verify that msn of latest message was captured in root node (push_something_at_root() did not release ydb lock) invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn); // if we call flush_some_child, then that function unpins the root // otherwise, we unpin ourselves if (node->height > 0 && toku_ft_nonleaf_is_gorged(node)) { - flush_node_on_background_thread(h, node); + flush_node_on_background_thread(ft, node); } else { - toku_unpin_ftnode(h, node); // unpin root + toku_unpin_ftnode(ft, node); // unpin root } return 0; @@ -3389,17 +3394,13 @@ toku_ft_get_dictionary_id(FT_HANDLE brt) { return dict_id; } -int toku_ft_set_flags(FT_HANDLE brt, unsigned int flags) { - assert(flags==(flags&TOKU_DB_KEYCMP_BUILTIN)); // make sure there are no extraneous flags - brt->did_set_flags = true; - brt->options.flags = flags; - return 0; +void toku_ft_set_flags(FT_HANDLE ft_handle, unsigned int flags) { + ft_handle->did_set_flags = true; + ft_handle->options.flags = flags; } -int toku_ft_get_flags(FT_HANDLE brt, unsigned int *flags) { - *flags = brt->options.flags; - assert(brt->options.flags==(brt->options.flags&TOKU_DB_KEYCMP_BUILTIN)); // make sure there are no extraneous flags - return 0; +void toku_ft_get_flags(FT_HANDLE ft_handle, unsigned int *flags) { + *flags = ft_handle->options.flags; } void toku_ft_get_maximum_advised_key_value_lengths (unsigned int *max_key_len, unsigned int *max_val_len) @@ -3456,7 +3457,6 @@ void toku_ft_set_redirect_callback(FT_HANDLE brt, on_redirect_callback redir_cb, brt->redirect_callback_extra = extra; } - int toku_ft_set_update(FT_HANDLE brt, ft_update_func update_fun) { brt->options.update_fun = update_fun; return 0; diff --git a/ft/ft-ops.h b/ft/ft-ops.h index f3952f2511c..90d20f489e5 100644 --- a/ft/ft-ops.h +++ b/ft/ft-ops.h @@ -41,8 +41,8 @@ int toku_ft_change_descriptor(FT_HANDLE t, const DBT* old_descriptor, const DBT* uint32_t toku_serialize_descriptor_size(const DESCRIPTOR desc); int toku_ft_handle_create(FT_HANDLE *) __attribute__ ((warn_unused_result)); -int toku_ft_set_flags(FT_HANDLE, unsigned int flags) __attribute__ ((warn_unused_result)); -int toku_ft_get_flags(FT_HANDLE, unsigned int *flags) __attribute__ ((warn_unused_result)); +void toku_ft_set_flags(FT_HANDLE, unsigned int flags); +void toku_ft_get_flags(FT_HANDLE, unsigned int *flags); void toku_ft_handle_set_nodesize(FT_HANDLE, unsigned int nodesize); void toku_ft_handle_get_nodesize(FT_HANDLE, unsigned int *nodesize); void toku_ft_get_maximum_advised_key_value_lengths(unsigned int *klimit, unsigned int *vlimit); diff --git a/ft/ft.cc b/ft/ft.cc index 0a743473886..bb503fb1f83 100644 --- a/ft/ft.cc +++ b/ft/ft.cc @@ -1055,4 +1055,7 @@ void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp toku_ft_unlock(ft); } - +// mark the ft as a blackhole. any message injections will be a no op. +void toku_ft_set_blackhole(FT_HANDLE ft_handle) { + ft_handle->ft->blackhole = true; +} diff --git a/ft/ft.h b/ft/ft.h index e123c4c90a4..805c536de83 100644 --- a/ft/ft.h +++ b/ft/ft.h @@ -109,5 +109,7 @@ void toku_ft_get_basementnodesize(FT ft, unsigned int *basementnodesize); void toku_ft_set_compression_method(FT ft, enum toku_compression_method method); void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp); +// mark the ft as a blackhole. any message injections will be a no op. +void toku_ft_set_blackhole(FT_HANDLE ft_handle); #endif diff --git a/ft/recover.cc b/ft/recover.cc index afb11ed4c60..7c388eaf618 100644 --- a/ft/recover.cc +++ b/ft/recover.cc @@ -273,8 +273,7 @@ static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, bool must_create r = toku_ft_handle_create(&brt); assert(r == 0); - r = toku_ft_set_flags(brt, treeflags); - assert(r == 0); + toku_ft_set_flags(brt, treeflags); if (nodesize != 0) { toku_ft_handle_set_nodesize(brt, nodesize); diff --git a/src/tests/threaded_stress_test_helpers.h b/src/tests/threaded_stress_test_helpers.h index ca422a557d6..2c23905a885 100644 --- a/src/tests/threaded_stress_test_helpers.h +++ b/src/tests/threaded_stress_test_helpers.h @@ -66,6 +66,7 @@ struct env_args { int checkpointing_period; int cleaner_period; int cleaner_iterations; + int64_t lk_max_memory; uint64_t cachetable_size; const char *envdir; test_update_callback_f update_function; // update callback function @@ -112,6 +113,8 @@ struct cli_args { struct env_args env_args; // specifies environment variables bool single_txn; bool warm_cache; // warm caches before running stress_table + bool blackhole; // all message injects are no-ops. helps measure txn/logging/locktree overhead. + bool prelocked_write; // use prelocked_write flag for insertions, avoiding the locktree }; struct arg { @@ -701,7 +704,8 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, void *stats_extra) { DBT key, val; dbt_init(&key, &rand_key_b, sizeof rand_key_b); dbt_init(&val, valbuf, sizeof valbuf); - r = db->put(db, txn, &key, &val, 0); + int flags = arg->cli->prelocked_write ? DB_PRELOCKED_WRITE : 0; + r = db->put(db, txn, &key, &val, flags); if (r != 0) { goto cleanup; } @@ -814,7 +818,8 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void DBT key, val; dbt_init(&key, &rand_key_b, sizeof rand_key_b); dbt_init(&val, valbuf, sizeof valbuf); - r = db->put(db, txn, &key, &val, 0); + int flags = arg->cli->prelocked_write ? DB_PRELOCKED_WRITE : 0; + r = db->put(db, txn, &key, &val, flags); if (r != 0) { goto cleanup; } @@ -1398,9 +1403,10 @@ static int run_workers( static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs, int (*bt_compare)(DB *, const DBT *, const DBT *), - struct env_args env_args + struct cli_args *cli_args ) { int r; + struct env_args env_args = cli_args->env_args; char rmcmd[32 + strlen(env_args.envdir)]; sprintf(rmcmd, "rm -rf %s", env_args.envdir); r = system(rmcmd); @@ -1411,6 +1417,7 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs, r = db_env_create(&env, 0); assert(r == 0); r = env->set_redzone(env, 0); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r); + r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r); r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); if (env_args.generate_put_callback) { r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback); @@ -1430,7 +1437,6 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs, r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r); *env_res = env; - for (int i = 0; i < num_DBs; i++) { DB *db; char name[30]; @@ -1444,7 +1450,8 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs, CKERR(r); r = db->set_readpagesize(db, env_args.basement_node_size); CKERR(r); - r = db->open(db, null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666); + const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0); + r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666); CKERR(r); db_res[i] = db; } @@ -1533,14 +1540,16 @@ static void do_xa_recovery(DB_ENV* env) { static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, int (*bt_compare)(DB *, const DBT *, const DBT *), - struct env_args env_args) { + struct cli_args *cli_args) { int r; + struct env_args env_args = cli_args->env_args; /* create the dup database file */ DB_ENV *env; r = db_env_create(&env, 0); assert(r == 0); r = env->set_redzone(env, 0); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r); + r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r); env->set_update(env, env_args.update_function); // set the cache size to 10MB r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); @@ -1571,7 +1580,8 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, get_ith_table_name(name, sizeof(name), i); r = db_create(&db, env, 0); CKERR(r); - r = db->open(db, null_txn, name, NULL, DB_BTREE, 0, 0666); + const int flags = cli_args->blackhole ? DB_BLACKHOLE : 0; + r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666); CKERR(r); db_res[i] = db; } @@ -1593,6 +1603,7 @@ static const struct env_args DEFAULT_ENV_ARGS = { .checkpointing_period = 10, .cleaner_period = 1, .cleaner_iterations = 1, + .lk_max_memory = 1 * 1024 * 1024, .cachetable_size = 300000, .envdir = ENVDIR, .update_function = update_op_callback, @@ -1606,6 +1617,7 @@ static const struct env_args DEFAULT_PERF_ENV_ARGS = { .checkpointing_period = 60, .cleaner_period = 1, .cleaner_iterations = 5, + .lk_max_memory = 1L * 1024 * 1024 * 1024, .cachetable_size = 1<<30, .envdir = ENVDIR, .update_function = NULL, @@ -1645,6 +1657,8 @@ static struct cli_args UU() get_default_args(void) { .env_args = DEFAULT_ENV_ARGS, .single_txn = false, .warm_cache = false, + .blackhole = false, + .prelocked_write = false }; return DEFAULT_ARGS; } @@ -2016,6 +2030,8 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct BOOL_ARG("only_stress", only_stress), BOOL_ARG("test", do_test_and_crash), BOOL_ARG("recover", do_recover), + BOOL_ARG("blackhole", blackhole), + BOOL_ARG("prelocked_write", prelocked_write), STRING_ARG("--envdir", env_args.envdir), LOCAL_STRING_ARG("--perf_format", perf_format_s, "human"), @@ -2167,7 +2183,7 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co dbs, args->num_DBs, bt_compare, - args->env_args + args ); { int chk_r = fill_tables_with_zeroes(dbs, args->num_DBs, args->num_elements, args->key_size, args->val_size); CKERR(chk_r); } { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); } @@ -2177,7 +2193,7 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co dbs, args->num_DBs, bt_compare, - args->env_args); CKERR(chk_r); } + args); CKERR(chk_r); } if (args->warm_cache) { do_warm_cache(env, dbs, args); } @@ -2201,7 +2217,7 @@ UU() stress_recover(struct cli_args *args) { dbs, args->num_DBs, stress_int_dbt_cmp, - args->env_args); CKERR(chk_r); } + args); CKERR(chk_r); } DB_TXN* txn = NULL; struct arg recover_args; diff --git a/src/ydb.cc b/src/ydb.cc index f5e01afe0d8..e6dcad20aab 100644 --- a/src/ydb.cc +++ b/src/ydb.cc @@ -406,20 +406,18 @@ static int toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte, uint32_t min, u static int db_use_builtin_key_cmp(DB *db) { HANDLE_PANICKED_DB(db); - int r; + int r = 0; if (db_opened(db)) r = toku_ydb_do_error(db->dbenv, EINVAL, "Comparison functions cannot be set after DB open.\n"); else if (db->i->key_compare_was_set) r = toku_ydb_do_error(db->dbenv, EINVAL, "Key comparison function already set.\n"); else { uint32_t tflags; - r = toku_ft_get_flags(db->i->ft_handle, &tflags); - if (r!=0) return r; + toku_ft_get_flags(db->i->ft_handle, &tflags); tflags |= TOKU_DB_KEYCMP_BUILTIN; - r = toku_ft_set_flags(db->i->ft_handle, tflags); - if (!r) - db->i->key_compare_was_set = true; + toku_ft_set_flags(db->i->ft_handle, tflags); + db->i->key_compare_was_set = true; } return r; } diff --git a/src/ydb_db.cc b/src/ydb_db.cc index e227c41935b..c19dc39823a 100644 --- a/src/ydb_db.cc +++ b/src/ydb_db.cc @@ -234,19 +234,19 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP int is_db_hot_index = flags & DB_IS_HOT_INDEX; unused_flags&=~DB_IS_HOT_INDEX; //We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided. - unused_flags&=~DB_READ_UNCOMMITTED; - unused_flags&=~DB_READ_COMMITTED; - unused_flags&=~DB_SERIALIZABLE; - if (unused_flags & ~DB_THREAD) return EINVAL; // unknown flags + unused_flags&=~DB_READ_UNCOMMITTED; + unused_flags&=~DB_READ_COMMITTED; + unused_flags&=~DB_SERIALIZABLE; + + // DB_THREAD is implicitly supported and DB_BLACKHOLE is supported at the ft-layer + unused_flags &= ~DB_THREAD; + unused_flags &= ~DB_BLACKHOLE; + + // check for unknown or conflicting flags + if (unused_flags) return EINVAL; // unknown flags if (is_db_excl && !is_db_create) return EINVAL; if (dbtype==DB_UNKNOWN && is_db_excl) return EINVAL; - /* tokudb supports no duplicates and sorted duplicates only */ - unsigned int tflags; - r = toku_ft_get_flags(db->i->ft_handle, &tflags); - if (r != 0) - return r; - if (db_opened(db)) { // it was already open return EINVAL; @@ -384,17 +384,12 @@ db_open_iname(DB * db, DB_TXN * txn, const char *iname_in_env, uint32_t flags, i flags&=~DB_SERIALIZABLE; flags&=~DB_IS_HOT_INDEX; // unknown or conflicting flags are bad - if ((flags & ~DB_THREAD) || (is_db_excl && !is_db_create)) { + int unknown_flags = flags & ~DB_THREAD; + unknown_flags &= ~DB_BLACKHOLE; + if (unknown_flags || (is_db_excl && !is_db_create)) { return EINVAL; } - /* tokudb supports no duplicates and sorted duplicates only */ - unsigned int tflags; - r = toku_ft_get_flags(db->i->ft_handle, &tflags); - if (r != 0) { - return r; - } - if (db_opened(db)) { return EINVAL; /* It was already open. */ } @@ -411,6 +406,12 @@ db_open_iname(DB * db, DB_TXN * txn, const char *iname_in_env, uint32_t flags, i goto error_cleanup; } + // if the dictionary was opened as a blackhole, mark the + // fractal tree as blackhole too. + if (flags & DB_BLACKHOLE) { + toku_ft_set_blackhole(ft_handle); + } + db->i->opened = 1; // now that the handle has successfully opened, a valid descriptor