/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: #ident "$Id: omt-test.cc 46193 2012-07-26 17:12:18Z yfogel $" #ident "Copyright (c) 2007 Tokutek Inc. All rights reserved." #include "test.h" #include #include #include #include #include #include #include #include #include #include #include #include #include namespace toku { namespace test { static inline uint32_t fudge(const uint32_t x) { return x + 300; } static inline uint32_t defudge(const uint32_t fx) { return fx - 300; } int test_iterator(const uint32_t &v, const uint32_t idx, bool *const UU(unused)); int test_iterator(const uint32_t &v, const uint32_t idx, bool *const UU(unused)) { invariant(defudge(v) == idx); return 0; } int check_iterator_before(const uint32_t &v, const uint32_t idx, bool *const called); int check_iterator_before(const uint32_t &v, const uint32_t idx, bool *const called) { invariant(defudge(v) == idx); invariant(idx % 10 < 5); called[idx] = true; return 0; } int check_iterator_after(const uint32_t &v, const uint32_t UU(idx), bool *const called); int check_iterator_after(const uint32_t &v, const uint32_t UU(idx), bool *const called) { invariant(defudge(v) % 10 >= 5); called[defudge(v)] = true; return 0; } int die(const uint32_t &UU(v), const uint32_t UU(idx), void *const UU(unused)); int die(const uint32_t &UU(v), const uint32_t UU(idx), void *const UU(unused)) { abort(); return 0; // hahaha } static void run_test(uint32_t nelts) { assert(nelts % 10 == 0); // run_test depends on nelts being a multiple of 10 omt omt; omt.create(); omt.verify_marks_consistent(); for (uint32_t i = 0; i < nelts; ++i) { omt.insert_at(fudge(i), i); } omt.verify_marks_consistent(); int r; for (uint32_t i = 0; i < nelts / 10; ++i) { r = omt.iterate_and_mark_range(i * 10, i * 10 + 5, nullptr); invariant_zero(r); omt.verify_marks_consistent(); } bool called[nelts]; ZERO_ARRAY(called); r = omt.iterate_over_marked(called); invariant_zero(r); for (uint32_t i = 0; i < nelts; ++i) { if (i % 10 < 5) { invariant(called[i]); } else { invariant(!called[i]); } } omt.verify_marks_consistent(); invariant(omt.size() == nelts); omt.delete_all_marked(); omt.verify_marks_consistent(); invariant(omt.size() * 2 == nelts); r = omt.iterate_over_marked(nullptr); invariant_zero(r); ZERO_ARRAY(called); r = omt.iterate(called); invariant_zero(r); omt.verify_marks_consistent(); for (uint32_t i = 0; i < nelts; ++i) { if (i % 10 < 5) { invariant(!called[i]); } else { invariant(called[i]); } } omt.destroy(); } typedef omt stress_omt; int int_heaviside(const uint32_t &v, const uint32_t &target); int int_heaviside(const uint32_t &v, const uint32_t &target) { return (v > target) - (v < target); } struct stress_shared { stress_omt *omt; volatile bool running; struct rwlock lock; toku_mutex_t mutex; int num_marker_threads; }; struct reader_extra { int tid; stress_shared *shared; uint64_t iterations; uint64_t last_iteration; char buf_read[8]; char buf_write[8]; struct random_data rand_read; struct random_data rand_write; }; static void generate_range(struct random_data *rng, const struct stress_shared &shared, uint32_t *begin, uint32_t *limit) { const uint32_t nelts = shared.omt->size(); double range_limit_d = nelts; range_limit_d /= 1000; range_limit_d /= shared.num_marker_threads; range_limit_d += 1; uint32_t range_limit = static_cast(range_limit_d); if (range_limit < 5) { range_limit = 5; } if (range_limit > 1000) { range_limit = 1000; } *begin = rand_choices(rng, nelts - 1); if (*begin + range_limit > nelts) { range_limit = nelts - *begin; } *limit = *begin + rand_choices(rng, range_limit); } struct pair { uint32_t begin; uint32_t limit; }; int mark_read_iterator(const uint32_t &UU(v), const uint32_t idx, struct pair * const pair); int mark_read_iterator(const uint32_t &UU(v), const uint32_t idx, struct pair * const pair) { invariant(defudge(v) == idx); invariant(idx >= pair->begin); invariant(idx < pair->limit); return 0; } static void *stress_mark_worker(void *extrav) { struct reader_extra *CAST_FROM_VOIDP(extra, extrav); struct stress_shared &shared = *extra->shared; toku_mutex_t &mutex = shared.mutex; while (shared.running) { toku_mutex_lock(&mutex); rwlock_read_lock(&shared.lock, &mutex); toku_mutex_unlock(&mutex); struct pair range; generate_range(&extra->rand_read, shared, &range.begin, &range.limit); shared.omt->iterate_and_mark_range(range.begin, range.limit, &range); ++extra->iterations; toku_mutex_lock(&mutex); rwlock_read_unlock(&shared.lock); toku_mutex_unlock(&mutex); usleep(1); } return nullptr; } template class array_ftor { int m_count; T *m_array; public: array_ftor(int size) : m_count(0) { XMALLOC_N(size, m_array); } ~array_ftor() { toku_free(m_array); } void operator() (const T &x) { m_array[m_count++] = x; } template void iterate(callback_t &cb) const { for (int i = 0; i < m_count; ++i) { cb(m_array[i]); } } }; int use_array_ftor(const uint32_t &v, const uint32_t UU(idx), array_ftor *const fp); int use_array_ftor(const uint32_t &v, const uint32_t UU(idx), array_ftor *const fp) { array_ftor &f = *fp; f(v); return 0; } class inserter { stress_omt *m_omt; public: inserter(stress_omt *omt) : m_omt(omt) {} void operator() (const uint32_t &x) { m_omt->insert(x, x, nullptr); } }; /* * split the range evenly/not evenly between marker threads * context tells it the range * context also holds iteration number * * N threads * N 'contexts' holds iteration number, seed * * create rng based on seed * loop: * generate random range. Mark that range, increment iteration number * * * * * for each context * create rng based on context->last_seed * loop (iteration number times) * mark (in array) random range * context->last_seed := context->seed * check the array and the omt * */ static void simulate_reader_marks_on_array(struct reader_extra *const reader, const struct stress_shared &shared, bool *const should_be_marked) { if (verbose) { fprintf(stderr, "thread %d ran %" PRIu64 " iterations\n", reader->tid, reader->iterations - reader->last_iteration); } for (; reader->last_iteration < reader->iterations; ++reader->last_iteration) { uint32_t begin; uint32_t limit; generate_range(&reader->rand_write, shared, &begin, &limit); for (uint32_t i = begin; i < limit; i++) { should_be_marked[i] = true; } } } int copy_marks(const uint32_t &v, const uint32_t idx, bool * const is_marked); int copy_marks(const uint32_t &v, const uint32_t idx, bool * const is_marked) { invariant(defudge(v) == idx); is_marked[idx] = true; return 0; } static inline uint32_t count_true(const bool *const bools, uint32_t n) { uint32_t count = 0; for (uint32_t i = 0; i < n; ++i) { if (bools[i]) { ++count; } } return count; } static void stress_deleter(struct reader_extra *const readers, int num_marker_threads, stress_omt *omt) { // Verify (iterate_over_marked) agrees exactly with iterate_and_mark_range (multithreaded) stress_shared &shared = *readers[0].shared; bool should_be_marked[omt->size()]; ZERO_ARRAY(should_be_marked); for (int i = 0; i < num_marker_threads; i++) { simulate_reader_marks_on_array(&readers[i], shared, should_be_marked); } bool is_marked_according_to_iterate[omt->size()]; ZERO_ARRAY(is_marked_according_to_iterate); omt->verify_marks_consistent(); omt->iterate_over_marked(&is_marked_according_to_iterate[0]); omt->verify_marks_consistent(); invariant(!memcmp(should_be_marked, is_marked_according_to_iterate, sizeof(should_be_marked))); if (verbose) { double frac_marked = count_true(should_be_marked, omt->size()); frac_marked /= omt->size(); fprintf(stderr, "Marked: %0.4f\n", frac_marked); omt->verify_marks_consistent(); } array_ftor aftor(omt->size()); omt->iterate_over_marked, use_array_ftor>(&aftor); omt->delete_all_marked(); omt->verify_marks_consistent(); omt->iterate_over_marked(nullptr); inserter ins(omt); aftor.iterate(ins); omt->verify_marks_consistent(); } static void *stress_delete_worker(void *extrav) { reader_extra *CAST_FROM_VOIDP(readers, extrav); stress_shared &shared = *readers[0].shared; int num_marker_threads = shared.num_marker_threads; toku_mutex_t &mutex = shared.mutex; const double repetitions = 20; for (int i = 0; i < repetitions; ++i) { // sleep 0 - 0.15s // early iterations sleep for a short time // later iterations sleep longer int sleep_for = 1000 * 100 * (1.5 * (i+1) / repetitions); usleep(sleep_for); toku_mutex_lock(&mutex); rwlock_write_lock(&shared.lock, &mutex); toku_mutex_unlock(&mutex); stress_deleter(readers, num_marker_threads, shared.omt); toku_mutex_lock(&mutex); rwlock_write_unlock(&shared.lock); toku_mutex_unlock(&mutex); } __sync_bool_compare_and_swap(&shared.running, true, false); return nullptr; } static void stress_test(int nelts) { stress_omt omt; omt.create(); for (int i = 0; i < nelts; ++i) { omt.insert_at(fudge(i), i); } const int num_marker_threads = 5; struct stress_shared extra; ZERO_STRUCT(extra); extra.omt = &omt; toku_mutex_init(&extra.mutex, NULL); rwlock_init(&extra.lock); extra.running = true; extra.num_marker_threads = num_marker_threads; struct reader_extra readers[num_marker_threads]; ZERO_ARRAY(readers); srandom(time(NULL)); toku_pthread_t marker_threads[num_marker_threads]; for (int i = 0; i < num_marker_threads; ++i) { struct reader_extra &reader = readers[i]; reader.tid = i; reader.shared = &extra; int r; int seed = random(); r = myinitstate_r(seed, reader.buf_read, 8, &reader.rand_read); invariant_zero(r); r = myinitstate_r(seed, reader.buf_write, 8, &reader.rand_write); invariant_zero(r); toku_pthread_create(&marker_threads[i], NULL, stress_mark_worker, &reader); } toku_pthread_t deleter_thread; toku_pthread_create(&deleter_thread, NULL, stress_delete_worker, &readers[0]); toku_pthread_join(deleter_thread, NULL); for (int i = 0; i < num_marker_threads; ++i) { toku_pthread_join(marker_threads[i], NULL); } rwlock_destroy(&extra.lock); toku_mutex_destroy(&extra.mutex); omt.destroy(); } } // end namespace test } // end namespace toku int test_main(int argc, const char *argv[]) { default_parse_args(argc, argv); for (int i = 10; i <= 80; i*=2) { toku::test::run_test(i); } toku::test::run_test(9000); toku::test::stress_test(1000 * 100); return 0; }