/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* $Rev$ $Date$ */ /** * Test parallel functions. */ #include #include "stream.hpp" #include "string.hpp" #include "function.hpp" #include "list.hpp" #include "perf.hpp" #include "parallel.hpp" namespace tuscany { #ifdef WANT_THREADS int inci = 0; struct incPerf { incPerf() { } const bool operator()() const { inci = inci + 1; return true; } }; int addi = 0; struct addAndFetchPerf { addAndFetchPerf() { } const bool operator()() const { __sync_add_and_fetch(&addi, 1); return true; } }; int muxi = 0; struct mutexPerf { pthread_mutex_t* mutex; mutexPerf(pthread_mutex_t* mutex) : mutex(mutex) { } const bool operator()() const { pthread_mutex_lock(mutex); muxi = muxi + 1; pthread_mutex_unlock(mutex); return true; } }; const gc_ptr tlsic() { gc_ptr i = new (gc_new()) int(); *i = 0; return i; } const perthread_ptr tlsi(tlsic); struct tlsPerf { tlsPerf() { } const bool operator()() const { *tlsi = *tlsi + 1; return true; } }; bool testAtomicPerf() { const int count = 100000; { const lambda l = incPerf(); cout << "Non-atomic inc test " << time(l, 1000, count) << " ms" << endl; assert(inci == count + 1000); } { const lambda l = addAndFetchPerf(); cout << "Atomic inc test " << time(l, 1000, count) << " ms" << endl; assert(addi == count + 1000); } { pthread_mutex_t mutex; pthread_mutex_init(&mutex, NULL); const lambda l = mutexPerf(&mutex); cout << "Locked inc test " << time(l, 1000, count) << " ms" << endl; assert(muxi == count + 1000); pthread_mutex_destroy(&mutex); } { const lambda l = tlsPerf(); cout << "Thread local inc test " << time(l, 1000, count) << " ms" << endl; assert(*tlsi == count + 1000); } return true; } const int mtsquare(const int x) { for(int i = 0; i < 10000000; i++) ; return x * x; } const list > submitSquares(worker& w, const int max, const int i) { if (i == max) return list >(); const lambda func = curry(lambda (mtsquare), i); return cons(submit(w, func), submitSquares(w, max, i + 1)); } bool checkSquareResults(const list > r, int i) { if (isNil(r)) return true; assert(car(r) == i * i); checkSquareResults(cdr(r), i + 1); return true; } const gc_ptr tlsvc() { gc_ptr i = new (gc_new()) unsigned long(); *i = 0l; return i; } const perthread_ptr tlsv(tlsvc); const long int tlsset(gc_ptr> wq, gc_ptr> xq) { const unsigned long v = *tlsv; *tlsv = threadId(); enqueue(*xq, true); dequeue(*wq); return v; } const bool tlscheck(gc_ptr> wq, gc_ptr> xq) { const bool r = *tlsv == threadId(); enqueue(*xq, true); dequeue(*wq); return r; } const bool waitForWorkers(wqueue& xq, const int n) { if (n == 0) return true; dequeue(xq); return waitForWorkers(xq, n - 1); } const bool unblockWorkers(wqueue& wq, const int n) { if (n == 0) return true; enqueue(wq, true); return unblockWorkers(wq, n - 1); } const list > submitTLSSets(worker& w, wqueue& wq, wqueue& xq, const int max, const int i) { if (i == max) return list >(); const lambda func = curry(lambda>, gc_ptr>)>(tlsset), (gc_ptr>)&wq, (gc_ptr>)&xq); return cons(submit(w, func), submitTLSSets(w, wq, xq, max, i + 1)); } bool checkTLSSets(const list > s) { if (isNil(s)) return true; assert(car(s) == 0); return checkTLSSets(cdr(s)); } const list > submitTLSChecks(worker& w, wqueue& wq, wqueue& xq, const int max, const int i) { if (i == max) return list >(); const lambda func = curry(lambda>, gc_ptr>)>(tlscheck), (gc_ptr>)&wq, (gc_ptr>)&xq); return cons(submit(w, func), submitTLSChecks(w, wq, xq, max, i + 1)); } bool checkTLSResults(const list > r) { if (isNil(r)) return true; assert(car(r) == true); return checkTLSResults(cdr(r)); } bool testWorker() { const int max = 100; worker w(max); { const lambda func = curry(lambda (mtsquare), 2); assert(submit(w, func) == 4); } { const list > r(submitSquares(w, max, 0)); checkSquareResults(r, 0); } { wqueue wq(max); unblockWorkers(wq, max); waitForWorkers(wq, max); unblockWorkers(wq, max); waitForWorkers(wq, max); } { wqueue wq(max); wqueue xq(max); const list > s(submitTLSSets(w, wq, xq, max, 0)); waitForWorkers(xq, max); unblockWorkers(wq, max); checkTLSSets(s); const list > r(submitTLSChecks(w, wq, xq, max, 0)); waitForWorkers(xq, max); unblockWorkers(wq, max); checkTLSResults(r); } shutdown(w); return true; } #endif } int main() { tuscany::cout << "Testing..." << tuscany::endl; #ifdef WANT_THREADS tuscany::testAtomicPerf(); tuscany::testWorker(); #else tuscany::cout << "Skipped multi-thread tests" << tuscany::endl; #endif tuscany::cout << "OK" << tuscany::endl; return 0; }