diff options
author | jsdelfino <jsdelfino@13f79535-47bb-0310-9956-ffa450edef68> | 2010-02-17 04:14:31 +0000 |
---|---|---|
committer | jsdelfino <jsdelfino@13f79535-47bb-0310-9956-ffa450edef68> | 2010-02-17 04:14:31 +0000 |
commit | 00438314438f3dde00b532ac5d8d28ccc35c7096 (patch) | |
tree | 80dbbb010c5125455a164c77670b8694231f123f /sca-cpp/trunk/kernel/parallel.hpp | |
parent | 50063bc212e8e93d014519ef0e4d4cabef0b6be2 (diff) |
Working queue and chat components. Added a few useful start/stop scripts. Fixed lifecycle code to call start/stop/restart functions before APR pools are cleaned up in both parent and child processes. Minor build script improvements.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@910819 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '')
-rw-r--r-- | sca-cpp/trunk/kernel/parallel.hpp | 88 |
1 files changed, 65 insertions, 23 deletions
diff --git a/sca-cpp/trunk/kernel/parallel.hpp b/sca-cpp/trunk/kernel/parallel.hpp index 9fb6c59ba7..09cf0df9a3 100644 --- a/sca-cpp/trunk/kernel/parallel.hpp +++ b/sca-cpp/trunk/kernel/parallel.hpp @@ -59,9 +59,12 @@ private: pthread_cond_init(&valueCond, NULL); } + futureValue(const futureValue& fv) : valueMutex(fv.valueMutex), valueCond(fv.valueCond), hasValue(fv.hasValue), value(fv.value) { + } + ~futureValue() { - pthread_mutex_destroy(&valueMutex); - pthread_cond_destroy(&valueCond); + //pthread_mutex_destroy(&valueMutex); + //pthread_cond_destroy(&valueCond); } bool set(const T& v) { @@ -129,18 +132,21 @@ public: /** * A bounded thread safe queue. */ -template<typename T> class queue { +template<typename T> class wqueue { public: - queue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew<T>(max)) T[max]) { + wqueue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew<T>(max)) T[max]) { pthread_mutex_init(&mutex, NULL); pthread_cond_init(&full, NULL); pthread_cond_init(&empty, NULL); } - ~queue() { - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&full); - pthread_cond_destroy(&empty); + wqueue(const wqueue& wq) : max(wq.max), size(wq.size), tail(wq.tail), head(wq.head), mutex(wq.mutex), full(wq.full), empty(wq.empty), values(wq.values) { + } + + ~wqueue() { + //pthread_mutex_destroy(&mutex); + //pthread_cond_destroy(&full); + //pthread_cond_destroy(&empty); } private: @@ -153,14 +159,14 @@ private: pthread_cond_t empty; gc_ptr<T> values; - template<typename X> friend const int enqueue(queue<X>& q, const X& v); - template<typename X> friend const X dequeue(queue<X>& q); + template<typename X> friend const int enqueue(wqueue<X>& q, const X& v); + template<typename X> friend const X dequeue(wqueue<X>& q); }; /** * Adds an element to the tail of the queue. */ -template<typename T> const int enqueue(queue<T>&q, const T& v) { +template<typename T> const int enqueue(wqueue<T>&q, const T& v) { pthread_mutex_lock(&q.mutex); while(q.size == q.max) pthread_cond_wait(&q.full, &q.mutex); @@ -175,7 +181,7 @@ template<typename T> const int enqueue(queue<T>&q, const T& v) { /** * Returns the element at the head of the queue. */ -template<typename T> const T dequeue(queue<T>& q) { +template<typename T> const T dequeue(wqueue<T>& q) { pthread_mutex_lock(&q.mutex); while(q.size == 0) pthread_cond_wait(&q.empty, &q.mutex); @@ -191,7 +197,12 @@ template<typename T> const T dequeue(queue<T>& q) { * The worker thread function. */ void *workerThreadFunc(void *arg) { - queue<lambda<bool()> >* work = reinterpret_cast<queue<lambda<bool()> >*>(arg); + int ost; + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ost); + int ot; + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ot); + + wqueue<lambda<bool()> >* work = reinterpret_cast<wqueue<lambda<bool()> >*>(arg); while(dequeue(*work)()) ; return NULL; @@ -200,28 +211,44 @@ void *workerThreadFunc(void *arg) { /** * Returns a list of worker threads. */ -const list<pthread_t> workerThreads(queue<lambda<bool()> >& queue, const int count) { +const list<pthread_t> workerThreads(wqueue<lambda<bool()> >& wqueue, const int count) { if (count == 0) return list<pthread_t>(); pthread_t thread; - pthread_create(&thread, NULL, workerThreadFunc, &queue); - return cons(thread, workerThreads(queue, count - 1)); + pthread_create(&thread, NULL, workerThreadFunc, &wqueue); + return cons(thread, workerThreads(wqueue, count - 1)); } /** * A worker, implemented with a work queue and a pool of threads. */ class worker { +private: + + // The worker holds a reference to a sharedWorker, to avoid non-thread-safe + // copies of the queue and thread pool when a worker is copied + class sharedWorker { + public: + sharedWorker(int max) : work(wqueue<lambda<bool()> >(max)), threads(workerThreads(work, max)) { + } + + wqueue<lambda<bool()> > work; + const list<pthread_t> threads; + }; + public: - worker(int max) : work(queue<lambda<bool()> >(max)), threads(workerThreads(work, max)) { + worker(int max) : w(*(new (gc_new<sharedWorker>()) sharedWorker(max))) { + } + + worker(const worker& wk) : w(wk.w) { } private: - queue<lambda<bool()> > work; - const list<pthread_t> threads; + sharedWorker& w; template<typename X> friend const future<X> submit(worker& w, const lambda<X()>& func); friend const bool shutdown(worker& w); + friend const bool cancel(worker& w); }; /** @@ -238,14 +265,14 @@ template<typename R> bool submitFunc(const lambda<R()>& func, const future<R>& f template<typename R> const future<R> submit(worker& w, const lambda<R()>& func) { const future<R> fut; const lambda<bool()> f = curry(lambda<bool(const lambda<R()>, future<R>)>(submitFunc<R>), func, fut); - enqueue(w.work, f); + enqueue(w.w.work, f); return fut; } /** * Enqueues shutdown requests. */ -const bool shutdownEnqueue(const list<pthread_t>& threads, queue<lambda<bool()> >& work) { +const bool shutdownEnqueue(const list<pthread_t>& threads, wqueue<lambda<bool()> >& work) { if (isNil(threads)) return true; enqueue(work, result(false)); @@ -266,8 +293,23 @@ const bool shutdownJoin(const list<pthread_t>& threads) { * Shutdown a worker. */ const bool shutdown(worker& w) { - shutdownEnqueue(w.threads, w.work); - shutdownJoin(w.threads); + shutdownEnqueue(w.w.threads, w.w.work); + shutdownJoin(w.w.threads); + return true; +} + +/** + * Cancel a worker. + */ +const bool cancel(const list<pthread_t>& threads) { + if (isNil(threads)) + return true; + pthread_cancel(car(threads)); + return cancel(cdr(threads)); +} + +const bool cancel(worker& w) { + cancel(w.w.threads); return true; } |