mariadb/newbrt/cachetable-writequeue.h
Rich Prohaska 0d44815bf5 merge some tokudb.1032 into 1032b. addresses #1032
git-svn-id: file:///svn/toku/tokudb.1032b@7778 c7de825b-a66e-492c-adef-691d508d4ae1
2013-04-16 23:57:27 -04:00

118 lines
3.7 KiB
C

// When objects are evicted from the cachetable, they are written to storage by a
// thread in a thread pool. The objects are placed onto a write queue that feeds
// the thread pool. The write queue expects that an external mutex is used to
// protect it.
typedef struct writequeue *WRITEQUEUE;
struct writequeue {
PAIR head, tail; // head and tail of the linked list of pair's
toku_pthread_cond_t wait_read; // wait for read
int want_read; // number of threads waiting to read
toku_pthread_cond_t wait_write; // wait for write
int want_write; // number of threads waiting to write
int ninq; // number of pairs in the queue
char closed; // kicks waiting threads off of the write queue
};
// initialize a writequeue
// expects: the writequeue is not initialized
// effects: the writequeue is set to empty and the condition variable is initialized
static void writequeue_init(WRITEQUEUE wq) {
wq->head = wq->tail = 0;
int r;
r = toku_pthread_cond_init(&wq->wait_read, 0); assert(r == 0);
wq->want_read = 0;
r = toku_pthread_cond_init(&wq->wait_write, 0); assert(r == 0);
wq->want_write = 0;
wq->ninq = 0;
wq->closed = 0;
}
// destroy a writequeue
// expects: the writequeue must be initialized and empty
static void writequeue_destroy(WRITEQUEUE wq) {
assert(wq->head == 0 && wq->tail == 0);
int r;
r = toku_pthread_cond_destroy(&wq->wait_read); assert(r == 0);
r = toku_pthread_cond_destroy(&wq->wait_write); assert(r == 0);
}
// close the writequeue
// effects: signal any threads blocked in the writequeue
static void writequeue_set_closed(WRITEQUEUE wq) {
wq->closed = 1;
int r;
r = toku_pthread_cond_broadcast(&wq->wait_read); assert(r == 0);
r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
// determine whether or not the write queue is empty
// return: 1 if the write queue is empty, otherwise 0
static int writequeue_empty(WRITEQUEUE wq) {
return wq->head == 0;
}
// put a pair at the tail of the write queue
// expects: the mutex is locked
// effects: append the pair to the end of the write queue and signal
// any readers.
static void writequeue_enq(WRITEQUEUE wq, PAIR pair) {
pair->next_wq = 0;
if (wq->tail)
wq->tail->next_wq = pair;
else
wq->head = pair;
wq->tail = pair;
wq->ninq++;
if (wq->want_read) {
int r = toku_pthread_cond_signal(&wq->wait_read); assert(r == 0);
}
}
// get a pair from the head of the write queue
// expects: the mutex is locked
// effects: wait until the writequeue is not empty, remove the first pair from the
// write queue and return it
// returns: 0 if success, otherwise an error
static int writequeue_deq(WRITEQUEUE wq, toku_pthread_mutex_t *mutex, PAIR *pairptr) {
while (writequeue_empty(wq)) {
if (wq->closed)
return EINVAL;
wq->want_read++;
int r = toku_pthread_cond_wait(&wq->wait_read, mutex); assert(r == 0);
wq->want_read--;
}
PAIR pair = wq->head;
wq->head = pair->next_wq;
if (wq->head == 0)
wq->tail = 0;
wq->ninq--;
pair->next_wq = 0;
*pairptr = pair;
return 0;
}
// suspend the writer thread
// expects: the mutex is locked
static void writequeue_wait_write(WRITEQUEUE wq, toku_pthread_mutex_t *mutex) {
wq->want_write++;
int r = toku_pthread_cond_wait(&wq->wait_write, mutex); assert(r == 0);
wq->want_write--;
}
// wakeup the writer threads
// expects: the mutex is locked
static void writequeue_wakeup_write(WRITEQUEUE wq) {
if (wq->want_write) {
int r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
}