mariadb/newbrt/workqueue.h
Bradley C. Kuszmaul d51bc82a86 Turn on higher compression (#1192), increase block size (#1168), and provide a way to measure workqueue fullness (#1399).
Fixes #1168, #1192, #1399.

{{{
svn merge -r8927:9039 https://svn.tokutek.com/tokudb/toku/tokudb.1399
}}}
and resolve conflicts.


git-svn-id: file:///svn/toku/tokudb@9040 c7de825b-a66e-492c-adef-691d508d4ae1
2013-04-16 23:57:38 -04:00

198 lines
6.2 KiB
C

/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef _TOKU_WORKQUEUE_H
#define _TOKU_WORKQUEUE_H
#include <errno.h>
#include "toku_assert.h"
#include "toku_pthread.h"
struct workitem;
// A work function is called by a worker thread when the workitem (see below) is being handled
// by a worker thread.
typedef void (*WORKFUNC)(struct workitem *wi);
// A workitem contains the function that is called by a worker thread in a threadpool.
// A workitem is queued in a workqueue.
typedef struct workitem *WORKITEM;
struct workitem {
WORKFUNC f;
void *arg;
struct workitem *next;
};
// Initialize a workitem with a function and argument
static inline void workitem_init(WORKITEM wi, WORKFUNC f, void *arg) {
wi->f = f;
wi->arg = arg;
wi->next = 0;
}
// Access the workitem function
static inline WORKFUNC workitem_func(WORKITEM wi) {
return wi->f;
}
// Access the workitem argument
static inline void *workitem_arg(WORKITEM wi) {
return wi->arg;
}
// A workqueue is currently a fifo of workitems that feeds a thread pool. We may
// divide the workqueue into per worker thread queues.
typedef struct workqueue *WORKQUEUE;
struct workqueue {
WORKITEM head, tail; // list of workitems
toku_pthread_mutex_t lock;
toku_pthread_cond_t wait_read; // wait for read
int want_read; // number of threads waiting to read
toku_pthread_cond_t wait_write; // wait for write
int want_write; // number of threads waiting to write
char closed; // kicks waiting threads off of the write queue
int n_in_queue; // count of how many workitems are in the queue.
};
// Get a pointer to the workqueue lock. This is used by workqueue client software
// that wants to control the workqueue locking.
static inline toku_pthread_mutex_t *workqueue_lock_ref(WORKQUEUE wq) {
return &wq->lock;
}
// Lock the workqueue
static inline void workqueue_lock(WORKQUEUE wq) {
int r = toku_pthread_mutex_lock(&wq->lock); assert(r == 0);
}
// Unlock the workqueue
static inline void workqueue_unlock(WORKQUEUE wq) {
int r = toku_pthread_mutex_unlock(&wq->lock); assert(r == 0);
}
// Initialize a workqueue
// Expects: the workqueue is not initialized
// Effects: the workqueue is set to empty and the condition variable is initialized
__attribute__((unused))
static void workqueue_init(WORKQUEUE wq) {
int r;
r = toku_pthread_mutex_init(&wq->lock, 0); assert(r == 0);
wq->head = wq->tail = 0;
r = toku_pthread_cond_init(&wq->wait_read, 0); assert(r == 0);
wq->want_read = 0;
r = toku_pthread_cond_init(&wq->wait_write, 0); assert(r == 0);
wq->want_write = 0;
wq->closed = 0;
wq->n_in_queue = 0;
}
// Destroy a work queue
// Expects: the work queue must be initialized and empty
__attribute__((unused))
static void workqueue_destroy(WORKQUEUE wq) {
int r;
workqueue_lock(wq); // shutup helgrind
assert(wq->head == 0 && wq->tail == 0);
workqueue_unlock(wq);
r = toku_pthread_cond_destroy(&wq->wait_read); assert(r == 0);
r = toku_pthread_cond_destroy(&wq->wait_write); assert(r == 0);
r = toku_pthread_mutex_destroy(&wq->lock); assert(r == 0);
}
// Close the work queue
// Effects: signal any threads blocked in the work queue
__attribute__((unused))
static void workqueue_set_closed(WORKQUEUE wq, int dolock) {
int r;
if (dolock) workqueue_lock(wq);
wq->closed = 1;
if (dolock) workqueue_unlock(wq);
r = toku_pthread_cond_broadcast(&wq->wait_read); assert(r == 0);
r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
// Determine whether or not the work queue is empty
// Returns: 1 if the work queue is empty, otherwise 0
static inline int workqueue_empty(WORKQUEUE wq) {
return wq->head == 0;
}
// Put a work item at the tail of the work queue
// Effects: append the work item to the end of the work queue and signal
// any work queue readers.
// Dolock controls whether or not the work queue lock should be taken.
__attribute__((unused))
static void workqueue_enq(WORKQUEUE wq, WORKITEM wi, int dolock) {
if (dolock) workqueue_lock(wq);
wq->n_in_queue++;
wi->next = 0;
if (wq->tail)
wq->tail->next = wi;
else
wq->head = wi;
wq->tail = wi;
if (wq->want_read) {
int r = toku_pthread_cond_signal(&wq->wait_read); assert(r == 0);
}
if (dolock) workqueue_unlock(wq);
}
// Get a work item from the head of the work queue
// Effects: wait until the workqueue is not empty, remove the first workitem from the
// queue and return it.
// Dolock controls whether or not the work queue lock should be taken.
// Success: returns 0 and set the wiptr
// Failure: returns non-zero
__attribute__((unused))
static int workqueue_deq(WORKQUEUE wq, WORKITEM *wiptr, int dolock) {
if (dolock) workqueue_lock(wq);
assert(wq->n_in_queue >= 0);
while (workqueue_empty(wq)) {
if (wq->closed) {
if (dolock) workqueue_unlock(wq);
return EINVAL;
}
wq->want_read++;
int r = toku_pthread_cond_wait(&wq->wait_read, &wq->lock); assert(r == 0);
wq->want_read--;
}
wq->n_in_queue--;
WORKITEM wi = wq->head;
wq->head = wi->next;
if (wq->head == 0)
wq->tail = 0;
wi->next = 0;
if (dolock) workqueue_unlock(wq);
*wiptr = wi;
return 0;
}
// Suspend a work queue writer thread
__attribute__((unused))
static void workqueue_wait_write(WORKQUEUE wq, int dolock) {
if (dolock) workqueue_lock(wq);
wq->want_write++;
int r = toku_pthread_cond_wait(&wq->wait_write, &wq->lock); assert(r == 0);
wq->want_write--;
if (dolock) workqueue_unlock(wq);
}
// Wakeup the waiting work queue writer threads
__attribute__((unused))
static void workqueue_wakeup_write(WORKQUEUE wq, int dolock) {
if (wq->want_write) {
if (dolock) workqueue_lock(wq);
if (wq->want_write) {
int r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
if (dolock) workqueue_unlock(wq);
}
}
__attribute__((unused))
static int workqueue_n_in_queue (WORKQUEUE wq, int dolock) {
if (dolock) workqueue_lock(wq);
int r = wq->n_in_queue;
if (dolock) workqueue_unlock(wq);
return r;
}
#endif