From 215c005bde88752999978adce56b09d2a90a13b7 Mon Sep 17 00:00:00 2001 From: jsdelfino Date: Sun, 27 Sep 2009 20:04:28 +0000 Subject: Moved some sources up in the directory tree to attempt to simplify the directory structure a bit, and some minor refactoring. git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@819394 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/sca/runtime/core/src/tuscany/parallel.hpp | 283 -------------------------- 1 file changed, 283 deletions(-) delete mode 100644 cpp/sca/runtime/core/src/tuscany/parallel.hpp (limited to 'cpp/sca/runtime/core/src/tuscany/parallel.hpp') diff --git a/cpp/sca/runtime/core/src/tuscany/parallel.hpp b/cpp/sca/runtime/core/src/tuscany/parallel.hpp deleted file mode 100644 index c0d578e281..0000000000 --- a/cpp/sca/runtime/core/src/tuscany/parallel.hpp +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* $Rev$ $Date$ */ - -#ifndef tuscany_parallel_hpp -#define tuscany_parallel_hpp - -/** - * Simple parallel work execution functions. - */ - -#include -#include -#include "function.hpp" - -namespace tuscany { - -/** - * Returns the current thread id. - */ -unsigned int threadId() { - return syscall(__NR_gettid); -} - -/** - * Represents a value which will be know in the future. - */ -template class future { - -private: - template class futureValue { - public: - futureValue() : - refCount(0), hasValue(false) { - pthread_mutex_init(&valueMutex, NULL); - pthread_cond_init(&valueCond, NULL); - } - - ~futureValue() { - pthread_mutex_destroy(&valueMutex); - pthread_cond_destroy(&valueCond); - } - - unsigned int acquire() { - return __sync_add_and_fetch(&refCount, 1); - } - - unsigned int release() { - return __sync_sub_and_fetch(&refCount, 1); - } - - bool set(const T& v) { - pthread_mutex_lock(&valueMutex); - if(hasValue) { - pthread_mutex_unlock(&valueMutex); - return false; - } - hasValue = true; - value = v; - pthread_mutex_unlock(&valueMutex); - pthread_cond_broadcast(&valueCond); - return true; - } - - const T get() { - pthread_mutex_lock(&valueMutex); - while(!hasValue) { - pthread_cond_wait(&valueCond, &valueMutex); - } - const T& v = value; - pthread_mutex_unlock(&valueMutex); - return v; - } - - private: - unsigned refCount; - pthread_mutex_t valueMutex; - pthread_cond_t valueCond; - bool hasValue; - X value; - }; - - gc_counting_ptr > fvalue; - - template friend const X get(const future& f); - template friend bool set(const future& f, const X& v); - -public: - future() : fvalue(new futureValue()) { - //std::cout << "future() threadId " << threadId() << "\n"; - } - - ~future() { - //std::cout << "~future() threadId " << threadId() << "\n"; - } - - future(const future& f) : fvalue(f.fvalue) { - //std::cout << "future(const future& f) threadId " << threadId() << "\n"; - } - - const future& operator=(const future& f) { - //std::cout << "future::operator=(const future& f) threadId " << threadId() << "\n"; - if (&f == this) - return *this; - fvalue = f.fvalue; - return *this; - } - - const future& operator=(const T& v) const { - fvalue->set(v); - return *this; - } - - operator const T() const { - return fvalue->get(); - } - -}; - -/** - * A bounded thread safe queue. - */ -template class queue { -public: - explicit queue(int max) : max(max), size(0), tail(0), head(0), values(new T[max]) { - pthread_mutex_init(&mutex, NULL); - pthread_cond_init(&full, NULL); - pthread_cond_init(&empty, NULL); - } - - ~queue() { - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&full); - pthread_cond_destroy(&empty); - } - -private: - const int max; - int size; - int tail; - int head; - pthread_mutex_t mutex; - pthread_cond_t full; - pthread_cond_t empty; - gc_aptr values; - - template friend const int enqueue(queue& q, const X& v); - template friend const X dequeue(queue& q); -}; - -/** - * Adds an element to the tail of the queue. - */ -template const int enqueue(queue&q, const T& v) { - pthread_mutex_lock(&q.mutex); - while(q.size == q.max) - pthread_cond_wait(&q.full, &q.mutex); - q.values[q.tail] = v; - q.tail = (q.tail + 1) % q.max; - q.size++; - pthread_mutex_unlock(&q.mutex); - pthread_cond_broadcast(&q.empty); - return q.size; -} - -/** - * Returns the element at the head of the queue. - */ -template const T dequeue(queue& q) { - pthread_mutex_lock(&q.mutex); - while(q.size == 0) - pthread_cond_wait(&q.empty, &q.mutex); - const T v = q.values[q.head]; - q.head = (q.head + 1) % q.max; - q.size--; - pthread_mutex_unlock(&q.mutex); - pthread_cond_broadcast(&q.full); - return v; -} - -/** - * The worker thread function. - */ -void *workerThreadFunc(void *arg) { - queue >* work = reinterpret_cast >*>(arg); - while(dequeue(*work)()) - ; - return NULL; -} - -/** - * Returns a list of worker threads. - */ -const list makeWorkerThreads(queue >& queue, const int count) { - if (count == 0) - return list(); - pthread_t thread; - pthread_create(&thread, NULL, workerThreadFunc, &queue); - return cons(thread, makeWorkerThreads(queue, count - 1)); -} - -/** - * A worker, implemented with a work queue and a pool of threads. - */ -class worker { -public: - explicit worker(int max) : work(queue >(max)), threads(makeWorkerThreads(work, max)) { - } - -private: - queue > work; - const list threads; - - template friend const future submit(worker& w, const lambda& func); - friend const bool shutdown(worker& w); -}; - -/** - * Function used to wrap work submitted to a worker. - */ -template bool submitFunc(const lambda& func, const future& fut) { - fut = func(); - return true; -} - -/** - * Submits work to a worker. - */ -template const future submit(worker& w, const lambda& func) { - const future fut; - const lambda f = curry(lambda, future)>(submitFunc), func, fut); - enqueue(w.work, f); - return fut; -} - -/** - * Enqueues shutdown requests. - */ -const bool shutdownEnqueue(const list& threads, queue >& work) { - if (threads == list()) - return true; - enqueue(work, unit(false)); - return shutdownEnqueue(cdr(threads), work); -} - -/** - * Waits for shut down threads to terminate. - */ -const bool shutdownJoin(const list& threads) { - if (threads == list()) - return true; - pthread_join(car(threads), NULL); - return shutdownJoin(cdr(threads)); -} - -/** - * Shutdown a worker. - */ -const bool shutdown(worker& w) { - shutdownEnqueue(w.threads, w.work); - shutdownJoin(w.threads); - return true; -} - -} -#endif /* tuscany_parallel_hpp */ -- cgit v1.2.3