From 87cf6985fdcfc7ae318ac1d0270ed5c2c5a408ad Mon Sep 17 00:00:00 2001 From: Leif Walsh <leif@tokutek.com> Date: Wed, 17 Apr 2013 00:01:13 -0400 Subject: [PATCH] refs #5351 add comments and a timed* api git-svn-id: file:///svn/toku/tokudb@48727 c7de825b-a66e-492c-adef-691d508d4ae1 --- portability/circular_buffer.cc | 43 ++++++++++++++++++ portability/circular_buffer.h | 82 ++++++++++++++++++++++++++++++---- 2 files changed, 116 insertions(+), 9 deletions(-) diff --git a/portability/circular_buffer.cc b/portability/circular_buffer.cc index a49d1b005ac..ea3fc3c9f28 100644 --- a/portability/circular_buffer.cc +++ b/portability/circular_buffer.cc @@ -107,6 +107,27 @@ namespace toku { return pushed; } + template<typename T> + bool circular_buffer<T>::timedpush(const T &elt, toku_timespec_t *abstime) { + bool pushed = false; + invariant_notnull(abstime); + lock(); + if (is_empty()) { + ++m_push_waiters; + int r = toku_cond_timedwait(&m_push_cond, &m_lock, abstime); + if (r != 0) { + invariant(r == ETIMEDOUT); + } + --m_push_waiters; + } + if (!is_full() && m_push_waiters == 0) { + push_and_maybe_signal_unlocked(elt); + pushed = true; + } + unlock(); + return pushed; + } + template<typename T> T circular_buffer<T>::pop_and_maybe_signal_unlocked(void) { toku_mutex_assert_locked(&m_lock); @@ -145,4 +166,26 @@ namespace toku { return popped; } + template<typename T> + bool circular_buffer<T>::timedpop(T * const eltp, toku_timespec_t *abstime) { + bool popped = false; + invariant_notnull(eltp); + invariant_notnull(abstime); + lock(); + if (is_empty()) { + ++m_pop_waiters; + int r = toku_cond_timedwait(&m_pop_cond, &m_lock, abstime); + if (r != 0) { + invariant(r == ETIMEDOUT); + } + --m_pop_waiters; + } + if (!is_empty()) { + *eltp = pop_and_maybe_signal_unlocked(); + popped = true; + } + unlock(); + return popped; + } + } diff --git a/portability/circular_buffer.h b/portability/circular_buffer.h index 3c9db06b67a..7dece0dd735 100644 --- a/portability/circular_buffer.h +++ b/portability/circular_buffer.h @@ -13,24 +13,88 @@ namespace toku { +// The circular buffer manages an array of elements as a thread-safe FIFO queue. +// It does not allocate its own space, or grow the space it manages. +// Access to the circular buffer is managed by a mutex. +// The blocking operations are managed by condition variables. They are as fairly scheduled as the threading library supports. +// +// Sample usage: +// int array[2] +// circular_buffer<int> intbuf; +// intbuf.init(array, 2); +// +// // thread A +// intbuf.push(1); +// intbuf.push(2); +// intbuf.push(3); // <- blocks until thread B runs +// +// // thread B +// int a = intbuf.pop(); // <- 1 +// int b = intbuf.pop(); // <- 2 +// int c = intbuf.pop(); // <- 3 +// int d = intbuf.pop(); // <- blocks until more elements are available template<typename T> class circular_buffer { public: - __attribute__((nonnull)) - void init(T * const array, size_t cap); + // Effect: + // Initialize the circular buffer with an array of elements to manage. + // Requires: + // array must remain valid until deinit() is called. + void init(T * const array, size_t cap) __attribute__((nonnull)); + + // Effect: + // Deinitialize the circular buffer. Destroys mutex and condition variables, checks for errors. + // Requires: + // Must be empty, use trypop() to drain everything before calling deinit(). + // Must be free of waiters, no outstanding calls to push() or pop(), trypush() sentinels to flush waiters if necessary. void deinit(void); - void push(const T &); + // Effect: + // Append elt to the end of the queue. + // Notes: + // Blocks until there is room in the array. + void push(const T &elt); - __attribute__((warn_unused_result)) - bool trypush(const T &); + // Effect: + // Append elt to the end of the queue if there's room and nobody is waiting to push. + // Notes: + // Doesn't block. + // Returns: + // true iff elt was appended + bool trypush(const T &elt) __attribute__((warn_unused_result)); - __attribute__((warn_unused_result)) - T pop(void); + // Effect: + // Append elt to the end of the queue if there's room before abstime. + // Notes: + // Blocks until at most abstime waiting for room in the queue. See pthread_cond_timedwait(3) for an example of how to use abstime. + // Returns: + // true iff elt was appended + bool timedpush(const T &elt, toku_timespec_t *abstime) __attribute__((nonnull, warn_unused_result)); - __attribute__((nonnull, warn_unused_result)) - bool trypop(T * const); + // Effect: + // Remove the first item from the queue and return it. + // Notes: + // Blocks until there is something to return. + T pop(void) __attribute__((warn_unused_result)); + + // Effect: + // Remove the first item from the queue and return it, if one exists. + // Notes: + // Doesn't block. + // Returns the element in *eltp. + // Returns: + // true iff *eltp was set + bool trypop(T * const eltp) __attribute__((nonnull, warn_unused_result)); + + // Effect: + // Remove the first item from the queue and return it, if one exists before abstime + // Notes: + // Blocks until at most abstime waiting for room in the queue. See pthread_cond_timedwait(3) for an example of how to use abstime. + // Returns the element in *eltp. + // Returns: + // true iff *eltp was set + bool timedpop(T * const eltp, toku_timespec_t *abstime) __attribute__((nonnull, warn_unused_result)); private: void lock(void);