summaryrefslogtreecommitdiffstats
path: root/sca-cpp/trunk/kernel/parallel.hpp
diff options
context:
space:
mode:
authorjsdelfino <jsdelfino@13f79535-47bb-0310-9956-ffa450edef68>2010-02-17 04:14:31 +0000
committerjsdelfino <jsdelfino@13f79535-47bb-0310-9956-ffa450edef68>2010-02-17 04:14:31 +0000
commit00438314438f3dde00b532ac5d8d28ccc35c7096 (patch)
tree80dbbb010c5125455a164c77670b8694231f123f /sca-cpp/trunk/kernel/parallel.hpp
parent50063bc212e8e93d014519ef0e4d4cabef0b6be2 (diff)
Working queue and chat components. Added a few useful start/stop scripts. Fixed lifecycle code to call start/stop/restart functions before APR pools are cleaned up in both parent and child processes. Minor build script improvements.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@910819 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '')
-rw-r--r--sca-cpp/trunk/kernel/parallel.hpp88
1 files changed, 65 insertions, 23 deletions
diff --git a/sca-cpp/trunk/kernel/parallel.hpp b/sca-cpp/trunk/kernel/parallel.hpp
index 9fb6c59ba7..09cf0df9a3 100644
--- a/sca-cpp/trunk/kernel/parallel.hpp
+++ b/sca-cpp/trunk/kernel/parallel.hpp
@@ -59,9 +59,12 @@ private:
pthread_cond_init(&valueCond, NULL);
}
+ futureValue(const futureValue& fv) : valueMutex(fv.valueMutex), valueCond(fv.valueCond), hasValue(fv.hasValue), value(fv.value) {
+ }
+
~futureValue() {
- pthread_mutex_destroy(&valueMutex);
- pthread_cond_destroy(&valueCond);
+ //pthread_mutex_destroy(&valueMutex);
+ //pthread_cond_destroy(&valueCond);
}
bool set(const T& v) {
@@ -129,18 +132,21 @@ public:
/**
* A bounded thread safe queue.
*/
-template<typename T> class queue {
+template<typename T> class wqueue {
public:
- queue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew<T>(max)) T[max]) {
+ wqueue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew<T>(max)) T[max]) {
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&full, NULL);
pthread_cond_init(&empty, NULL);
}
- ~queue() {
- pthread_mutex_destroy(&mutex);
- pthread_cond_destroy(&full);
- pthread_cond_destroy(&empty);
+ wqueue(const wqueue& wq) : max(wq.max), size(wq.size), tail(wq.tail), head(wq.head), mutex(wq.mutex), full(wq.full), empty(wq.empty), values(wq.values) {
+ }
+
+ ~wqueue() {
+ //pthread_mutex_destroy(&mutex);
+ //pthread_cond_destroy(&full);
+ //pthread_cond_destroy(&empty);
}
private:
@@ -153,14 +159,14 @@ private:
pthread_cond_t empty;
gc_ptr<T> values;
- template<typename X> friend const int enqueue(queue<X>& q, const X& v);
- template<typename X> friend const X dequeue(queue<X>& q);
+ template<typename X> friend const int enqueue(wqueue<X>& q, const X& v);
+ template<typename X> friend const X dequeue(wqueue<X>& q);
};
/**
* Adds an element to the tail of the queue.
*/
-template<typename T> const int enqueue(queue<T>&q, const T& v) {
+template<typename T> const int enqueue(wqueue<T>&q, const T& v) {
pthread_mutex_lock(&q.mutex);
while(q.size == q.max)
pthread_cond_wait(&q.full, &q.mutex);
@@ -175,7 +181,7 @@ template<typename T> const int enqueue(queue<T>&q, const T& v) {
/**
* Returns the element at the head of the queue.
*/
-template<typename T> const T dequeue(queue<T>& q) {
+template<typename T> const T dequeue(wqueue<T>& q) {
pthread_mutex_lock(&q.mutex);
while(q.size == 0)
pthread_cond_wait(&q.empty, &q.mutex);
@@ -191,7 +197,12 @@ template<typename T> const T dequeue(queue<T>& q) {
* The worker thread function.
*/
void *workerThreadFunc(void *arg) {
- queue<lambda<bool()> >* work = reinterpret_cast<queue<lambda<bool()> >*>(arg);
+ int ost;
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ost);
+ int ot;
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ot);
+
+ wqueue<lambda<bool()> >* work = reinterpret_cast<wqueue<lambda<bool()> >*>(arg);
while(dequeue(*work)())
;
return NULL;
@@ -200,28 +211,44 @@ void *workerThreadFunc(void *arg) {
/**
* Returns a list of worker threads.
*/
-const list<pthread_t> workerThreads(queue<lambda<bool()> >& queue, const int count) {
+const list<pthread_t> workerThreads(wqueue<lambda<bool()> >& wqueue, const int count) {
if (count == 0)
return list<pthread_t>();
pthread_t thread;
- pthread_create(&thread, NULL, workerThreadFunc, &queue);
- return cons(thread, workerThreads(queue, count - 1));
+ pthread_create(&thread, NULL, workerThreadFunc, &wqueue);
+ return cons(thread, workerThreads(wqueue, count - 1));
}
/**
* A worker, implemented with a work queue and a pool of threads.
*/
class worker {
+private:
+
+ // The worker holds a reference to a sharedWorker, to avoid non-thread-safe
+ // copies of the queue and thread pool when a worker is copied
+ class sharedWorker {
+ public:
+ sharedWorker(int max) : work(wqueue<lambda<bool()> >(max)), threads(workerThreads(work, max)) {
+ }
+
+ wqueue<lambda<bool()> > work;
+ const list<pthread_t> threads;
+ };
+
public:
- worker(int max) : work(queue<lambda<bool()> >(max)), threads(workerThreads(work, max)) {
+ worker(int max) : w(*(new (gc_new<sharedWorker>()) sharedWorker(max))) {
+ }
+
+ worker(const worker& wk) : w(wk.w) {
}
private:
- queue<lambda<bool()> > work;
- const list<pthread_t> threads;
+ sharedWorker& w;
template<typename X> friend const future<X> submit(worker& w, const lambda<X()>& func);
friend const bool shutdown(worker& w);
+ friend const bool cancel(worker& w);
};
/**
@@ -238,14 +265,14 @@ template<typename R> bool submitFunc(const lambda<R()>& func, const future<R>& f
template<typename R> const future<R> submit(worker& w, const lambda<R()>& func) {
const future<R> fut;
const lambda<bool()> f = curry(lambda<bool(const lambda<R()>, future<R>)>(submitFunc<R>), func, fut);
- enqueue(w.work, f);
+ enqueue(w.w.work, f);
return fut;
}
/**
* Enqueues shutdown requests.
*/
-const bool shutdownEnqueue(const list<pthread_t>& threads, queue<lambda<bool()> >& work) {
+const bool shutdownEnqueue(const list<pthread_t>& threads, wqueue<lambda<bool()> >& work) {
if (isNil(threads))
return true;
enqueue(work, result(false));
@@ -266,8 +293,23 @@ const bool shutdownJoin(const list<pthread_t>& threads) {
* Shutdown a worker.
*/
const bool shutdown(worker& w) {
- shutdownEnqueue(w.threads, w.work);
- shutdownJoin(w.threads);
+ shutdownEnqueue(w.w.threads, w.w.work);
+ shutdownJoin(w.w.threads);
+ return true;
+}
+
+/**
+ * Cancel a worker.
+ */
+const bool cancel(const list<pthread_t>& threads) {
+ if (isNil(threads))
+ return true;
+ pthread_cancel(car(threads));
+ return cancel(cdr(threads));
+}
+
+const bool cancel(worker& w) {
+ cancel(w.w.threads);
return true;
}