From c9bfccc35345ce58fb5774d4b0b6a9868b262c0a Mon Sep 17 00:00:00 2001 From: giorgio Date: Wed, 5 Sep 2012 08:31:30 +0000 Subject: git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1381061 13f79535-47bb-0310-9956-ffa450edef68 --- .../branches/lightweight-sca/kernel/parallel.hpp | 466 +++++++++++++++++++++ 1 file changed, 466 insertions(+) create mode 100644 sca-cpp/branches/lightweight-sca/kernel/parallel.hpp (limited to 'sca-cpp/branches/lightweight-sca/kernel/parallel.hpp') diff --git a/sca-cpp/branches/lightweight-sca/kernel/parallel.hpp b/sca-cpp/branches/lightweight-sca/kernel/parallel.hpp new file mode 100644 index 0000000000..3be4d3bc8e --- /dev/null +++ b/sca-cpp/branches/lightweight-sca/kernel/parallel.hpp @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +#ifndef tuscany_parallel_hpp +#define tuscany_parallel_hpp + +/** + * Simple parallel work execution functions. + */ + +#include +#ifdef WANT_THREADS +#include +#endif + +#include "function.hpp" +#include "list.hpp" + +namespace tuscany { + +/** + * Returns the current process id. + */ +unsigned long processId() { + return (unsigned long)getpid(); +} + +#ifdef WANT_THREADS + +/** + * Returns the current thread id. + */ +unsigned long threadId() { + return (unsigned long)pthread_self(); +} + +/** + * Represents a value which will be know in the future. + */ +template class future { + +private: + template class futureValue { + public: + futureValue() : hasValue(false) { + pthread_mutex_init(&valueMutex, NULL); + pthread_cond_init(&valueCond, NULL); + } + + futureValue(const futureValue& fv) : valueMutex(fv.valueMutex), valueCond(fv.valueCond), hasValue(fv.hasValue), value(fv.value) { + } + + ~futureValue() { + //pthread_mutex_destroy(&valueMutex); + //pthread_cond_destroy(&valueCond); + } + + bool set(const T& v) { + pthread_mutex_lock(&valueMutex); + if(hasValue) { + pthread_mutex_unlock(&valueMutex); + return false; + } + hasValue = true; + value = v; + pthread_mutex_unlock(&valueMutex); + pthread_cond_broadcast(&valueCond); + return true; + } + + const T get() { + pthread_mutex_lock(&valueMutex); + while(!hasValue) { + pthread_cond_wait(&valueCond, &valueMutex); + } + const T& v = value; + pthread_mutex_unlock(&valueMutex); + return v; + } + + private: + pthread_mutex_t valueMutex; + pthread_cond_t valueCond; + bool hasValue; + X value; + }; + + gc_ptr > fvalue; + + template friend const X get(const future& f); + template friend bool set(const future& f, const X& v); + +public: + future() : fvalue(new (gc_new >()) futureValue()) { + } + + ~future() { + } + + future(const future& f) : fvalue(f.fvalue) { + } + + const future& operator=(const future& f) { + if (&f == this) + return *this; + fvalue = f.fvalue; + return *this; + } + + const future& operator=(const T& v) const { + fvalue->set(v); + return *this; + } + + operator const T() const { + return fvalue->get(); + } +}; + +/** + * A bounded thread safe queue. + */ +template class wqueue { +public: + wqueue(size_t max) : max(max), size(0), tail(0), head(0), values(new (gc_anew(max)) T[max]) { + pthread_mutex_init(&mutex, NULL); + pthread_cond_init(&full, NULL); + pthread_cond_init(&empty, NULL); + } + + wqueue(const wqueue& wq) : max(wq.max), size(wq.size), tail(wq.tail), head(wq.head), mutex(wq.mutex), full(wq.full), empty(wq.empty), values(wq.values) { + } + + ~wqueue() { + //pthread_mutex_destroy(&mutex); + //pthread_cond_destroy(&full); + //pthread_cond_destroy(&empty); + } + +private: + const size_t max; + size_t size; + size_t tail; + size_t head; + pthread_mutex_t mutex; + pthread_cond_t full; + pthread_cond_t empty; + gc_ptr values; + + template friend const size_t enqueue(wqueue& q, const X& v); + template friend const X dequeue(wqueue& q); +}; + +/** + * Adds an element to the tail of the queue. + */ +template const size_t enqueue(wqueue&q, const T& v) { + pthread_mutex_lock(&q.mutex); + while(q.size == q.max) + pthread_cond_wait(&q.full, &q.mutex); + q.values[q.tail] = v; + q.tail = (q.tail + 1) % q.max; + q.size++; + pthread_mutex_unlock(&q.mutex); + pthread_cond_broadcast(&q.empty); + return q.size; +} + +/** + * Returns the element at the head of the queue. + */ +template const T dequeue(wqueue& q) { + pthread_mutex_lock(&q.mutex); + while(q.size == 0) + pthread_cond_wait(&q.empty, &q.mutex); + const T v = q.values[q.head]; + q.head = (q.head + 1) % q.max; + q.size--; + pthread_mutex_unlock(&q.mutex); + pthread_cond_broadcast(&q.full); + return v; +} + +/** + * The worker thread function. + */ +void *workerThreadFunc(void *arg) { + int ost; + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ost); + int ot; + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ot); + + wqueue >* work = reinterpret_cast >*>(arg); + while(dequeue(*work)()) + ; + return NULL; +} + +/** + * Returns a list of worker threads. + */ +const list workerThreads(wqueue >& wqueue, const size_t count) { + if (count == 0) + return list(); + pthread_t thread; + pthread_create(&thread, NULL, workerThreadFunc, &wqueue); + return cons(thread, workerThreads(wqueue, count - 1)); +} + +/** + * A worker, implemented with a work queue and a pool of threads. + */ +class worker { +private: + + // The worker holds a reference to a sharedWorker, to avoid non-thread-safe + // copies of the queue and thread pool when a worker is copied + class sharedWorker { + public: + sharedWorker(size_t max) : work(wqueue >(max)), threads(workerThreads(work, max)) { + } + + wqueue > work; + const list threads; + }; + +public: + worker(size_t max) : w(*(new (gc_new()) sharedWorker(max))) { + } + + worker(const worker& wk) : w(wk.w) { + } + +private: + sharedWorker& w; + + template friend const future submit(worker& w, const lambda& func); + friend const bool shutdown(worker& w); + friend const bool cancel(worker& w); +}; + +/** + * Function used to wrap work submitted to a worker. + */ +template bool submitFunc(const lambda& func, const future& fut) { + fut = func(); + return true; +} + +/** + * Submits work to a worker. + */ +template const future submit(worker& w, const lambda& func) { + const future fut; + const lambda f = curry(lambda, future)>(submitFunc), func, fut); + enqueue(w.w.work, f); + return fut; +} + +/** + * Enqueues shutdown requests. + */ +const bool shutdownEnqueue(const list& threads, wqueue >& work) { + if (isNil(threads)) + return true; + enqueue(work, result(false)); + return shutdownEnqueue(cdr(threads), work); +} + +/** + * Waits for shut down threads to terminate. + */ +const bool shutdownJoin(const list& threads) { + if (isNil(threads)) + return true; + pthread_join(car(threads), NULL); + return shutdownJoin(cdr(threads)); +} + +/** + * Shutdown a worker. + */ +const bool shutdown(worker& w) { + shutdownEnqueue(w.w.threads, w.w.work); + shutdownJoin(w.w.threads); + return true; +} + +/** + * Cancel a worker. + */ +const bool cancel(const list& threads) { + if (isNil(threads)) + return true; + pthread_cancel(car(threads)); + return cancel(cdr(threads)); +} + +const bool cancel(worker& w) { + cancel(w.w.threads); + return true; +} + +#else + +/** + * Returns the current thread id. + */ +unsigned long threadId() { + return 0; +} + +#endif + +/** + * Represents a per-thread value. + */ +template class perthread_ptr { +public: + perthread_ptr() : key(createkey()), owner(true), cl(lambda()>()), managed(false) { + } + + perthread_ptr(const lambda()>& cl) : key(createkey()), owner(true), cl(cl), managed(true) { + } + + ~perthread_ptr() { + if (owner) + deletekey(key); + } + + perthread_ptr(const perthread_ptr& c) : key(c.key), owner(false), cl(c.cl), managed(c.managed) { + } + + perthread_ptr& operator=(const perthread_ptr& r) throw() { + if(this == &r) + return *this; + key = r.key; + owner = false; + cl = r.cl; + managed = r.managed; + return *this; + } + + const perthread_ptr& operator=(const gc_ptr& v) { + set(v); + return *this; + } + + const perthread_ptr& operator=(T* v) { + set(v); + return *this; + } + + const bool operator==(const gc_ptr& r) const throw() { + return get() == r; + } + + const bool operator==(T* p) const throw() { + return get() == p; + } + + const bool operator!=(const gc_ptr& r) const throw() { + return !this->operator==(r); + } + + const bool operator!=(T* p) const throw() { + return !this->operator==(p); + } + + T& operator*() const throw() { + return *get(); + } + + T* operator->() const throw() { + return get(); + } + + operator gc_ptr() const { + return get(); + } + + operator T*() const { + return get(); + } + +private: +#ifdef WANT_THREADS + pthread_key_t createkey() { + pthread_key_t k; + pthread_key_create(&k, NULL); + return k; + } + + bool deletekey(pthread_key_t k) { + pthread_key_delete(k); + return true; + } + + bool set(const gc_ptr& v) { + pthread_setspecific(key, (T*)v); + return true; + } + + gc_ptr get() const { + const gc_ptr v = static_cast(pthread_getspecific(key)); + if (v != NULL || !managed) + return v; + const gc_ptr nv = cl(); + pthread_setspecific(key, nv); + return nv; + } + +#else + gc_ptr > createkey() { + return new (gc_new >()) gc_ptr(); + } + + bool deletekey(unused gc_ptr > k) { + return true; + } + + bool set(const gc_ptr& v) { + *key = v; + return true; + } + + gc_ptr get() const { + if (*key != NULL || !managed) + return *key; + *key = cl(); + return *key; + } + +#endif + +#ifdef WANT_THREADS + pthread_key_t key; +#else + gc_ptr >key; +#endif + + bool owner; + lambda()> cl; + bool managed; +}; + +} +#endif /* tuscany_parallel_hpp */ -- cgit v1.2.3