From c266ccbbcabf33bf37fadcb2543a63f148214cad Mon Sep 17 00:00:00 2001 From: Rich Prohaska Date: Wed, 17 Apr 2013 00:00:09 -0400 Subject: [PATCH] #4470 add a nop performance test and don't share per thread worker_extra in processor cache lines closes[t:4470] git-svn-id: file:///svn/toku/tokudb@39614 c7de825b-a66e-492c-adef-691d508d4ae1 --- src/tests/perf_nop.c | 69 ++++++++++++++++++++++++ src/tests/threaded_stress_test_helpers.h | 53 ++++++++++++++---- 2 files changed, 113 insertions(+), 9 deletions(-) create mode 100644 src/tests/perf_nop.c diff --git a/src/tests/perf_nop.c b/src/tests/perf_nop.c new file mode 100644 index 00000000000..93e29fe172a --- /dev/null +++ b/src/tests/perf_nop.c @@ -0,0 +1,69 @@ +/* -*- mode: C; c-basic-offset: 4 -*- */ +#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved." +#ident "$Id: test_stress1.c 39258 2012-01-27 13:51:58Z zardosht $" +#include "test.h" + +#include +#include + +#include +#include +#include +#include +#include + +#include "threaded_stress_test_helpers.h" + +// +// This test is a form of stress that does operations on a single dictionary: +// We create a dictionary bigger than the cachetable (around 4x greater). +// Then, we spawn a bunch of pthreads that do the following: +// - scan dictionary forward with bulk fetch +// - scan dictionary forward slowly +// - scan dictionary backward with bulk fetch +// - scan dictionary backward slowly +// - Grow the dictionary with insertions +// - do random point queries into the dictionary +// With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes. +// If the test runs to completion without crashing, we consider it a success. It also tests that snapshots +// work correctly by verifying that table scans sum their vals to 0. +// +// This does NOT test: +// - splits and merges +// - multiple DBs +// +// Variables that are interesting to tweak and run: +// - small cachetable +// - number of elements +// + +static void +stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) { + int n = cli_args->num_elements; + // + // the threads that we want: + // - some threads constantly updating random values + // - one thread doing table scan with bulk fetch + // - one thread doing table scan without bulk fetch + // - some threads doing random point queries + // + + if (verbose) printf("starting creation of pthreads\n"); + const int num_threads = cli_args->num_ptquery_threads; + struct arg myargs[num_threads]; + for (int i = 0; i < num_threads; i++) { + arg_init(&myargs[i], n, dbp, env, cli_args); + } + for (int i = 0; i < num_threads; i++) { + myargs[i].operation = nop; + } + run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args); +} + +int +test_main(int argc, char *const argv[]) { + struct cli_args args = get_default_args_for_perf(); + parse_stress_test_args(argc, argv, &args); + stress_test_main(&args); + return 0; +} diff --git a/src/tests/threaded_stress_test_helpers.h b/src/tests/threaded_stress_test_helpers.h index b90e817da5c..926572ebc2f 100644 --- a/src/tests/threaded_stress_test_helpers.h +++ b/src/tests/threaded_stress_test_helpers.h @@ -18,7 +18,7 @@ #include #include #include - +#include static inline int32_t myrandom_r(struct random_data *buf) @@ -58,6 +58,7 @@ struct arg { enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking bool crash_on_operation_failure; // true if we should crash if operation returns non-zero, false otherwise struct random_data *random_data; // state for random_r + bool single_txn; }; struct env_args { @@ -90,6 +91,7 @@ struct cli_args { u_int32_t key_size; // number of bytes in vals. Must be at least 4 u_int32_t val_size; // number of bytes in vals. Must be at least 4 struct env_args env_args; // specifies environment variables + bool single_txn; }; DB_TXN * const null_txn = 0; @@ -104,6 +106,7 @@ static void arg_init(struct arg *arg, int num_elements, DB **dbp, DB_ENV *env, s arg->lock_type = STRESS_LOCK_NONE; arg->txn_type = DB_TXN_SNAPSHOT; arg->crash_on_operation_failure = cli_args->crash_on_update_failure; + arg->single_txn = cli_args->single_txn; arg->operation_extra = NULL; } @@ -111,7 +114,8 @@ struct worker_extra { struct arg* thread_arg; toku_pthread_mutex_t *operation_lock_mutex; struct rwlock *operation_lock; - int num_operations_completed; + int64_t num_operations_completed; + int64_t pad[4]; // pad to 64 bytes }; static void lock_worker_op(struct worker_extra* we) { @@ -158,17 +162,26 @@ static void *worker(void *arg_v) { if (verbose) { printf("%lu starting %p\n", toku_pthread_self(), arg->operation); } + if (arg->single_txn) { + r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r); + } while (run_test) { lock_worker_op(we); - r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r); + if (!arg->single_txn) { + r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r); + } r = arg->operation(txn, arg, arg->operation_extra); if (r == 0) { - CHK(txn->commit(txn,0)); + if (!arg->single_txn) { + CHK(txn->commit(txn,0)); + } } else { if (arg->crash_on_operation_failure) { CKERR(r); } else { - CHK(txn->abort(txn)); + if (!arg->single_txn) { + CHK(txn->abort(txn)); + } } } unlock_worker_op(we); @@ -177,6 +190,9 @@ static void *worker(void *arg_v) { usleep(arg->sleep_ms * 1000); } } + if (arg->single_txn) { + CHK(txn->commit(txn, 0)); + } if (verbose) printf("%lu returning\n", toku_pthread_self()); toku_free(random_buf); @@ -266,7 +282,11 @@ static int generate_row_for_put( return 0; } -static int UU() loader_op(DB_TXN* txn, ARG arg, void* UU(operation_extra)) { +static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) { + return 0; +} + +static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra)) { DB_ENV* env = arg->env; int r; for (int num = 0; num < 2; num++) { @@ -373,6 +393,15 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext return ptquery_and_maybe_check_op(db, txn, arg, FALSE); } +static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) { + int db_index = arg->num_DBs > 1 ? random()%arg->num_DBs : 0; + DB* db = arg->dbp[db_index]; + DBC* cursor = NULL; + int r = db->cursor(db, txn, &cursor, 0); assert(r == 0); + r = cursor->c_close(cursor); assert(r == 0); + return 0; +} + #define MAX_RANDOM_VAL 10000 enum update_type { @@ -704,7 +733,7 @@ static void *test_time(void *arg) { if (verbose) { printf("Sleeping for %d seconds\n", num_seconds); } - int num_operations_completed_total[tte->num_wes]; + int64_t num_operations_completed_total[tte->num_wes]; memset(num_operations_completed_total, 0, sizeof num_operations_completed_total); for (int i = 0; i < num_seconds; i += tte->performance_period) { usleep(tte->performance_period*1000*1000); @@ -762,7 +791,8 @@ static int run_workers( rwlock_init(&rwlock); toku_pthread_t tids[num_threads]; toku_pthread_t time_tid; - struct worker_extra worker_extra[num_threads]; + struct worker_extra *worker_extra = (struct worker_extra *) + memalign(64, num_threads * sizeof (struct worker_extra)); // allocate worker_extra's on cache line boundaries struct test_time_extra tte; tte.num_seconds = num_seconds; tte.crash_at_end = crash_at_end; @@ -797,6 +827,7 @@ static int run_workers( printf("ending test, pthreads have joined\n"); rwlock_destroy(&rwlock); toku_pthread_mutex_destroy(&mutex); + toku_free(worker_extra); return r; } @@ -990,6 +1021,7 @@ static struct cli_args UU() get_default_args(void) { .key_size = MIN_KEY_SIZE, .val_size = MIN_VAL_SIZE, .env_args = DEFAULT_ENV_ARGS, + .single_txn = false, }; return DEFAULT_ARGS; } @@ -1078,7 +1110,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct argc--; argv++; args->update_broadcast_period_ms = atoi(argv[1]); } - else if (strcmp(argv[1], "--num_ptquery_threads") == 0) { + else if (strcmp(argv[1], "--num_ptquery_threads") == 0 || strcmp(argv[1], "--num_threads") == 0) { argc--; argv++; args->num_ptquery_threads = atoi(argv[1]); } @@ -1132,6 +1164,9 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct argc--; argv++; args->env_args.envdir = argv[1]; } + else if (strcmp(argv[1], "--single_txn") == 0) { + args->single_txn = true; + } else { resultcode=1; goto do_usage;