fix some data race problems detected with helgrind. addresses #1277

git-svn-id: file:///svn/toku/tokudb.1032b+1343@8628 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Rich Prohaska 2013-04-16 23:57:35 -04:00 committed by Yoni Fogel
parent a55b70cc05
commit 84d17e804a
5 changed files with 42 additions and 39 deletions

View file

@ -163,7 +163,8 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn,
r = threadpool_create(&t->threadpool, nprocs); assert(r == 0);
#if DO_WRITER_THREAD
threadpool_maybe_add(t->threadpool, cachetable_writer, t);
for (i=0; i < (u_int32_t)nprocs; i++)
threadpool_maybe_add(t->threadpool, cachetable_writer, t);
#endif
*result = t;
return 0;
@ -566,7 +567,6 @@ static void flush_and_remove (CACHETABLE ct, PAIR p, int write_me) {
// evictions without a write can be run in the current thread
cachetable_write_pair(ct, p);
} else {
threadpool_maybe_add(ct->threadpool, cachetable_writer, ct);
writequeue_enq(&ct->wq, p);
}
#else
@ -1117,12 +1117,10 @@ static void *cachetable_writer(void *arg) {
int r;
cachetable_lock(ct);
while (1) {
threadpool_set_thread_idle(ct->threadpool);
PAIR p = 0;
r = writequeue_deq(&ct->wq, &ct->mutex, &p);
if (r != 0)
break;
threadpool_set_thread_busy(ct->threadpool);
cachetable_write_pair(ct, p);
}
cachetable_unlock(ct);

View file

@ -4,6 +4,28 @@
#include "includes.h"
#include "test.h"
// this mutex is used by some of the tests to serialize access to some
// global data, especially between the test thread and the cachetable
// writeback threads
toku_pthread_mutex_t test_mutex;
static inline void test_mutex_init() {
int r = toku_pthread_mutex_init(&test_mutex, 0); assert(r == 0);
}
static inline void test_mutex_destroy() {
int r = toku_pthread_mutex_destroy(&test_mutex); assert(r == 0);
}
static inline void test_mutex_lock() {
int r = toku_pthread_mutex_lock(&test_mutex); assert(r == 0);
}
static inline void test_mutex_unlock() {
int r = toku_pthread_mutex_unlock(&test_mutex); assert(r == 0);
}
// hook my_malloc_always_fails into malloc to control malloc and verify
// the correct recovery from malloc failures
#if defined(__linux__)
@ -517,7 +539,9 @@ static void test_size_flush_callback(CACHEFILE f,
if (test_size_debug && verbose) printf("test_size_flush %p %" PRId64 " %p %ld %u %u\n", f, key.b, value, size, (unsigned)do_write, (unsigned)keep);
if (keep) {
assert(do_write != 0);
test_mutex_lock();
test_size_flush_key = key;
test_mutex_unlock();
}
}
@ -595,7 +619,9 @@ static void test_size_flush() {
assert(r == 0);
/* put 2*n keys into the table, ensure flushes occur in key order */
test_mutex_lock();
test_size_flush_key = make_blocknum(-1);
test_mutex_unlock();
int i;
CACHEKEY expect_flush_key = make_blocknum(0);
@ -623,11 +649,13 @@ static void test_size_flush() {
assert(entry_value == value);
assert(entry_size == size);
test_mutex_lock();
if (test_size_flush_key.b != -1) {
assert(test_size_flush_key.b == expect_flush_key.b);
assert(expect_flush_key.b == i-n);
expect_flush_key.b += 1;
}
test_mutex_unlock();
r = toku_cachetable_unpin(f, key, hkey, CACHETABLE_CLEAN, size);
assert(r == 0);
@ -663,6 +691,8 @@ test_main (int argc, const char *argv[]) {
#endif
}
test_mutex_init();
// run tests
#if !defined(_WIN32)
test_multi_filehandles();
@ -682,6 +712,8 @@ test_main (int argc, const char *argv[]) {
test_size_resize();
test_size_flush();
}
test_mutex_destroy();
toku_malloc_cleanup();
if (verbose) printf("ok\n");
return 0;

View file

@ -48,8 +48,10 @@ static void print_ints(void) {
}
static void item_becomes_present(CACHEFILE cf, CACHEKEY key) {
while (n_present >= N_PRESENT_LIMIT) toku_pthread_yield();
test_mutex_lock();
while (n_present >= N_PRESENT_LIMIT) {
test_mutex_unlock(); toku_pthread_yield(); test_mutex_lock();
}
assert(n_present<N_PRESENT_LIMIT);
present_items[n_present].cf = cf;
present_items[n_present].key = key;
@ -163,7 +165,10 @@ static void test_chaining (void) {
//print_ints();
}
for (trial=0; trial<TRIALS; trial++) {
if (n_present>0) {
test_mutex_lock();
int my_n_present = n_present;
test_mutex_unlock();
if (my_n_present>0) {
// First touch some random ones
test_mutex_lock();
int whichone = random()%n_present;

View file

@ -1,12 +1,8 @@
#include "includes.h"
// use gcc builtin fetch_and_add 0->no 1->yes
#define DO_ATOMIC_FETCH_AND_ADD 1
struct threadpool {
int max_threads;
int current_threads;
int busy_threads;
toku_pthread_t pids[];
};
@ -17,7 +13,6 @@ int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) {
return ENOMEM;
threadpool->max_threads = max_threads;
threadpool->current_threads = 0;
threadpool->busy_threads = 0;
int i;
for (i=0; i<max_threads; i++)
threadpool->pids[i] = 0;
@ -38,31 +33,14 @@ void threadpool_destroy(THREADPOOL *threadpoolptr) {
}
void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg) {
if ((threadpool->current_threads == 0 || threadpool->busy_threads < threadpool->current_threads) && threadpool->current_threads < threadpool->max_threads) {
if (threadpool->current_threads < threadpool->max_threads) {
int r = toku_pthread_create(&threadpool->pids[threadpool->current_threads], 0, f, arg);
if (r == 0) {
threadpool->current_threads++;
threadpool_set_thread_busy(threadpool);
}
}
}
void threadpool_set_thread_busy(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
(void) __sync_fetch_and_add(&threadpool->busy_threads, 1);
#else
threadpool->busy_threads++;
#endif
}
void threadpool_set_thread_idle(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
(void) __sync_fetch_and_add(&threadpool->busy_threads, -1);
#else
threadpool->busy_threads--;
#endif
}
int threadpool_get_current_threads(THREADPOOL threadpool) {
return threadpool->current_threads;
}

View file

@ -34,16 +34,6 @@ void threadpool_destroy(THREADPOOL *threadpoolptr);
void threadpool_maybe_add(THREADPOOL theadpool, void *(*f)(void *), void *arg);
// Set the current thread busy
// Effects: the threadpool keeps a count of the number of idle threads. It
// uses this count to control the creation of additional threads.
void threadpool_set_thread_busy(THREADPOOL);
// Set the current thread idle
void threadpool_set_thread_idle(THREADPOOL);
// get the current number of threads
int threadpool_get_current_threads(THREADPOOL);