mirror of
https://github.com/MariaDB/server.git
synced 2025-01-22 23:04:20 +01:00
e3baafc9db
Add Makefile rules for the threaded_stress_tests. Fix the race conditions found by drd (mostly of which are benign). drd for test_stress1 now runs clean (for valgrind 3.6.1) git-svn-id: file:///svn/toku/tokudb@35690 c7de825b-a66e-492c-adef-691d508d4ae1
221 lines
7.1 KiB
C
221 lines
7.1 KiB
C
/* -*- mode: C; c-basic-offset: 4 -*- */
|
|
#ifndef _TOKU_WORKQUEUE_H
|
|
#define _TOKU_WORKQUEUE_H
|
|
#ident "$Id$"
|
|
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
|
|
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
|
|
|
|
|
|
#include <errno.h>
|
|
#include "toku_assert.h"
|
|
#include "toku_pthread.h"
|
|
#include "c_dialects.h"
|
|
|
|
C_BEGIN
|
|
|
|
struct workitem;
|
|
|
|
// A work function is called by a worker thread when the workitem (see below) is being handled
|
|
// by a worker thread.
|
|
typedef void (*WORKFUNC)(struct workitem *wi);
|
|
|
|
// A workitem contains the function that is called by a worker thread in a threadpool.
|
|
// A workitem is queued in a workqueue.
|
|
typedef struct workitem *WORKITEM;
|
|
struct workitem {
|
|
WORKFUNC f;
|
|
void *arg;
|
|
struct workitem *next;
|
|
};
|
|
|
|
// Initialize a workitem with a function and argument
|
|
static inline void workitem_init(WORKITEM wi, WORKFUNC f, void *arg) {
|
|
wi->f = f;
|
|
wi->arg = arg;
|
|
wi->next = 0;
|
|
}
|
|
|
|
// Access the workitem function
|
|
static inline WORKFUNC workitem_func(WORKITEM wi) {
|
|
return wi->f;
|
|
}
|
|
|
|
// Access the workitem argument
|
|
static inline void *workitem_arg(WORKITEM wi) {
|
|
return wi->arg;
|
|
}
|
|
|
|
// A workqueue is currently a fifo of workitems that feeds a thread pool. We may
|
|
// divide the workqueue into per worker thread queues.
|
|
typedef struct workqueue *WORKQUEUE;
|
|
struct workqueue {
|
|
WORKITEM head, tail; // list of workitems
|
|
toku_pthread_mutex_t lock;
|
|
toku_pthread_cond_t wait_read; // wait for read
|
|
int want_read; // number of threads waiting to read
|
|
toku_pthread_cond_t wait_write; // wait for write
|
|
int want_write; // number of threads waiting to write
|
|
char closed; // kicks waiting threads off of the write queue
|
|
int n_in_queue; // count of how many workitems are in the queue.
|
|
};
|
|
|
|
// Get a pointer to the workqueue lock. This is used by workqueue client software
|
|
// that wants to control the workqueue locking.
|
|
static inline toku_pthread_mutex_t *workqueue_lock_ref(WORKQUEUE wq) {
|
|
return &wq->lock;
|
|
}
|
|
|
|
// Lock the workqueue
|
|
static inline void workqueue_lock(WORKQUEUE wq) {
|
|
int r = toku_pthread_mutex_lock(&wq->lock); assert(r == 0);
|
|
}
|
|
|
|
// Unlock the workqueue
|
|
static inline void workqueue_unlock(WORKQUEUE wq) {
|
|
int r = toku_pthread_mutex_unlock(&wq->lock); assert(r == 0);
|
|
}
|
|
|
|
// Initialize a workqueue
|
|
// Expects: the workqueue is not initialized
|
|
// Effects: the workqueue is set to empty and the condition variable is initialized
|
|
__attribute__((unused))
|
|
static void workqueue_init(WORKQUEUE wq) {
|
|
int r;
|
|
r = toku_pthread_mutex_init(&wq->lock, 0); assert(r == 0);
|
|
wq->head = wq->tail = 0;
|
|
r = toku_pthread_cond_init(&wq->wait_read, 0); assert(r == 0);
|
|
wq->want_read = 0;
|
|
r = toku_pthread_cond_init(&wq->wait_write, 0); assert(r == 0);
|
|
wq->want_write = 0;
|
|
wq->closed = 0;
|
|
wq->n_in_queue = 0;
|
|
}
|
|
|
|
// Destroy a work queue
|
|
// Expects: the work queue must be initialized and empty
|
|
__attribute__((unused))
|
|
static void workqueue_destroy(WORKQUEUE wq) {
|
|
int r;
|
|
workqueue_lock(wq); // shutup helgrind
|
|
assert(wq->head == 0 && wq->tail == 0);
|
|
workqueue_unlock(wq);
|
|
r = toku_pthread_cond_destroy(&wq->wait_read); assert(r == 0);
|
|
r = toku_pthread_cond_destroy(&wq->wait_write); assert(r == 0);
|
|
r = toku_pthread_mutex_destroy(&wq->lock); assert(r == 0);
|
|
}
|
|
|
|
// Close the work queue
|
|
// Effects: signal any threads blocked in the work queue
|
|
__attribute__((unused))
|
|
static void workqueue_set_closed(WORKQUEUE wq, int dolock) {
|
|
int r;
|
|
if (dolock) workqueue_lock(wq);
|
|
wq->closed = 1;
|
|
r = toku_pthread_cond_broadcast(&wq->wait_read); assert(r == 0);
|
|
r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
|
|
if (dolock) workqueue_unlock(wq);
|
|
}
|
|
|
|
// Determine whether or not the work queue is empty
|
|
// Returns: 1 if the work queue is empty, otherwise 0
|
|
static inline int workqueue_empty(WORKQUEUE wq) {
|
|
return wq->head == 0;
|
|
}
|
|
|
|
// Put a work item at the tail of the work queue
|
|
// Effects: append the work item to the end of the work queue and signal
|
|
// any work queue readers.
|
|
// Dolock controls whether or not the work queue lock should be taken.
|
|
__attribute__((unused))
|
|
static void workqueue_enq(WORKQUEUE wq, WORKITEM wi, int dolock) {
|
|
if (dolock) workqueue_lock(wq);
|
|
wq->n_in_queue++;
|
|
wi->next = 0;
|
|
if (wq->tail)
|
|
wq->tail->next = wi;
|
|
else
|
|
wq->head = wi;
|
|
wq->tail = wi;
|
|
if (wq->want_read) {
|
|
int r = toku_pthread_cond_signal(&wq->wait_read); assert(r == 0);
|
|
}
|
|
if (dolock) workqueue_unlock(wq);
|
|
}
|
|
|
|
// Get a work item from the head of the work queue
|
|
// Effects: wait until the workqueue is not empty, remove the first workitem from the
|
|
// queue and return it.
|
|
// Dolock controls whether or not the work queue lock should be taken.
|
|
// Success: returns 0 and set the wiptr
|
|
// Failure: returns non-zero
|
|
__attribute__((unused))
|
|
static int workqueue_deq(WORKQUEUE wq, WORKITEM *wiptr, int dolock) {
|
|
if (dolock) workqueue_lock(wq);
|
|
assert(wq->n_in_queue >= 0);
|
|
while (workqueue_empty(wq)) {
|
|
if (wq->closed) {
|
|
if (dolock) workqueue_unlock(wq);
|
|
return EINVAL;
|
|
}
|
|
wq->want_read++;
|
|
int r = toku_pthread_cond_wait(&wq->wait_read, &wq->lock); assert(r == 0);
|
|
wq->want_read--;
|
|
}
|
|
wq->n_in_queue--;
|
|
WORKITEM wi = wq->head;
|
|
wq->head = wi->next;
|
|
if (wq->head == 0)
|
|
wq->tail = 0;
|
|
wi->next = 0;
|
|
if (dolock) workqueue_unlock(wq);
|
|
*wiptr = wi;
|
|
return 0;
|
|
}
|
|
|
|
// Suspend the caller (thread that is currently attempting to put more work items into the work queue)
|
|
__attribute__((unused))
|
|
static void workqueue_wait_write(WORKQUEUE wq, int dolock) {
|
|
if (dolock) workqueue_lock(wq);
|
|
wq->want_write++;
|
|
int r = toku_pthread_cond_wait(&wq->wait_write, &wq->lock); assert(r == 0);
|
|
wq->want_write--;
|
|
if (dolock) workqueue_unlock(wq);
|
|
}
|
|
|
|
// Wakeup all threads that are currently attempting to put more work items into the work queue
|
|
__attribute__((unused))
|
|
static void workqueue_wakeup_write(WORKQUEUE wq, int dolock) {
|
|
if (wq->want_write) {
|
|
if (dolock) workqueue_lock(wq);
|
|
if (wq->want_write) {
|
|
int r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
|
|
}
|
|
if (dolock) workqueue_unlock(wq);
|
|
}
|
|
}
|
|
|
|
__attribute__((unused))
|
|
static int workqueue_n_in_queue (WORKQUEUE wq, int dolock) {
|
|
if (dolock) workqueue_lock(wq);
|
|
int r = wq->n_in_queue;
|
|
if (dolock) workqueue_unlock(wq);
|
|
return r;
|
|
}
|
|
|
|
#include "threadpool.h"
|
|
|
|
// initialize the work queue and worker
|
|
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr);
|
|
|
|
// destroy the work queue and worker
|
|
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr);
|
|
|
|
// this is the thread function for the worker threads in the worker thread
|
|
// pool. the arg is a pointer to the work queue that feeds work to the
|
|
// workers.
|
|
void *toku_worker(void *arg);
|
|
|
|
C_END
|
|
|
|
#endif
|
|
|