mariadb/newbrt/worker-thread-benchmarks/workqueue.h

118 lines
3.4 KiB
C
Raw Normal View History

typedef struct workqueue *WORKQUEUE;
struct workqueue {
WORKITEM head, tail; // head and tail of the linked list of work items
pthread_cond_t wait_read; // wait for read
int want_read; // number of threads waiting to read
pthread_cond_t wait_write; // wait for write
int want_write; // number of threads waiting to write
int ninq; // number of work items in the queue
char closed; // kicks waiting threads off of the write queue
};
// initialize a workqueue
// expects: the workqueue is not initialized
// effects: the workqueue is set to empty and the condition variable is initialized
static void workqueue_init(WORKQUEUE wq) {
wq->head = wq->tail = 0;
int r;
r = pthread_cond_init(&wq->wait_read, 0); assert(r == 0);
wq->want_read = 0;
r = pthread_cond_init(&wq->wait_write, 0); assert(r == 0);
wq->want_write = 0;
wq->ninq = 0;
wq->closed = 0;
}
// destroy a workqueue
// expects: the workqueue must be initialized and empty
static void workqueue_destroy(WORKQUEUE wq) {
assert(wq->head == 0 && wq->tail == 0);
int r;
r = pthread_cond_destroy(&wq->wait_read); assert(r == 0);
r = pthread_cond_destroy(&wq->wait_write); assert(r == 0);
}
// close the workqueue
// effects: signal any threads blocked in the workqueue
static void workqueue_set_closed(WORKQUEUE wq) {
wq->closed = 1;
int r;
r = pthread_cond_broadcast(&wq->wait_read); assert(r == 0);
r = pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
// determine whether or not the write queue is empty
// return: 1 if the write queue is empty, otherwise 0
static int workqueue_empty(WORKQUEUE wq) {
return wq->head == 0;
}
// put a work item at the tail of the write queue
// expects: the mutex is locked
// effects: append the workitem to the end of the write queue and signal
// any readers
static void workqueue_enq(WORKQUEUE wq, WORKITEM workitem) {
workitem->next_wq = 0;
if (wq->tail)
wq->tail->next_wq = workitem;
else
wq->head = workitem;
wq->tail = workitem;
wq->ninq++;
if (wq->want_read) {
int r = pthread_cond_signal(&wq->wait_read); assert(r == 0);
}
}
// get a workitem from the head of the write queue
// expects: the mutex is locked
// effects: wait until the workqueue is not empty, remove the first workitem from the
// write queue and return it
// returns: 0 if success, otherwise an error
static int workqueue_deq(WORKQUEUE wq, pthread_mutex_t *mutex, WORKITEM *workitemptr) {
while (workqueue_empty(wq)) {
if (wq->closed)
return EINVAL;
wq->want_read++;
int r = pthread_cond_wait(&wq->wait_read, mutex); assert(r == 0);
wq->want_read--;
}
WORKITEM workitem = wq->head;
wq->head = workitem->next_wq;
if (wq->head == 0)
wq->tail = 0;
wq->ninq--;
workitem->next_wq = 0;
*workitemptr = workitem;
return 0;
}
#if 0
// suspend the writer thread
// expects: the mutex is locked
static void workqueue_wait_write(WORKQUEUE wq, pthread_mutex_t *mutex) {
wq->want_write++;
int r = pthread_cond_wait(&wq->wait_write, mutex); assert(r == 0);
wq->want_write--;
}
// wakeup the writer threads
// expects: the mutex is locked
static void workqueue_wakeup_write(WORKQUEUE wq) {
if (wq->want_write) {
int r = pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
}
#endif