diff options
Diffstat (limited to '')
-rw-r--r-- | sca-cpp/trunk/kernel/element.hpp | 2 | ||||
-rw-r--r-- | sca-cpp/trunk/kernel/parallel.hpp | 88 | ||||
-rw-r--r-- | sca-cpp/trunk/kernel/value.hpp | 24 |
3 files changed, 90 insertions, 24 deletions
diff --git a/sca-cpp/trunk/kernel/element.hpp b/sca-cpp/trunk/kernel/element.hpp index 0d14acc4a3..c6aa2c44eb 100644 --- a/sca-cpp/trunk/kernel/element.hpp +++ b/sca-cpp/trunk/kernel/element.hpp @@ -193,7 +193,7 @@ const value valueToElement(const value& t) { const list<value> valuesToElements(const list<value>& l); // Convert a name value pair - if (isList(t) && isSymbol(car<value>(t))) { + if (isList(t) && !isNil((list<value>)t) && isSymbol(car<value>(t))) { const value n = car<value>(t); const value v = cadr<value>(t); diff --git a/sca-cpp/trunk/kernel/parallel.hpp b/sca-cpp/trunk/kernel/parallel.hpp index 9fb6c59ba7..09cf0df9a3 100644 --- a/sca-cpp/trunk/kernel/parallel.hpp +++ b/sca-cpp/trunk/kernel/parallel.hpp @@ -59,9 +59,12 @@ private: pthread_cond_init(&valueCond, NULL); } + futureValue(const futureValue& fv) : valueMutex(fv.valueMutex), valueCond(fv.valueCond), hasValue(fv.hasValue), value(fv.value) { + } + ~futureValue() { - pthread_mutex_destroy(&valueMutex); - pthread_cond_destroy(&valueCond); + //pthread_mutex_destroy(&valueMutex); + //pthread_cond_destroy(&valueCond); } bool set(const T& v) { @@ -129,18 +132,21 @@ public: /** * A bounded thread safe queue. */ -template<typename T> class queue { +template<typename T> class wqueue { public: - queue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew<T>(max)) T[max]) { + wqueue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew<T>(max)) T[max]) { pthread_mutex_init(&mutex, NULL); pthread_cond_init(&full, NULL); pthread_cond_init(&empty, NULL); } - ~queue() { - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&full); - pthread_cond_destroy(&empty); + wqueue(const wqueue& wq) : max(wq.max), size(wq.size), tail(wq.tail), head(wq.head), mutex(wq.mutex), full(wq.full), empty(wq.empty), values(wq.values) { + } + + ~wqueue() { + //pthread_mutex_destroy(&mutex); + //pthread_cond_destroy(&full); + //pthread_cond_destroy(&empty); } private: @@ -153,14 +159,14 @@ private: pthread_cond_t empty; gc_ptr<T> values; - template<typename X> friend const int enqueue(queue<X>& q, const X& v); - template<typename X> friend const X dequeue(queue<X>& q); + template<typename X> friend const int enqueue(wqueue<X>& q, const X& v); + template<typename X> friend const X dequeue(wqueue<X>& q); }; /** * Adds an element to the tail of the queue. */ -template<typename T> const int enqueue(queue<T>&q, const T& v) { +template<typename T> const int enqueue(wqueue<T>&q, const T& v) { pthread_mutex_lock(&q.mutex); while(q.size == q.max) pthread_cond_wait(&q.full, &q.mutex); @@ -175,7 +181,7 @@ template<typename T> const int enqueue(queue<T>&q, const T& v) { /** * Returns the element at the head of the queue. */ -template<typename T> const T dequeue(queue<T>& q) { +template<typename T> const T dequeue(wqueue<T>& q) { pthread_mutex_lock(&q.mutex); while(q.size == 0) pthread_cond_wait(&q.empty, &q.mutex); @@ -191,7 +197,12 @@ template<typename T> const T dequeue(queue<T>& q) { * The worker thread function. */ void *workerThreadFunc(void *arg) { - queue<lambda<bool()> >* work = reinterpret_cast<queue<lambda<bool()> >*>(arg); + int ost; + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ost); + int ot; + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ot); + + wqueue<lambda<bool()> >* work = reinterpret_cast<wqueue<lambda<bool()> >*>(arg); while(dequeue(*work)()) ; return NULL; @@ -200,28 +211,44 @@ void *workerThreadFunc(void *arg) { /** * Returns a list of worker threads. */ -const list<pthread_t> workerThreads(queue<lambda<bool()> >& queue, const int count) { +const list<pthread_t> workerThreads(wqueue<lambda<bool()> >& wqueue, const int count) { if (count == 0) return list<pthread_t>(); pthread_t thread; - pthread_create(&thread, NULL, workerThreadFunc, &queue); - return cons(thread, workerThreads(queue, count - 1)); + pthread_create(&thread, NULL, workerThreadFunc, &wqueue); + return cons(thread, workerThreads(wqueue, count - 1)); } /** * A worker, implemented with a work queue and a pool of threads. */ class worker { +private: + + // The worker holds a reference to a sharedWorker, to avoid non-thread-safe + // copies of the queue and thread pool when a worker is copied + class sharedWorker { + public: + sharedWorker(int max) : work(wqueue<lambda<bool()> >(max)), threads(workerThreads(work, max)) { + } + + wqueue<lambda<bool()> > work; + const list<pthread_t> threads; + }; + public: - worker(int max) : work(queue<lambda<bool()> >(max)), threads(workerThreads(work, max)) { + worker(int max) : w(*(new (gc_new<sharedWorker>()) sharedWorker(max))) { + } + + worker(const worker& wk) : w(wk.w) { } private: - queue<lambda<bool()> > work; - const list<pthread_t> threads; + sharedWorker& w; template<typename X> friend const future<X> submit(worker& w, const lambda<X()>& func); friend const bool shutdown(worker& w); + friend const bool cancel(worker& w); }; /** @@ -238,14 +265,14 @@ template<typename R> bool submitFunc(const lambda<R()>& func, const future<R>& f template<typename R> const future<R> submit(worker& w, const lambda<R()>& func) { const future<R> fut; const lambda<bool()> f = curry(lambda<bool(const lambda<R()>, future<R>)>(submitFunc<R>), func, fut); - enqueue(w.work, f); + enqueue(w.w.work, f); return fut; } /** * Enqueues shutdown requests. */ -const bool shutdownEnqueue(const list<pthread_t>& threads, queue<lambda<bool()> >& work) { +const bool shutdownEnqueue(const list<pthread_t>& threads, wqueue<lambda<bool()> >& work) { if (isNil(threads)) return true; enqueue(work, result(false)); @@ -266,8 +293,23 @@ const bool shutdownJoin(const list<pthread_t>& threads) { * Shutdown a worker. */ const bool shutdown(worker& w) { - shutdownEnqueue(w.threads, w.work); - shutdownJoin(w.threads); + shutdownEnqueue(w.w.threads, w.w.work); + shutdownJoin(w.w.threads); + return true; +} + +/** + * Cancel a worker. + */ +const bool cancel(const list<pthread_t>& threads) { + if (isNil(threads)) + return true; + pthread_cancel(car(threads)); + return cancel(cdr(threads)); +} + +const bool cancel(worker& w) { + cancel(w.w.threads); return true; } diff --git a/sca-cpp/trunk/kernel/value.hpp b/sca-cpp/trunk/kernel/value.hpp index 4f19ee3915..1aafbe6d38 100644 --- a/sca-cpp/trunk/kernel/value.hpp +++ b/sca-cpp/trunk/kernel/value.hpp @@ -556,5 +556,29 @@ template<typename T> const list<value> mkvalues(const list<T>& l) { return cons<value>(car(l), mkvalues(cdr(l))); } +/** + * Convert a path string value to a list of values. + */ +const list<string> pathTokens(const char* p) { + if (p == NULL || p[0] == '\0') + return list<string>(); + if (p[0] == '/') + return tokenize("/", p + 1); + return tokenize("/", p); +} + +const list<value> pathValues(const value& p) { + return mkvalues(pathTokens(c_str(p))); +} + +/** + * Convert a path represented as a list of values to a string value. + */ +const value path(const list<value>& p) { + if (isNil(p)) + return ""; + return string("/") + car(p) + path(cdr(p)); +} + } #endif /* tuscany_value_hpp */ |