#include #include #include #include "toku_portability.h" #include "toku_assert.h" #include "toku_pthread.h" #include "memory.h" #include "workqueue.h" #include "threadpool.h" int verbose; static WORKITEM new_workitem (void) { WORKITEM wi = (WORKITEM) toku_malloc(sizeof *wi); assert(wi); return wi; } static void destroy_workitem(WORKITEM wi) { toku_free(wi); } // test simple create and destroy static void test_create_destroy (void) { if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__); struct workqueue workqueue, *wq = &workqueue; workqueue_init(wq); assert(workqueue_empty(wq)); workqueue_destroy(wq); } // verify that the wq implements FIFO ordering static void test_simple_enq_deq (int n) { if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__); struct workqueue workqueue, *wq = &workqueue; int r; workqueue_init(wq); assert(workqueue_empty(wq)); WORKITEM work[n]; int i; for (i=0; iworkqueue); rwfc->current = 0; rwfc->limit = limit; } static void rwfc_destroy (struct rwfc *rwfc) { workqueue_destroy(&rwfc->workqueue); } static void rwfc_do_read (WORKITEM wi) { struct rwfc *rwfc = (struct rwfc *) workitem_arg(wi); workqueue_lock(&rwfc->workqueue); if (2*rwfc->current-- > rwfc->limit && 2*rwfc->current <= rwfc->limit) { workqueue_wakeup_write(&rwfc->workqueue, 0); } workqueue_unlock(&rwfc->workqueue); destroy_workitem(wi); } static void * rwfc_worker (void *arg) { struct workqueue *wq = arg; while (1) { WORKITEM wi = 0; int r = workqueue_deq(wq, &wi, 1); if (r == EINVAL) { assert(wi == 0); break; } usleep(random() % 100); wi->f(wi); } return arg; } static void test_flow_control (int limit, int n, int maxthreads) { if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__); struct rwfc my_rwfc, *rwfc = &my_rwfc; THREADPOOL tp; int i; rwfc_init(rwfc, limit); threadpool_create(&tp, maxthreads); for (i=0; iworkqueue); sleep(1); // this is here to block the reader on the first deq for (i=0; iworkqueue); workqueue_enq(&rwfc->workqueue, wi, 0); rwfc->current++; while (rwfc->current >= rwfc->limit) { // printf("%d - %d %d\n", i, rwfc->current, rwfc->limit); workqueue_wait_write(&rwfc->workqueue, 0); } workqueue_unlock(&rwfc->workqueue); // toku_os_usleep(random() % 1); } workqueue_set_closed(&rwfc->workqueue, 1); threadpool_destroy(&tp); rwfc_destroy(rwfc); } int main(int argc, const char *argv[]) { int i; for (i=1; i