diff --git a/src/tests/recover-test_crash_in_flusher_thread.h b/src/tests/recover-test_crash_in_flusher_thread.h index 9c5698c11fa..283ed843212 100644 --- a/src/tests/recover-test_crash_in_flusher_thread.h +++ b/src/tests/recover-test_crash_in_flusher_thread.h @@ -63,21 +63,23 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { } // make the guy that updates the db + struct update_op_args uoe = get_update_op_args(cli_args, NULL); + myargs[0].operation_extra = &uoe; myargs[0].operation = update_op; //myargs[0].update_pad_frequency = 0; db_env_set_flusher_thread_callback(ft_callback, env); - run_workers(myargs, num_threads, cli_args->time_of_test, true); + run_workers(myargs, num_threads, cli_args->time_of_test, true, cli_args); } static int run_recover_ft_test(int argc, char *const argv[]) { - struct cli_args args = DEFAULT_ARGS; + struct cli_args args = get_default_args(); // make test time arbitrarily high because we expect a crash args.time_of_test = 1000000000; args.num_elements = 2000; // we want to induce a checkpoint - args.checkpointing_period = 0; + args.env_args.checkpointing_period = 0; parse_stress_test_args(argc, argv, &args); if (args.do_test_and_crash) { stress_test_main(&args); diff --git a/src/tests/recover-test_stress1.c b/src/tests/recover-test_stress1.c index fd41d75953e..b9c2ab387d0 100644 --- a/src/tests/recover-test_stress1.c +++ b/src/tests/recover-test_stress1.c @@ -55,29 +55,40 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { for (int i = 0; i < num_threads; i++) { arg_init(&myargs[i], n, dbp, env, cli_args); } + struct scan_op_extra soe[4]; // make the forward fast scanner - myargs[0].fast = TRUE; - myargs[0].fwd = TRUE; + soe[0].fast = TRUE; + soe[0].fwd = TRUE; + soe[0].prefetch = FALSE; + myargs[0].operation_extra = &soe[0]; myargs[0].operation = scan_op; // make the forward slow scanner - myargs[1].fast = FALSE; - myargs[1].fwd = TRUE; + soe[1].fast = FALSE; + soe[1].fwd = TRUE; + soe[1].prefetch = FALSE; + myargs[1].operation_extra = &soe[1]; myargs[1].operation = scan_op; // make the backward fast scanner - myargs[2].fast = TRUE; - myargs[2].fwd = FALSE; + soe[2].fast = TRUE; + soe[2].fwd = FALSE; + soe[2].prefetch = FALSE; + myargs[2].operation_extra = &soe[2]; myargs[2].operation = scan_op; // make the backward slow scanner - myargs[3].fast = FALSE; - myargs[3].fwd = FALSE; + soe[3].fast = FALSE; + soe[3].fwd = FALSE; + soe[3].prefetch = FALSE; + myargs[3].operation_extra = &soe[3]; myargs[3].operation = scan_op; + struct update_op_args uoe = get_update_op_args(cli_args, NULL); // make the guy that updates the db for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { + myargs[i].operation_extra = &uoe; myargs[i].operation = update_op; } @@ -87,13 +98,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { } int num_seconds = random() % cli_args->time_of_test; - run_workers(myargs, num_threads, num_seconds, true); + run_workers(myargs, num_threads, num_seconds, true, cli_args); } int test_main(int argc, char *const argv[]) { - struct cli_args args = DEFAULT_ARGS; - args.checkpointing_period = 1; + struct cli_args args = get_default_args(); + args.env_args.checkpointing_period = 1; parse_stress_test_args(argc, argv, &args); if (args.do_test_and_crash) { stress_test_main(&args); diff --git a/src/tests/recover-test_stress2.c b/src/tests/recover-test_stress2.c index 6c7cb955445..2175de36330 100644 --- a/src/tests/recover-test_stress2.c +++ b/src/tests/recover-test_stress2.c @@ -22,21 +22,23 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { if (verbose) printf("starting creation of pthreads\n"); const int num_threads = cli_args->num_update_threads; struct arg myargs[num_threads]; - for (int i = 0; i < num_threads; i++) { + struct update_op_args uoe = get_update_op_args(cli_args, NULL); + // make the guy that updates the db + for (int i = 0; i < 0 + cli_args->num_update_threads; ++i) { arg_init(&myargs[i], n, dbp, env, cli_args); - // make the guy that updates the db + myargs[i].operation_extra = &uoe; myargs[i].operation = update_op; } int num_seconds = random() % cli_args->time_of_test; - run_workers(myargs, num_threads, num_seconds, true); + run_workers(myargs, num_threads, num_seconds, true, cli_args); } int test_main(int argc, char *const argv[]) { - struct cli_args args = DEFAULT_ARGS; - args.checkpointing_period = 1; + struct cli_args args = get_default_args(); + args.env_args.checkpointing_period = 1; args.num_elements = 2000; parse_stress_test_args(argc, argv, &args); if (args.do_test_and_crash) { diff --git a/src/tests/test_stress1.c b/src/tests/test_stress1.c index 421ca833e0c..a447173de4d 100644 --- a/src/tests/test_stress1.c +++ b/src/tests/test_stress1.c @@ -55,29 +55,40 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { for (int i = 0; i < num_threads; i++) { arg_init(&myargs[i], n, dbp, env, cli_args); } + struct scan_op_extra soe[4]; // make the forward fast scanner - myargs[0].fast = TRUE; - myargs[0].fwd = TRUE; + soe[0].fast = TRUE; + soe[0].fwd = TRUE; + soe[0].prefetch = FALSE; + myargs[0].operation_extra = &soe[0]; myargs[0].operation = scan_op; // make the forward slow scanner - myargs[1].fast = FALSE; - myargs[1].fwd = TRUE; + soe[1].fast = FALSE; + soe[1].fwd = TRUE; + soe[1].prefetch = FALSE; + myargs[1].operation_extra = &soe[1]; myargs[1].operation = scan_op; // make the backward fast scanner - myargs[2].fast = TRUE; - myargs[2].fwd = FALSE; + soe[2].fast = TRUE; + soe[2].fwd = FALSE; + soe[2].prefetch = FALSE; + myargs[2].operation_extra = &soe[2]; myargs[2].operation = scan_op; // make the backward slow scanner - myargs[3].fast = FALSE; - myargs[3].fwd = FALSE; + soe[3].fast = FALSE; + soe[3].fwd = FALSE; + soe[3].prefetch = FALSE; + myargs[3].operation_extra = &soe[3]; myargs[3].operation = scan_op; + struct update_op_args uoe = get_update_op_args(cli_args, NULL); // make the guy that updates the db for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { + myargs[i].operation_extra = &uoe; myargs[i].operation = update_op; } @@ -86,12 +97,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { myargs[i].operation = ptquery_op; } - run_workers(myargs, num_threads, cli_args->time_of_test, false); + run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args); } int test_main(int argc, char *const argv[]) { - struct cli_args args = DEFAULT_ARGS; + struct cli_args args = get_default_args(); parse_stress_test_args(argc, argv, &args); stress_test_main(&args); return 0; diff --git a/src/tests/test_stress2.c b/src/tests/test_stress2.c index 341481fd239..70e202f4cef 100644 --- a/src/tests/test_stress2.c +++ b/src/tests/test_stress2.c @@ -49,45 +49,56 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { for (int i = 0; i < num_threads; i++) { arg_init(&myargs[i], n, dbp, env, cli_args); } + struct scan_op_extra soe[4]; // make the forward fast scanner - myargs[0].fast = TRUE; - myargs[0].fwd = TRUE; + soe[0].fast = TRUE; + soe[0].fwd = TRUE; + soe[0].prefetch = FALSE; + myargs[0].operation_extra = &soe[0]; myargs[0].operation = scan_op; // make the forward slow scanner - myargs[1].fast = FALSE; - myargs[1].fwd = TRUE; + soe[1].fast = FALSE; + soe[1].fwd = TRUE; + soe[1].prefetch = FALSE; + myargs[1].operation_extra = &soe[1]; myargs[1].operation = scan_op; // make the backward fast scanner - myargs[2].fast = TRUE; - myargs[2].fwd = FALSE; + soe[2].fast = TRUE; + soe[2].fwd = FALSE; + soe[2].prefetch = FALSE; + myargs[2].operation_extra = &soe[2]; myargs[2].operation = scan_op; // make the backward slow scanner - myargs[3].fast = FALSE; - myargs[3].fwd = FALSE; + soe[3].fast = FALSE; + soe[3].fwd = FALSE; + soe[3].prefetch = FALSE; + myargs[3].operation_extra = &soe[3]; myargs[3].operation = scan_op; + struct update_op_args uoe = get_update_op_args(cli_args, NULL); // make the guy that updates the db for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { - myargs[i].bounded_update_range = false; + myargs[i].operation_extra = &uoe; + myargs[i].bounded_element_range = false; myargs[i].operation = update_op; } // make the guy that does point queries for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) { - myargs[i].bounded_update_range = false; + myargs[i].bounded_element_range = false; myargs[i].operation = ptquery_op_no_check; } - run_workers(myargs, num_threads, cli_args->time_of_test, false); + run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args); } int test_main(int argc, char *const argv[]) { - struct cli_args args = DEFAULT_ARGS; + struct cli_args args = get_default_args(); parse_stress_test_args(argc, argv, &args); stress_test_main(&args); return 0; diff --git a/src/tests/test_stress3.c b/src/tests/test_stress3.c index 250fbbe2b3b..c713059ad33 100644 --- a/src/tests/test_stress3.c +++ b/src/tests/test_stress3.c @@ -48,28 +48,40 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { arg_init(&myargs[i], n, dbp, env, cli_args); } + struct scan_op_extra soe[4]; + // make the forward fast scanner - myargs[0].fast = TRUE; - myargs[0].fwd = TRUE; + soe[0].fast = TRUE; + soe[0].fwd = TRUE; + soe[0].prefetch = FALSE; + myargs[0].operation_extra = &soe[0]; myargs[0].operation = scan_op; // make the forward slow scanner - myargs[1].fast = FALSE; - myargs[1].fwd = TRUE; + soe[1].fast = FALSE; + soe[1].fwd = TRUE; + soe[1].prefetch = FALSE; + myargs[1].operation_extra = &soe[1]; myargs[1].operation = scan_op; // make the backward fast scanner - myargs[2].fast = TRUE; - myargs[2].fwd = FALSE; + soe[2].fast = TRUE; + soe[2].fwd = FALSE; + soe[2].prefetch = FALSE; + myargs[2].operation_extra = &soe[2]; myargs[2].operation = scan_op; // make the backward slow scanner - myargs[3].fast = FALSE; - myargs[3].fwd = FALSE; + soe[3].fast = FALSE; + soe[3].fwd = FALSE; + soe[3].prefetch = FALSE; + myargs[3].operation_extra = &soe[3]; myargs[3].operation = scan_op; + struct update_op_args uoe = get_update_op_args(cli_args, NULL); // make the guy that updates the db for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { + myargs[i].operation_extra = &uoe; myargs[i].lock_type = STRESS_LOCK_SHARED; myargs[i].operation = update_op; } @@ -84,12 +96,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { myargs[i].operation = ptquery_op; } - run_workers(myargs, num_threads, cli_args->time_of_test, false); + run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args); } int test_main(int argc, char *const argv[]) { - struct cli_args args = DEFAULT_ARGS; + struct cli_args args = get_default_args(); parse_stress_test_args(argc, argv, &args); stress_test_main(&args); return 0; diff --git a/src/tests/test_stress4.c b/src/tests/test_stress4.c index 5e08e9e516d..d28b736abb2 100644 --- a/src/tests/test_stress4.c +++ b/src/tests/test_stress4.c @@ -48,30 +48,42 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { arg_init(&myargs[i], n, dbp, env, cli_args); } + struct scan_op_extra soe[4]; + // make the forward fast scanner - myargs[0].fast = TRUE; - myargs[0].fwd = TRUE; + soe[0].fast = TRUE; + soe[0].fwd = TRUE; + soe[0].prefetch = FALSE; + myargs[0].operation_extra = &soe[0]; myargs[0].operation = scan_op_no_check; // make the forward slow scanner - myargs[1].fast = FALSE; - myargs[1].fwd = TRUE; + soe[1].fast = FALSE; + soe[1].fwd = TRUE; + soe[1].prefetch = FALSE; + myargs[1].operation_extra = &soe[1]; myargs[1].operation = scan_op_no_check; // make the backward fast scanner - myargs[2].fast = TRUE; - myargs[2].fwd = FALSE; + soe[2].fast = TRUE; + soe[2].fwd = FALSE; + soe[2].prefetch = FALSE; + myargs[2].operation_extra = &soe[2]; myargs[2].operation = scan_op_no_check; // make the backward slow scanner - myargs[3].fast = FALSE; - myargs[3].fwd = FALSE; + soe[3].fast = FALSE; + soe[3].fwd = FALSE; + soe[3].prefetch = FALSE; + myargs[3].operation_extra = &soe[3]; myargs[3].operation = scan_op_no_check; + struct update_op_args uoe[cli_args->num_update_threads]; // make the guy that updates the db for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { - myargs[i].update_history_buffer = toku_xmalloc(n * (sizeof myargs[i].update_history_buffer[0])); - memset(myargs[i].update_history_buffer, 0, n * (sizeof myargs[i].update_history_buffer[0])); + int* update_history_buffer = toku_xmalloc(n * (sizeof uoe[i-4].update_history_buffer[0])); + uoe[i-4] = get_update_op_args(cli_args,update_history_buffer); + memset(uoe[i-4].update_history_buffer, 0, n * (sizeof uoe[i-4].update_history_buffer[0])); myargs[i].operation = update_with_history_op; } @@ -80,16 +92,16 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { myargs[i].operation = ptquery_op; } - run_workers(myargs, num_threads, cli_args->time_of_test, false); + run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args); - for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { - toku_free(myargs[i].update_history_buffer); + for (int i = 0; i < cli_args->num_update_threads; ++i) { + toku_free(uoe[i].update_history_buffer); } } int test_main(int argc, char *const argv[]) { - struct cli_args args = DEFAULT_ARGS; + struct cli_args args = get_default_args(); parse_stress_test_args(argc, argv, &args); stress_test_main(&args); return 0; diff --git a/src/tests/test_stress5.c b/src/tests/test_stress5.c index 43d889976b4..493e0587fa4 100644 --- a/src/tests/test_stress5.c +++ b/src/tests/test_stress5.c @@ -29,21 +29,30 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { for (int i = 0; i < num_threads; i++) { arg_init(&myargs[i], n, dbp, env, cli_args); } + struct scan_op_extra soe[2]; + // make the forward fast scanner - myargs[0].fast = TRUE; - myargs[0].fwd = TRUE; + soe[0].fast = TRUE; + soe[0].fwd = TRUE; + soe[0].prefetch = FALSE; + myargs[0].operation_extra = &soe[0]; myargs[0].operation = scan_op; - // make the backward slow scanner - myargs[1].fast = FALSE; - myargs[1].fwd = FALSE; + // make the forward slow scanner + soe[1].fast = FALSE; + soe[1].fwd = TRUE; + soe[1].prefetch = FALSE; + myargs[1].operation_extra = &soe[1]; myargs[1].operation = scan_op; // make the guy that updates the db myargs[2].operation = loader_op; myargs[3].operation = keyrange_op; + struct update_op_args uoe = get_update_op_args(cli_args, NULL); + // make the guy that updates the db for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { + myargs[i].operation_extra = &uoe; myargs[i].operation = update_op; } @@ -51,14 +60,14 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) { myargs[i].operation = ptquery_op; } - run_workers(myargs, num_threads, cli_args->time_of_test, false); + run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args); } int test_main(int argc, char *const argv[]) { - struct cli_args args = DEFAULT_ARGS; + struct cli_args args = get_default_args(); // let's make default checkpointing period really slow - args.checkpointing_period = 1; + args.env_args.checkpointing_period = 1; parse_stress_test_args(argc, argv, &args); stress_test_main(&args); return 0; diff --git a/src/tests/test_stress6.c b/src/tests/test_stress6.c index 3ba47de4bd2..e31e1548501 100644 --- a/src/tests/test_stress6.c +++ b/src/tests/test_stress6.c @@ -50,28 +50,38 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { arg_init(&myargs[i], n, dbp, env, cli_args); } + struct scan_op_extra soe[4]; + // make the forward fast scanner - myargs[0].fast = TRUE; - myargs[0].fwd = TRUE; + soe[0].fast = TRUE; + soe[0].fwd = TRUE; + soe[0].prefetch = FALSE; myargs[0].lock_type = STRESS_LOCK_SHARED; + myargs[0].operation_extra = &soe[0]; myargs[0].operation = scan_op; // make the forward slow scanner - myargs[1].fast = FALSE; - myargs[1].fwd = TRUE; + soe[1].fast = FALSE; + soe[1].fwd = TRUE; + soe[1].prefetch = FALSE; myargs[1].lock_type = STRESS_LOCK_SHARED; + myargs[1].operation_extra = &soe[1]; myargs[1].operation = scan_op; // make the backward fast scanner - myargs[2].fast = TRUE; - myargs[2].fwd = FALSE; + soe[2].fast = TRUE; + soe[2].fwd = FALSE; + soe[2].prefetch = FALSE; myargs[2].lock_type = STRESS_LOCK_SHARED; + myargs[2].operation_extra = &soe[2]; myargs[2].operation = scan_op; // make the backward slow scanner - myargs[3].fast = FALSE; - myargs[3].fwd = FALSE; + soe[3].fast = FALSE; + soe[3].fwd = FALSE; + soe[3].prefetch = FALSE; myargs[3].lock_type = STRESS_LOCK_SHARED; + myargs[3].operation_extra = &soe[3]; myargs[3].operation = scan_op; // make the guy that removes and recreates the db @@ -84,25 +94,27 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { myargs[5].operation = truncate_me; // make the guy that updates the db + struct update_op_args uoe = get_update_op_args(cli_args, NULL); for (int i = 6; i < 6 + cli_args->num_update_threads; ++i) { - myargs[i].bounded_update_range = false; + myargs[i].bounded_element_range = false; myargs[i].lock_type = STRESS_LOCK_SHARED; + myargs[i].operation_extra = &uoe; myargs[i].operation = update_op; } // make the guy that does point queries for (int i = 6 + cli_args->num_update_threads; i < num_threads; i++) { myargs[i].lock_type = STRESS_LOCK_SHARED; - myargs[i].bounded_update_range = false; + myargs[i].bounded_element_range = false; myargs[i].operation = ptquery_op_no_check; } - run_workers(myargs, num_threads, cli_args->time_of_test, false); + run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args); } int test_main(int argc, char *const argv[]) { - struct cli_args args = DEFAULT_ARGS; + struct cli_args args = get_default_args(); parse_stress_test_args(argc, argv, &args); stress_test_main(&args); return 0; diff --git a/src/tests/test_stress7.c b/src/tests/test_stress7.c index 973b09c2349..2ccb456825b 100644 --- a/src/tests/test_stress7.c +++ b/src/tests/test_stress7.c @@ -29,21 +29,30 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { for (int i = 0; i < num_threads; i++) { arg_init(&myargs[i], n, dbp, env, cli_args); } + struct scan_op_extra soe[2]; + // make the forward fast scanner - myargs[0].fast = TRUE; - myargs[0].fwd = TRUE; + soe[0].fast = TRUE; + soe[0].fwd = TRUE; + soe[0].prefetch = FALSE; + myargs[0].operation_extra = &soe[0]; myargs[0].operation = scan_op; - // make the backward slow scanner - myargs[1].fast = FALSE; - myargs[1].fwd = FALSE; + // make the forward slow scanner + soe[1].fast = FALSE; + soe[1].fwd = TRUE; + soe[1].prefetch = FALSE; + myargs[1].operation_extra = &soe[1]; myargs[1].operation = scan_op; // make the guy that runs HOT in the background myargs[2].operation = hot_op; myargs[3].operation = keyrange_op; + struct update_op_args uoe = get_update_op_args(cli_args, NULL); + // make the guy that updates the db for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { + myargs[i].operation_extra = &uoe; myargs[i].operation = update_op; } @@ -51,14 +60,14 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) { myargs[i].operation = ptquery_op; } - run_workers(myargs, num_threads, cli_args->time_of_test, false); + run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args); } int test_main(int argc, char *const argv[]) { - struct cli_args args = DEFAULT_ARGS; + struct cli_args args = get_default_args(); // let's make default checkpointing period really slow - args.checkpointing_period = 1; + args.env_args.checkpointing_period = 1; parse_stress_test_args(argc, argv, &args); stress_test_main(&args); return 0; diff --git a/src/tests/test_stress_with_verify.c b/src/tests/test_stress_with_verify.c index aca345dc661..94b3025f9a0 100644 --- a/src/tests/test_stress_with_verify.c +++ b/src/tests/test_stress_with_verify.c @@ -36,8 +36,11 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { arg_init(&myargs[i], n, dbp, env, cli_args); } // make the forward fast scanner - myargs[0].fast = TRUE; - myargs[0].fwd = TRUE; + struct scan_op_extra soe; + soe.fast = TRUE; + soe.fwd = TRUE; + soe.prefetch = FALSE; + myargs[0].operation_extra = &soe; myargs[0].lock_type = STRESS_LOCK_SHARED; myargs[0].operation = scan_op; @@ -52,18 +55,20 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { } // make the guy that does point queries + struct update_op_args uoe = get_update_op_args(cli_args, NULL); for (int i = 2 + cli_args->num_update_threads; i < num_threads; i++) { myargs[i].lock_type = STRESS_LOCK_SHARED; + myargs[i].operation_extra = &uoe; myargs[i].operation = ptquery_op; } - run_workers(myargs, num_threads, cli_args->time_of_test, false); + run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args); } int test_main(int argc, char *const argv[]) { - struct cli_args args = DEFAULT_ARGS; + struct cli_args args = get_default_args(); // let's make default checkpointing period really slow - args.checkpointing_period = 1; + args.env_args.checkpointing_period = 1; args.num_elements= 2000; // make default of small num elements to args.num_ptquery_threads = 0; parse_stress_test_args(argc, argv, &args); diff --git a/src/tests/threaded_stress_test_helpers.h b/src/tests/threaded_stress_test_helpers.h index c8e2799af1b..39bd55f36e6 100644 --- a/src/tests/threaded_stress_test_helpers.h +++ b/src/tests/threaded_stress_test_helpers.h @@ -23,7 +23,7 @@ volatile bool run_test; // should be volatile since we are communicating through this variable. typedef struct arg *ARG; -typedef int (*operation_t)(DB_ENV *env, DB** dbp, DB_TXN *txn, ARG arg); +typedef int (*operation_t)(DB_TXN *txn, ARG arg, void* operation_extra); typedef int (*test_update_callback_f)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra); @@ -34,108 +34,127 @@ enum stress_lock_type { }; struct arg { - int n; - DB **dbp; - DB_ENV* env; - bool fast; - bool fwd; - bool prefetch; - bool bounded_update_range; - int sleep_ms; - enum stress_lock_type lock_type; - u_int32_t txn_type; - int *update_history_buffer; - operation_t operation; - toku_pthread_mutex_t *broadcast_lock_mutex; - struct rwlock *broadcast_lock; - int update_pad_frequency; - bool crash_on_update_failure; - u_int32_t update_txn_size; + int num_elements; // number of elements per DB + DB **dbp; // array of DBs + int num_DBs; // number of DBs + DB_ENV* env; // environment used + bool bounded_element_range; // true if elements in dictionary are bounded + // by num_elements, that is, all keys in each + // DB are in [0, num_elements) + // false otherwise + int sleep_ms; // number of milliseconds to sleep between operations + u_int32_t txn_type; // isolation level for txn running operation + operation_t operation; // function that is the operation to be run + void* operation_extra; // extra parameter passed to operation + enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking + bool crash_on_operation_failure; // true if we should crash if operation returns non-zero, false otherwise }; -struct cli_args { - int num_elements; - int time_of_test; +struct env_args { int node_size; int basement_node_size; - u_int64_t cachetable_size; - bool only_create; - bool only_stress; int checkpointing_period; int cleaner_period; int cleaner_iterations; - int update_broadcast_period_ms; - int num_ptquery_threads; - test_update_callback_f update_function; - bool do_test_and_crash; - bool do_recover; + u_int64_t cachetable_size; char *envdir; - int num_update_threads; - bool crash_on_update_failure; - u_int32_t update_txn_size; + test_update_callback_f update_function; // update callback function +}; + +struct cli_args { + int num_elements; // number of elements per DB + int num_DBs; // number of DBs + int time_of_test; // how long test should run + bool only_create; // true if want to only create DBs but not run stress + bool only_stress; // true if DBs are already created and want to only run stress + int update_broadcast_period_ms; // specific to test_stress3 + int num_ptquery_threads; // number of threads to run point queries + bool do_test_and_crash; // true if we should crash after running stress test. For recovery tests. + bool do_recover; // true if we should run recover + int num_update_threads; // number of threads running updates + bool crash_on_update_failure; + bool print_performance; + bool print_thread_performance; + int performance_period; + u_int32_t update_txn_size; // for clients that do updates, specifies number of updates per txn + u_int32_t key_size; // number of bytes in vals. Must be at least 4 + u_int32_t val_size; // number of bytes in vals. Must be at least 4 + struct env_args env_args; // specifies environment variables }; DB_TXN * const null_txn = 0; -static void arg_init(struct arg *arg, int n, DB **dbp, DB_ENV *env, struct cli_args *cli_args) { - arg->n = n; +static void arg_init(struct arg *arg, int num_elements, DB **dbp, DB_ENV *env, struct cli_args *cli_args) { + arg->num_elements = num_elements; arg->dbp = dbp; + arg->num_DBs = cli_args->num_DBs; arg->env = env; - arg->fast = true; - arg->fwd = true; - arg->prefetch = false; // setting this to TRUE causes thrashing, even with a cachetable size that is 400000. Must investigate - arg->bounded_update_range = true; + arg->bounded_element_range = true; arg->sleep_ms = 0; arg->lock_type = STRESS_LOCK_NONE; arg->txn_type = DB_TXN_SNAPSHOT; - arg->update_history_buffer = NULL; - arg->update_pad_frequency = n/100; // bit arbitrary. Just want dictionary to grow and shrink so splits and merges occur - arg->crash_on_update_failure = cli_args->crash_on_update_failure; - arg->update_txn_size = cli_args->update_txn_size; + arg->crash_on_operation_failure = cli_args->crash_on_update_failure; + arg->operation_extra = NULL; +} + +struct worker_extra { + struct arg* thread_arg; + toku_pthread_mutex_t *operation_lock_mutex; + struct rwlock *operation_lock; + int num_operations_completed; +}; + +static void lock_worker_op(struct worker_extra* we) { + ARG arg = we->thread_arg; + if (arg->lock_type != STRESS_LOCK_NONE) { + toku_pthread_mutex_lock(we->operation_lock_mutex); + if (arg->lock_type == STRESS_LOCK_SHARED) { + rwlock_read_lock(we->operation_lock, we->operation_lock_mutex); + } else if (arg->lock_type == STRESS_LOCK_EXCL) { + rwlock_write_lock(we->operation_lock, we->operation_lock_mutex); + } else { + assert(false); + } + toku_pthread_mutex_unlock(we->operation_lock_mutex); + } +} + +static void unlock_worker_op(struct worker_extra* we) { + ARG arg = we->thread_arg; + toku_pthread_mutex_lock(we->operation_lock_mutex); + if (arg->lock_type == STRESS_LOCK_SHARED) { + rwlock_read_unlock(we->operation_lock); + } else if (arg->lock_type == STRESS_LOCK_EXCL) { + rwlock_write_unlock(we->operation_lock); + } else { + assert(arg->lock_type == STRESS_LOCK_NONE); + } + toku_pthread_mutex_unlock(we->operation_lock_mutex); } static void *worker(void *arg_v) { - ARG arg = arg_v; + struct worker_extra* we = arg_v; + ARG arg = we->thread_arg; DB_ENV *env = arg->env; - DB** dbp = arg->dbp; DB_TXN *txn = NULL; - if (verbose) + if (verbose) { printf("%lu starting %p\n", toku_pthread_self(), arg->operation); + } while (run_test) { - if (arg->lock_type != STRESS_LOCK_NONE) { - toku_pthread_mutex_lock(arg->broadcast_lock_mutex); - if (arg->lock_type == STRESS_LOCK_SHARED) { - rwlock_read_lock(arg->broadcast_lock, arg->broadcast_lock_mutex); - } else if (arg->lock_type == STRESS_LOCK_EXCL) { - rwlock_write_lock(arg->broadcast_lock, arg->broadcast_lock_mutex); - } else { - assert(false); - } - toku_pthread_mutex_unlock(arg->broadcast_lock_mutex); - } - + lock_worker_op(we); int r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r); - r = arg->operation(env, dbp, txn, arg); + r = arg->operation(txn, arg, arg->operation_extra); if (r == 0) { CHK(txn->commit(txn,0)); } else { - if (arg->crash_on_update_failure) { + if (arg->crash_on_operation_failure) { CKERR(r); } else { CHK(txn->abort(txn)); } } - - toku_pthread_mutex_lock(arg->broadcast_lock_mutex); - if (arg->lock_type == STRESS_LOCK_SHARED) { - rwlock_read_unlock(arg->broadcast_lock); - } else if (arg->lock_type == STRESS_LOCK_EXCL) { - rwlock_write_unlock(arg->broadcast_lock); - } else { - assert(arg->lock_type == STRESS_LOCK_NONE); - } - toku_pthread_mutex_unlock(arg->broadcast_lock_mutex); - + unlock_worker_op(we); + we->num_operations_completed++; if (arg->sleep_ms) { usleep(arg->sleep_ms * 1000); } @@ -152,6 +171,12 @@ struct scan_cb_extra { int64_t num_elements; }; +struct scan_op_extra { + bool fast; + bool fwd; + bool prefetch; +}; + static int scan_cb(const DBT *a, const DBT *b, void *arg_v) { SCAN_CB_EXTRA cb_extra = arg_v; @@ -164,23 +189,28 @@ scan_cb(const DBT *a, const DBT *b, void *arg_v) { return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0; } -static int scan_op_and_maybe_check_sum(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg, bool check_sum) { +static int scan_op_and_maybe_check_sum( + DB* db, + DB_TXN *txn, + struct scan_op_extra* sce, + bool check_sum + ) +{ int r = 0; - DB* db = *dbp; DBC* cursor = NULL; struct scan_cb_extra e; - e.fast = arg->fast; + e.fast = sce->fast; e.curr_sum = 0; e.num_elements = 0; CHK(db->cursor(db, txn, &cursor, 0)); - if (arg->prefetch) { + if (sce->prefetch) { r = cursor->c_pre_acquire_range_lock(cursor, db->dbt_neg_infty(), db->dbt_pos_infty()); assert(r == 0); } while (r != DB_NOTFOUND) { - if (arg->fwd) { + if (sce->fwd) { r = cursor->c_getf_next(cursor, 0, scan_cb, &e); } else { @@ -217,7 +247,8 @@ static int generate_row_for_put( return 0; } -static int UU() loader_op(DB_ENV *env, DB** UU(dbp), DB_TXN* txn, ARG UU(arg)) { +static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra)) { + DB_ENV* env = arg->env; int r; for (int num = 0; num < 2; num++) { DB *db_load; @@ -248,12 +279,16 @@ static int UU() loader_op(DB_ENV *env, DB** UU(dbp), DB_TXN* txn, ARG UU(arg)) { return 0; } -static int UU() keyrange_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { +static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) { int r; - DB* db = *dbp; + // callback is designed to run on tests with one DB + // no particular reason why, just the way it was + // originally done + int db_index = random()%arg->num_DBs; + DB* db = arg->dbp[db_index]; int rand_key = random(); - if (arg->bounded_update_range) { - rand_key = rand_key % arg->n; + if (arg->bounded_element_range) { + rand_key = rand_key % arg->num_elements; } DBT key; dbt_init(&key, &rand_key, sizeof rand_key); @@ -264,28 +299,39 @@ static int UU() keyrange_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { return r; } -static int UU() verify_op(DB_ENV *UU(env), DB **dbp, DB_TXN* UU(txn), ARG UU(arg)) { +static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra)) { int r; - DB* db = *dbp; - r = db->verify_with_progress(db, NULL, NULL, 0, 0); + for (int i = 0; i < arg->num_DBs; i++) { + DB* db = arg->dbp[i]; + r = db->verify_with_progress(db, NULL, NULL, 0, 0); + } CKERR(r); return r; } -static int UU() scan_op(DB_ENV *env, DB **dbp, DB_TXN *txn, ARG arg) { - return scan_op_and_maybe_check_sum(env, dbp, txn, arg, true); +static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra) { + struct scan_op_extra* extra = operation_extra; + for (int i = 0; i < arg->num_DBs; i++) { + int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, true); + assert_zero(r); + } + return 0; } -static int UU() scan_op_no_check(DB_ENV *env, DB **dbp, DB_TXN *txn, ARG arg) { - return scan_op_and_maybe_check_sum(env, dbp, txn, arg, false); +static int UU() scan_op_no_check(DB_TXN *txn, ARG arg, void* operation_extra) { + struct scan_op_extra* extra = operation_extra; + for (int i = 0; i < arg->num_DBs; i++) { + int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, false); + assert_zero(r); + } + return 0; } -static int UU() ptquery_and_maybe_check_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg, BOOL check) { +static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, BOOL check) { int r; - DB* db = *dbp; int rand_key = random(); - if (arg->bounded_update_range) { - rand_key = rand_key % arg->n; + if (arg->bounded_element_range) { + rand_key = rand_key % arg->num_elements; } DBT key, val; memset(&val, 0, sizeof val); @@ -296,12 +342,16 @@ static int UU() ptquery_and_maybe_check_op(DB_ENV *UU(env), DB **dbp, DB_TXN *tx return r; } -static int UU() ptquery_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { - return ptquery_and_maybe_check_op(env, dbp, txn, arg, TRUE); +static int UU() ptquery_op(DB_TXN *txn, ARG arg, void* UU(operation_extra)) { + int db_index = random()%arg->num_DBs; + DB* db = arg->dbp[db_index]; + return ptquery_and_maybe_check_op(db, txn, arg, TRUE); } -static int UU() ptquery_op_no_check(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { - return ptquery_and_maybe_check_op(env, dbp, txn, arg, FALSE); +static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_extra)) { + int db_index = random()%arg->num_DBs; + DB* db = arg->dbp[db_index]; + return ptquery_and_maybe_check_op(db, txn, arg, FALSE); } #define MAX_RANDOM_VAL 10000 @@ -326,6 +376,20 @@ struct update_op_extra { } u; }; +struct update_op_args { + int *update_history_buffer; + u_int32_t update_txn_size; + int update_pad_frequency; +}; + +static struct update_op_args get_update_op_args(struct cli_args* cli_args, int* update_history_buffer) { + struct update_op_args uoe; + uoe.update_history_buffer = update_history_buffer; + uoe.update_pad_frequency = cli_args->num_elements/100; // arbitrary + uoe.update_txn_size = cli_args->update_txn_size; + return uoe; +} + static u_int64_t update_count = 0; static int update_op_callback(DB *UU(db), const DBT *UU(key), @@ -367,24 +431,26 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key), return 0; } -static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { +static int UU()update_op2(DB_TXN* txn, ARG arg, void* operation_extra) { int r; - DB* db = *dbp; + int db_index = random()%arg->num_DBs; + DB* db = arg->dbp[db_index]; int curr_val_sum = 0; DBT key, val; int rand_key; int rand_key2; + struct update_op_args* op_args = operation_extra; update_count++; struct update_op_extra extra; memset(&extra, 0, sizeof(extra)); extra.type = UPDATE_ADD_DIFF; extra.pad_bytes = 0; - for (u_int32_t i = 0; i < arg->update_txn_size; i++) { + for (u_int32_t i = 0; i < op_args->update_txn_size; i++) { rand_key = random(); - if (arg->bounded_update_range) { - rand_key = rand_key % (arg->n/2); + if (arg->bounded_element_range) { + rand_key = rand_key % (arg->num_elements/2); } - rand_key2 = arg->n - rand_key; + rand_key2 = arg->num_elements - rand_key; assert(rand_key != rand_key2); extra.u.d.diff = 1; curr_val_sum += extra.u.d.diff; @@ -395,10 +461,9 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { dbt_init(&val, &extra, sizeof extra), 0 ); - if (r != 0 && !arg->crash_on_update_failure) { + if (r != 0) { return r; } - CKERR(r); extra.u.d.diff = -1; r = db->update( db, @@ -407,34 +472,35 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { dbt_init(&val, &extra, sizeof extra), 0 ); - if (r != 0 && !arg->crash_on_update_failure) { + if (r != 0) { return r; } - CKERR(r); } return r; } -static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { +static int UU()update_op(DB_TXN *txn, ARG arg, void* operation_extra) { int r; - DB* db = *dbp; + int db_index = random()%arg->num_DBs; + DB* db = arg->dbp[db_index]; int curr_val_sum = 0; DBT key, val; int rand_key; update_count++; + struct update_op_args* op_args = operation_extra; struct update_op_extra extra; memset(&extra, 0, sizeof(extra)); extra.type = UPDATE_ADD_DIFF; extra.pad_bytes = 0; - if (arg->update_pad_frequency) { - if (update_count % (2*arg->update_pad_frequency) == update_count%arg->update_pad_frequency) { + if (op_args->update_pad_frequency) { + if (update_count % (2*op_args->update_pad_frequency) == update_count%op_args->update_pad_frequency) { extra.pad_bytes = 100; } } - for (u_int32_t i = 0; i < arg->update_txn_size; i++) { + for (u_int32_t i = 0; i < op_args->update_txn_size; i++) { rand_key = random(); - if (arg->bounded_update_range) { - rand_key = rand_key % arg->n; + if (arg->bounded_element_range) { + rand_key = rand_key % arg->num_elements; } extra.u.d.diff = random() % MAX_RANDOM_VAL; // just make every other value random @@ -449,18 +515,17 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { dbt_init(&val, &extra, sizeof extra), 0 ); - if (r != 0 && !arg->crash_on_update_failure) { + if (r != 0) { return r; } - CKERR(r); } // // now put in one more to ensure that the sum stays 0 // extra.u.d.diff = -curr_val_sum; rand_key = random(); - if (arg->bounded_update_range) { - rand_key = rand_key % arg->n; + if (arg->bounded_element_range) { + rand_key = rand_key % arg->num_elements; } r = db->update( db, @@ -469,19 +534,20 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { dbt_init(&val, &extra, sizeof extra), 0 ); - if (r != 0 && !arg->crash_on_update_failure) { + if (r != 0) { return r; } - CKERR(r); return r; } -static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) { - assert(arg->bounded_update_range); - assert(arg->update_history_buffer); +static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_extra) { + struct update_op_args* op_args = operation_extra; + assert(arg->bounded_element_range); + assert(op_args->update_history_buffer); int r; - DB* db = *dbp; + int db_index = random()%arg->num_DBs; + DB* db = arg->dbp[db_index]; int curr_val_sum = 0; DBT key, val; int rand_key; @@ -490,22 +556,22 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A extra.type = UPDATE_WITH_HISTORY; update_count++; extra.pad_bytes = 0; - if (arg->update_pad_frequency) { - if (update_count % (2*arg->update_pad_frequency) != update_count%arg->update_pad_frequency) { + if (op_args->update_pad_frequency) { + if (update_count % (2*op_args->update_pad_frequency) != update_count%op_args->update_pad_frequency) { extra.pad_bytes = 500; } } - for (u_int32_t i = 0; i < arg->update_txn_size; i++) { - rand_key = random() % arg->n; + for (u_int32_t i = 0; i < op_args->update_txn_size; i++) { + rand_key = random() % arg->num_elements; extra.u.h.new = random() % MAX_RANDOM_VAL; // just make every other value random if (i%2 == 0) { extra.u.h.new = -extra.u.h.new; } curr_val_sum += extra.u.h.new; - extra.u.h.expected = arg->update_history_buffer[rand_key]; - arg->update_history_buffer[rand_key] = extra.u.h.new; + extra.u.h.expected = op_args->update_history_buffer[rand_key]; + op_args->update_history_buffer[rand_key] = extra.u.h.new; r = db->update( db, txn, @@ -513,21 +579,20 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A dbt_init(&val, &extra, sizeof extra), 0 ); - if (r != 0 && !arg->crash_on_update_failure) { + if (r != 0) { return r; } - CKERR(r); } // // now put in one more to ensure that the sum stays 0 // extra.u.h.new = -curr_val_sum; rand_key = random(); - if (arg->bounded_update_range) { - rand_key = rand_key % arg->n; + if (arg->bounded_element_range) { + rand_key = rand_key % arg->num_elements; } - extra.u.h.expected = arg->update_history_buffer[rand_key]; - arg->update_history_buffer[rand_key] = extra.u.h.new; + extra.u.h.expected = op_args->update_history_buffer[rand_key]; + op_args->update_history_buffer[rand_key] = extra.u.h.new; r = db->update( db, txn, @@ -535,18 +600,18 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A dbt_init(&val, &extra, sizeof extra), 0 ); - if (r != 0 && !arg->crash_on_update_failure) { + if (r != 0) { return r; } - CKERR(r); return r; } -static int UU() update_broadcast_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG UU(arg)) { +static int UU() update_broadcast_op(DB_TXN *txn, ARG UU(arg), void* UU(operation_extra)) { struct update_op_extra extra; memset(&extra, 0, sizeof(extra)); - DB* db = *dbp; + int db_index = random()%arg->num_DBs; + DB* db = arg->dbp[db_index]; extra.type = UPDATE_NEGATE; extra.pad_bytes = 0; DBT val; @@ -555,33 +620,43 @@ static int UU() update_broadcast_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG return r; } -static int UU() hot_op(DB_ENV *UU(env), DB **dbp, DB_TXN *UU(txn), ARG UU(arg)) { +static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra)) { int r; - DB* db = *dbp; - r = db->hot_optimize(db, NULL, NULL); - CKERR(r); - return r; + for (int i = 0; i < arg->num_DBs; i++) { + DB* db = arg->dbp[i]; + r = db->hot_optimize(db, NULL, NULL); + CKERR(r); + } + return 0; } -static int UU() remove_and_recreate_me(DB_ENV *env, DB **dbp, DB_TXN *UU(txn), ARG UU(arg)) { +static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra)) { int r; - r = (*dbp)->close(*dbp, 0); CKERR(r); + int db_index = random()%arg->num_DBs; + DB* db = arg->dbp[db_index]; + r = (db)->close(db, 0); CKERR(r); - r = env->dbremove(env, null_txn, "main", NULL, 0); + char name[30]; + memset(name, 0, sizeof(name)); + snprintf(name, sizeof(name), "main%d", db_index); + + r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0); CKERR(r); - r = db_create(dbp, env, 0); + r = db_create(&(arg->dbp[db_index]), arg->env, 0); assert(r == 0); - r = (*dbp)->open(*dbp, null_txn, "main", NULL, DB_BTREE, DB_CREATE, 0666); + r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666); assert(r == 0); return 0; } -static int UU() truncate_me(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG UU(arg)) { +static int UU() truncate_me(DB_TXN *txn, ARG UU(arg), void* UU(operation_extra)) { int r; - u_int32_t row_count = 0; - r = (*dbp)->truncate(*dbp, txn, &row_count, 0); - assert(r == 0); + for ( int i = 0; i < arg->num_DBs; i++) { + u_int32_t row_count = 0; + r = (*arg->dbp)->truncate(*arg->dbp, txn, &row_count, 0); + assert(r == 0); + } return 0; } @@ -590,32 +665,77 @@ static int UU() truncate_me(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG UU(arg)) struct test_time_extra { int num_seconds; bool crash_at_end; + struct worker_extra *wes; + int num_wes; + bool print_performance; + bool print_thread_performance; + int performance_period; }; static void *test_time(void *arg) { struct test_time_extra* tte = arg; int num_seconds = tte->num_seconds; - + // // if num_Seconds is set to 0, run indefinitely // - if (num_seconds != 0) { - if (verbose) - printf("Sleeping for %d seconds\n", num_seconds); - usleep(num_seconds*1000*1000); - if (verbose) - printf("should now end test\n"); - __sync_bool_compare_and_swap(&run_test, true, false); // make this atomic to make valgrind --tool=drd happy. - if (verbose) - printf("run_test %d\n", run_test); - if (tte->crash_at_end) { - toku_hard_crash_on_purpose(); + if (num_seconds == 0) { + num_seconds = INT32_MAX; + } + if (verbose) { + printf("Sleeping for %d seconds\n", num_seconds); + } + int num_operations_completed_total[tte->num_wes]; + memset(num_operations_completed_total, 0, sizeof num_operations_completed_total); + for (int i = 0; i < num_seconds; i += tte->performance_period) { + usleep(tte->performance_period*1000*1000); + int total_operations_in_period = 0; + for (int we = 0; we < tte->num_wes; ++we) { + int last = num_operations_completed_total[we]; + int current = __sync_fetch_and_add(&tte->wes[we].num_operations_completed, 0); + if (tte->print_thread_performance) { + printf("Thread %d Iteration %d Operations %d\n", we, i, current - last); + } + total_operations_in_period += (current - last); + num_operations_completed_total[we] = current; } + if (tte->print_performance) { + printf("Iteration %d Total_Operations %d\n", i, total_operations_in_period); + } + } + int total_operations_in_test = 0; + for (int we = 0; we < tte->num_wes; ++we) { + int current = __sync_fetch_and_add(&tte->wes[we].num_operations_completed, 0); + if (tte->print_thread_performance) { + printf("TOTAL Thread %d Operations %d\n", we, current); + } + total_operations_in_test += current; + } + if (tte->print_performance) { + printf("Total_Operations %d\n", total_operations_in_test); + } + + if (verbose) { + printf("should now end test\n"); + } + __sync_bool_compare_and_swap(&run_test, true, false); // make this atomic to make valgrind --tool=drd happy. + if (verbose) { + printf("run_test %d\n", run_test); + } + if (tte->crash_at_end) { + toku_hard_crash_on_purpose(); } return arg; } -static int run_workers(struct arg *thread_args, int num_threads, u_int32_t num_seconds, bool crash_at_end) { +static int run_workers( + struct arg *thread_args, + int num_threads, + u_int32_t num_seconds, + bool crash_at_end, + struct cli_args* cli_args + ) +{ int r; toku_pthread_mutex_t mutex; toku_pthread_mutex_init(&mutex, NULL); @@ -623,14 +743,22 @@ static int run_workers(struct arg *thread_args, int num_threads, u_int32_t num_s rwlock_init(&rwlock); toku_pthread_t tids[num_threads]; toku_pthread_t time_tid; + struct worker_extra worker_extra[num_threads]; struct test_time_extra tte; tte.num_seconds = num_seconds; tte.crash_at_end = crash_at_end; + tte.wes = worker_extra; + tte.num_wes = num_threads; + tte.print_performance = cli_args->print_performance; + tte.print_thread_performance = cli_args->print_thread_performance; + tte.performance_period = cli_args->performance_period; run_test = true; for (int i = 0; i < num_threads; ++i) { - thread_args[i].broadcast_lock = &rwlock; - thread_args[i].broadcast_lock_mutex = &mutex; - CHK(toku_pthread_create(&tids[i], NULL, worker, &thread_args[i])); + worker_extra[i].thread_arg = &thread_args[i]; + worker_extra[i].operation_lock = &rwlock; + worker_extra[i].operation_lock_mutex = &mutex; + worker_extra[i].num_operations_completed = 0; + CHK(toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i])); if (verbose) printf("%lu created\n", tids[i]); } @@ -646,88 +774,90 @@ static int run_workers(struct arg *thread_args, int num_threads, u_int32_t num_s if (verbose) printf("%lu joined\n", tids[i]); } - rwlock_destroy(&rwlock); if (verbose) printf("ending test, pthreads have joined\n"); + rwlock_destroy(&rwlock); toku_pthread_mutex_destroy(&mutex); return r; } -static int create_table(DB_ENV **env_res, DB **db_res, +static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs, int (*bt_compare)(DB *, const DBT *, const DBT *), - u_int64_t cachesize, - u_int32_t checkpointing_period, - u_int32_t cleaner_period, - u_int32_t cleaner_iterations, - u_int32_t pagesize, - u_int32_t readpagesize, - char *envdir) { + struct env_args env_args +) { int r; - char rmcmd[32 + strlen(envdir)]; sprintf(rmcmd, "rm -rf %s", envdir); + char rmcmd[32 + strlen(env_args.envdir)]; sprintf(rmcmd, "rm -rf %s", env_args.envdir); r = system(rmcmd); CKERR(r); - r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); + r = toku_os_mkdir(env_args.envdir, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); DB_ENV *env; r = db_env_create(&env, 0); assert(r == 0); r = env->set_redzone(env, 0); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r); - r = env->set_cachesize(env, cachesize / (1 << 30), cachesize % (1 << 30), 1); CKERR(r); + r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r); - r = env->open(env, envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); - r = env->checkpointing_set_period(env, checkpointing_period); CKERR(r); - r = env->cleaner_set_period(env, cleaner_period); CKERR(r); - r = env->cleaner_set_iterations(env, cleaner_iterations); CKERR(r); - - DB *db; - r = db_create(&db, env, 0); - assert(r == 0); - r = db->set_flags(db, 0); - assert(r == 0); - r = db->set_pagesize(db, pagesize); - assert(r == 0); - r = db->set_readpagesize(db, readpagesize); - assert(r == 0); - r = db->open(db, null_txn, "main", NULL, DB_BTREE, DB_CREATE, 0666); - assert(r == 0); - + r = env->open(env, env_args.envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); + r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r); + r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r); + r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r); *env_res = env; - *db_res = db; + + + for (int i = 0; i < num_DBs; i++) { + DB *db; + char name[30]; + memset(name, 0, sizeof(name)); + snprintf(name, sizeof(name), "main%d", i); + r = db_create(&db, env, 0); + CKERR(r); + r = db->set_flags(db, 0); + CKERR(r); + r = db->set_pagesize(db, env_args.node_size); + CKERR(r); + r = db->set_readpagesize(db, env_args.basement_node_size); + CKERR(r); + r = db->open(db, null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666); + CKERR(r); + db_res[i] = db; + } return r; } -static int fill_table_from_fun(DB *db, int num_elements, int max_bufsz, +static int fill_table_from_fun(DB *db, int num_elements, int key_bufsz, int val_bufsz, void (*callback)(int idx, void *extra, void *key, int *keysz, void *val, int *valsz), void *extra) { int r = 0; for (long i = 0; i < num_elements; ++i) { - char keybuf[max_bufsz], valbuf[max_bufsz]; + char keybuf[key_bufsz], valbuf[val_bufsz]; + memset(keybuf, 0, sizeof(keybuf)); + memset(valbuf, 0, sizeof(valbuf)); int keysz, valsz; callback(i, extra, keybuf, &keysz, valbuf, &valsz); + // let's make sure the data stored fits in the buffers we passed in + assert(keysz <= key_bufsz); + assert(valsz <= val_bufsz); DBT key, val; - r = db->put(db, null_txn, dbt_init(&key, keybuf, keysz), dbt_init(&val, valbuf, valsz), 0); + // make size of data what is specified w/input parameters + // note that key and val have sizes of + // key_bufsz and val_bufsz, which were passed into this + // function, not what was stored by the callback + r = db->put( + db, + null_txn, + dbt_init(&key, keybuf, key_bufsz), + dbt_init(&val, valbuf, val_bufsz), + 0 + ); assert(r == 0); } return r; } -static void int_element_callback(int idx, void *UU(extra), void *keyv, int *keysz, void *valv, int *valsz) { - int *key = keyv, *val = valv; - *key = idx; - *val = idx; - *keysz = sizeof(int); - *valsz = sizeof(int); -} - -static int fill_table_with_ints(DB *db, int num_elements) __attribute__((unused)); -static int fill_table_with_ints(DB *db, int num_elements) { - return fill_table_from_fun(db, num_elements, sizeof(int), int_element_callback, NULL); -} - static void zero_element_callback(int idx, void *UU(extra), void *keyv, int *keysz, void *valv, int *valsz) { int *key = keyv, *val = valv; *key = idx; @@ -736,30 +866,26 @@ static void zero_element_callback(int idx, void *UU(extra), void *keyv, int *key *valsz = sizeof(int); } -static int fill_table_with_zeroes(DB *db, int num_elements) { - return fill_table_from_fun(db, num_elements, sizeof(int), zero_element_callback, NULL); -} - -static int fill_table_from_array(DB *db, int num_elements, void *array, size_t element_size) __attribute__((unused)); -static int fill_table_from_array(DB *db, int num_elements, void *array, size_t element_size) { - int r = 0; - char *a = array; - for (char *p = a; p < a + num_elements * element_size; p += element_size) { - DBT key, val; - r = db->put(db, null_txn, dbt_init(&key, p, element_size), dbt_init(&val, p, element_size), 0); - assert(r == 0); +static int fill_tables_with_zeroes(DB **dbs, int num_DBs, int num_elements, u_int32_t key_size, u_int32_t val_size) { + for (int i = 0; i < num_DBs; i++) { + assert(key_size >= sizeof(int)); + assert(val_size >= sizeof(int)); + int r = fill_table_from_fun( + dbs[i], + num_elements, + key_size, + val_size, + zero_element_callback, + NULL + ); + CKERR(r); } - return r; + return 0; } -static int open_table(DB_ENV **env_res, DB **db_res, +static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, int (*bt_compare)(DB *, const DBT *, const DBT *), - u_int64_t cachesize, - u_int32_t checkpointing_period, - u_int32_t cleaner_period, - u_int32_t cleaner_iterations, - test_update_callback_f f, - char *envdir) { + struct env_args env_args) { int r; /* create the dup database file */ @@ -767,55 +893,77 @@ static int open_table(DB_ENV **env_res, DB **db_res, r = db_env_create(&env, 0); assert(r == 0); r = env->set_redzone(env, 0); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r); - env->set_update(env, f); + env->set_update(env, env_args.update_function); // set the cache size to 10MB - r = env->set_cachesize(env, cachesize / (1 << 30), cachesize % (1 << 30), 1); CKERR(r); + r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r); - r = env->open(env, envdir, DB_RECOVER|DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); - r = env->checkpointing_set_period(env, checkpointing_period); CKERR(r); - r = env->cleaner_set_period(env, cleaner_period); CKERR(r); - r = env->cleaner_set_iterations(env, cleaner_iterations); CKERR(r); - - DB *db; - r = db_create(&db, env, 0); - assert(r == 0); - r = db->open(db, null_txn, "main", NULL, DB_BTREE, 0, 0666); - assert(r == 0); - + r = env->open(env, env_args.envdir, DB_RECOVER|DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); + r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r); + r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r); + r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r); *env_res = env; - *db_res = db; + + + for (int i = 0; i < num_DBs; i++) { + DB *db; + char name[30]; + memset(name, 0, sizeof(name)); + snprintf(name, sizeof(name), "main%d", i); + r = db_create(&db, env, 0); + CKERR(r); + r = db->open(db, null_txn, name, NULL, DB_BTREE, 0, 0666); + CKERR(r); + db_res[i] = db; + } return r; } -static int close_table(DB_ENV *env, DB *db) { +static int close_tables(DB_ENV *env, DB** dbs, int num_DBs) { int r; - r = db->close(db, 0); CKERR(r); + for (int i = 0; i < num_DBs; i++) { + r = dbs[i]->close(dbs[i], 0); CKERR(r); + } r = env->close(env, 0); CKERR(r); return r; } -static const struct cli_args DEFAULT_ARGS = { - .num_elements = 150000, - .time_of_test = 180, +static const struct env_args DEFAULT_ENV_ARGS = { .node_size = 4096, .basement_node_size = 1024, - .cachetable_size = 300000, - .only_create = false, - .only_stress = false, .checkpointing_period = 10, .cleaner_period = 1, .cleaner_iterations = 1, - .update_broadcast_period_ms = 2000, - .num_ptquery_threads = 1, - .update_function = update_op_callback, - .do_test_and_crash = false, - .do_recover = false, + .cachetable_size = 300000, .envdir = ENVDIR, - .num_update_threads = 1, - .crash_on_update_failure = true, - .update_txn_size = 1000, + .update_function = update_op_callback, }; +#define MIN_VAL_SIZE sizeof(int) +#define MIN_KEY_SIZE sizeof(int) +static struct cli_args get_default_args(void) { + struct cli_args DEFAULT_ARGS = { + .num_elements = 150000, + .num_DBs = 1, + .time_of_test = 180, + .only_create = false, + .only_stress = false, + .update_broadcast_period_ms = 2000, + .num_ptquery_threads = 1, + .do_test_and_crash = false, + .do_recover = false, + .num_update_threads = 1, + .crash_on_update_failure = true, + .print_performance = false, + .print_thread_performance = false, + .performance_period = 1, + .update_txn_size = 1000, + .key_size = MIN_KEY_SIZE, + .val_size = MIN_VAL_SIZE, + .env_args = DEFAULT_ENV_ARGS, + }; + return DEFAULT_ARGS; +} + static inline void parse_stress_test_args (int argc, char *const argv[], struct cli_args *args) { const char *argv0=argv[0]; while (argc>1) { @@ -830,52 +978,62 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct do_usage: fprintf(stderr, "Usage:\n%s [-h|-v|-q] [OPTIONS] [--only_create|--only_stress]\n", argv0); fprintf(stderr, "OPTIONS are among:\n"); - fprintf(stderr, "\t--num_elements INT (default %d)\n", DEFAULT_ARGS.num_elements); - fprintf(stderr, "\t--num_seconds INT (default %ds)\n", DEFAULT_ARGS.time_of_test); - fprintf(stderr, "\t--node_size INT (default %d bytes)\n", DEFAULT_ARGS.node_size); - fprintf(stderr, "\t--basement_node_size INT (default %d bytes)\n", DEFAULT_ARGS.basement_node_size); - fprintf(stderr, "\t--cachetable_size INT (default %ld bytes)\n", DEFAULT_ARGS.cachetable_size); - fprintf(stderr, "\t--checkpointing_period INT (default %ds)\n", DEFAULT_ARGS.checkpointing_period); - fprintf(stderr, "\t--cleaner_period INT (default %ds)\n", DEFAULT_ARGS.cleaner_period); - fprintf(stderr, "\t--cleaner_iterations INT (default %ds)\n", DEFAULT_ARGS.cleaner_iterations); - fprintf(stderr, "\t--update_broadcast_period INT (default %dms)\n", DEFAULT_ARGS.update_broadcast_period_ms); - fprintf(stderr, "\t--num_ptquery_threads INT (default %d threads)\n", DEFAULT_ARGS.num_ptquery_threads); - fprintf(stderr, "\t--num_update_threads INT (default %d threads)\n", DEFAULT_ARGS.num_update_threads); - fprintf(stderr, "\t--update_txn_size INT (default %d rows)\n", DEFAULT_ARGS.update_txn_size); - fprintf(stderr, "\t--[no-]crash_on_update_failure BOOL (default %s)\n", DEFAULT_ARGS.crash_on_update_failure ? "yes" : "no"); + fprintf(stderr, "\t--num_elements INT (default %d)\n", get_default_args().num_elements); + fprintf(stderr, "\t--num_DBs INT (default %d)\n", get_default_args().num_DBs); + fprintf(stderr, "\t--num_seconds INT (default %ds)\n", get_default_args().time_of_test); + fprintf(stderr, "\t--node_size INT (default %d bytes)\n", get_default_args().env_args.node_size); + fprintf(stderr, "\t--basement_node_size INT (default %d bytes)\n", get_default_args().env_args.basement_node_size); + fprintf(stderr, "\t--cachetable_size INT (default %ld bytes)\n", get_default_args().env_args.cachetable_size); + fprintf(stderr, "\t--checkpointing_period INT (default %ds)\n", get_default_args().env_args.checkpointing_period); + fprintf(stderr, "\t--cleaner_period INT (default %ds)\n", get_default_args().env_args.cleaner_period); + fprintf(stderr, "\t--cleaner_iterations INT (default %ds)\n", get_default_args().env_args.cleaner_iterations); + fprintf(stderr, "\t--update_broadcast_period INT (default %dms)\n", get_default_args().update_broadcast_period_ms); + fprintf(stderr, "\t--num_ptquery_threads INT (default %d threads)\n", get_default_args().num_ptquery_threads); + fprintf(stderr, "\t--num_update_threads INT (default %d threads)\n", get_default_args().num_update_threads); + fprintf(stderr, "\t--update_txn_size INT (default %d rows)\n", get_default_args().update_txn_size); + fprintf(stderr, "\t--key_size INT (default %d, minimum %ld)\n", get_default_args().key_size, MIN_KEY_SIZE); + fprintf(stderr, "\t--val_size INT (default %d, minimum %ld)\n", get_default_args().val_size, MIN_VAL_SIZE); + fprintf(stderr, "\t--[no-]crash_on_update_failure BOOL (default %s)\n", get_default_args().crash_on_update_failure ? "yes" : "no"); + fprintf(stderr, "\t--print_performance \n"); + fprintf(stderr, "\t--print_thread_performance \n"); + fprintf(stderr, "\t--performance_period INT (default %d)\n", get_default_args().performance_period); exit(resultcode); } else if (strcmp(argv[1], "--num_elements") == 0) { argc--; argv++; args->num_elements = atoi(argv[1]); } + else if (strcmp(argv[1], "--num_DBs") == 0) { + argc--; argv++; + args->num_DBs = atoi(argv[1]); + } else if (strcmp(argv[1], "--num_seconds") == 0) { argc--; argv++; args->time_of_test = atoi(argv[1]); } else if (strcmp(argv[1], "--node_size") == 0) { argc--; argv++; - args->node_size = atoi(argv[1]); + args->env_args.node_size = atoi(argv[1]); } else if (strcmp(argv[1], "--basement_node_size") == 0) { argc--; argv++; - args->basement_node_size = atoi(argv[1]); + args->env_args.basement_node_size = atoi(argv[1]); } else if (strcmp(argv[1], "--cachetable_size") == 0) { argc--; argv++; - args->cachetable_size = strtoll(argv[1], NULL, 0); + args->env_args.cachetable_size = strtoll(argv[1], NULL, 0); } else if (strcmp(argv[1], "--checkpointing_period") == 0) { argc--; argv++; - args->checkpointing_period = atoi(argv[1]); + args->env_args.checkpointing_period = atoi(argv[1]); } else if (strcmp(argv[1], "--cleaner_period") == 0) { argc--; argv++; - args->cleaner_period = atoi(argv[1]); + args->env_args.cleaner_period = atoi(argv[1]); } else if (strcmp(argv[1], "--cleaner_iterations") == 0) { argc--; argv++; - args->cleaner_iterations = atoi(argv[1]); + args->env_args.cleaner_iterations = atoi(argv[1]); } else if (strcmp(argv[1], "--update_broadcast_period") == 0) { argc--; argv++; @@ -895,10 +1053,30 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct else if (strcmp(argv[1], "--no-crash_on_update_failure") == 0) { args->crash_on_update_failure = false; } + else if (strcmp(argv[1], "--print_performance") == 0) { + args->print_performance = true; + } + else if (strcmp(argv[1], "--print_thread_performance") == 0) { + args->print_thread_performance = true; + } + else if (strcmp(argv[1], "--performance_period") == 0) { + argc--; argv++; + args->performance_period = atoi(argv[1]); + } else if (strcmp(argv[1], "--update_txn_size") == 0) { argc--; argv++; args->update_txn_size = atoi(argv[1]); } + else if (strcmp(argv[1], "--key_size") == 0) { + argc--; argv++; + args->key_size = atoi(argv[1]); + assert(args->key_size >= MIN_KEY_SIZE); + } + else if (strcmp(argv[1], "--val_size") == 0) { + argc--; argv++; + args->val_size = atoi(argv[1]); + assert(args->val_size >= MIN_VAL_SIZE); + } else if (strcmp(argv[1], "--only_create") == 0) { args->only_create = true; } @@ -913,7 +1091,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct } else if (strcmp(argv[1], "--envdir") == 0 && argc > 1) { argc--; argv++; - args->envdir = argv[1]; + args->env_args.envdir = argv[1]; } else { resultcode=1; @@ -930,64 +1108,73 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct static void stress_table(DB_ENV *, DB **, struct cli_args *); +static int +stress_int_dbt_cmp (DB *db, const DBT *a, const DBT *b) { + assert(db && a && b); + assert(a->size >= sizeof(int)); + assert(b->size >= sizeof(int)); + + int x = *(int *) a->data; + int y = *(int *) b->data; + + if (xy) return 1; + return 0; +} + + static void stress_test_main(struct cli_args *args) { DB_ENV* env = NULL; - DB* db = NULL; + DB* dbs[args->num_DBs]; + memset(dbs, 0, sizeof(dbs)); if (!args->only_stress) { - create_table( + create_tables( &env, - &db, - int_dbt_cmp, - args->cachetable_size, - args->checkpointing_period, - args->cleaner_period, - args->cleaner_iterations, - args->node_size, - args->basement_node_size, - args->envdir); - CHK(fill_table_with_zeroes(db, args->num_elements)); - CHK(close_table(env, db)); + dbs, + args->num_DBs, + stress_int_dbt_cmp, + args->env_args + ); + CHK(fill_tables_with_zeroes(dbs, args->num_DBs, args->num_elements, args->key_size, args->val_size)); + CHK(close_tables(env, dbs, args->num_DBs)); } if (!args->only_create) { - CHK(open_table(&env, - &db, - int_dbt_cmp, - args->cachetable_size, //cachetable size - args->checkpointing_period, // checkpoint period - args->cleaner_period, - args->cleaner_iterations, - args->update_function, - args->envdir)); - stress_table(env, &db, args); - CHK(close_table(env, db)); + CHK(open_tables(&env, + dbs, + args->num_DBs, + stress_int_dbt_cmp, + args->env_args)); + stress_table(env, dbs, args); + CHK(close_tables(env, dbs, args->num_DBs)); } } static void UU() stress_recover(struct cli_args *args) { DB_ENV* env = NULL; - DB* db = NULL; - CHK(open_table(&env, - &db, - int_dbt_cmp, - args->cachetable_size, //cachetable size - args->checkpointing_period, // checkpoint period - args->cleaner_period, - args->cleaner_iterations, - args->update_function, - args->envdir)); + DB* dbs[args->num_DBs]; + memset(dbs, 0, sizeof(dbs)); + CHK(open_tables(&env, + dbs, + args->num_DBs, + stress_int_dbt_cmp, + args->env_args)); DB_TXN* txn = NULL; struct arg recover_args; - arg_init(&recover_args, args->num_elements, &db, env, args); + arg_init(&recover_args, args->num_elements, dbs, env, args); int r = env->txn_begin(env, 0, &txn, recover_args.txn_type); CKERR(r); - r = scan_op_and_maybe_check_sum(env, &db, txn, &recover_args, true); + struct scan_op_extra soe; + soe.fast = TRUE; + soe.fwd = TRUE; + soe.prefetch = FALSE; + r = scan_op(txn, &recover_args, &soe); CKERR(r); CHK(txn->commit(txn,0)); - CHK(close_table(env, db)); + CHK(close_tables(env, dbs, args->num_DBs)); } #endif