/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* $Rev$ $Date$ */ /** * Test parallel functions. */ #include #include "stream.hpp" #include "string.hpp" #include "function.hpp" #include "list.hpp" #include "perf.hpp" #include "parallel.hpp" namespace tuscany { int inci = 0; const gc_ptr tlsic() { const gc_ptr i = new (gc_new()) int(); *i = 0; return i; } const perthread_ptr tlsi(tlsic); #ifdef WANT_THREADS int addi = 0; int muxi = 0; #endif const bool testAtomicPerf() { const int count = 100000; { const blambda l = []() -> const bool { inci = inci + 1; return true; }; cout << "Non-atomic inc test " << time(l, 1000, count) << " ms" << endl; assert(inci == count + 1000); } #ifdef WANT_THREADS { const blambda l = []() -> const bool { __sync_add_and_fetch(&addi, 1); return true; }; cout << "Atomic inc test " << time(l, 1000, count) << " ms" << endl; assert(addi == count + 1000); } { pthread_mutex_t mutex; pthread_mutex_init(&mutex, NULL); const blambda l = [&mutex]() -> const bool { pthread_mutex_lock(&mutex); muxi = muxi + 1; pthread_mutex_unlock(&mutex); return true; }; cout << "Locked inc test " << time(l, 1000, count) << " ms" << endl; assert(muxi == count + 1000); pthread_mutex_destroy(&mutex); } #endif { const blambda l = []() -> const bool { *tlsi = *tlsi + 1; return true; }; cout << "Thread local inc test " << time(l, 1000, count) << " ms" << endl; assert(*tlsi == count + 1000); } return true; } #ifdef WANT_THREADS const int mtsquare(const int x) { for(int i = 0; i < 10000000; i++) ; return x * x; } const list > submitSquares(worker& w, const int max, const int i) { if (i == max) return list >(); const lambda func = curry(lambda (mtsquare), i); return cons(submit(w, func), submitSquares(w, max, i + 1)); } bool checkSquareResults(const list > r, int i) { if (isNil(r)) return true; assert(car(r) == i * i); checkSquareResults(cdr(r), i + 1); return true; } const gc_ptr tlsvc() { const gc_ptr i = new (gc_new()) unsigned long(); *i = 0l; return i; } const perthread_ptr tlsv(tlsvc); const long int tlsset(gc_ptr> wq, gc_ptr> xq) { const unsigned long v = *tlsv; *tlsv = threadId(); enqueue(*xq, true); dequeue(*wq); return v; } const bool tlscheck(gc_ptr> wq, gc_ptr> xq) { const bool r = *tlsv == threadId(); enqueue(*xq, true); dequeue(*wq); return r; } const bool waitForWorkers(wqueue& xq, const int n) { if (n == 0) return true; dequeue(xq); return waitForWorkers(xq, n - 1); } const bool unblockWorkers(wqueue& wq, const int n) { if (n == 0) return true; enqueue(wq, true); return unblockWorkers(wq, n - 1); } const list > submitTLSSets(worker& w, wqueue& wq, wqueue& xq, const int max, const int i) { if (i == max) return list >(); const lambda func = curry(lambda>, gc_ptr>)>(tlsset), (gc_ptr>)&wq, (gc_ptr>)&xq); return cons(submit(w, func), submitTLSSets(w, wq, xq, max, i + 1)); } const bool checkTLSSets(const list > s) { if (isNil(s)) return true; assert(car(s) == 0); return checkTLSSets(cdr(s)); } const list > submitTLSChecks(worker& w, wqueue& wq, wqueue& xq, const int max, const int i) { if (i == max) return list >(); const blambda func = curry(lambda>, gc_ptr>)>(tlscheck), (gc_ptr>)&wq, (gc_ptr>)&xq); return cons(submit(w, func), submitTLSChecks(w, wq, xq, max, i + 1)); } const bool checkTLSResults(const list > r) { if (isNil(r)) return true; assert(car(r) == true); return checkTLSResults(cdr(r)); } const bool testWorker() { const int max = 100; worker w(max); { const lambda func = curry(lambda (mtsquare), 2); assert(submit(w, func) == 4); } { const list > r(submitSquares(w, max, 0)); checkSquareResults(r, 0); } { wqueue wq(max); unblockWorkers(wq, max); waitForWorkers(wq, max); unblockWorkers(wq, max); waitForWorkers(wq, max); } { wqueue wq(max); wqueue xq(max); const list > s(submitTLSSets(w, wq, xq, max, 0)); waitForWorkers(xq, max); unblockWorkers(wq, max); checkTLSSets(s); const list > r(submitTLSChecks(w, wq, xq, max, 0)); waitForWorkers(xq, max); unblockWorkers(wq, max); checkTLSResults(r); } shutdown(w); return true; } #endif } int main() { const tuscany::gc_scoped_pool p; tuscany::cout << "Testing..." << tuscany::endl; tuscany::testAtomicPerf(); #ifdef WANT_THREADS tuscany::testWorker(); #else tuscany::cout << "Skipped multi-thread tests" << tuscany::endl; #endif tuscany::cout << "OK" << tuscany::endl; return 0; }