diff options
Diffstat (limited to 'sca-cpp/trunk/kernel/parallel.hpp')
-rw-r--r-- | sca-cpp/trunk/kernel/parallel.hpp | 161 |
1 files changed, 74 insertions, 87 deletions
diff --git a/sca-cpp/trunk/kernel/parallel.hpp b/sca-cpp/trunk/kernel/parallel.hpp index 3be4d3bc8e..8b01bb819f 100644 --- a/sca-cpp/trunk/kernel/parallel.hpp +++ b/sca-cpp/trunk/kernel/parallel.hpp @@ -39,7 +39,7 @@ namespace tuscany { /** * Returns the current process id. */ -unsigned long processId() { +inline const unsigned long processId() noexcept { return (unsigned long)getpid(); } @@ -48,7 +48,7 @@ unsigned long processId() { /** * Returns the current thread id. */ -unsigned long threadId() { +inline const unsigned long threadId() noexcept{ return (unsigned long)pthread_self(); } @@ -60,20 +60,20 @@ template<typename T> class future { private: template<typename X> class futureValue { public: - futureValue() : hasValue(false) { + inline futureValue() noexcept : hasValue(false) { pthread_mutex_init(&valueMutex, NULL); pthread_cond_init(&valueCond, NULL); } - futureValue(const futureValue& fv) : valueMutex(fv.valueMutex), valueCond(fv.valueCond), hasValue(fv.hasValue), value(fv.value) { + inline futureValue(const futureValue& fv) noexcept : valueMutex(fv.valueMutex), valueCond(fv.valueCond), hasValue(fv.hasValue), value(fv.value) { } - ~futureValue() { + inline ~futureValue() noexcept { //pthread_mutex_destroy(&valueMutex); //pthread_cond_destroy(&valueCond); } - bool set(const T& v) { + inline const bool set(const T& v) noexcept { pthread_mutex_lock(&valueMutex); if(hasValue) { pthread_mutex_unlock(&valueMutex); @@ -86,7 +86,7 @@ private: return true; } - const T get() { + inline const T get() noexcept { pthread_mutex_lock(&valueMutex); while(!hasValue) { pthread_cond_wait(&valueCond, &valueMutex); @@ -100,37 +100,32 @@ private: pthread_mutex_t valueMutex; pthread_cond_t valueCond; bool hasValue; - X value; + gc_mutable_ref<X> value; }; - gc_ptr<futureValue<T> > fvalue; + const gc_ptr<futureValue<T> > fvalue; - template<typename X> friend const X get(const future<X>& f); - template<typename X> friend bool set(const future<X>& f, const X& v); + template<typename X> friend const X get(const future<X>& f) noexcept; + template<typename X> friend bool set(const future<X>& f, const X& v) noexcept; public: - future() : fvalue(new (gc_new<futureValue<T> >()) futureValue<T>()) { + inline future() noexcept : fvalue(new (gc_new<futureValue<T> >()) futureValue<T>()) { } - ~future() { + inline ~future() noexcept { } - future(const future& f) : fvalue(f.fvalue) { + inline future(const future& f) noexcept : fvalue(f.fvalue) { } - const future& operator=(const future& f) { - if (&f == this) - return *this; - fvalue = f.fvalue; - return *this; - } + const future& operator=(const future& f) = delete; - const future& operator=(const T& v) const { + inline const future& operator=(const T& v) const noexcept { fvalue->set(v); return *this; } - operator const T() const { + inline operator const T() const noexcept { return fvalue->get(); } }; @@ -140,16 +135,16 @@ public: */ template<typename T> class wqueue { public: - wqueue(size_t max) : max(max), size(0), tail(0), head(0), values(new (gc_anew<T>(max)) T[max]) { + inline wqueue(size_t max) noexcept : max(max), size(0), tail(0), head(0), values(new (gc_anew<gc_mutable_ref<T> >(max)) gc_mutable_ref<T>[max]) { pthread_mutex_init(&mutex, NULL); pthread_cond_init(&full, NULL); pthread_cond_init(&empty, NULL); } - wqueue(const wqueue& wq) : max(wq.max), size(wq.size), tail(wq.tail), head(wq.head), mutex(wq.mutex), full(wq.full), empty(wq.empty), values(wq.values) { + inline wqueue(const wqueue& wq) noexcept : max(wq.max), size(wq.size), tail(wq.tail), head(wq.head), mutex(wq.mutex), full(wq.full), empty(wq.empty), values(wq.values) { } - ~wqueue() { + inline ~wqueue() { //pthread_mutex_destroy(&mutex); //pthread_cond_destroy(&full); //pthread_cond_destroy(&empty); @@ -163,16 +158,16 @@ private: pthread_mutex_t mutex; pthread_cond_t full; pthread_cond_t empty; - gc_ptr<T> values; + gc_ptr<gc_mutable_ref<T> > values; - template<typename X> friend const size_t enqueue(wqueue<X>& q, const X& v); - template<typename X> friend const X dequeue(wqueue<X>& q); + template<typename X> friend const size_t enqueue(wqueue<X>& q, const X& v) noexcept; + template<typename X> friend const X dequeue(wqueue<X>& q) noexcept; }; /** * Adds an element to the tail of the queue. */ -template<typename T> const size_t enqueue(wqueue<T>&q, const T& v) { +template<typename T> inline const size_t enqueue(wqueue<T>&q, const T& v) noexcept { pthread_mutex_lock(&q.mutex); while(q.size == q.max) pthread_cond_wait(&q.full, &q.mutex); @@ -187,7 +182,7 @@ template<typename T> const size_t enqueue(wqueue<T>&q, const T& v) { /** * Returns the element at the head of the queue. */ -template<typename T> const T dequeue(wqueue<T>& q) { +template<typename T> inline const T dequeue(wqueue<T>& q) noexcept { pthread_mutex_lock(&q.mutex); while(q.size == 0) pthread_cond_wait(&q.empty, &q.mutex); @@ -202,13 +197,13 @@ template<typename T> const T dequeue(wqueue<T>& q) { /** * The worker thread function. */ -void *workerThreadFunc(void *arg) { +inline void* workerThreadFunc(void* arg) noexcept { int ost; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ost); int ot; pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ot); - wqueue<lambda<bool()> >* work = reinterpret_cast<wqueue<lambda<bool()> >*>(arg); + wqueue<blambda >* work = reinterpret_cast<wqueue<blambda >*>(arg); while(dequeue(*work)()) ; return NULL; @@ -217,7 +212,7 @@ void *workerThreadFunc(void *arg) { /** * Returns a list of worker threads. */ -const list<pthread_t> workerThreads(wqueue<lambda<bool()> >& wqueue, const size_t count) { +inline const list<pthread_t> workerThreads(wqueue<blambda >& wqueue, const size_t count) noexcept { if (count == 0) return list<pthread_t>(); pthread_t thread; @@ -235,32 +230,32 @@ private: // copies of the queue and thread pool when a worker is copied class sharedWorker { public: - sharedWorker(size_t max) : work(wqueue<lambda<bool()> >(max)), threads(workerThreads(work, max)) { + inline sharedWorker(size_t max) noexcept : work(wqueue<blambda >(max)), threads(workerThreads(work, max)) { } - wqueue<lambda<bool()> > work; + wqueue<blambda > work; const list<pthread_t> threads; }; public: - worker(size_t max) : w(*(new (gc_new<sharedWorker>()) sharedWorker(max))) { + inline worker(size_t max) noexcept : w(*(new (gc_new<sharedWorker>()) sharedWorker(max))) { } - worker(const worker& wk) : w(wk.w) { + inline worker(const worker& wk) noexcept : w(wk.w) { } private: sharedWorker& w; - template<typename X> friend const future<X> submit(worker& w, const lambda<X()>& func); - friend const bool shutdown(worker& w); - friend const bool cancel(worker& w); + template<typename X> friend const future<X> submit(const worker& w, const lambda<const X()>& func) noexcept; + friend const bool shutdown(const worker& w) noexcept; + friend const bool cancel(const worker& w) noexcept; }; /** * Function used to wrap work submitted to a worker. */ -template<typename R> bool submitFunc(const lambda<R()>& func, const future<R>& fut) { +template<typename R> inline const bool submitFunc(const lambda<const R()>& func, const future<R>& fut) noexcept { fut = func(); return true; } @@ -268,9 +263,9 @@ template<typename R> bool submitFunc(const lambda<R()>& func, const future<R>& f /** * Submits work to a worker. */ -template<typename R> const future<R> submit(worker& w, const lambda<R()>& func) { +template<typename R> inline const future<R> submit(const worker& w, const lambda<const R()>& func) noexcept { const future<R> fut; - const lambda<bool()> f = curry(lambda<bool(const lambda<R()>, future<R>)>(submitFunc<R>), func, fut); + const blambda f = curry(lambda<const bool(const lambda<const R()>, future<R>)>(submitFunc<R>), func, fut); enqueue(w.w.work, f); return fut; } @@ -278,7 +273,7 @@ template<typename R> const future<R> submit(worker& w, const lambda<R()>& func) /** * Enqueues shutdown requests. */ -const bool shutdownEnqueue(const list<pthread_t>& threads, wqueue<lambda<bool()> >& work) { +inline const bool shutdownEnqueue(const list<pthread_t>& threads, wqueue<blambda>& work) noexcept { if (isNil(threads)) return true; enqueue(work, result(false)); @@ -288,7 +283,7 @@ const bool shutdownEnqueue(const list<pthread_t>& threads, wqueue<lambda<bool()> /** * Waits for shut down threads to terminate. */ -const bool shutdownJoin(const list<pthread_t>& threads) { +inline const bool shutdownJoin(const list<pthread_t>& threads) noexcept { if (isNil(threads)) return true; pthread_join(car(threads), NULL); @@ -298,7 +293,7 @@ const bool shutdownJoin(const list<pthread_t>& threads) { /** * Shutdown a worker. */ -const bool shutdown(worker& w) { +inline const bool shutdown(const worker& w) noexcept { shutdownEnqueue(w.w.threads, w.w.work); shutdownJoin(w.w.threads); return true; @@ -307,14 +302,14 @@ const bool shutdown(worker& w) { /** * Cancel a worker. */ -const bool cancel(const list<pthread_t>& threads) { +inline const bool cancel(const list<pthread_t>& threads) noexcept { if (isNil(threads)) return true; pthread_cancel(car(threads)); return cancel(cdr(threads)); } -const bool cancel(worker& w) { +inline const bool cancel(const worker& w) noexcept { cancel(w.w.threads); return true; } @@ -324,7 +319,7 @@ const bool cancel(worker& w) { /** * Returns the current thread id. */ -unsigned long threadId() { +inline const unsigned long threadId() noexcept { return 0; } @@ -335,114 +330,106 @@ unsigned long threadId() { */ template<typename T> class perthread_ptr { public: - perthread_ptr() : key(createkey()), owner(true), cl(lambda<gc_ptr<T>()>()), managed(false) { + inline perthread_ptr() noexcept : key(createkey()), owner(true), cl(lambda<const gc_ptr<T>()>()), managed(false) { } - perthread_ptr(const lambda<gc_ptr<T>()>& cl) : key(createkey()), owner(true), cl(cl), managed(true) { + inline perthread_ptr(const lambda<const gc_ptr<T>()>& cl) noexcept : key(createkey()), owner(true), cl(cl), managed(true) { } - ~perthread_ptr() { + inline ~perthread_ptr() noexcept { if (owner) deletekey(key); } - perthread_ptr(const perthread_ptr& c) : key(c.key), owner(false), cl(c.cl), managed(c.managed) { + inline perthread_ptr(const perthread_ptr& c) noexcept : key(c.key), owner(false), cl(c.cl), managed(c.managed) { } - perthread_ptr& operator=(const perthread_ptr& r) throw() { - if(this == &r) - return *this; - key = r.key; - owner = false; - cl = r.cl; - managed = r.managed; - return *this; - } + perthread_ptr& operator=(const perthread_ptr& r) = delete; - const perthread_ptr& operator=(const gc_ptr<T>& v) { + inline const perthread_ptr& operator=(const gc_ptr<T>& v) const noexcept { set(v); return *this; } - const perthread_ptr& operator=(T* v) { + inline const perthread_ptr& operator=(T* const v) const noexcept { set(v); return *this; } - const bool operator==(const gc_ptr<T>& r) const throw() { + inline const bool operator==(const gc_ptr<T>& r) const noexcept { return get() == r; } - const bool operator==(T* p) const throw() { + inline const bool operator==(const T* const p) const noexcept { return get() == p; } - const bool operator!=(const gc_ptr<T>& r) const throw() { + inline const bool operator!=(const gc_ptr<T>& r) const noexcept { return !this->operator==(r); } - const bool operator!=(T* p) const throw() { + inline const bool operator!=(const T* const p) const noexcept { return !this->operator==(p); } - T& operator*() const throw() { + inline T& operator*() const noexcept { return *get(); } - T* operator->() const throw() { + inline T* const operator->() const noexcept { return get(); } - operator gc_ptr<T>() const { + inline operator gc_ptr<T>() const { return get(); } - operator T*() const { + inline operator T* const () const { return get(); } private: #ifdef WANT_THREADS - pthread_key_t createkey() { + inline const pthread_key_t createkey() noexcept { pthread_key_t k; pthread_key_create(&k, NULL); return k; } - bool deletekey(pthread_key_t k) { + inline const bool deletekey(pthread_key_t k) noexcept { pthread_key_delete(k); return true; } - bool set(const gc_ptr<T>& v) { + inline const bool set(const gc_ptr<T>& v) const noexcept { pthread_setspecific(key, (T*)v); return true; } - gc_ptr<T> get() const { - const gc_ptr<T> v = static_cast<T*>(pthread_getspecific(key)); + inline const gc_ptr<T> get() const noexcept { + const gc_ptr<T> v = (T*)(pthread_getspecific(key)); if (v != NULL || !managed) return v; const gc_ptr<T> nv = cl(); - pthread_setspecific(key, nv); + pthread_setspecific(key, (T*)nv); return nv; } #else - gc_ptr<gc_ptr<T> > createkey() { + inline const gc_ptr<gc_ptr<T> > createkey() noexcept { return new (gc_new<gc_ptr<T> >()) gc_ptr<T>(); } - bool deletekey(unused gc_ptr<gc_ptr<T> > k) { + inline const bool deletekey(unused gc_ptr<gc_ptr<T> > k) noexcept { return true; } - bool set(const gc_ptr<T>& v) { + inline const bool set(const gc_ptr<T>& v) const noexcept { *key = v; return true; } - gc_ptr<T> get() const { + inline const gc_ptr<T> get() const noexcept { if (*key != NULL || !managed) return *key; *key = cl(); @@ -452,14 +439,14 @@ private: #endif #ifdef WANT_THREADS - pthread_key_t key; + const pthread_key_t key; #else - gc_ptr<gc_ptr<T> >key; + const gc_ptr<gc_ptr<T> >key; #endif - bool owner; - lambda<const gc_ptr<T>()> cl; - bool managed; + const bool owner; + const lambda<const gc_ptr<T>()> cl; + const bool managed; }; } |