mirror of
https://github.com/MariaDB/server.git
synced 2025-02-02 03:51:50 +01:00
closes #5283, delete workqueue code, it is now unused
git-svn-id: file:///svn/toku/tokudb@46801 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
parent
7ea6dd4171
commit
6389aeb1b7
6 changed files with 1 additions and 471 deletions
|
@ -71,7 +71,6 @@ set(FT_SOURCES
|
|||
txn
|
||||
txn_manager
|
||||
ule
|
||||
workqueue
|
||||
x1764
|
||||
xids
|
||||
ybt
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
#include <fcntl.h>
|
||||
#include "fttypes.h"
|
||||
#include "minicron.h"
|
||||
#include "workqueue.h"
|
||||
|
||||
|
||||
// Maintain a cache mapping from cachekeys to values (void*)
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
|
||||
#include "compress.h"
|
||||
#include "sub_block.h"
|
||||
#include "threadpool.h"
|
||||
|
||||
static uint8_t
|
||||
get_uint8_at_offset(void *vp, size_t offset) {
|
||||
|
|
|
@ -1,206 +0,0 @@
|
|||
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
|
||||
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
|
||||
#ident "$Id$"
|
||||
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
|
||||
#include <toku_portability.h>
|
||||
#include "test.h"
|
||||
#include <stdio.h>
|
||||
#include <errno.h>
|
||||
#include <string.h>
|
||||
|
||||
|
||||
#include <toku_pthread.h>
|
||||
#include "memory.h"
|
||||
#include "workqueue.h"
|
||||
#include "threadpool.h"
|
||||
|
||||
static WORKITEM
|
||||
new_workitem (void) {
|
||||
WORKITEM wi = (WORKITEM) toku_malloc(sizeof *wi); assert(wi);
|
||||
return wi;
|
||||
}
|
||||
|
||||
static void
|
||||
destroy_workitem(WORKITEM wi) {
|
||||
toku_free(wi);
|
||||
}
|
||||
|
||||
// test simple create and destroy
|
||||
|
||||
static void
|
||||
test_create_destroy (void) {
|
||||
if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
|
||||
struct workqueue workqueue, *wq = &workqueue;
|
||||
workqueue_init(wq);
|
||||
assert(workqueue_empty(wq));
|
||||
workqueue_destroy(wq);
|
||||
}
|
||||
|
||||
// verify that the wq implements FIFO ordering
|
||||
|
||||
static void
|
||||
test_simple_enq_deq (int n) {
|
||||
if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
|
||||
struct workqueue workqueue, *wq = &workqueue;
|
||||
int r;
|
||||
|
||||
workqueue_init(wq);
|
||||
assert(workqueue_empty(wq));
|
||||
WORKITEM work[n];
|
||||
int i;
|
||||
for (i=0; i<n; i++) {
|
||||
work[i] = new_workitem();
|
||||
workqueue_enq(wq, work[i], 1);
|
||||
assert(!workqueue_empty(wq));
|
||||
}
|
||||
for (i=0; i<n; i++) {
|
||||
WORKITEM wi = 0;
|
||||
r = workqueue_deq(wq, &wi, 1);
|
||||
assert(r == 0 && wi == work[i]);
|
||||
destroy_workitem(wi);
|
||||
}
|
||||
assert(workqueue_empty(wq));
|
||||
workqueue_destroy(wq);
|
||||
}
|
||||
|
||||
// setting the wq closed should cause deq to return EINVAL
|
||||
|
||||
static void
|
||||
test_set_closed (void) {
|
||||
if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
|
||||
struct workqueue workqueue, *wq = &workqueue;
|
||||
workqueue_init(wq);
|
||||
WORKITEM wi = 0;
|
||||
workqueue_set_closed(wq, 1);
|
||||
int r = workqueue_deq(wq, &wi, 1);
|
||||
assert(r == EINVAL && wi == 0);
|
||||
workqueue_destroy(wq);
|
||||
}
|
||||
|
||||
// closing a wq with a blocked reader thread should cause the reader to get EINVAL
|
||||
|
||||
static void *
|
||||
test_set_closed_waiter(void *arg) {
|
||||
struct workqueue *CAST_FROM_VOIDP(wq, arg);
|
||||
int r;
|
||||
|
||||
WORKITEM wi = 0;
|
||||
r = workqueue_deq(wq, &wi, 1);
|
||||
assert(r == EINVAL && wi == 0);
|
||||
return arg;
|
||||
}
|
||||
|
||||
static void
|
||||
test_set_closed_thread (void) {
|
||||
if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
|
||||
struct workqueue workqueue, *wq = &workqueue;
|
||||
int r;
|
||||
|
||||
workqueue_init(wq);
|
||||
toku_pthread_t tid;
|
||||
r = toku_pthread_create(&tid, 0, test_set_closed_waiter, wq); assert(r == 0);
|
||||
sleep(1);
|
||||
workqueue_set_closed(wq, 1);
|
||||
void *ret;
|
||||
r = toku_pthread_join(tid, &ret);
|
||||
assert(r == 0 && ret == wq);
|
||||
workqueue_destroy(wq);
|
||||
}
|
||||
|
||||
// verify writer reader flow control
|
||||
// the write (main) thread writes as fast as possible until the wq is full. then it
|
||||
// waits.
|
||||
// the read thread reads from the wq slowly using a random delay. it wakes up any
|
||||
// writers when the wq size <= 1/2 of the wq limit
|
||||
|
||||
struct rwfc {
|
||||
struct workqueue workqueue;
|
||||
int current, limit;
|
||||
};
|
||||
|
||||
static void rwfc_init (struct rwfc *rwfc, int limit) {
|
||||
workqueue_init(&rwfc->workqueue);
|
||||
rwfc->current = 0; rwfc->limit = limit;
|
||||
}
|
||||
|
||||
static void
|
||||
rwfc_destroy (struct rwfc *rwfc) {
|
||||
workqueue_destroy(&rwfc->workqueue);
|
||||
}
|
||||
|
||||
static void
|
||||
rwfc_do_read (WORKITEM wi) {
|
||||
struct rwfc *rwfc = (struct rwfc *) workitem_arg(wi);
|
||||
workqueue_lock(&rwfc->workqueue);
|
||||
if (2*rwfc->current-- > rwfc->limit && 2*rwfc->current <= rwfc->limit) {
|
||||
workqueue_wakeup_write(&rwfc->workqueue, 0);
|
||||
}
|
||||
workqueue_unlock(&rwfc->workqueue);
|
||||
destroy_workitem(wi);
|
||||
}
|
||||
|
||||
static void *
|
||||
rwfc_worker (void *arg) {
|
||||
struct workqueue *CAST_FROM_VOIDP(wq, arg);
|
||||
while (1) {
|
||||
WORKITEM wi = 0;
|
||||
int r = workqueue_deq(wq, &wi, 1);
|
||||
if (r == EINVAL) {
|
||||
assert(wi == 0);
|
||||
break;
|
||||
}
|
||||
usleep(random() % 100);
|
||||
wi->f(wi);
|
||||
}
|
||||
return arg;
|
||||
}
|
||||
|
||||
static void
|
||||
test_flow_control (int limit, int n, int maxthreads) {
|
||||
if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
|
||||
|
||||
struct rwfc my_rwfc, *rwfc = &my_rwfc;
|
||||
THREADPOOL tp;
|
||||
int i;
|
||||
rwfc_init(rwfc, limit);
|
||||
toku_thread_pool_create(&tp, maxthreads);
|
||||
int T = maxthreads;
|
||||
toku_thread_pool_run(tp, 0, &T, rwfc_worker, &rwfc->workqueue);
|
||||
assert(T == maxthreads);
|
||||
sleep(1); // this is here to block the reader on the first deq
|
||||
for (i=0; i<n; i++) {
|
||||
WORKITEM wi = new_workitem();
|
||||
workitem_init(wi, rwfc_do_read, rwfc);
|
||||
workqueue_lock(&rwfc->workqueue);
|
||||
workqueue_enq(&rwfc->workqueue, wi, 0);
|
||||
rwfc->current++;
|
||||
while (rwfc->current >= rwfc->limit) {
|
||||
// printf("%d - %d %d\n", i, rwfc->current, rwfc->limit);
|
||||
workqueue_wait_write(&rwfc->workqueue, 0);
|
||||
}
|
||||
workqueue_unlock(&rwfc->workqueue);
|
||||
// toku_os_usleep(random() % 1);
|
||||
}
|
||||
workqueue_set_closed(&rwfc->workqueue, 1);
|
||||
toku_thread_pool_destroy(&tp);
|
||||
rwfc_destroy(rwfc);
|
||||
}
|
||||
|
||||
int
|
||||
test_main (int argc, const char *argv[]) {
|
||||
int i;
|
||||
for (i=1; i<argc; i++) {
|
||||
const char *arg = argv[i];
|
||||
if (strcmp(arg, "-v") == 0)
|
||||
verbose++;
|
||||
}
|
||||
test_create_destroy();
|
||||
test_simple_enq_deq(0);
|
||||
test_simple_enq_deq(42);
|
||||
test_set_closed();
|
||||
test_set_closed_thread();
|
||||
test_flow_control(8, 10000, 1);
|
||||
test_flow_control(8, 10000, 2);
|
||||
test_flow_control(8, 10000, 17);
|
||||
return 0;
|
||||
}
|
|
@ -1,47 +0,0 @@
|
|||
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
|
||||
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
|
||||
#ident "$Id$"
|
||||
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
|
||||
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
|
||||
|
||||
#include <stdio.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include <toku_portability.h>
|
||||
#include "toku_assert.h"
|
||||
#include "toku_os.h"
|
||||
#include <toku_pthread.h>
|
||||
#include "workqueue.h"
|
||||
#include "threadpool.h"
|
||||
|
||||
// Create fixed number of worker threads, all waiting on a single queue
|
||||
// of work items (WORKQUEUE).
|
||||
|
||||
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr, int fraction) {
|
||||
workqueue_init(wq);
|
||||
assert(fraction > 0);
|
||||
int nprocs = toku_os_get_number_active_processors();
|
||||
int nthreads = (nprocs*2)/fraction;
|
||||
if (nthreads == 0) nthreads = 1;
|
||||
toku_thread_pool_create(tpptr, nthreads);
|
||||
toku_thread_pool_run(*tpptr, 0, &nthreads, toku_worker, wq);
|
||||
}
|
||||
|
||||
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr) {
|
||||
workqueue_set_closed(wq, 1); // close the work queue and [see "A" in toku_worker()]
|
||||
toku_thread_pool_destroy(tpptr); // wait for all of the worker threads to exit
|
||||
workqueue_destroy(wq);
|
||||
}
|
||||
|
||||
void *toku_worker(void *arg) {
|
||||
WORKQUEUE CAST_FROM_VOIDP(wq, arg);
|
||||
int r;
|
||||
while (1) {
|
||||
WORKITEM wi = 0;
|
||||
r = workqueue_deq(wq, &wi, 1); // get work from the queue, block if empty
|
||||
if (r != 0) // shut down worker threads when work queue is closed
|
||||
break; // [see "A" in toku_destroy_workers() ]
|
||||
wi->f(wi); // call the work handler function
|
||||
}
|
||||
return arg;
|
||||
}
|
216
ft/workqueue.h
216
ft/workqueue.h
|
@ -1,216 +0,0 @@
|
|||
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
|
||||
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
|
||||
#ifndef _TOKU_WORKQUEUE_H
|
||||
#define _TOKU_WORKQUEUE_H
|
||||
#ident "$Id$"
|
||||
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
|
||||
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
|
||||
|
||||
|
||||
#include <errno.h>
|
||||
#include "toku_assert.h"
|
||||
#include <toku_pthread.h>
|
||||
|
||||
struct workitem;
|
||||
|
||||
// A work function is called by a worker thread when the workitem (see below) is being handled
|
||||
// by a worker thread.
|
||||
typedef void (*WORKFUNC)(struct workitem *wi);
|
||||
|
||||
// A workitem contains the function that is called by a worker thread in a threadpool.
|
||||
// A workitem is queued in a workqueue.
|
||||
typedef struct workitem *WORKITEM;
|
||||
struct workitem {
|
||||
WORKFUNC f;
|
||||
void *arg;
|
||||
struct workitem *next;
|
||||
};
|
||||
|
||||
// Initialize a workitem with a function and argument
|
||||
static inline void workitem_init(WORKITEM wi, WORKFUNC f, void *arg) {
|
||||
wi->f = f;
|
||||
wi->arg = arg;
|
||||
wi->next = 0;
|
||||
}
|
||||
|
||||
// Access the workitem function
|
||||
static inline WORKFUNC workitem_func(WORKITEM wi) {
|
||||
return wi->f;
|
||||
}
|
||||
|
||||
// Access the workitem argument
|
||||
static inline void *workitem_arg(WORKITEM wi) {
|
||||
return wi->arg;
|
||||
}
|
||||
|
||||
// A workqueue is currently a fifo of workitems that feeds a thread pool. We may
|
||||
// divide the workqueue into per worker thread queues.
|
||||
typedef struct workqueue *WORKQUEUE;
|
||||
struct workqueue {
|
||||
WORKITEM head, tail; // list of workitems
|
||||
toku_mutex_t lock;
|
||||
toku_cond_t wait_read; // wait for read
|
||||
int want_read; // number of threads waiting to read
|
||||
toku_cond_t wait_write; // wait for write
|
||||
int want_write; // number of threads waiting to write
|
||||
char closed; // kicks waiting threads off of the write queue
|
||||
int n_in_queue; // count of how many workitems are in the queue.
|
||||
};
|
||||
|
||||
// Get a pointer to the workqueue lock. This is used by workqueue client software
|
||||
// that wants to control the workqueue locking.
|
||||
static inline toku_mutex_t *workqueue_lock_ref(WORKQUEUE wq) {
|
||||
return &wq->lock;
|
||||
}
|
||||
|
||||
// Lock the workqueue
|
||||
static inline void workqueue_lock(WORKQUEUE wq) {
|
||||
toku_mutex_lock(&wq->lock);
|
||||
}
|
||||
|
||||
// Unlock the workqueue
|
||||
static inline void workqueue_unlock(WORKQUEUE wq) {
|
||||
toku_mutex_unlock(&wq->lock);
|
||||
}
|
||||
|
||||
// Initialize a workqueue
|
||||
// Expects: the workqueue is not initialized
|
||||
// Effects: the workqueue is set to empty and the condition variable is initialized
|
||||
__attribute__((unused))
|
||||
static void workqueue_init(WORKQUEUE wq) {
|
||||
toku_mutex_init(&wq->lock, 0);
|
||||
wq->head = wq->tail = 0;
|
||||
toku_cond_init(&wq->wait_read, 0);
|
||||
wq->want_read = 0;
|
||||
toku_cond_init(&wq->wait_write, 0);
|
||||
wq->want_write = 0;
|
||||
wq->closed = 0;
|
||||
wq->n_in_queue = 0;
|
||||
}
|
||||
|
||||
// Destroy a work queue
|
||||
// Expects: the work queue must be initialized and empty
|
||||
__attribute__((unused))
|
||||
static void workqueue_destroy(WORKQUEUE wq) {
|
||||
workqueue_lock(wq); // shutup helgrind
|
||||
assert(wq->head == 0 && wq->tail == 0);
|
||||
workqueue_unlock(wq);
|
||||
toku_cond_destroy(&wq->wait_read);
|
||||
toku_cond_destroy(&wq->wait_write);
|
||||
toku_mutex_destroy(&wq->lock);
|
||||
}
|
||||
|
||||
// Close the work queue
|
||||
// Effects: signal any threads blocked in the work queue
|
||||
__attribute__((unused))
|
||||
static void workqueue_set_closed(WORKQUEUE wq, int dolock) {
|
||||
if (dolock) workqueue_lock(wq);
|
||||
wq->closed = 1;
|
||||
toku_cond_broadcast(&wq->wait_read);
|
||||
toku_cond_broadcast(&wq->wait_write);
|
||||
if (dolock) workqueue_unlock(wq);
|
||||
}
|
||||
|
||||
// Determine whether or not the work queue is empty
|
||||
// Returns: 1 if the work queue is empty, otherwise 0
|
||||
static inline int workqueue_empty(WORKQUEUE wq) {
|
||||
return wq->head == 0;
|
||||
}
|
||||
|
||||
// Put a work item at the tail of the work queue
|
||||
// Effects: append the work item to the end of the work queue and signal
|
||||
// any work queue readers.
|
||||
// Dolock controls whether or not the work queue lock should be taken.
|
||||
__attribute__((unused))
|
||||
static void workqueue_enq(WORKQUEUE wq, WORKITEM wi, int dolock) {
|
||||
if (dolock) workqueue_lock(wq);
|
||||
wq->n_in_queue++;
|
||||
wi->next = 0;
|
||||
if (wq->tail)
|
||||
wq->tail->next = wi;
|
||||
else
|
||||
wq->head = wi;
|
||||
wq->tail = wi;
|
||||
if (wq->want_read) {
|
||||
toku_cond_signal(&wq->wait_read);
|
||||
}
|
||||
if (dolock) workqueue_unlock(wq);
|
||||
}
|
||||
|
||||
// Get a work item from the head of the work queue
|
||||
// Effects: wait until the workqueue is not empty, remove the first workitem from the
|
||||
// queue and return it.
|
||||
// Dolock controls whether or not the work queue lock should be taken.
|
||||
// Success: returns 0 and set the wiptr
|
||||
// Failure: returns non-zero
|
||||
__attribute__((unused))
|
||||
static int workqueue_deq(WORKQUEUE wq, WORKITEM *wiptr, int dolock) {
|
||||
if (dolock) workqueue_lock(wq);
|
||||
assert(wq->n_in_queue >= 0);
|
||||
while (workqueue_empty(wq)) {
|
||||
if (wq->closed) {
|
||||
if (dolock) workqueue_unlock(wq);
|
||||
return EINVAL;
|
||||
}
|
||||
wq->want_read++;
|
||||
toku_cond_wait(&wq->wait_read, &wq->lock);
|
||||
wq->want_read--;
|
||||
}
|
||||
wq->n_in_queue--;
|
||||
WORKITEM wi = wq->head;
|
||||
wq->head = wi->next;
|
||||
if (wq->head == 0)
|
||||
wq->tail = 0;
|
||||
wi->next = 0;
|
||||
if (dolock) workqueue_unlock(wq);
|
||||
*wiptr = wi;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Suspend the caller (thread that is currently attempting to put more work items into the work queue)
|
||||
__attribute__((unused))
|
||||
static void workqueue_wait_write(WORKQUEUE wq, int dolock) {
|
||||
if (dolock) workqueue_lock(wq);
|
||||
wq->want_write++;
|
||||
toku_cond_wait(&wq->wait_write, &wq->lock);
|
||||
wq->want_write--;
|
||||
if (dolock) workqueue_unlock(wq);
|
||||
}
|
||||
|
||||
// Wakeup all threads that are currently attempting to put more work items into the work queue
|
||||
__attribute__((unused))
|
||||
static void workqueue_wakeup_write(WORKQUEUE wq, int dolock) {
|
||||
if (wq->want_write) {
|
||||
if (dolock) workqueue_lock(wq);
|
||||
if (wq->want_write) {
|
||||
toku_cond_broadcast(&wq->wait_write);
|
||||
}
|
||||
if (dolock) workqueue_unlock(wq);
|
||||
}
|
||||
}
|
||||
|
||||
__attribute__((unused))
|
||||
static int workqueue_n_in_queue (WORKQUEUE wq, int dolock) {
|
||||
if (dolock) workqueue_lock(wq);
|
||||
int r = wq->n_in_queue;
|
||||
if (dolock) workqueue_unlock(wq);
|
||||
return r;
|
||||
}
|
||||
|
||||
#include "threadpool.h"
|
||||
|
||||
// initialize the work queue and worker
|
||||
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr, int fraction);
|
||||
|
||||
void toku_init_workers_with_num_threads(WORKQUEUE wq, THREADPOOL *tpptr, int num_threads);
|
||||
|
||||
// destroy the work queue and worker
|
||||
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr);
|
||||
|
||||
// this is the thread function for the worker threads in the worker thread
|
||||
// pool. the arg is a pointer to the work queue that feeds work to the
|
||||
// workers.
|
||||
void *toku_worker(void *arg);
|
||||
|
||||
#endif
|
||||
|
Loading…
Add table
Reference in a new issue