From 00438314438f3dde00b532ac5d8d28ccc35c7096 Mon Sep 17 00:00:00 2001 From: jsdelfino Date: Wed, 17 Feb 2010 04:14:31 +0000 Subject: Working queue and chat components. Added a few useful start/stop scripts. Fixed lifecycle code to call start/stop/restart functions before APR pools are cleaned up in both parent and child processes. Minor build script improvements. git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@910819 13f79535-47bb-0310-9956-ffa450edef68 --- sca-cpp/trunk/kernel/parallel.hpp | 88 +++++++++++++++++++++++++++++---------- 1 file changed, 65 insertions(+), 23 deletions(-) (limited to 'sca-cpp/trunk/kernel/parallel.hpp') diff --git a/sca-cpp/trunk/kernel/parallel.hpp b/sca-cpp/trunk/kernel/parallel.hpp index 9fb6c59ba7..09cf0df9a3 100644 --- a/sca-cpp/trunk/kernel/parallel.hpp +++ b/sca-cpp/trunk/kernel/parallel.hpp @@ -59,9 +59,12 @@ private: pthread_cond_init(&valueCond, NULL); } + futureValue(const futureValue& fv) : valueMutex(fv.valueMutex), valueCond(fv.valueCond), hasValue(fv.hasValue), value(fv.value) { + } + ~futureValue() { - pthread_mutex_destroy(&valueMutex); - pthread_cond_destroy(&valueCond); + //pthread_mutex_destroy(&valueMutex); + //pthread_cond_destroy(&valueCond); } bool set(const T& v) { @@ -129,18 +132,21 @@ public: /** * A bounded thread safe queue. */ -template class queue { +template class wqueue { public: - queue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew(max)) T[max]) { + wqueue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew(max)) T[max]) { pthread_mutex_init(&mutex, NULL); pthread_cond_init(&full, NULL); pthread_cond_init(&empty, NULL); } - ~queue() { - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&full); - pthread_cond_destroy(&empty); + wqueue(const wqueue& wq) : max(wq.max), size(wq.size), tail(wq.tail), head(wq.head), mutex(wq.mutex), full(wq.full), empty(wq.empty), values(wq.values) { + } + + ~wqueue() { + //pthread_mutex_destroy(&mutex); + //pthread_cond_destroy(&full); + //pthread_cond_destroy(&empty); } private: @@ -153,14 +159,14 @@ private: pthread_cond_t empty; gc_ptr values; - template friend const int enqueue(queue& q, const X& v); - template friend const X dequeue(queue& q); + template friend const int enqueue(wqueue& q, const X& v); + template friend const X dequeue(wqueue& q); }; /** * Adds an element to the tail of the queue. */ -template const int enqueue(queue&q, const T& v) { +template const int enqueue(wqueue&q, const T& v) { pthread_mutex_lock(&q.mutex); while(q.size == q.max) pthread_cond_wait(&q.full, &q.mutex); @@ -175,7 +181,7 @@ template const int enqueue(queue&q, const T& v) { /** * Returns the element at the head of the queue. */ -template const T dequeue(queue& q) { +template const T dequeue(wqueue& q) { pthread_mutex_lock(&q.mutex); while(q.size == 0) pthread_cond_wait(&q.empty, &q.mutex); @@ -191,7 +197,12 @@ template const T dequeue(queue& q) { * The worker thread function. */ void *workerThreadFunc(void *arg) { - queue >* work = reinterpret_cast >*>(arg); + int ost; + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ost); + int ot; + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ot); + + wqueue >* work = reinterpret_cast >*>(arg); while(dequeue(*work)()) ; return NULL; @@ -200,28 +211,44 @@ void *workerThreadFunc(void *arg) { /** * Returns a list of worker threads. */ -const list workerThreads(queue >& queue, const int count) { +const list workerThreads(wqueue >& wqueue, const int count) { if (count == 0) return list(); pthread_t thread; - pthread_create(&thread, NULL, workerThreadFunc, &queue); - return cons(thread, workerThreads(queue, count - 1)); + pthread_create(&thread, NULL, workerThreadFunc, &wqueue); + return cons(thread, workerThreads(wqueue, count - 1)); } /** * A worker, implemented with a work queue and a pool of threads. */ class worker { +private: + + // The worker holds a reference to a sharedWorker, to avoid non-thread-safe + // copies of the queue and thread pool when a worker is copied + class sharedWorker { + public: + sharedWorker(int max) : work(wqueue >(max)), threads(workerThreads(work, max)) { + } + + wqueue > work; + const list threads; + }; + public: - worker(int max) : work(queue >(max)), threads(workerThreads(work, max)) { + worker(int max) : w(*(new (gc_new()) sharedWorker(max))) { + } + + worker(const worker& wk) : w(wk.w) { } private: - queue > work; - const list threads; + sharedWorker& w; template friend const future submit(worker& w, const lambda& func); friend const bool shutdown(worker& w); + friend const bool cancel(worker& w); }; /** @@ -238,14 +265,14 @@ template bool submitFunc(const lambda& func, const future& f template const future submit(worker& w, const lambda& func) { const future fut; const lambda f = curry(lambda, future)>(submitFunc), func, fut); - enqueue(w.work, f); + enqueue(w.w.work, f); return fut; } /** * Enqueues shutdown requests. */ -const bool shutdownEnqueue(const list& threads, queue >& work) { +const bool shutdownEnqueue(const list& threads, wqueue >& work) { if (isNil(threads)) return true; enqueue(work, result(false)); @@ -266,8 +293,23 @@ const bool shutdownJoin(const list& threads) { * Shutdown a worker. */ const bool shutdown(worker& w) { - shutdownEnqueue(w.threads, w.work); - shutdownJoin(w.threads); + shutdownEnqueue(w.w.threads, w.w.work); + shutdownJoin(w.w.threads); + return true; +} + +/** + * Cancel a worker. + */ +const bool cancel(const list& threads) { + if (isNil(threads)) + return true; + pthread_cancel(car(threads)); + return cancel(cdr(threads)); +} + +const bool cancel(worker& w) { + cancel(w.w.threads); return true; } -- cgit v1.2.3