From 00438314438f3dde00b532ac5d8d28ccc35c7096 Mon Sep 17 00:00:00 2001 From: jsdelfino Date: Wed, 17 Feb 2010 04:14:31 +0000 Subject: Working queue and chat components. Added a few useful start/stop scripts. Fixed lifecycle code to call start/stop/restart functions before APR pools are cleaned up in both parent and child processes. Minor build script improvements. git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@910819 13f79535-47bb-0310-9956-ffa450edef68 --- sca-cpp/trunk/INSTALL | 16 +- sca-cpp/trunk/autogen.sh | 31 --- sca-cpp/trunk/bootstrap | 31 +++ sca-cpp/trunk/components/cache/client-test.cpp | 27 +- sca-cpp/trunk/components/cache/mcache-test.cpp | 27 +- sca-cpp/trunk/components/cache/mcache.cpp | 62 +++-- sca-cpp/trunk/components/cache/mcache.hpp | 16 +- sca-cpp/trunk/components/cache/memcached-start | 21 ++ sca-cpp/trunk/components/cache/memcached-stop | 23 ++ sca-cpp/trunk/components/cache/memcached-test | 7 +- sca-cpp/trunk/components/cache/server-test | 8 +- sca-cpp/trunk/components/chat/Makefile.am | 17 +- sca-cpp/trunk/components/chat/chat-listener.cpp | 57 ---- sca-cpp/trunk/components/chat/chat-sender.cpp | 58 ---- sca-cpp/trunk/components/chat/chat.composite | 20 +- sca-cpp/trunk/components/chat/chatter.cpp | 162 ++++++++++++ sca-cpp/trunk/components/chat/client-test.cpp | 111 ++++++++ sca-cpp/trunk/components/chat/server-test | 39 +++ sca-cpp/trunk/components/chat/server-test.scm | 2 +- sca-cpp/trunk/components/chat/xmpp-test.cpp | 103 ++++++++ sca-cpp/trunk/components/chat/xmpp.hpp | 292 +++++++++++++++++++++ sca-cpp/trunk/components/queue/Makefile.am | 4 +- sca-cpp/trunk/components/queue/client-test.cpp | 60 +++-- sca-cpp/trunk/components/queue/qpid-test.cpp | 45 +++- sca-cpp/trunk/components/queue/qpid.hpp | 153 ++++++++++- sca-cpp/trunk/components/queue/qpidd-start | 24 ++ sca-cpp/trunk/components/queue/qpidd-stop | 26 ++ sca-cpp/trunk/components/queue/queue-listener.cpp | 102 ++++++- sca-cpp/trunk/components/queue/queue-sender.cpp | 15 +- sca-cpp/trunk/components/queue/queue.composite | 15 +- sca-cpp/trunk/components/queue/send-test | 31 +++ sca-cpp/trunk/components/queue/server-test | 43 +++ sca-cpp/trunk/components/queue/server-test.scm | 2 +- sca-cpp/trunk/components/webservice/echo-test | 2 +- sca-cpp/trunk/components/webservice/server-test | 2 +- .../components/webservice/webservice-client.cpp | 2 +- sca-cpp/trunk/configure.ac | 16 +- sca-cpp/trunk/etc/git-exclude | 2 + sca-cpp/trunk/kernel/element.hpp | 2 +- sca-cpp/trunk/kernel/parallel.hpp | 88 +++++-- sca-cpp/trunk/kernel/value.hpp | 24 ++ sca-cpp/trunk/modules/http/curl.hpp | 8 +- sca-cpp/trunk/modules/http/httpd.hpp | 38 +-- sca-cpp/trunk/modules/java/eval.hpp | 4 +- sca-cpp/trunk/modules/java/mod-java.cpp | 47 ++-- sca-cpp/trunk/modules/java/mod-java.hpp | 21 +- .../modules/java/org/apache/tuscany/Service.java | 22 +- sca-cpp/trunk/modules/java/test/Client.java | 12 +- sca-cpp/trunk/modules/java/test/ClientImpl.java | 18 +- sca-cpp/trunk/modules/java/test/Server.java | 12 +- sca-cpp/trunk/modules/java/test/ServerImpl.java | 26 +- sca-cpp/trunk/modules/json/json.hpp | 2 +- sca-cpp/trunk/modules/python/client-test.py | 12 +- sca-cpp/trunk/modules/python/eval.hpp | 4 +- sca-cpp/trunk/modules/python/mod-python.cpp | 31 +-- sca-cpp/trunk/modules/python/mod-python.hpp | 10 +- sca-cpp/trunk/modules/python/server-test.py | 30 +-- sca-cpp/trunk/modules/scheme/primitive.hpp | 6 +- sca-cpp/trunk/modules/server/client-test.scm | 12 +- sca-cpp/trunk/modules/server/impl-test.cpp | 2 +- sca-cpp/trunk/modules/server/mod-cpp.hpp | 7 +- sca-cpp/trunk/modules/server/mod-eval.cpp | 27 +- sca-cpp/trunk/modules/server/mod-eval.hpp | 228 +++++++++------- sca-cpp/trunk/modules/server/mod-scheme.hpp | 10 +- sca-cpp/trunk/modules/server/mod-wiring.cpp | 104 ++++---- sca-cpp/trunk/modules/server/server-test.scm | 24 +- sca-cpp/trunk/test/store-cpp/start | 5 +- sca-cpp/trunk/test/store-cpp/stop | 4 +- sca-cpp/trunk/test/store-java/start | 5 +- sca-cpp/trunk/test/store-java/stop | 4 +- sca-cpp/trunk/test/store-python/start | 5 +- sca-cpp/trunk/test/store-python/stop | 4 +- sca-cpp/trunk/test/store-scheme/start | 5 +- sca-cpp/trunk/test/store-scheme/stop | 4 +- 74 files changed, 1810 insertions(+), 731 deletions(-) delete mode 100755 sca-cpp/trunk/autogen.sh create mode 100755 sca-cpp/trunk/bootstrap create mode 100755 sca-cpp/trunk/components/cache/memcached-start create mode 100755 sca-cpp/trunk/components/cache/memcached-stop delete mode 100644 sca-cpp/trunk/components/chat/chat-listener.cpp delete mode 100644 sca-cpp/trunk/components/chat/chat-sender.cpp create mode 100644 sca-cpp/trunk/components/chat/chatter.cpp create mode 100644 sca-cpp/trunk/components/chat/client-test.cpp create mode 100755 sca-cpp/trunk/components/chat/server-test create mode 100644 sca-cpp/trunk/components/chat/xmpp-test.cpp create mode 100755 sca-cpp/trunk/components/queue/qpidd-start create mode 100755 sca-cpp/trunk/components/queue/qpidd-stop create mode 100755 sca-cpp/trunk/components/queue/send-test create mode 100755 sca-cpp/trunk/components/queue/server-test (limited to 'sca-cpp/trunk') diff --git a/sca-cpp/trunk/INSTALL b/sca-cpp/trunk/INSTALL index d6e5229683..2fe522d7e4 100644 --- a/sca-cpp/trunk/INSTALL +++ b/sca-cpp/trunk/INSTALL @@ -55,7 +55,7 @@ build it from source at git://code.stanziq.com//libstrophe To configure the Tuscany SCA build do this: -./autogen.sh +./bootstrap ./configure --prefix= To enable debugging and strict warning compile options, add: @@ -64,7 +64,8 @@ To enable debugging and strict warning compile options, add: To enable gprof profiling, add: --enable-profiling -To enable multi-threading with the HTTPD worker MPM, add: +To enable multi-threading (required by the Queue and Chat components and +for running with the HTTPD worker or event multi-threaded MPMs): --enable-threads To enable support for Python component implementations: @@ -99,6 +100,7 @@ dependencies installed under $HOME: --with-libcurl=/usr --with-libxml2=/usr \ --with-js-include=/usr/include/xulrunner-1.9.1.7/unstable \ --with-js-lib=/usr/lib/xulrunner-1.9.1.7 \ +--enable-threads \ --enable-python --with-python=/usr \ --enable-java --with-java=/usr/lib/jvm/default-java \ --enable-webservice --with-axis2c=$HOME/axis2c-1.6.0-bin \ @@ -135,7 +137,7 @@ Building dependencies from source Here are example build and install steps for some of the dependencies. -Apache HTTPD, including APR: +Apache HTTPD, including APR, using the HTTP prefork MPM (recommended): ./configure --enable-ssl --enable-proxy --enable-rewrite --with-included-apr \ --with-mpm=prefork --prefix=$HOME/httpd-2.2.13-bin make @@ -150,6 +152,11 @@ make install export AXIS2C_HOME=$HOME/axis2c-1.6.0-bin Apache Qpid/C++: +git clone git://git.apache.org/qpid.git +cd qpid +git checkout -b 0.6-release origin/0.6-release +cd qpid/cpp +./bootstrap ./configure --prefix=$HOME/qpidc-0.6-bin make make install @@ -162,6 +169,7 @@ git submodule update aclocal automake --add-missing --foreign --copy autoconf -./configure +./configure --prefix=$HOME/libstrophe-bin make +make install diff --git a/sca-cpp/trunk/autogen.sh b/sca-cpp/trunk/autogen.sh deleted file mode 100755 index af38864985..0000000000 --- a/sca-cpp/trunk/autogen.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -for i in "libtoolize --force" aclocal autoconf autoheader -do - echo -n "Running $i..." - $i || exit 1 - echo 'done.' -done - -echo -n 'Running automake...' -automake --add-missing -echo 'done.' -exit 0 - diff --git a/sca-cpp/trunk/bootstrap b/sca-cpp/trunk/bootstrap new file mode 100755 index 0000000000..af38864985 --- /dev/null +++ b/sca-cpp/trunk/bootstrap @@ -0,0 +1,31 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +for i in "libtoolize --force" aclocal autoconf autoheader +do + echo -n "Running $i..." + $i || exit 1 + echo 'done.' +done + +echo -n 'Running automake...' +automake --add-missing +echo 'done.' +exit 0 + diff --git a/sca-cpp/trunk/components/cache/client-test.cpp b/sca-cpp/trunk/components/cache/client-test.cpp index c8b23b391e..ddf093a6dc 100644 --- a/sca-cpp/trunk/components/cache/client-test.cpp +++ b/sca-cpp/trunk/components/cache/client-test.cpp @@ -36,7 +36,7 @@ namespace tuscany { namespace cache { -const string url("http://localhost:8090/mcache"); +const string uri("http://localhost:8090/mcache"); bool testCache() { http::CURLSession cs; @@ -46,10 +46,12 @@ bool testCache() { + (list() + "price" + string("$2.99")); const list a = mklist(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), i); - const failable id = http::post(a, url, cs); + const failable id = http::post(a, uri, cs); assert(hasContent(id)); + + const string p = path(content(id)); { - const failable val = http::get(url + "/" + content(id), cs); + const failable val = http::get(uri + p, cs); assert(hasContent(val)); assert(content(val) == a); } @@ -60,22 +62,22 @@ bool testCache() { const list b = mklist(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), j); { - const failable r = http::put(b, url + "/" + content(id), cs); + const failable r = http::put(b, uri + p, cs); assert(hasContent(r)); assert(content(r) == value(true)); } { - const failable val = http::get(url + "/" + content(id), cs); + const failable val = http::get(uri + p, cs); assert(hasContent(val)); assert(content(val) == b); } { - const failable r = http::del(url + "/" + content(id), cs); + const failable r = http::del(uri + p, cs); assert(hasContent(r)); assert(content(r) == value(true)); } { - const failable val = http::get(url + "/" + content(id), cs); + const failable val = http::get(uri + p, cs); assert(!hasContent(val)); } @@ -83,13 +85,13 @@ bool testCache() { } struct getLoop { - const value id; + const string path; const value entry; http::CURLSession cs; - getLoop(const value& id, const value& entry, http::CURLSession cs) : id(id), entry(entry), cs(cs) { + getLoop(const string& path, const value& entry, http::CURLSession cs) : path(path), entry(entry), cs(cs) { } const bool operator()() const { - const failable val = http::get(url + "/" + id, cs); + const failable val = http::get(uri + path, cs); assert(hasContent(val)); assert(content(val) == entry); return true; @@ -103,10 +105,11 @@ bool testGetPerf() { const value a = mklist(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), i); http::CURLSession cs; - const failable id = http::post(a, url, cs); + const failable id = http::post(a, uri, cs); assert(hasContent(id)); + const string p = path(content(id)); - const lambda gl = getLoop(content(id), a, cs); + const lambda gl = getLoop(p, a, cs); cout << "Cache get test " << time(gl, 5, 200) << " ms" << endl; return true; diff --git a/sca-cpp/trunk/components/cache/mcache-test.cpp b/sca-cpp/trunk/components/cache/mcache-test.cpp index 90f17ffd48..316372c5be 100644 --- a/sca-cpp/trunk/components/cache/mcache-test.cpp +++ b/sca-cpp/trunk/components/cache/mcache-test.cpp @@ -33,33 +33,36 @@ namespace tuscany { namespace cache { bool testMemCached() { - MemCached ch; + MemCached ch("127.0.0.1", 11211); + const value k = mklist("a"); - assert(hasContent(post("a", string("AAA"), ch))); - assert((get("a", ch)) == value(string("AAA"))); - assert(hasContent(put("a", string("aaa"), ch))); - assert((get("a", ch)) == value(string("aaa"))); - assert(hasContent(del("a", ch))); - assert(!hasContent(get("a", ch))); + assert(hasContent(post(k, string("AAA"), ch))); + assert((get(k, ch)) == value(string("AAA"))); + assert(hasContent(put(k, string("aaa"), ch))); + assert((get(k, ch)) == value(string("aaa"))); + assert(hasContent(del(k, ch))); + assert(!hasContent(get(k, ch))); return true; } struct getLoop { + const value k; MemCached& ch; - getLoop(MemCached& ch) : ch(ch) { + getLoop(const value& k, MemCached& ch) : k(k), ch(ch) { } const bool operator()() const { - assert((get("c", ch)) == value(string("CCC"))); + assert((get(k, ch)) == value(string("CCC"))); return true; } }; bool testGetPerf() { - MemCached ch; - assert(hasContent(post("c", string("CCC"), ch))); + const value k = mklist("c"); + MemCached ch("127.0.0.1", 11211); + assert(hasContent(post(k, string("CCC"), ch))); - const lambda gl = getLoop(ch); + const lambda gl = getLoop(k, ch); cout << "Memcached get test " << time(gl, 5, 200) << " ms" << endl; return true; } diff --git a/sca-cpp/trunk/components/cache/mcache.cpp b/sca-cpp/trunk/components/cache/mcache.cpp index b1dc750ccb..50b642106f 100644 --- a/sca-cpp/trunk/components/cache/mcache.cpp +++ b/sca-cpp/trunk/components/cache/mcache.cpp @@ -34,14 +34,12 @@ #include "mcache.hpp" namespace tuscany { -namespace cache { - -cache::MemCached ch; +namespace mcache { /** * Get an item from the cache. */ -const failable get(const list& params) { +const failable get(const list& params, cache::MemCached& ch) { return cache::get(car(params), ch); } @@ -56,9 +54,9 @@ const value uuidValue() { return value(string(buf, APR_UUID_FORMATTED_LENGTH)); } -const failable post(const list& params) { - const value id = uuidValue(); - const failable val = cache::post(id, car(params), ch); +const failable post(const list& params, cache::MemCached& ch) { + const value id = append(car(params), mklist(uuidValue())); + const failable val = cache::post(id, cadr(params), ch); if (!hasContent(val)) return mkfailure(reason(val)); return id; @@ -67,7 +65,7 @@ const failable post(const list& params) { /** * Put an item into the cache. */ -const failable put(const list& params) { +const failable put(const list& params, cache::MemCached& ch) { const failable val = cache::put(car(params), cadr(params), ch); if (!hasContent(val)) return mkfailure(reason(val)); @@ -77,13 +75,49 @@ const failable put(const list& params) { /** * Delete an item from the cache. */ -const failable del(const list& params) { +const failable del(const list& params, cache::MemCached& ch) { const failable val = cache::del(car(params), ch); if (!hasContent(val)) return mkfailure(reason(val)); return value(content(val)); } +/** + * Component implementation lambda function. + */ +class applyCache { +public: + applyCache(cache::MemCached& ch) : ch(ch) { + } + + const value operator()(const list& params) const { + const value func(car(params)); + if (func == "get") + return get(cdr(params), ch); + if (func == "post") + return post(cdr(params), ch); + if (func == "put") + return put(cdr(params), ch); + if (func == "delete") + return del(cdr(params), ch); + return tuscany::mkfailure(); + } + +private: + cache::MemCached& ch; +}; + +/** + * Start the component. + */ +const failable start(unused const list& params) { + // Connect to memcached + cache::MemCached& ch = *(new (gc_new()) cache::MemCached("127.0.0.1", 11211)); + + // Return the component implementation lambda function + return value(lambda&)>(applyCache(ch))); +} + } } @@ -91,14 +125,8 @@ extern "C" { const tuscany::value apply(const tuscany::list& params) { const tuscany::value func(car(params)); - if (func == "get") - return tuscany::cache::get(cdr(params)); - if (func == "post") - return tuscany::cache::post(cdr(params)); - if (func == "put") - return tuscany::cache::put(cdr(params)); - if (func == "delete") - return tuscany::cache::del(cdr(params)); + if (func == "start" || func == "restart") + return tuscany::mcache::start(cdr(params)); return tuscany::mkfailure(); } diff --git a/sca-cpp/trunk/components/cache/mcache.hpp b/sca-cpp/trunk/components/cache/mcache.hpp index 9659c11788..4751975099 100644 --- a/sca-cpp/trunk/components/cache/mcache.hpp +++ b/sca-cpp/trunk/components/cache/mcache.hpp @@ -48,23 +48,28 @@ namespace cache { */ class MemCached { public: - MemCached() { - apr_pool_create(&pool, NULL); - apr_memcache_create(pool, 1, 0, &mc); - init("localhost", 11211); + MemCached() : owner(false) { } - MemCached(const string host, const int port) { + MemCached(const string host, const int port) : owner(true) { apr_pool_create(&pool, NULL); apr_memcache_create(pool, 1, 0, &mc); init(host, port); } + MemCached(const MemCached& c) : owner(false) { + pool = c.pool; + mc = c.mc; + } + ~MemCached() { + if (!owner) + return; apr_pool_destroy(pool); } private: + bool owner; apr_pool_t* pool; apr_memcache_t* mc; @@ -86,7 +91,6 @@ private: return mkfailure("Could not add server"); return true; } - }; /** diff --git a/sca-cpp/trunk/components/cache/memcached-start b/sca-cpp/trunk/components/cache/memcached-start new file mode 100755 index 0000000000..cd27faf046 --- /dev/null +++ b/sca-cpp/trunk/components/cache/memcached-start @@ -0,0 +1,21 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Start memcached +memcached -l 127.0.0.1 -m 4 -p 11211 & diff --git a/sca-cpp/trunk/components/cache/memcached-stop b/sca-cpp/trunk/components/cache/memcached-stop new file mode 100755 index 0000000000..b999228b46 --- /dev/null +++ b/sca-cpp/trunk/components/cache/memcached-stop @@ -0,0 +1,23 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Stop memcached +mc="memcached -l 127.0.0.1 -m 4 -p 11211" + +kill `ps -f | grep -v grep | grep "${mc}" | awk '{ print $2 }'` diff --git a/sca-cpp/trunk/components/cache/memcached-test b/sca-cpp/trunk/components/cache/memcached-test index df21d32a57..d4b9c04eda 100755 --- a/sca-cpp/trunk/components/cache/memcached-test +++ b/sca-cpp/trunk/components/cache/memcached-test @@ -18,14 +18,13 @@ # under the License. # Setup -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & +./memcached-start sleep 1 # Test -./mcache-test +./mcache-test 2>/dev/null rc=$? # Cleanup -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` +./memcached-stop return $rc diff --git a/sca-cpp/trunk/components/cache/server-test b/sca-cpp/trunk/components/cache/server-test index 821724295d..4942f547bc 100755 --- a/sca-cpp/trunk/components/cache/server-test +++ b/sca-cpp/trunk/components/cache/server-test @@ -26,18 +26,16 @@ SCAContribution `pwd`/ SCAComposite mcache.composite EOF +./memcached-start ../../modules/http/httpd-start tmp - -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & sleep 2 # Test -./client-test +./client-test 2>/dev/null rc=$? # Cleanup -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` ../../modules/http/httpd-stop tmp +./memcached-stop sleep 2 return $rc diff --git a/sca-cpp/trunk/components/chat/Makefile.am b/sca-cpp/trunk/components/chat/Makefile.am index ea63a475e2..11e3179e8a 100644 --- a/sca-cpp/trunk/components/chat/Makefile.am +++ b/sca-cpp/trunk/components/chat/Makefile.am @@ -17,15 +17,20 @@ if WANT_CHAT -INCLUDES = -I${LIBSTROPHE_INCLUDE} +noinst_PROGRAMS = xmpp-test client-test + +INCLUDES = -I${LIBSTROPHE_INCLUDE} -I${LIBSTROPHE_INCLUDE}/src compdir=$(prefix)/components/chat -comp_LTLIBRARIES = libchat-sender.la libchat-listener.la +comp_LTLIBRARIES = libchatter.la + +libchatter_la_SOURCES = chatter.cpp +libchatter_la_LDFLAGS = -L${LIBSTROPHE_LIB} -R${LIBSTROPHE_LIB} -lstrophe -lexpat -lssl -lresolv -libchat_sender_la_SOURCES = chat-sender.cpp -libchat_sender_la_LDFLAGS = -L${LIBSTROPHE_LIB} -R${LIBSTROPHE_LIB} -lstrophe -lexpat -lssl -lresolv +xmpp_test_SOURCES = xmpp-test.cpp +xmpp_test_LDFLAGS = -L${LIBSTROPHE_LIB} -R${LIBSTROPHE_LIB} -lstrophe -lexpat -lssl -lresolv -libchat_listener_la_SOURCES = chat-listener.cpp -libchat_listener_la_LDFLAGS = -L${LIBSTROPHE_LIB} -R${LIBSTROPHE_LIB} -lstrophe -lexpat -lssl -lresolv +client_test_SOURCES = client-test.cpp +client_test_LDFLAGS = -lxml2 -lcurl -lmozjs -L${LIBSTROPHE_LIB} -R${LIBSTROPHE_LIB} -lstrophe -lexpat -lssl -lresolv endif diff --git a/sca-cpp/trunk/components/chat/chat-listener.cpp b/sca-cpp/trunk/components/chat/chat-listener.cpp deleted file mode 100644 index 884965839a..0000000000 --- a/sca-cpp/trunk/components/chat/chat-listener.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* $Rev$ $Date$ */ - -/** - * XMPP chat listener component implementation. - */ - -#include "string.hpp" -#include "function.hpp" -#include "list.hpp" -#include "value.hpp" -#include "monad.hpp" -#include "xmpp.hpp" - -namespace tuscany { -namespace chat { - -/** - * Initialize the component. - */ -const failable start(unused const list& params) { - //TODO establish a session with an XMPP server for a JID and mark the current server instance busy - - return value(true); -} - -} -} - -extern "C" { - -const tuscany::value apply(const tuscany::list& params) { - const tuscany::value func(car(params)); - if (func == "start") - return tuscany::chat::start(cdr(params)); - return tuscany::mkfailure(); -} - -} diff --git a/sca-cpp/trunk/components/chat/chat-sender.cpp b/sca-cpp/trunk/components/chat/chat-sender.cpp deleted file mode 100644 index bd67bf7315..0000000000 --- a/sca-cpp/trunk/components/chat/chat-sender.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* $Rev$ $Date$ */ - -/** - * XMPP chat sender component implementation. - */ - -#include "string.hpp" -#include "function.hpp" -#include "list.hpp" -#include "value.hpp" -#include "monad.hpp" -#include "xmpp.hpp" - -namespace tuscany { -namespace chat { - -/** - * Post an item to an XMPP JID. - */ -const failable post(unused const list& params) { - - //TODO post the item - - return value(true); -} - -} -} - -extern "C" { - -const tuscany::value apply(const tuscany::list& params) { - const tuscany::value func(car(params)); - if (func == "post") - return tuscany::chat::post(cdr(params)); - return tuscany::mkfailure(); -} - -} diff --git a/sca-cpp/trunk/components/chat/chat.composite b/sca-cpp/trunk/components/chat/chat.composite index 2c19e2d0de..f02eed1418 100644 --- a/sca-cpp/trunk/components/chat/chat.composite +++ b/sca-cpp/trunk/components/chat/chat.composite @@ -22,17 +22,22 @@ targetNamespace="http://tuscany.apache.org/xmlns/sca/components" name="chat"> - - - sample@apache.org - + + + sca1@localhost + sca1 + - - - sample@apache.org + + + sca2@localhost + sca2 + + + @@ -41,6 +46,7 @@ + diff --git a/sca-cpp/trunk/components/chat/chatter.cpp b/sca-cpp/trunk/components/chat/chatter.cpp new file mode 100644 index 0000000000..5a6d4909df --- /dev/null +++ b/sca-cpp/trunk/components/chat/chatter.cpp @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +/** + * XMPP chatter component implementation. + */ + +#include "string.hpp" +#include "function.hpp" +#include "list.hpp" +#include "value.hpp" +#include "monad.hpp" +#include "parallel.hpp" +#include "xmpp.hpp" + +namespace tuscany { +namespace chat { + +/** + * Post an item to an XMPP JID. + */ +const failable post(const list& params, XMPPClient& xc) { + const value to = car(car(params)); + const value val = cadr(params); + debug(to, "chat::post::jid"); + debug(val, "chat::post::value"); + const failable r = post(to, val, xc); + if (!hasContent(r)) + return mkfailure(reason(r)); + return value(mklist(to)); +} + +/** + * A relay function that posts the XMPP messages it receives to a relay component reference. + */ +class relay { +public: + relay(const lambda&)>& rel) : rel(rel) { + } + + const failable operator()(const value& jid, const value& val, unused XMPPClient& xc) const { + if (isNil(rel)) + return true; + debug(jid, "chat::relay::jid"); + debug(val, "chat::relay::value"); + const value res = rel(mklist("post", mklist(jid), val)); + return true; + } + +private: + const lambda&)> rel; +}; + +/** + * Subscribe and listen to an XMPP session. + */ +class subscribe { +public: + subscribe(const lambda(const value&, const value&, XMPPClient&)>& l, XMPPClient& xc) : l(l), xc(xc) { + } + + const failable operator()() const { + gc_pool pool; + debug("chat::subscribe::listen"); + const failable r = listen(l, const_cast(xc)); + debug("chat::subscribe::stopped"); + return r; + } + +private: + const lambda(const value&, const value&, XMPPClient&)> l; + XMPPClient xc; +}; + +/** + * Chatter component lambda function + */ +class chatter { +public: + chatter(XMPPClient& xc, worker& w) : xc(xc), w(w) { + } + + const value operator()(const list& params) const { + const tuscany::value func(car(params)); + if (func == "post") + return post(cdr(params), const_cast(xc)); + + // Stop the chatter component + if (func != "stop") + return tuscany::mkfailure(); + debug("chat::chatter::stop"); + + // Disconnect and shutdown the worker thread + disconnect(const_cast(xc)); + shutdown(const_cast(w)); + debug("chat::chatter::stopped"); + + return failable(value(lambda&)>())); + } + +private: + const XMPPClient xc; + worker w; +}; + +/** + * Start the component. + */ +const failable start(const list& params) { + // Extract the relay reference and the XMPP JID and password + const bool hasRelay = !isNil(cddr(params)); + const value rel = hasRelay? car(params) : value(lambda&)>()); + const list props = hasRelay? cdr(params) : params; + const value jid = ((lambda)>)car(props))(list()); + const value pass = ((lambda)>)cadr(props))(list()); + + // Create an XMPP client session + XMPPClient xc(jid, pass, false); + const failable r = connect(xc); + if (!hasContent(r)) + return mkfailure(reason(r)); + + // Listen and relay messages in a worker thread + worker w(3); + const lambda(const value&, const value&, XMPPClient&)> rl = relay(rel); + submit >(w, lambda()>(subscribe(rl, xc))); + + // Return the chatter component lambda function + return value(lambda&)>(chatter(xc, w))); +} + +} +} + +extern "C" { + +const tuscany::value apply(const tuscany::list& params) { + const tuscany::value func(car(params)); + if (func == "start" || func == "restart") + return tuscany::chat::start(cdr(params)); + return tuscany::mkfailure(); +} + +} diff --git a/sca-cpp/trunk/components/chat/client-test.cpp b/sca-cpp/trunk/components/chat/client-test.cpp new file mode 100644 index 0000000000..f9ca2cabd6 --- /dev/null +++ b/sca-cpp/trunk/components/chat/client-test.cpp @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +/** + * Test chat component. + */ + +#include +#include "stream.hpp" +#include "string.hpp" +#include "list.hpp" +#include "element.hpp" +#include "value.hpp" +#include "monad.hpp" +#include "perf.hpp" +#include "parallel.hpp" +#include "../../modules/http/curl.hpp" +#include "xmpp.hpp" + +namespace tuscany { +namespace chat { + +const value jid1("sca1@localhost"); +const value pass1("sca1"); +const value jid2("sca2@localhost"); +const value pass2("sca2"); +const value jid3("sca3@localhost"); +const value pass3("sca3"); + +const list item = list() + + (list() + "name" + string("Apple")) + + (list() + "price" + string("$2.99")); +const list entry = mklist(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), item); + +worker w(2); +bool received; + +const failable listener(const value& from, const value& val, unused XMPPClient& xc) { + assert(contains(from, "sca2@localhost")); + assert(val == entry); + received = true; + return false; +} + +struct subscribe { + XMPPClient& xc; + subscribe(XMPPClient& xc) : xc(xc) { + } + const failable operator()() const { + const lambda(const value&, const value&, XMPPClient&)> l(listener); + listen(l, xc); + return true; + } +}; + +bool testListen() { + received = false; + XMPPClient& xc = *(new (gc_new()) XMPPClient(jid3, pass3)); + const failable c = connect(xc); + assert(hasContent(c)); + const lambda()> subs = subscribe(xc); + submit(w, subs); + return true; +} + +bool testPost() { + gc_scoped_pool pool; + http::CURLSession ch; + const failable id = http::post(entry, "http://localhost:8090/print-sender/sca2@localhost", ch); + assert(hasContent(id)); + return true; +} + +bool testReceived() { + shutdown(w); + assert(received == true); + return true; +} + +} +} + +int main() { + tuscany::cout << "Testing..." << tuscany::endl; + + tuscany::chat::testListen(); + tuscany::chat::testPost(); + tuscany::chat::testReceived(); + + tuscany::cout << "OK" << tuscany::endl; + + return 0; +} diff --git a/sca-cpp/trunk/components/chat/server-test b/sca-cpp/trunk/components/chat/server-test new file mode 100755 index 0000000000..919a798fa5 --- /dev/null +++ b/sca-cpp/trunk/components/chat/server-test @@ -0,0 +1,39 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Setup +../../modules/http/httpd-conf tmp 8090 ../../modules/http/htdocs +../../modules/server/server-conf tmp +../../modules/server/scheme-conf tmp +cat >>tmp/conf/httpd.conf </dev/null +rc=$? + +# Cleanup +../../modules/http/httpd-stop tmp +sleep 1 +return $rc diff --git a/sca-cpp/trunk/components/chat/server-test.scm b/sca-cpp/trunk/components/chat/server-test.scm index 17a42ed795..a6023708e1 100644 --- a/sca-cpp/trunk/components/chat/server-test.scm +++ b/sca-cpp/trunk/components/chat/server-test.scm @@ -17,4 +17,4 @@ ; Chat test case -(define (print x) (display x)) +(define (post key val report) (report "post" '("sca3@localhost") val)) diff --git a/sca-cpp/trunk/components/chat/xmpp-test.cpp b/sca-cpp/trunk/components/chat/xmpp-test.cpp new file mode 100644 index 0000000000..6b7fa3439f --- /dev/null +++ b/sca-cpp/trunk/components/chat/xmpp-test.cpp @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +/** + * Test XMPP support functions. + */ + +#include +#include "stream.hpp" +#include "string.hpp" +#include "list.hpp" +#include "element.hpp" +#include "monad.hpp" +#include "value.hpp" +#include "perf.hpp" +#include "parallel.hpp" +#include "xmpp.hpp" + +namespace tuscany { +namespace chat { + +const value jid1("sca1@localhost"); +const value pass1("sca1"); +const value jid2("sca2@localhost"); +const value pass2("sca2"); + +worker w(2); +bool received; + +const failable listener(const value& from, const value& val, unused XMPPClient& xc) { + assert(contains(from, "sca1@localhost")); + assert(val == "hey"); + received = true; + return false; +} + +struct subscribe { + XMPPClient& xc; + subscribe(XMPPClient& xc) : xc(xc) { + } + const failable operator()() const { + const lambda(const value&, const value&, XMPPClient&)> l(listener); + listen(l, xc); + return true; + } +}; + +bool testListen() { + received = false; + XMPPClient& xc = *(new (gc_new()) XMPPClient(jid2, pass2)); + const failable c = connect(xc); + assert(hasContent(c)); + const lambda()> subs = subscribe(xc); + submit(w, subs); + return true; +} + +bool testPost() { + XMPPClient xc(jid1, pass1); + const failable c = connect(xc); + assert(hasContent(c)); + const failable p = post(jid2, "hey", xc); + assert(hasContent(p)); + return true; +} + +bool testReceived() { + shutdown(w); + assert(received == true); + return true; +} + +} +} + +int main() { + tuscany::cout << "Testing..." << tuscany::endl; + + tuscany::chat::testListen(); + tuscany::chat::testPost(); + tuscany::chat::testReceived(); + + tuscany::cout << "OK" << tuscany::endl; + return 0; +} diff --git a/sca-cpp/trunk/components/chat/xmpp.hpp b/sca-cpp/trunk/components/chat/xmpp.hpp index b9508f0a53..0f84d5ec4a 100644 --- a/sca-cpp/trunk/components/chat/xmpp.hpp +++ b/sca-cpp/trunk/components/chat/xmpp.hpp @@ -26,11 +26,303 @@ * XMPP support functions. */ +#include + #include "strophe.h" +extern "C" { +#include "common.h" +} +#include "string.hpp" +#include "list.hpp" +#include "value.hpp" +#include "monad.hpp" +#include "../../modules/scheme/eval.hpp" namespace tuscany { namespace chat { +/** + * XMPP runtime, one per process. + */ +class XMPPRuntime { +public: + XMPPRuntime() { + xmpp_initialize(); + log = xmpp_get_default_logger(XMPP_LEVEL_DEBUG); + } + + ~XMPPRuntime() { + xmpp_shutdown(); + } + +private: + friend class XMPPClient; + xmpp_log_t* log; + +} xmppRuntime; + +/** + * Represents an XMPP client. + */ +const string resourceUUID() { + apr_uuid_t uuid; + apr_uuid_get(&uuid); + char buf[APR_UUID_FORMATTED_LENGTH]; + apr_uuid_format(buf, &uuid); + return string(buf, APR_UUID_FORMATTED_LENGTH); +} + +class XMPPClient { +public: + XMPPClient(const string& jid, const string& pass, bool owner = true) : owner(owner), ctx(xmpp_ctx_new(NULL, xmppRuntime.log)), conn(xmpp_conn_new(ctx)), connecting(false), connected(false), disconnecting(false) { + xmpp_conn_set_jid(conn, c_str(jid + "/" + resourceUUID())); + xmpp_conn_set_pass(conn, c_str(pass)); + } + + XMPPClient(const XMPPClient& xc) : owner(false), ctx(xc.ctx), conn(xc.conn), listener(xc.listener), connecting(xc.connecting), connected(xc.connected), disconnecting(xc.disconnecting) { + } + + ~XMPPClient() { + extern const failable disconnect(XMPPClient& xc); + if (!owner) + return; + if (!disconnecting) + disconnect(*this); + xmpp_conn_release(conn); + xmpp_ctx_free(ctx); + } + +private: + friend int versionHandler(xmpp_conn_t* const conn, xmpp_stanza_t* const stanza, void* const udata); + friend void connHandler(xmpp_conn_t * const conn, const xmpp_conn_event_t status, const int err, xmpp_stream_error_t* const errstream, void *const udata); + friend int messageHandler(xmpp_conn_t* const conn, xmpp_stanza_t* const stanza, void* const udata); + friend const failable connect(XMPPClient& xc); + friend const failable send(const char* data, const int len, XMPPClient& xc); + friend const failable send(xmpp_stanza_t* const stanza, XMPPClient& xc); + friend const failable post(const value& to, const value& val, XMPPClient& xc); + friend const failable disconnect(XMPPClient& xc); + friend const failable listen(const lambda(const value&, const value&, XMPPClient&)>& listener, XMPPClient& xc); + + const bool owner; + xmpp_ctx_t* ctx; + xmpp_conn_t* conn; + lambda(const value&, const value&, XMPPClient&)> listener; + bool connecting; + bool connected; + bool disconnecting; +}; + +/** + * Make a text stanza. + */ +xmpp_stanza_t* textStanza(const char* text, xmpp_ctx_t* ctx) { + xmpp_stanza_t* stanza = xmpp_stanza_new(ctx); + xmpp_stanza_set_text(stanza, text); + return stanza; +} + +/** + * Make a named stanza. + */ +xmpp_stanza_t* namedStanza(const char* ns, const char* name, xmpp_ctx_t* ctx) { + xmpp_stanza_t* stanza = xmpp_stanza_new(ctx); + xmpp_stanza_set_name(stanza, name); + if (ns != NULL) + xmpp_stanza_set_ns(stanza, ns); + return stanza; +} + +/** + * Make a named stanza using a qualified name. + */ +xmpp_stanza_t* namedStanza(const char* name, xmpp_ctx_t* ctx) { + xmpp_stanza_t* stanza = xmpp_stanza_new(ctx); + xmpp_stanza_set_name(stanza, name); + return stanza; +} + +/** + * XMPP version handler. + */ +int versionHandler(xmpp_conn_t* const conn, xmpp_stanza_t* const stanza, void* const udata) { + XMPPClient& xc = *(XMPPClient*)udata; + + // Build version reply stanza + xmpp_stanza_t* reply = namedStanza("iq", xc.ctx); + xmpp_stanza_set_type(reply, "result"); + xmpp_stanza_set_id(reply, xmpp_stanza_get_id(stanza)); + xmpp_stanza_set_attribute(reply, "to", xmpp_stanza_get_attribute(stanza, "from")); + xmpp_stanza_t* query = namedStanza(xmpp_stanza_get_ns(xmpp_stanza_get_children(stanza)), "query", xc.ctx); + xmpp_stanza_add_child(reply, query); + xmpp_stanza_t* name = namedStanza("name", xc.ctx); + xmpp_stanza_add_child(query, name); + xmpp_stanza_add_child(name, textStanza("Apache Tuscany", xc.ctx)); + xmpp_stanza_t* version = namedStanza("version", xc.ctx); + xmpp_stanza_add_child(query, version); + xmpp_stanza_add_child(version, textStanza("1.0", xc.ctx)); + + // Send it + xmpp_send(conn, reply); + xmpp_stanza_release(reply); + return 1; +} + +/** + * XMPP message handler + */ +int messageHandler(unused xmpp_conn_t* const conn, xmpp_stanza_t* const stanza, void* const udata) { + // Ignore noise + if(xmpp_stanza_get_child_by_name(stanza, "body") == NULL) + return 1; + if(!strcmp(xmpp_stanza_get_attribute(stanza, "type"), "error")) + return 1; + + // Call the client listener function + XMPPClient& xc = *(XMPPClient*)udata; + const char* from = xmpp_stanza_get_attribute(stanza, "from"); + const char* text = xmpp_stanza_get_text(xmpp_stanza_get_child_by_name(stanza, "body")); + if (isNil(xc.listener)) + return 1; + const value val(scheme::readValue(text)); + debug(from, "chat::messageHandler::from"); + debug(val, "chat::messageHandler::body"); + const failable r = xc.listener(value(string(from)), val, xc); + if (!hasContent(r) || !content(r)) { + // Stop listening + xc.listener = lambda(const value&, const value&, XMPPClient&)>(); + return 0; + } + return 1; +} + +/** + * XMPP connection handler. + */ +void connHandler(xmpp_conn_t * const conn, const xmpp_conn_event_t status, unused const int err, unused xmpp_stream_error_t* const errstream, void *const udata) { + XMPPClient& xc = *(XMPPClient*)udata; + xc.connecting = false; + + if (status == XMPP_CONN_CONNECT) { + debug("chat::connHandler::connected"); + xmpp_handler_add(conn, versionHandler, "jabber:iq:version", "iq", NULL, &xc); + + // Send a stanza so that we appear online to contacts + xmpp_stanza_t* pres = xmpp_stanza_new(xc.ctx); + xmpp_stanza_set_name(pres, "presence"); + xmpp_send(conn, pres); + xmpp_stanza_release(pres); + xc.connected = true; + return; + } + + debug("chat::connHandler::disconnected"); + xc.connected = false; + if (xc.ctx->loop_status == XMPP_LOOP_RUNNING) + xc.ctx->loop_status = XMPP_LOOP_QUIT; +} + +/** + * Connect to an XMPP server. + */ +const failable connect(XMPPClient& xc) { + xc.connecting = true; + xmpp_connect_client(xc.conn, NULL, 0, connHandler, &xc); + while(xc.connecting) + xmpp_run_once(xc.ctx, 20L); + if (!xc.connected) + return mkfailure("Couldn't connect to XMPP server"); + return true; +} + +/** + * Send a buffer on an XMPP session. + */ +const failable send(const char* data, const int len, XMPPClient& xc) { + if (len == 0) + return 0; + const int written = xc.conn->tls? tls_write(xc.conn->tls, data, len) : sock_write(xc.conn->sock, data, len); + if (written < 0) { + xc.conn->error = xc.conn->tls? tls_error(xc.conn->tls) : sock_error(); + return mkfailure("Couldn't send stanza to XMPP server"); + } + return send(data + written, len - written, xc); +} + +/** + * Send a string on an XMPP session. + */ +const failable send(const string& data, XMPPClient& xc) { + return send(c_str(data), length(data), xc); +} + +/** + * Send a stanza on an XMPP session. + */ +const failable send(xmpp_stanza_t* const stanza, XMPPClient& xc) { + char *buf; + size_t len; + const int rc = xmpp_stanza_to_text(stanza, &buf, &len); + if (rc != 0) + return mkfailure("Couldn't convert stanza to text"); + const failable r = send(buf, len, xc); + if (!hasContent(r)) { + xmpp_free(xc.conn->ctx, buf); + return r; + } + xmpp_debug(xc.conn->ctx, "conn", "SENT: %s", buf); + xmpp_free(xc.conn->ctx, buf); + return content(r); +} + +/** + * Post a message to an XMPP jid. + */ +const failable post(const value& to, const value& val, XMPPClient& xc) { + debug(to, "chat::post::to"); + debug(val, "chat::post::body"); + + // Convert the value to a string + const string vs(scheme::writeValue(val)); + + // Build message stanza + xmpp_stanza_t* stanza = namedStanza("message", xc.ctx); + xmpp_stanza_set_type(stanza, "chat"); + xmpp_stanza_set_attribute(stanza, "to", c_str(string(to))); + xmpp_stanza_t* body = namedStanza("body", xc.ctx); + xmpp_stanza_add_child(stanza, body); + xmpp_stanza_add_child(body, textStanza(c_str(vs), xc.ctx)); + + // Send it + const failable r = send(stanza, xc); + xmpp_stanza_release(stanza); + if (!hasContent(r)) + return mkfailure(reason(r)); + return true; +} + +/** + * Disconnect an XMPP session. + */ +const failable disconnect(XMPPClient& xc) { + xc.disconnecting = true; + const failable r = send("", xc); + if (!hasContent(r)) + return mkfailure(reason(r)); + return true; +} + +/** + * Listen to messages received by an XMPP client. + */ +const failable listen(const lambda(const value&, const value&, XMPPClient&)>& listener, XMPPClient& xc) { + debug("chat::listen"); + xc.listener = listener; + xmpp_handler_add(xc.conn, messageHandler, NULL, "message", NULL, &xc); + while(xc.connected && !isNil(xc.listener)) + xmpp_run_once(xc.ctx, 1000L); + return true; +} + } } diff --git a/sca-cpp/trunk/components/queue/Makefile.am b/sca-cpp/trunk/components/queue/Makefile.am index 6bbdc119b4..09ff0e54a4 100644 --- a/sca-cpp/trunk/components/queue/Makefile.am +++ b/sca-cpp/trunk/components/queue/Makefile.am @@ -35,11 +35,11 @@ qpid_test_SOURCES = qpid-test.cpp qpid_test_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient client_test_SOURCES = client-test.cpp -client_test_LDFLAGS = -lxml2 -lcurl -lmozjs +client_test_LDFLAGS = -lxml2 -lcurl -lmozjs -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient qpidc.prefix: $(top_builddir)/config.status echo ${QPIDC_PREFIX} >qpidc.prefix -#TESTS = qpid-test server-test +TESTS = send-test server-test endif diff --git a/sca-cpp/trunk/components/queue/client-test.cpp b/sca-cpp/trunk/components/queue/client-test.cpp index 121a739e0d..a448d1fccd 100644 --- a/sca-cpp/trunk/components/queue/client-test.cpp +++ b/sca-cpp/trunk/components/queue/client-test.cpp @@ -32,28 +32,54 @@ #include "monad.hpp" #include "perf.hpp" #include "../../modules/http/curl.hpp" +#include "qpid.hpp" + +// Ignore conversion issues and redundant declarations in Qpid headers +#ifdef WANT_MAINTAINER_MODE +#pragma GCC diagnostic ignored "-Wconversion" +#pragma GCC diagnostic ignored "-Wredundant-decls" +#endif namespace tuscany { namespace queue { +const value key(mklist(string("report"))); +const string qname("reportq"); + +const list item = list() + + (list() + "name" + string("Apple")) + + (list() + "price" + string("$2.99")); +const list entry = mklist(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), item); + +bool testDeclareQueue() { + QpidConnection qc; + QpidSession qs(qc); + const failable r = declareQueue(key, qname, qs); + assert(hasContent(r)); + return true; +} + +const bool listener(const value& k, const value& v) { + cerr << "k " << k << " v " << v << endl; + assert(k == key); + assert(v == entry); + return false; +} + +bool testListen() { + QpidConnection qc; + QpidSession qs(qc); + QpidSubscription qsub(qs); + const lambda l(listener); + listen(qname, l, qsub); + return true; +} bool testPost() { - http::CURLSession cs; - - const value func = "http://ws.apache.org/axis2/c/samples/echoString"; - const list arg = mklist( - list() + "ns1:echoString" - + (list() + "@xmlns:ns1" + string("http://ws.apache.org/axis2/services/echo")) - + (list() + "text" + string("Hello World!"))); - - const failable rval = http::evalExpr(mklist(func, arg), "http://localhost:8090/echo-client", cs); - assert(hasContent(rval)); - - const list r = mklist( - list() + "ns1:echoString" - + (list() + "@xmlns:ns1" + string("http://ws.apache.org/axis2/c/samples")) - + (list() + "text" + string("Hello World!"))); - assert(content(rval) == r); + gc_scoped_pool pool; + http::CURLSession ch; + const failable id = http::post(entry, "http://localhost:8090/print-sender", ch); + assert(hasContent(id)); return true; } @@ -63,7 +89,9 @@ bool testPost() { int main() { tuscany::cout << "Testing..." << tuscany::endl; + tuscany::queue::testDeclareQueue(); tuscany::queue::testPost(); + tuscany::queue::testListen(); tuscany::cout << "OK" << tuscany::endl; diff --git a/sca-cpp/trunk/components/queue/qpid-test.cpp b/sca-cpp/trunk/components/queue/qpid-test.cpp index 97d8d2f5d5..1a650157b2 100644 --- a/sca-cpp/trunk/components/queue/qpid-test.cpp +++ b/sca-cpp/trunk/components/queue/qpid-test.cpp @@ -33,20 +33,51 @@ #include "perf.hpp" #include "qpid.hpp" +// Ignore conversion issues and redundant declarations in Qpid headers +#ifdef WANT_MAINTAINER_MODE +#pragma GCC diagnostic ignored "-Wconversion" +#pragma GCC diagnostic ignored "-Wredundant-decls" +#endif + namespace tuscany { namespace queue { -bool testPost() { +const value key(mklist("test")); +const string qname("testq"); + +bool testDeclareQueue() { QpidConnection qc; + QpidSession qs(qc); + const failable r = declareQueue(key, qname, qs); + assert(hasContent(r)); + return true; +} + +const list item = list() + + (list() + "name" + string("Apple")) + + (list() + "price" + string("$2.99")); +const list entry = mklist(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), item); +bool testPost() { + QpidConnection qc; QpidSession qs(qc); + const failable r = post(key, entry, qs); + assert(hasContent(r)); + return true; +} - // Post the item - const list params; - const value key = ((lambda)>)cadr(params))(list()); - post(key, car(params), qs); +const bool listener(const value& k, const value& v) { + assert(k == key); + assert(v == entry); + return false; +} - return value(true); +bool testListen() { + QpidConnection qc; + QpidSession qs(qc); + QpidSubscription qsub(qs); + const lambda l(listener); + listen(qname, l, qsub); return true; } @@ -56,7 +87,9 @@ bool testPost() { int main() { tuscany::cout << "Testing..." << tuscany::endl; + tuscany::queue::testDeclareQueue(); tuscany::queue::testPost(); + tuscany::queue::testListen(); tuscany::cout << "OK" << tuscany::endl; diff --git a/sca-cpp/trunk/components/queue/qpid.hpp b/sca-cpp/trunk/components/queue/qpid.hpp index e6be319be0..8b466cedcc 100644 --- a/sca-cpp/trunk/components/queue/qpid.hpp +++ b/sca-cpp/trunk/components/queue/qpid.hpp @@ -34,6 +34,8 @@ #include #include +#include +#include #include "string.hpp" #include "list.hpp" @@ -53,6 +55,10 @@ public: c.open("localhost", 5672); } + QpidConnection(const bool owner) : owner(owner) { + c.open("localhost", 5672); + } + QpidConnection(const QpidConnection& qc) : owner(false), c(qc.c) { } @@ -63,6 +69,7 @@ public: } private: + friend const failable close(QpidConnection& qc); friend class QpidSession; const bool owner; @@ -70,6 +77,14 @@ private: }; +/** + * Close a Qpid connection. + */ +const failable close(QpidConnection& qc) { + qc.c.close(); + return true; +} + /** * Represents a Qpid session. */ @@ -78,6 +93,9 @@ public: QpidSession(QpidConnection& qc) : owner(true), s(qc.c.newSession()) { } + QpidSession(QpidConnection& qc, const bool owner) : owner(owner), s(qc.c.newSession()) { + } + QpidSession(const QpidSession& qs) : owner(false), s(qs.s) { } @@ -88,38 +106,145 @@ public: } private: - friend qpid::client::Session session(const QpidSession& qs); + friend const failable close(QpidSession& qs); + friend const failable declareQueue(const value& key, const string& name, QpidSession& qs); + friend const failable post(const value& key, const value& val, QpidSession& qs); + friend class QpidSubscription; const bool owner; qpid::client::Session s; }; -qpid::client::Session session(const QpidSession& qs) { - return qs.s; +/** + * Close a Qpid session. + */ +const failable close(QpidSession& qs) { + try { + qs.s.close(); + } catch (const qpid::Exception& e) { + return mkfailure(string("Qpid failure: ") + e.what()); + } + return true; } /** * Declare a key / AMQP queue pair. */ -const failable declareQueue(const string& key, const string& name, QpidSession& qs) { - session(qs).queueDeclare(qpid::client::arg::queue=c_str(name)); - session(qs).exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(key)); +const failable declareQueue(const value& key, const string& name, QpidSession& qs) { + const string ks(scheme::writeValue(key)); + try { + qs.s.queueDeclare(qpid::client::arg::queue=c_str(name)); + qs.s.exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(ks)); + } catch (const qpid::Exception& e) { + return mkfailure(string("Qpid failure: ") + e.what()); + } return true; } /** - * Post a key / value pair in message to an AMQP broker. + * Post a key / value pair message to an AMQP broker. */ -const failable post(const string& key, const value& val, QpidSession& qs) { +const failable post(const value& key, const value& val, QpidSession& qs) { - // Convert the value to a string + // Send in a message with the given key. + const string ks(scheme::writeValue(key)); const string vs(scheme::writeValue(val)); + try { + qpid::client::Message message; + message.getDeliveryProperties().setRoutingKey(c_str(ks)); + message.setData(c_str(vs)); + qs.s.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct"); + } catch (const qpid::Exception& e) { + return mkfailure(string("Qpid failure: ") + e.what()); + } + return true; +} - // Send in a message with the given key. - qpid::client::Message message; - message.getDeliveryProperties().setRoutingKey(c_str(key)); - message.setData(c_str(vs)); - session(qs).messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct"); +/** + * Represents a Qpid subscription. + */ +class QpidSubscription { +public: + QpidSubscription(QpidSession& qs) : owner(true), subs(qs.s) { + } + + QpidSubscription(QpidSession& qs, const bool owner) : owner(owner), subs(qs.s) { + } + + QpidSubscription(const QpidSubscription& qsub) : owner(false), subs(qsub.subs) { + } + + ~QpidSubscription() { + if (!owner) + return; + try { + subs.stop(); + } catch (const qpid::Exception& e) { + mkfailure(string("Qpid failure: ") + e.what()); + } + } + +private: + friend const failable listen(const string& name, const lambda& l, QpidSubscription& qsub); + friend const failable stop(QpidSubscription& qsub); + + const bool owner; + qpid::client::SubscriptionManager subs; +}; + +/** + * Register a listener function with an AMQP queue. + */ +class Listener : public qpid::client::MessageListener { +public: + Listener(const lambda l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) { + } + + virtual void received(qpid::client::Message& msg) { + + // Call the listener function + const value k(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str())); + const value v(scheme::readValue(msg.getData().c_str())); + const bool r = l(k, v); + if (!r) { + try { + subs.cancel(msg.getDestination()); + } catch (const qpid::Exception& e) { + mkfailure(string("Qpid failure: ") + e.what()); + } + } + } + +private: + const lambda l; + qpid::client::SubscriptionManager& subs; +}; + + +const failable listen(const string& name, const lambda& l, QpidSubscription& qsub) { + debug("queue::listen"); + Listener listener(l, qsub.subs); + try { + qsub.subs.subscribe(listener, c_str(name)); + qsub.subs.run(); + } catch (const qpid::Exception& e) { + return mkfailure(string("Qpid failure: ") + e.what()); + } + debug("queue::listen::stopped"); + return true; +} + +/** + * Stop an AMQP subscription. + */ +const failable stop(QpidSubscription& qsub) { + debug("queue::stop"); + try { + qsub.subs.stop(); + } catch (const qpid::Exception& e) { + return mkfailure(string("Qpid failure: ") + e.what()); + } + debug("queue::stopped"); return true; } diff --git a/sca-cpp/trunk/components/queue/qpidd-start b/sca-cpp/trunk/components/queue/qpidd-start new file mode 100755 index 0000000000..02e048c41e --- /dev/null +++ b/sca-cpp/trunk/components/queue/qpidd-start @@ -0,0 +1,24 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Start qpidd +here=`readlink -f $0`; here=`dirname $here` + +qpid_prefix=`cat $here/qpidc.prefix` +$qpid_prefix/sbin/qpidd & diff --git a/sca-cpp/trunk/components/queue/qpidd-stop b/sca-cpp/trunk/components/queue/qpidd-stop new file mode 100755 index 0000000000..6fb0467cff --- /dev/null +++ b/sca-cpp/trunk/components/queue/qpidd-stop @@ -0,0 +1,26 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Stop qpidd +here=`readlink -f $0`; here=`dirname $here` + +qpid_prefix=`cat $here/qpidc.prefix` +qpidd="$qpid_prefix/sbin/qpidd" + +kill `ps -f | grep -v grep | grep "${qpidd}" | awk '{ print $2 }'` diff --git a/sca-cpp/trunk/components/queue/queue-listener.cpp b/sca-cpp/trunk/components/queue/queue-listener.cpp index 75c53e7b41..57a63b620c 100644 --- a/sca-cpp/trunk/components/queue/queue-listener.cpp +++ b/sca-cpp/trunk/components/queue/queue-listener.cpp @@ -28,6 +28,7 @@ #include "list.hpp" #include "value.hpp" #include "monad.hpp" +#include "parallel.hpp" #include "qpid.hpp" // Ignore conversion issues and redundant declarations in Qpid headers @@ -39,22 +40,107 @@ namespace tuscany { namespace queue { -QpidConnection qc; +/** + * A relay function that posts the AMQP messages it receives to a relay component reference. + */ +class relay { +public: + relay(const lambda&)>& rel) : rel(rel) { + } + + const bool operator()(const value& k, const value& v) const { + debug(k, "queue::relay::key"); + debug(v, "queue::relay::value"); + const value res = rel(mklist("post", isList(k)? (list)k : mklist(k), v)); + return true; + } + +private: + const lambda&)> rel; +}; + +/** + * Subscribe and listen to an AMQP queue. + */ +class subscribe { +public: + subscribe(const string& qname, const lambda& l, const QpidSubscription& qsub) : qname(qname), l(l), qsub(qsub) { + } + + const failable operator()() const { + gc_pool pool; + debug(qname, "queue::subscribe::listen"); + const failable r = listen(qname, l, const_cast(qsub)); + debug(qname, "queue::subscribe::stopped"); + return r; + } + +private: + const string qname; + const lambda l; + const QpidSubscription qsub; +}; /** - * Initialize the component. + * Listener lambda function, responsible for starting an AMQP subscription in a worker thread, and + * apply any function calls to the listener component. The only supported function is stop(), + * called to stop the listener component and shutdown the worker thread. + */ +class listener { +public: + listener(QpidConnection& qc, QpidSession& qs, QpidSubscription& qsub, worker& w) : qc(qc), qs(qs), qsub(qsub), w(w) { + } + + const value operator()(const list& params) const { + const tuscany::value func(car(params)); + + // Stop the component + if (func != "stop") + return tuscany::mkfailure(); + debug("queue::listener::stop"); + + // TODO check why stop() and close() hang in child processes + //stop(const_cast(qsub)); + //close(const_cast(qs)); + //close(const_cast(qc)); + cancel(const_cast(w)); + + debug("queue::listener::stopped"); + return failable(value(lambda&)>())); + } + +private: + QpidConnection qc; + QpidSession qs; + QpidSubscription qsub; + worker w; +}; + +/** + * Start the component. */ const failable start(const list& params) { - QpidSession qs(qc); + // Extract the relay reference and the AMQP key and queue name + const value rel = car(params); + const value pk = ((lambda)>)cadr(params))(list()); + const value key = isList(pk)? (list)pk : mklist(pk); + const value qname = ((lambda)>)caddr(params))(list()); + + // Create an AMQP session + QpidConnection qc(false); + QpidSession qs(qc, false); // Declare the configured AMQP key / queue pair - const value key = ((lambda)>)caddr(params))(list()); - const value qname = ((lambda)>)cadddr(params))(list()); declareQueue(key, qname, qs); - //TODO create a subscription and mark the current server instance busy + // Listen and relay messages in a worker thread + QpidSubscription qsub(qs, false); + worker w(3); + const lambda rl = relay(rel); + submit >(w, lambda()>(subscribe(qname, rl, qsub))); - return value(true); + // Return the listener component lambda function + return value(lambda&)>(listener(qc, qs, qsub, w))); } } @@ -64,7 +150,7 @@ extern "C" { const tuscany::value apply(const tuscany::list& params) { const tuscany::value func(car(params)); - if (func == "start") + if (func == "start" || func == "restart") return tuscany::queue::start(cdr(params)); return tuscany::mkfailure(); } diff --git a/sca-cpp/trunk/components/queue/queue-sender.cpp b/sca-cpp/trunk/components/queue/queue-sender.cpp index 15128411c2..07f8491f54 100644 --- a/sca-cpp/trunk/components/queue/queue-sender.cpp +++ b/sca-cpp/trunk/components/queue/queue-sender.cpp @@ -39,19 +39,22 @@ namespace tuscany { namespace queue { -QpidConnection qc; - /** * Post an item to a queue. */ const failable post(const list& params) { + QpidConnection qc; QpidSession qs(qc); // Post the item - const value key = ((lambda)>)cadr(params))(list()); - post(key, car(params), qs); - - return value(true); + const value pk = ((lambda)>)caddr(params))(list()); + const value key = isList(pk)? append(pk, (list)car(params)) : cons(pk, (list)car(params)); + debug(key, "queue::post::key"); + debug(cadr(params), "queue::post::value"); + const failable r = post(key, cadr(params), qs); + if (!hasContent(r)) + return mkfailure(reason(r)); + return key; } } diff --git a/sca-cpp/trunk/components/queue/queue.composite b/sca-cpp/trunk/components/queue/queue.composite index b908481bad..535680c6c3 100644 --- a/sca-cpp/trunk/components/queue/queue.composite +++ b/sca-cpp/trunk/components/queue/queue.composite @@ -22,15 +22,15 @@ targetNamespace="http://tuscany.apache.org/xmlns/sca/components" name="queue"> - + print - + - + print printq @@ -42,6 +42,15 @@ + + + + + + report + + + diff --git a/sca-cpp/trunk/components/queue/send-test b/sca-cpp/trunk/components/queue/send-test new file mode 100755 index 0000000000..ec6d9d9083 --- /dev/null +++ b/sca-cpp/trunk/components/queue/send-test @@ -0,0 +1,31 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Setup +./qpidd-start +sleep 1 + +# Test +./qpid-test 2>/dev/null +rc=$? + +# Cleanup +./qpidd-stop +sleep 1 +return $rc diff --git a/sca-cpp/trunk/components/queue/server-test b/sca-cpp/trunk/components/queue/server-test new file mode 100755 index 0000000000..3fc94e6f35 --- /dev/null +++ b/sca-cpp/trunk/components/queue/server-test @@ -0,0 +1,43 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Setup +../../modules/http/httpd-conf tmp 8090 ../../modules/http/htdocs +../../modules/server/server-conf tmp +../../modules/server/scheme-conf tmp +cat >>tmp/conf/httpd.conf </dev/null +rc=$? + +# Cleanup +../../modules/http/httpd-stop tmp +sleep 1 +./qpidd-stop +sleep 1 +return $rc diff --git a/sca-cpp/trunk/components/queue/server-test.scm b/sca-cpp/trunk/components/queue/server-test.scm index 16c4d95dc8..1a89ce8b31 100644 --- a/sca-cpp/trunk/components/queue/server-test.scm +++ b/sca-cpp/trunk/components/queue/server-test.scm @@ -17,4 +17,4 @@ ; Queue test case -(define (print x) (display x)) +(define (post key val report) (report "post" '() val)) diff --git a/sca-cpp/trunk/components/webservice/echo-test b/sca-cpp/trunk/components/webservice/echo-test index 5e7a380521..e5cd1d9a9d 100755 --- a/sca-cpp/trunk/components/webservice/echo-test +++ b/sca-cpp/trunk/components/webservice/echo-test @@ -28,7 +28,7 @@ cd $pwd sleep 1 # Test -./axis2-test +./axis2-test 2>/dev/null rc=$? # Cleanup diff --git a/sca-cpp/trunk/components/webservice/server-test b/sca-cpp/trunk/components/webservice/server-test index 7d33223b3d..8311b94220 100755 --- a/sca-cpp/trunk/components/webservice/server-test +++ b/sca-cpp/trunk/components/webservice/server-test @@ -39,7 +39,7 @@ cd $pwd sleep 2 # Test -./client-test +./client-test 2>/dev/null rc=$? # Cleanup diff --git a/sca-cpp/trunk/components/webservice/webservice-client.cpp b/sca-cpp/trunk/components/webservice/webservice-client.cpp index ae4e472e65..cb756c015e 100644 --- a/sca-cpp/trunk/components/webservice/webservice-client.cpp +++ b/sca-cpp/trunk/components/webservice/webservice-client.cpp @@ -57,7 +57,7 @@ extern "C" { const tuscany::value apply(const tuscany::list& params) { const tuscany::value func(car(params)); - if (func == "start" || func == "stop" || func == "restart") + if (func == "start" || func == "restart") return tuscany::mkfailure(); return tuscany::webservice::apply(func, cdr(params)); } diff --git a/sca-cpp/trunk/configure.ac b/sca-cpp/trunk/configure.ac index 1ac3a9e5bd..19f7e19080 100644 --- a/sca-cpp/trunk/configure.ac +++ b/sca-cpp/trunk/configure.ac @@ -341,8 +341,8 @@ AC_ARG_ENABLE(java, [AS_HELP_STRING([--enable-java], [enable Java support [defau esac ], [ AC_MSG_RESULT(no)]) if test "${want_java}" = "true"; then - LIBS="-L${JAVA_LIB} ${default_LIBS}" - #AC_CHECK_LIB([java], [JNI_OnLoad], [], [AC_MSG_ERROR([couldn't find a suitable libjava, use --with-java=PATH])]) + LIBS="-L${JAVA_LIB} -L${JAVA_LIB}/server ${default_LIBS}" + AC_CHECK_LIB([java], [JNI_CreateJavaVM], [], [AC_MSG_ERROR([couldn't find a suitable libjava, use --with-java=PATH])], [-ljvm -lverify]) AC_PROG_JAVAC AC_PROG_JAR AM_CONDITIONAL([WANT_JAVA], true) @@ -441,8 +441,11 @@ AC_ARG_ENABLE(queue, [AS_HELP_STRING([--enable-queue], [enable Queue component [ esac ], [ AC_MSG_RESULT(no)]) if test "${want_queue}" = "true"; then + if test "${want_threads}" != "true"; then + AC_MSG_ERROR([--enable-queue requires multi-threading, use --enable-threads]) + fi LIBS="-L${QPIDC_LIB} ${default_LIBS}" - #AC_CHECK_LIB([qpidclient], [], [], [AC_MSG_ERROR([couldn't find a suitable libqpidclient, use --with-qpidc=PATH])]) + AC_CHECK_LIB([qpidclient], [_init], [], [AC_MSG_ERROR([couldn't find a suitable libqpidclient, use --with-qpidc=PATH])]) AM_CONDITIONAL([WANT_QUEUE], true) AC_DEFINE([WANT_QUEUE], 1, [enable Queue component]) else @@ -451,7 +454,7 @@ fi # Configure path to Libstrophe includes and lib. AC_MSG_CHECKING([for libstrophe]) -AC_ARG_WITH([libstrophe], [AC_HELP_STRING([--with-libstrophe=PATH], [path to libstrophe source build [default=${HOME}/libstrophe]])], [ +AC_ARG_WITH([libstrophe], [AC_HELP_STRING([--with-libstrophe=PATH], [path to libstrophe source [default=${HOME}/libstrophe]])], [ LIBSTROPHE_INCLUDE="${withval}" LIBSTROPHE_LIB="${withval}" AC_MSG_RESULT("${withval}") @@ -477,8 +480,11 @@ AC_ARG_ENABLE(chat, [AS_HELP_STRING([--enable-chat], [enable Chat component [def esac ], [ AC_MSG_RESULT(no)]) if test "${want_chat}" = "true"; then + if test "${want_threads}" != "true"; then + AC_MSG_ERROR([--enable-chat requires multi-threading, use --enable-threads]) + fi LIBS="-L${LIBSTROPHE_LIB} ${default_LIBS}" - #AC_CHECK_LIB([strophe], [xmpp_initialize], [], [AC_MSG_ERROR([couldn't find a suitable libstrophe, use --with-libstrophe=PATH])], [-lexpat -lssl -lresolv]) + AC_CHECK_LIB([strophe], [xmpp_initialize], [], [AC_MSG_ERROR([couldn't find a suitable libstrophe, use --with-libstrophe=PATH])], [-lexpat -lssl -lresolv]) AM_CONDITIONAL([WANT_CHAT], true) AC_DEFINE([WANT_CHAT], 1, [enable Chat component]) else diff --git a/sca-cpp/trunk/etc/git-exclude b/sca-cpp/trunk/etc/git-exclude index 29c58979a8..8aa4146c56 100644 --- a/sca-cpp/trunk/etc/git-exclude +++ b/sca-cpp/trunk/etc/git-exclude @@ -20,6 +20,7 @@ surefire*.properties .settings/ .deployables/ .wtpmodules/ +.pydevproject .svn/ .cvs/ .dotest/ @@ -85,4 +86,5 @@ script-test axiom-test axis2-test qpid-test +xmpp-test diff --git a/sca-cpp/trunk/kernel/element.hpp b/sca-cpp/trunk/kernel/element.hpp index 0d14acc4a3..c6aa2c44eb 100644 --- a/sca-cpp/trunk/kernel/element.hpp +++ b/sca-cpp/trunk/kernel/element.hpp @@ -193,7 +193,7 @@ const value valueToElement(const value& t) { const list valuesToElements(const list& l); // Convert a name value pair - if (isList(t) && isSymbol(car(t))) { + if (isList(t) && !isNil((list)t) && isSymbol(car(t))) { const value n = car(t); const value v = cadr(t); diff --git a/sca-cpp/trunk/kernel/parallel.hpp b/sca-cpp/trunk/kernel/parallel.hpp index 9fb6c59ba7..09cf0df9a3 100644 --- a/sca-cpp/trunk/kernel/parallel.hpp +++ b/sca-cpp/trunk/kernel/parallel.hpp @@ -59,9 +59,12 @@ private: pthread_cond_init(&valueCond, NULL); } + futureValue(const futureValue& fv) : valueMutex(fv.valueMutex), valueCond(fv.valueCond), hasValue(fv.hasValue), value(fv.value) { + } + ~futureValue() { - pthread_mutex_destroy(&valueMutex); - pthread_cond_destroy(&valueCond); + //pthread_mutex_destroy(&valueMutex); + //pthread_cond_destroy(&valueCond); } bool set(const T& v) { @@ -129,18 +132,21 @@ public: /** * A bounded thread safe queue. */ -template class queue { +template class wqueue { public: - queue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew(max)) T[max]) { + wqueue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew(max)) T[max]) { pthread_mutex_init(&mutex, NULL); pthread_cond_init(&full, NULL); pthread_cond_init(&empty, NULL); } - ~queue() { - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&full); - pthread_cond_destroy(&empty); + wqueue(const wqueue& wq) : max(wq.max), size(wq.size), tail(wq.tail), head(wq.head), mutex(wq.mutex), full(wq.full), empty(wq.empty), values(wq.values) { + } + + ~wqueue() { + //pthread_mutex_destroy(&mutex); + //pthread_cond_destroy(&full); + //pthread_cond_destroy(&empty); } private: @@ -153,14 +159,14 @@ private: pthread_cond_t empty; gc_ptr values; - template friend const int enqueue(queue& q, const X& v); - template friend const X dequeue(queue& q); + template friend const int enqueue(wqueue& q, const X& v); + template friend const X dequeue(wqueue& q); }; /** * Adds an element to the tail of the queue. */ -template const int enqueue(queue&q, const T& v) { +template const int enqueue(wqueue&q, const T& v) { pthread_mutex_lock(&q.mutex); while(q.size == q.max) pthread_cond_wait(&q.full, &q.mutex); @@ -175,7 +181,7 @@ template const int enqueue(queue&q, const T& v) { /** * Returns the element at the head of the queue. */ -template const T dequeue(queue& q) { +template const T dequeue(wqueue& q) { pthread_mutex_lock(&q.mutex); while(q.size == 0) pthread_cond_wait(&q.empty, &q.mutex); @@ -191,7 +197,12 @@ template const T dequeue(queue& q) { * The worker thread function. */ void *workerThreadFunc(void *arg) { - queue >* work = reinterpret_cast >*>(arg); + int ost; + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ost); + int ot; + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ot); + + wqueue >* work = reinterpret_cast >*>(arg); while(dequeue(*work)()) ; return NULL; @@ -200,28 +211,44 @@ void *workerThreadFunc(void *arg) { /** * Returns a list of worker threads. */ -const list workerThreads(queue >& queue, const int count) { +const list workerThreads(wqueue >& wqueue, const int count) { if (count == 0) return list(); pthread_t thread; - pthread_create(&thread, NULL, workerThreadFunc, &queue); - return cons(thread, workerThreads(queue, count - 1)); + pthread_create(&thread, NULL, workerThreadFunc, &wqueue); + return cons(thread, workerThreads(wqueue, count - 1)); } /** * A worker, implemented with a work queue and a pool of threads. */ class worker { +private: + + // The worker holds a reference to a sharedWorker, to avoid non-thread-safe + // copies of the queue and thread pool when a worker is copied + class sharedWorker { + public: + sharedWorker(int max) : work(wqueue >(max)), threads(workerThreads(work, max)) { + } + + wqueue > work; + const list threads; + }; + public: - worker(int max) : work(queue >(max)), threads(workerThreads(work, max)) { + worker(int max) : w(*(new (gc_new()) sharedWorker(max))) { + } + + worker(const worker& wk) : w(wk.w) { } private: - queue > work; - const list threads; + sharedWorker& w; template friend const future submit(worker& w, const lambda& func); friend const bool shutdown(worker& w); + friend const bool cancel(worker& w); }; /** @@ -238,14 +265,14 @@ template bool submitFunc(const lambda& func, const future& f template const future submit(worker& w, const lambda& func) { const future fut; const lambda f = curry(lambda, future)>(submitFunc), func, fut); - enqueue(w.work, f); + enqueue(w.w.work, f); return fut; } /** * Enqueues shutdown requests. */ -const bool shutdownEnqueue(const list& threads, queue >& work) { +const bool shutdownEnqueue(const list& threads, wqueue >& work) { if (isNil(threads)) return true; enqueue(work, result(false)); @@ -266,8 +293,23 @@ const bool shutdownJoin(const list& threads) { * Shutdown a worker. */ const bool shutdown(worker& w) { - shutdownEnqueue(w.threads, w.work); - shutdownJoin(w.threads); + shutdownEnqueue(w.w.threads, w.w.work); + shutdownJoin(w.w.threads); + return true; +} + +/** + * Cancel a worker. + */ +const bool cancel(const list& threads) { + if (isNil(threads)) + return true; + pthread_cancel(car(threads)); + return cancel(cdr(threads)); +} + +const bool cancel(worker& w) { + cancel(w.w.threads); return true; } diff --git a/sca-cpp/trunk/kernel/value.hpp b/sca-cpp/trunk/kernel/value.hpp index 4f19ee3915..1aafbe6d38 100644 --- a/sca-cpp/trunk/kernel/value.hpp +++ b/sca-cpp/trunk/kernel/value.hpp @@ -556,5 +556,29 @@ template const list mkvalues(const list& l) { return cons(car(l), mkvalues(cdr(l))); } +/** + * Convert a path string value to a list of values. + */ +const list pathTokens(const char* p) { + if (p == NULL || p[0] == '\0') + return list(); + if (p[0] == '/') + return tokenize("/", p + 1); + return tokenize("/", p); +} + +const list pathValues(const value& p) { + return mkvalues(pathTokens(c_str(p))); +} + +/** + * Convert a path represented as a list of values to a string value. + */ +const value path(const list& p) { + if (isNil(p)) + return ""; + return string("/") + car(p) + path(cdr(p)); +} + } #endif /* tuscany_value_hpp */ diff --git a/sca-cpp/trunk/modules/http/curl.hpp b/sca-cpp/trunk/modules/http/curl.hpp index 4e96411ec6..f2c9458f42 100644 --- a/sca-cpp/trunk/modules/http/curl.hpp +++ b/sca-cpp/trunk/modules/http/curl.hpp @@ -252,7 +252,7 @@ const failable entryId(const failable l) { if (!hasContent(l)) return mkfailure(reason(l)); const string ls(content(l)); - return value(string(substr(ls, find_last(ls, '/') + 1))); + return value(mklist(string(substr(ls, find_last(ls, '/') + 1)))); } /** @@ -381,18 +381,18 @@ const failable del(const string& url, const CURLSession& ch) { * HTTP client proxy function. */ struct proxy { - proxy(const string& url) : url(url) { + proxy(const string& uri) : uri(uri) { } const value operator()(const list& args) const { CURLSession cs; - failable val = evalExpr(args, url, cs); + failable val = evalExpr(args, uri, cs); if (!hasContent(val)) return value(); return content(val); } - const string url; + const string uri; }; } diff --git a/sca-cpp/trunk/modules/http/httpd.hpp b/sca-cpp/trunk/modules/http/httpd.hpp index 6b08c3b838..bd0e23f76b 100644 --- a/sca-cpp/trunk/modules/http/httpd.hpp +++ b/sca-cpp/trunk/modules/http/httpd.hpp @@ -74,30 +74,6 @@ template C& serverConf(const cmd_parms *cmd, const module* mod) { } -/** - * Convert a path string to a list of values. - */ -const list pathTokens(const char* p) { - if (p == NULL || p[0] == '\0') - return list(); - if (p[0] == '/') - return tokenize("/", p + 1); - return tokenize("/", p); -} - -const list pathValues(const char* p) { - return mkvalues(pathTokens(p)); -} - -/** - * Convert a path represented as a list of values to a string. - */ -const string path(const list& p) { - if (isNil(p)) - return ""; - return string("/") + car(p) + path(cdr(p)); -} - /** * Return the content type of a request. */ @@ -219,10 +195,10 @@ const list read(request_rec* r) { } /** - * Convert a URI value to an absolute URL. + * Convert a URI represented as a list to an absolute URL. */ -const char* url(const value& v, request_rec* r) { - const string u = string(r->uri) + "/" + v; +const char* url(const list& v, request_rec* r) { + const string u = string(r->uri) + path(v); return ap_construct_url(r->pool, c_str(u), r); } @@ -400,7 +376,7 @@ const int internalRedirect(const string& uri, request_rec* r) { /** * Put a value in the process user data. */ -const bool putUserData(const string& k, const int v, const server_rec* s) { +const bool putUserData(const string& k, const void* v, const server_rec* s) { apr_pool_userdata_set((const void *)v, c_str(k), apr_pool_cleanup_null, s->process->pool); return true; } @@ -408,10 +384,10 @@ const bool putUserData(const string& k, const int v, const server_rec* s) { /** * Return a user data value. */ -const int userData(const string& k, const server_rec* s) { - void* v = (int)0; +const void* userData(const string& k, const server_rec* s) { + void* v = NULL; apr_pool_userdata_get(&v, c_str(k), s->process->pool); - return (int)v; + return v; } } diff --git a/sca-cpp/trunk/modules/java/eval.hpp b/sca-cpp/trunk/modules/java/eval.hpp index 9140fe75b4..58a1ab5ff9 100644 --- a/sca-cpp/trunk/modules/java/eval.hpp +++ b/sca-cpp/trunk/modules/java/eval.hpp @@ -501,8 +501,8 @@ const failable evalClass(const JavaRuntime& jr, const value& expr, const // The start, stop, and restart functions are optional const value fn = car(expr); - if (fn == "start" || fn == "stop" || fn == "restart") - return value(false); + if (fn == "start" || fn == "restart" || "stop") + return value(lambda&)>()); return mkfailure(string("Couldn't find function: ") + car(expr) + " : " + lastException(jr)); } diff --git a/sca-cpp/trunk/modules/java/mod-java.cpp b/sca-cpp/trunk/modules/java/mod-java.cpp index 4b8e7dca56..510f9574b0 100644 --- a/sca-cpp/trunk/modules/java/mod-java.cpp +++ b/sca-cpp/trunk/modules/java/mod-java.cpp @@ -37,40 +37,41 @@ namespace server { namespace modeval { /** - * Start the module. + * Apply a lifecycle start or restart event. */ -const failable start(unused ServerConf& sc) { - // Start a Java runtime - sc.moduleConf = new (gc_new()) java::JavaRuntime(); - return true; -} +struct javaLifecycle { + javaLifecycle(java::JavaRuntime& jr) : jr(jr) { + } + const value operator()(const list& params) const { + const value func = car(params); + if (func == "javaRuntime") + return (gc_ptr)(value*)(void*)&jr; + return lambda&)>(); + } + java::JavaRuntime& jr; +}; -/** - * Stop the module. - */ -const failable stop(unused ServerConf& sc) { - return true; -} +const value applyLifecycle(unused const list& params) { -/** - * Restart the module. - */ -const failable restart(ServerConf& sc) { - // Start a Java runtime - sc.moduleConf = new (gc_new()) java::JavaRuntime(); - return true; + // Create a Java runtime + java::JavaRuntime& jr = *(new (gc_new()) java::JavaRuntime()); + + // Return the function to invoke on subsequent events + return failable(lambda&)>(javaLifecycle(jr))); } /** * Evaluate a Java component implementation and convert it to an applicable * lambda function. */ -const failable&)> > evalImplementation(const string& path, const value& impl, const list& px, modeval::ServerConf& sc) { +const failable&)> > evalImplementation(const string& path, const value& impl, const list& px, const lambda&)>& lifecycle) { const string itype(elementName(impl)); - if (contains(itype, ".java")) - return modjava::evalImplementation(path, impl, px, sc); + if (contains(itype, ".java")) { + const void* p = (gc_ptr)lifecycle(mklist("javaRuntime")); + return modjava::evalImplementation(path, impl, px, *(java::JavaRuntime*)p); + } if (contains(itype, ".cpp")) - return modcpp::evalImplementation(path, impl, px, sc); + return modcpp::evalImplementation(path, impl, px); return mkfailure&)> >(string("Unsupported implementation type: ") + itype); } diff --git a/sca-cpp/trunk/modules/java/mod-java.hpp b/sca-cpp/trunk/modules/java/mod-java.hpp index ccb6d9262b..e7da06e930 100644 --- a/sca-cpp/trunk/modules/java/mod-java.hpp +++ b/sca-cpp/trunk/modules/java/mod-java.hpp @@ -34,19 +34,11 @@ #include "value.hpp" #include "monad.hpp" #include "eval.hpp" -#include "../server/mod-eval.hpp" namespace tuscany { namespace server { namespace modjava { -/** - * Return the Java runtime configured in a server. - */ -java::JavaRuntime& javaRuntime(modeval::ServerConf sc) { - return *(java::JavaRuntime*)sc.moduleConf; -} - /** * Apply a Java component implementation function. */ @@ -59,11 +51,10 @@ struct applyImplementation { const value operator()(const list& params) const { const value expr = append(params, px); debug(expr, "modeval::java::applyImplementation::input"); - const failable val = java::evalClass(jr, expr, impl); + const failable res = java::evalClass(jr, expr, impl); + const value val = !hasContent(res)? mklist(value(), reason(res)) : mklist(content(res)); debug(val, "modeval::java::applyImplementation::result"); - if (!hasContent(val)) - return mklist(value(), reason(val)); - return mklist(content(val)); + return val; } }; @@ -71,12 +62,12 @@ struct applyImplementation { * Evaluate a Java component implementation and convert it to an applicable * lambda function. */ -const failable&)> > evalImplementation(const string& path, const value& impl, const list& px, modeval::ServerConf& sc) { +const failable&)> > evalImplementation(const string& path, const value& impl, const list& px, java::JavaRuntime& jr) { const string cn(attributeValue("class", impl)); - const failable jc = java::readClass(javaRuntime(sc), path, cn); + const failable jc = java::readClass(jr, path, cn); if (!hasContent(jc)) return mkfailure&)> >(reason(jc)); - return lambda&)>(applyImplementation(content(jc), px, javaRuntime(sc))); + return lambda&)>(applyImplementation(content(jc), px, jr)); } } diff --git a/sca-cpp/trunk/modules/java/org/apache/tuscany/Service.java b/sca-cpp/trunk/modules/java/org/apache/tuscany/Service.java index eedba5f46f..a00d5b1b53 100644 --- a/sca-cpp/trunk/modules/java/org/apache/tuscany/Service.java +++ b/sca-cpp/trunk/modules/java/org/apache/tuscany/Service.java @@ -26,34 +26,24 @@ package org.apache.tuscany; public interface Service { /** - * Post a new item to a resource. + * Post a new item to a collection of items. */ - String post(Iterable item); + Iterable post(Iterable collection, Iterable item); /** * Return an item. */ - Iterable get(String id); + Iterable get(Iterable id); /** - * Return all items in the resource. + * Update an item. */ - Iterable getall(); - - /** - * Update am item. - */ - boolean put(String id, Iterable item); + boolean put(Iterable id, Iterable item); /** * Delete an item. */ - boolean delete(String id); - - /** - * Delete all items in the resource. - */ - boolean deleteall(); + boolean delete(Iterable id); /** * Evaluate an expression. diff --git a/sca-cpp/trunk/modules/java/test/Client.java b/sca-cpp/trunk/modules/java/test/Client.java index ff2d3fee13..c3bd875fcc 100644 --- a/sca-cpp/trunk/modules/java/test/Client.java +++ b/sca-cpp/trunk/modules/java/test/Client.java @@ -23,16 +23,12 @@ public interface Client { String echo(String x); - Iterable getall(); + Iterable get(Iterable id); - Iterable get(String id); + Iterable post(Iterable collection, Iterable item); - String post(Iterable item); + Boolean put(Iterable id, Iterable item); - Boolean put(String id, Iterable entry); - - Boolean deleteall(); - - Boolean delete(String id); + Boolean delete(Iterable id); } diff --git a/sca-cpp/trunk/modules/java/test/ClientImpl.java b/sca-cpp/trunk/modules/java/test/ClientImpl.java index 4bd3498dd5..ade2ba302e 100644 --- a/sca-cpp/trunk/modules/java/test/ClientImpl.java +++ b/sca-cpp/trunk/modules/java/test/ClientImpl.java @@ -25,27 +25,19 @@ public class ClientImpl { return server.echo(x); } - public Iterable getall(Server server) { - return server.getall(); - } - - public Iterable get(String id, Server server) { + public Iterable get(Iterable id, Server server) { return server.get(id); } - public String post(Iterable item, Server server) { - return server.post(item); + public Iterable post(Iterable collection, Iterable item, Server server) { + return server.post(collection, item); } - public Boolean put(String id, Iterable item, Server server) { + public Boolean put(Iterable id, Iterable item, Server server) { return server.put(id, item); } - public Boolean deleteall(Server server) { - return server.deleteall(); - } - - public Boolean delete(String id, Server server) { + public Boolean delete(Iterable id, Server server) { return server.delete(id); } diff --git a/sca-cpp/trunk/modules/java/test/Server.java b/sca-cpp/trunk/modules/java/test/Server.java index 917ccfd6b9..3dfe3c84ef 100644 --- a/sca-cpp/trunk/modules/java/test/Server.java +++ b/sca-cpp/trunk/modules/java/test/Server.java @@ -23,16 +23,12 @@ public interface Server { String echo(String x); - Iterable getall(); + Iterable get(Iterable id); - Iterable get(String id); + Iterable post(Iterable collection, Iterable item); - String post(Iterable item); + Boolean put(Iterable id, Iterable item); - Boolean put(String id, Iterable entry); - - Boolean deleteall(); - - Boolean delete(String id); + Boolean delete(Iterable id); } diff --git a/sca-cpp/trunk/modules/java/test/ServerImpl.java b/sca-cpp/trunk/modules/java/test/ServerImpl.java index dd4e227123..d979c20a15 100644 --- a/sca-cpp/trunk/modules/java/test/ServerImpl.java +++ b/sca-cpp/trunk/modules/java/test/ServerImpl.java @@ -27,31 +27,25 @@ public class ServerImpl { return x; } - public Iterable getall() { - return list("Sample Feed", "123456789", - list("Item", "111", list(list("'javaClass", "services.Item"), list("'name", "Apple"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 2.99))), - list("Item", "222", list(list("'javaClass", "services.Item"), list("'name", "Orange"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 3.55))), - list("Item", "333", list(list("'javaClass", "services.Item"), list("'name", "Pear"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 1.55)))); - } - - public Iterable get(String id) { + public Iterable get(Iterable id) { + if (isNil(id)) + return list("Sample Feed", "123456789", + list("Item", "111", list(list("'javaClass", "services.Item"), list("'name", "Apple"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 2.99))), + list("Item", "222", list(list("'javaClass", "services.Item"), list("'name", "Orange"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 3.55))), + list("Item", "333", list(list("'javaClass", "services.Item"), list("'name", "Pear"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 1.55)))); Iterable entry = list(list("'javaClass", "services.Item"), list("'name", "Apple"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 2.99)); return list("Item", id, entry); } - public String post(Iterable item) { - return "123456789"; + public Iterable post(Iterable collection, Iterable item) { + return list("123456789"); } - public Boolean put(String id, Iterable entry) { - return true; - } - - public Boolean deleteall() { + public Boolean put(Iterable id, Iterable item) { return true; } - public Boolean delete(String id) { + public Boolean delete(Iterable id) { return true; } } diff --git a/sca-cpp/trunk/modules/json/json.hpp b/sca-cpp/trunk/modules/json/json.hpp index 4c36b8b477..1d966d3f67 100644 --- a/sca-cpp/trunk/modules/json/json.hpp +++ b/sca-cpp/trunk/modules/json/json.hpp @@ -140,7 +140,7 @@ const bool isJSArray(const list& l) { if (isSymbol(v)) return false; if(isList(v)) { - if(isSymbol(car(v))) + if(!isNil((list)v) && isSymbol(car(v))) return false; } return true; diff --git a/sca-cpp/trunk/modules/python/client-test.py b/sca-cpp/trunk/modules/python/client-test.py index 6e3723302a..47e6cf4bda 100644 --- a/sca-cpp/trunk/modules/python/client-test.py +++ b/sca-cpp/trunk/modules/python/client-test.py @@ -22,18 +22,14 @@ def echo(x, ref): # ATOMPub test case -def getall(ref): - return ref("getall") - def get(id, ref): return ref("get", id) -def post(entry, ref): - return ref("post", entry) +def post(collection, item, ref): + return ref("post", collection, item) -def put(id, entry, ref): - return ref("put", id, entry) +def put(id, item, ref): + return ref("put", id, item) def delete(id, ref): return ref("delete", id) - diff --git a/sca-cpp/trunk/modules/python/eval.hpp b/sca-cpp/trunk/modules/python/eval.hpp index 5231b0ef60..136ecf6499 100644 --- a/sca-cpp/trunk/modules/python/eval.hpp +++ b/sca-cpp/trunk/modules/python/eval.hpp @@ -241,9 +241,9 @@ const failable evalScript(const value& expr, PyObject* script) { // The start, stop, and restart functions are optional const value fn = car(expr); - if (fn == "start" || fn == "stop" || fn == "restart") { + if (fn == "start" || fn == "restart" || fn == "stop") { PyErr_Clear(); - return value(false); + return value(lambda&)>()); } return mkfailure(string("Couldn't find function: ") + car(expr) + " : " + lastError()); diff --git a/sca-cpp/trunk/modules/python/mod-python.cpp b/sca-cpp/trunk/modules/python/mod-python.cpp index df94f2e8a1..8561a1fbf4 100644 --- a/sca-cpp/trunk/modules/python/mod-python.cpp +++ b/sca-cpp/trunk/modules/python/mod-python.cpp @@ -37,40 +37,27 @@ namespace server { namespace modeval { /** - * Start the module. + * Apply a lifecycle start or restart event. */ -const failable start(unused ServerConf& sc) { - // Start a Python runtime - sc.moduleConf = new (gc_new()) python::PythonRuntime(); - return true; -} +const value applyLifecycle(unused const list& params) { -/** - * Stop the module. - */ -const failable stop(unused ServerConf& sc) { - return true; -} + // Create a Python runtime + new (gc_new()) python::PythonRuntime(); -/** - * Restart the module. - */ -const failable restart(unused ServerConf& sc) { - // Start a Python runtime - sc.moduleConf = new (gc_new()) python::PythonRuntime(); - return true; + // Return a nil function as we don't need to handle the stop event + return failable(lambda&)>()); } /** * Evaluate a Python component implementation and convert it to an applicable * lambda function. */ -const failable&)> > evalImplementation(const string& path, const value& impl, const list& px, modeval::ServerConf& sc) { +const failable&)> > evalImplementation(const string& path, const value& impl, const list& px, unused const lambda&)>& lifecycle) { const string itype(elementName(impl)); if (contains(itype, ".python")) - return modpython::evalImplementation(path, impl, px, sc); + return modpython::evalImplementation(path, impl, px); if (contains(itype, ".cpp")) - return modcpp::evalImplementation(path, impl, px, sc); + return modcpp::evalImplementation(path, impl, px); return mkfailure&)> >(string("Unsupported implementation type: ") + itype); } diff --git a/sca-cpp/trunk/modules/python/mod-python.hpp b/sca-cpp/trunk/modules/python/mod-python.hpp index cbe2b6b97c..d13f2227ab 100644 --- a/sca-cpp/trunk/modules/python/mod-python.hpp +++ b/sca-cpp/trunk/modules/python/mod-python.hpp @@ -34,7 +34,6 @@ #include "value.hpp" #include "monad.hpp" #include "eval.hpp" -#include "../server/mod-eval.hpp" namespace tuscany { namespace server { @@ -51,11 +50,10 @@ struct applyImplementation { const value operator()(const list& params) const { const value expr = append(params, px); debug(expr, "modeval::python::applyImplementation::input"); - const failable val = python::evalScript(expr, impl); + const failable res = python::evalScript(expr, impl); + const value val = !hasContent(res)? mklist(value(), reason(res)) : mklist(content(res)); debug(val, "modeval::python::applyImplementation::result"); - if (!hasContent(val)) - return mklist(value(), reason(val)); - return mklist(content(val)); + return val; } }; @@ -63,7 +61,7 @@ struct applyImplementation { * Evaluate a Python component implementation and convert it to an applicable * lambda function. */ -const failable&)> > evalImplementation(const string& path, const value& impl, const list& px, unused modeval::ServerConf& sc) { +const failable&)> > evalImplementation(const string& path, const value& impl, const list& px) { const string fpath(path + attributeValue("script", impl)); ifstream is(fpath); if (fail(is)) diff --git a/sca-cpp/trunk/modules/python/server-test.py b/sca-cpp/trunk/modules/python/server-test.py index ac2b6829c9..61e177d0aa 100644 --- a/sca-cpp/trunk/modules/python/server-test.py +++ b/sca-cpp/trunk/modules/python/server-test.py @@ -22,25 +22,21 @@ def echo(x): # ATOMPub test case -def getall(): - return ("Sample Feed", "123456789", - ("Item", "111", (("'javaClass", "services.Item"), ("'name", "Apple"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 2.99))), - ("Item", "222", (("'javaClass", "services.Item"), ("'name", "Orange"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 3.55))), - ("Item", "333", (("'javaClass", "services.Item"), ("name", "Pear"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 1.55)))) - def get(id): - entry = (("'javaClass", "services.Item"), ("'name", "Apple"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 2.99)) - return ("Item", id, entry) - -def post(entry): - return "123456789" - -def put(id, entry): - return true - -def deleteall(): + if id == (): + return ("Sample Feed", "123456789", + ("Item", "111", (("'javaClass", "services.Item"), ("'name", "Apple"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 2.99))), + ("Item", "222", (("'javaClass", "services.Item"), ("'name", "Orange"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 3.55))), + ("Item", "333", (("'javaClass", "services.Item"), ("name", "Pear"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 1.55)))) + + entry = (("'javaClass", "services.Item"), ("'name", "Apple"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 2.99)) + return ("Item", id, entry) + +def post(collection, item): + return ("123456789",) + +def put(id, item): return true def delete(id): return true - diff --git a/sca-cpp/trunk/modules/scheme/primitive.hpp b/sca-cpp/trunk/modules/scheme/primitive.hpp index 5a13725ffd..e911c76f6c 100644 --- a/sca-cpp/trunk/modules/scheme/primitive.hpp +++ b/sca-cpp/trunk/modules/scheme/primitive.hpp @@ -172,15 +172,15 @@ const value cdddrProc(unused const list& args) { } const value startProc(unused const list& args) { - return false; + return lambda&)>(); } const value stopProc(unused const list& args) { - return false; + return lambda&)>(); } const value restartProc(unused const list& args) { - return false; + return lambda&)>(); } const value applyPrimitiveProcedure(const value& proc, list& args) { diff --git a/sca-cpp/trunk/modules/server/client-test.scm b/sca-cpp/trunk/modules/server/client-test.scm index 11c71cdfce..47b799d390 100644 --- a/sca-cpp/trunk/modules/server/client-test.scm +++ b/sca-cpp/trunk/modules/server/client-test.scm @@ -21,26 +21,18 @@ ; ATOMPub test case -(define (getall ref) - (ref "getall") -) - (define (get id ref) (ref "get" id) ) -(define (post entry ref) - (ref "post" entry) +(define (post coll entry ref) + (ref "post" coll entry) ) (define (put id entry ref) (ref "put" id entry) ) -(define (deleteall ref) - (ref deleteall) -) - (define (delete id ref) (ref "delete" id) ) diff --git a/sca-cpp/trunk/modules/server/impl-test.cpp b/sca-cpp/trunk/modules/server/impl-test.cpp index 51162d070d..3fa1d65323 100644 --- a/sca-cpp/trunk/modules/server/impl-test.cpp +++ b/sca-cpp/trunk/modules/server/impl-test.cpp @@ -38,7 +38,7 @@ const failable get(unused const list& params) { } const failable post(unused const list& params) { - return value(string("123456789")); + return value(mklist(string("123456789"))); } const failable put(unused const list& params) { diff --git a/sca-cpp/trunk/modules/server/mod-cpp.hpp b/sca-cpp/trunk/modules/server/mod-cpp.hpp index 19ba938034..23b4941c26 100644 --- a/sca-cpp/trunk/modules/server/mod-cpp.hpp +++ b/sca-cpp/trunk/modules/server/mod-cpp.hpp @@ -37,7 +37,6 @@ #include "monad.hpp" #include "dynlib.hpp" #include "../scheme/driver.hpp" -#include "mod-eval.hpp" namespace tuscany { namespace server { @@ -54,8 +53,8 @@ const list failableResult(const value& func, const list& v) { // Except for the start, stop, and restart functions, which are optional const value reason = cadr(v); if (length(reason) == 0) { - if (func == "start" || func == "stop" || func == "restart") - return mklist(false); + if (func == "start" || func == "restart" || func == "stop") + return mklist(lambda&)>()); return mklist(value(), string("Function not supported: ") + func); } return v; @@ -82,7 +81,7 @@ struct applyImplementation { * Evaluate a C++ component implementation and convert it to * an applicable lambda function. */ -const failable&)> > evalImplementation(const string& path, const value& impl, const list& px, unused modeval::ServerConf& sc) { +const failable&)> > evalImplementation(const string& path, const value& impl, const list& px) { // Configure the implementation's lambda function const value ipath(attributeValue("path", impl)); diff --git a/sca-cpp/trunk/modules/server/mod-eval.cpp b/sca-cpp/trunk/modules/server/mod-eval.cpp index 5a9b87ca2f..a94ccf5bbe 100644 --- a/sca-cpp/trunk/modules/server/mod-eval.cpp +++ b/sca-cpp/trunk/modules/server/mod-eval.cpp @@ -37,36 +37,23 @@ namespace server { namespace modeval { /** - * Start the module. + * Apply a lifecycle start or restart event. */ -const failable start(unused ServerConf& sc) { - return true; -} - -/** - * Stop the module. - */ -const failable stop(unused ServerConf& sc) { - return true; -} - -/** - * Restart the module. - */ -const failable restart(unused ServerConf& sc) { - return true; +const value applyLifecycle(unused const list& params) { + // Return a nil function as we don't need to handle any subsequent events + return failable(lambda&)>()); } /** * Evaluate a Scheme or C++ component implementation and convert it to an * applicable lambda function. */ -const failable&)> > evalImplementation(const string& path, const value& impl, const list& px, ServerConf& sc) { +const failable&)> > evalImplementation(const string& path, const value& impl, const list& px, unused const lambda&)>& lifecycle) { const string itype(elementName(impl)); if (contains(itype, ".scheme")) - return modscheme::evalImplementation(path, impl, px, sc); + return modscheme::evalImplementation(path, impl, px); if (contains(itype, ".cpp")) - return modcpp::evalImplementation(path, impl, px, sc); + return modcpp::evalImplementation(path, impl, px); return mkfailure&)> >(string("Unsupported implementation type: ") + itype); } diff --git a/sca-cpp/trunk/modules/server/mod-eval.hpp b/sca-cpp/trunk/modules/server/mod-eval.hpp index 6ab1d68ad6..03cce5aeb4 100644 --- a/sca-cpp/trunk/modules/server/mod-eval.hpp +++ b/sca-cpp/trunk/modules/server/mod-eval.hpp @@ -53,16 +53,17 @@ namespace modeval { */ class ServerConf { public: - ServerConf(server_rec* s) : s(s), moduleConf(NULL), home(""), wiringServerName(""), contributionPath(""), compositeName("") { + ServerConf(server_rec* s) : s(s), home(""), wiringServerName(""), contributionPath(""), compositeName("") { } const server_rec* s; - void* moduleConf; + lambda&)> lifecycle; string home; string wiringServerName; string contributionPath; string compositeName; list implementations; + list implTree; }; /** @@ -103,20 +104,15 @@ const failable get(request_rec* r, const lambda&)>& return httpd::writeResult(json::jsonResult(id, content(val), cx), "application/json-rpc", r); } - // Evaluate an ATOM GET request and return an ATOM feed representing a collection of resources - const list path(httpd::pathValues(r->uri)); - if (isNil(cddr(path))) { - const failable val = failableResult(impl(cons("getall", list()))); - if (!hasContent(val)) - return mkfailure(reason(val)); - return httpd::writeResult(atom::writeATOMFeed(atom::feedValuesToElements(content(val))), "application/atom+xml;type=feed", r); - } - - // Evaluate an ATOM GET and return an ATOM entry representing a resource - const failable val = failableResult(impl(cons("get", mklist(caddr(path))))); + // Evaluate the GET expression and return an ATOM entry or feed representing a resource + const list path(pathValues(r->uri)); + const failable val = failableResult(impl(cons("get", mklist(cddr(path))))); if (!hasContent(val)) return mkfailure(reason(val)); - return httpd::writeResult(atom::writeATOMEntry(atom::entryValuesToElements(content(val))), "application/atom+xml;type=entry", r); + if (isNil(cddr(path))) + return httpd::writeResult(atom::writeATOMFeed(atom::feedValuesToElements(content(val))), "application/atom+xml;type=feed", r); + else + return httpd::writeResult(atom::writeATOMEntry(atom::entryValuesToElements(content(val))), "application/atom+xml;type=entry", r); } /** @@ -157,6 +153,7 @@ const failable post(request_rec* r, const lambda&)> if (contains(ct, "application/atom+xml")) { // Read the ATOM entry + const list path(pathValues(r->uri)); const int rc = httpd::setupReadPolicy(r); if(rc != OK) return rc; @@ -164,12 +161,13 @@ const failable post(request_rec* r, const lambda&)> debug(ls, "modeval::post::input"); const value entry = atom::entryValue(content(atom::readEntry(ls))); - // Evaluate the request expression - const failable val = failableResult(impl(cons("post", mklist(entry)))); + // Evaluate the POST expression + const failable val = failableResult(impl(cons("post", mklist(cddr(path), entry)))); if (!hasContent(val)) return mkfailure(reason(val)); // Return the created resource location + debug(content(val), "modeval::post::location"); apr_table_setn(r->headers_out, "Location", apr_pstrdup(r->pool, httpd::url(content(val), r))); r->status = HTTP_CREATED; return OK; @@ -190,7 +188,7 @@ const failable put(request_rec* r, const lambda&)>& debug(r->uri, "modeval::put::url"); // Read the ATOM entry - const list path(httpd::pathValues(r->uri)); + const list path(pathValues(r->uri)); const int rc = httpd::setupReadPolicy(r); if(rc != OK) return rc; @@ -198,8 +196,8 @@ const failable put(request_rec* r, const lambda&)>& debug(ls, "modeval::put::input"); const value entry = atom::entryValue(content(atom::readEntry(ls))); - // Evaluate the ATOM PUT request and update the corresponding resource - const failable val = failableResult(impl(cons("put", mklist(caddr(path), entry)))); + // Evaluate the PUT expression and update the corresponding resource + const failable val = failableResult(impl(cons("put", mklist(cddr(path), entry)))); if (!hasContent(val)) return mkfailure(reason(val)); if (val == value(false)) @@ -214,20 +212,8 @@ const failable del(request_rec* r, const lambda&)>& debug(r->uri, "modeval::delete::url"); // Evaluate an ATOM delete request - const list path(httpd::pathValues(r->uri)); - if (isNil(cddr(path))) { - - // Delete a collection of resources - const failable val = failableResult(impl(cons("deleteall", list()))); - if (!hasContent(val)) - return mkfailure(reason(val)); - if (val == value(false)) - return HTTP_NOT_FOUND; - return OK; - } - - // Delete a resource - const failable val = failableResult(impl(cons("delete", mklist(caddr(path))))); + const list path(pathValues(r->uri)); + const failable val = failableResult(impl(cons("delete", mklist(cddr(path))))); if (!hasContent(val)) return mkfailure(reason(val)); if (val == value(false)) @@ -257,8 +243,8 @@ int handler(request_rec *r) { // Get the component implementation lambda const ServerConf& sc = httpd::serverConf(r, &mod_tuscany_eval); - const list path(httpd::pathValues(r->uri)); - const list impl(assoctree(cadr(path), sc.implementations)); + const list path(pathValues(r->uri)); + const list impl(assoctree(cadr(path), sc.implTree)); if (isNil(impl)) return HTTP_NOT_FOUND; @@ -317,7 +303,7 @@ const list propProxies(const list& props) { * Evaluate a component and convert it to an applicable lambda function. */ const value evalComponent(ServerConf& sc, server_rec& server, const value& comp) { - extern const failable&)> > evalImplementation(const string& cpath, const value& impl, const list& px, ServerConf& sc); + extern const failable&)> > evalImplementation(const string& cpath, const value& impl, const list& px, const lambda&)>& lifecycle); const value impl = scdl::implementation(comp); @@ -336,7 +322,7 @@ const value evalComponent(ServerConf& sc, server_rec& server, const value& comp) const list ppx(propProxies(scdl::properties(comp))); // Evaluate the component implementation and convert it to an applicable lambda function - const failable&)> > cimpl(evalImplementation(sc.contributionPath, impl, append(rpx, ppx), sc)); + const failable&)> > cimpl(evalImplementation(sc.contributionPath, impl, append(rpx, ppx), sc.lifecycle)); if (!hasContent(cimpl)) return reason(cimpl); return content(cimpl); @@ -362,19 +348,28 @@ const failable > readComponents(const string& path) { } /** - * Apply a list of component implementations to a (start, stop or restart) lifecycle expression. + * Apply a list of component implementations to a start or restart lifecycle expression. + * Return the functions returned by the component implementations. */ -const failable applyLifecycleExpr(const list impls, const list& expr) { +const failable > applyLifecycleExpr(const list& impls, const list& expr) { if (isNil(impls)) - return true; + return list(); // Evaluate lifecycle expression against a component implementation lambda - const lambda&)> l(cadr(car(impls))); + const lambda&)> l = cadr(car(impls)); const failable r = failableResult(l(expr)); if (!hasContent(r)) - return mkfailure(reason(r)); + return mkfailure >(reason(r)); + const lambda&)> rl = content(r); + + // Use the returned lambda function, if any, from now on + const lambda&)> al = isNil(rl)? l : rl; - return applyLifecycleExpr(cdr(impls), expr); + // Continue with the rest of the list + const failable > nr = applyLifecycleExpr(cdr(impls), expr); + if (!hasContent(nr)) + return nr; + return cons(mklist(car(car(impls)), value(al)), content(nr)); } /** @@ -388,14 +383,106 @@ const failable confComponents(const string& lifecycle, ServerConf& sc, ser const failable > comps = readComponents(sc.contributionPath + sc.compositeName); if (!hasContent(comps)) return mkfailure(reason(comps)); - const list impls = componentToImplementationAssoc(sc, server, content(comps)); + const list starts = componentToImplementationAssoc(sc, server, content(comps)); - // Store the implementation lambda functions in a tree for fast retrieval - sc.implementations = mkbtree(sort(impls)); + // Start or restart the component implementations + // Record the returned lambda functions + debug(starts, "modeval::confComponents::start"); + const failable > impls = applyLifecycleExpr(starts, mklist(c_str(lifecycle))); + if (!hasContent(impls)) + return mkfailure(reason(impls)); + sc.implementations = content(impls); debug(sc.implementations, "modeval::confComponents::implementations"); - // Start or restart the component implementations - return applyLifecycleExpr(impls, mklist(c_str(lifecycle))); + // Store the implementation lambda functions in a tree for fast retrieval + sc.implTree = mkbtree(sort(sc.implementations)); + + return true; +} + +/** + * Cleanup callback, called when the server is stopped or restarted. + */ +apr_status_t serverCleanup(void* v) { + gc_pool pool; + ServerConf& sc = *(ServerConf*)v; + debug("modeval::serverCleanup"); + + // Stop the component implementations + applyLifecycleExpr(sc.implementations, mklist("stop")); + + // Call the module lifecycle function + if (isNil(sc.lifecycle)) + return APR_SUCCESS; + debug("modeval::serverCleanup::stop"); + sc.lifecycle(mklist("stop")); + + return APR_SUCCESS; +} + +/** + * Called after all the configuration commands have been run. + * Process the server configuration and configure the deployed components. + */ +int postConfig(apr_pool_t *p, unused apr_pool_t *plog, unused apr_pool_t *ptemp, server_rec *s) { + extern const value applyLifecycle(const list&); + + gc_scoped_pool pool(p); + ServerConf& sc = httpd::serverConf(s, &mod_tuscany_eval); + + // Count the calls to post config + const string k("tuscany::modeval::postConfig"); + const int count = (int)httpd::userData(k, s); + httpd::putUserData(k, (void*)(count + 1), s); + + // Count == 0, do nothing as post config is always called twice, + // count == 1 is the first start, count > 1 is a restart + if (count == 0) + return OK; + if (count == 1) { + debug("modeval::postConfig::start"); + const failable r = failableResult(applyLifecycle(mklist("start"))); + if (!hasContent(r)) + return -1; + sc.lifecycle = content(r); + } + if (count > 1) { + debug("modeval::postConfig::restart"); + const failable r = failableResult(applyLifecycle(mklist("restart"))); + if (!hasContent(r)) + return -1; + sc.lifecycle = content(r); + } + + // Configure the deployed components + debug(sc.wiringServerName, "modeval::postConfig::wiringServerName"); + debug(sc.contributionPath, "modeval::postConfig::contributionPath"); + debug(sc.compositeName, "modeval::postConfig::compositeName"); + const failable res = confComponents(count > 1? "restart" : "start", sc, *s); + if (!hasContent(res)) { + cerr << "[Tuscany] Due to one or more errors mod_tuscany_eval loading failed. Causing apache to stop loading." << endl; + return -1; + } + + // Register a cleanup callback, called when the server is stopped or restarted + apr_pool_pre_cleanup_register(p, (void*)&sc, serverCleanup); + + return OK; +} + +/** + * Child process initialization. + */ +void childInit(apr_pool_t* p, server_rec* s) { + gc_scoped_pool pool(p); + ServerConf* sc = (ServerConf*)ap_get_module_config(s->module_config, &mod_tuscany_eval); + if(sc == NULL) { + cerr << "[Tuscany] Due to one or more errors mod_tuscany_eval loading failed. Causing apache to stop loading." << endl; + exit(APEXIT_CHILDFATAL); + } + + // Register a cleanup callback, called when the child is stopped or restarted + apr_pool_pre_cleanup_register(p, (void*)sc, serverCleanup); } /** @@ -444,51 +531,6 @@ const command_rec commands[] = { {NULL, NULL, NULL, 0, NO_ARGS, NULL} }; -int postConfig(apr_pool_t *p, unused apr_pool_t *plog, unused apr_pool_t *ptemp, server_rec *s) { - extern const failable start(ServerConf& sc); - extern const failable restart(ServerConf& sc); - gc_scoped_pool pool(p); - ServerConf& sc = httpd::serverConf(s, &mod_tuscany_eval); - - // Count the calls to post config - const string k("tuscany::modeval::postConfig"); - const int count = httpd::userData(k, s); - httpd::putUserData(k, count +1, s); - - // Count == 0, do nothing as post config is always called twice, - // count == 1 is the first start, count > 1 is a restart - if (count == 0) - return OK; - if (count == 1) { - debug("modeval::postConfig::start"); - start(sc); - } - if (count > 1) { - debug("modeval::postConfig::restart"); - restart(sc); - } - - // Configure the components deployed to the server - debug(sc.wiringServerName, "modeval::postConfig::wiringServerName"); - debug(sc.contributionPath, "modeval::postConfig::contributionPath"); - debug(sc.compositeName, "modeval::postConfig::compositeName"); - const failable res = confComponents(count > 1? "restart" : "start", sc, *s); - if (!hasContent(res)) - return -1; - return OK; -} - -/** - * Child process initialization. - */ -void childInit(apr_pool_t* p, server_rec* svr_rec) { - gc_scoped_pool pool(p); - ServerConf* c = (ServerConf*)ap_get_module_config(svr_rec->module_config, &mod_tuscany_eval); - if(c == NULL) { - cerr << "[Tuscany] Due to one or more errors mod_tuscany_eval loading failed. Causing apache to stop loading." << endl; - exit(APEXIT_CHILDFATAL); - } -} void registerHooks(unused apr_pool_t *p) { ap_hook_post_config(postConfig, NULL, NULL, APR_HOOK_MIDDLE); diff --git a/sca-cpp/trunk/modules/server/mod-scheme.hpp b/sca-cpp/trunk/modules/server/mod-scheme.hpp index e18eececf6..f5b9554c3f 100644 --- a/sca-cpp/trunk/modules/server/mod-scheme.hpp +++ b/sca-cpp/trunk/modules/server/mod-scheme.hpp @@ -34,7 +34,6 @@ #include "value.hpp" #include "monad.hpp" #include "../scheme/eval.hpp" -#include "../server/mod-eval.hpp" namespace tuscany { namespace server { @@ -61,11 +60,10 @@ struct applyImplementation { const value expr = cons(car(params), append(scheme::quotedParameters(cdr(params)), px)); debug(expr, "modeval::scheme::applyImplementation::input"); scheme::Env env = scheme::setupEnvironment(); - const value val = scheme::evalScript(expr, impl, env); + const value res = scheme::evalScript(expr, impl, env); + const value val = isNil(res)? mklist(value(), string("Could not evaluate expression")) : mklist(res); debug(val, "modeval::scheme::applyImplementation::result"); - if (isNil(val)) - return mklist(value(), string("Could not evaluate expression")); - return mklist(val); + return val; } }; @@ -73,7 +71,7 @@ struct applyImplementation { * Evaluate a Scheme component implementation and convert it to an * applicable lambda function. */ -const failable&)> > evalImplementation(const string& path, const value& impl, const list& px, unused modeval::ServerConf& sc) { +const failable&)> > evalImplementation(const string& path, const value& impl, const list& px) { const string fpath(path + attributeValue("script", impl)); ifstream is(fpath); if (fail(is)) diff --git a/sca-cpp/trunk/modules/server/mod-wiring.cpp b/sca-cpp/trunk/modules/server/mod-wiring.cpp index e21f8be773..c21b0fe254 100644 --- a/sca-cpp/trunk/modules/server/mod-wiring.cpp +++ b/sca-cpp/trunk/modules/server/mod-wiring.cpp @@ -82,7 +82,7 @@ int translateReference(request_rec *r) { // Find the requested component const ServerConf& sc = httpd::serverConf(r, &mod_tuscany_wiring); - const list rpath(httpd::pathValues(r->uri)); + const list rpath(pathValues(r->uri)); const list comp(assoctree(cadr(rpath), sc.references)); if (isNil(comp)) return HTTP_NOT_FOUND; @@ -147,20 +147,20 @@ int translateService(request_rec *r) { // Find the requested component const ServerConf& sc = httpd::serverConf(r, &mod_tuscany_wiring); - const list path(httpd::pathValues(r->uri)); - const list svc(assocPath(path, sc.services)); + const list p(pathValues(r->uri)); + const list svc(assocPath(p, sc.services)); if (isNil(svc)) return DECLINED; debug(svc, "modwiring::translateService::service"); // Build a component-name + path-info URI - const list target(cons(cadr(svc), httpd::pathInfo(path, car(svc)))); + const list target(cons(cadr(svc), httpd::pathInfo(p, car(svc)))); debug(target, "modwiring::translateService::target"); // Dispatch to the target component using a local internal redirect - const string p(httpd::path(target)); - debug(p, "modwiring::translateService::path"); - const string redir(string("/redirect:/components") + httpd::path(target)); + const string tp(path(target)); + debug(tp, "modwiring::translateService::path"); + const string redir(string("/redirect:/components") + tp); debug(redir, "modwiring::translateService::redirect"); r->filename = apr_pstrdup(r->pool, c_str(redir)); r->handler = "mod_tuscany_wiring"; @@ -216,7 +216,7 @@ const failable > readComponents(const string& path) { } /** - * Return a tree of component-name + references pairs. The references are + * Return a list of component-name + references pairs. The references are * arranged in trees of reference-name + reference-target pairs. */ const list componentReferenceToTargetTree(const value& c) { @@ -229,12 +229,8 @@ const list componentReferenceToTargetAssoc(const list& c) { return cons(componentReferenceToTargetTree(car(c)), componentReferenceToTargetAssoc(cdr(c))); } -const list componentReferenceToTargetTree(const list& c) { - return mkbtree(sort(componentReferenceToTargetAssoc(c))); -} - /** - * Return a tree of service-URI-path + component-name pairs. Service-URI-paths are + * Return a list of service-URI-path + component-name pairs. Service-URI-paths are * represented as lists of URI path fragments. */ const list defaultBindingURI(const string& cn, const string& sn) { @@ -247,7 +243,7 @@ const list bindingToComponentAssoc(const string& cn, const string& sn, co const value uri(scdl::uri(car(b))); if (isNil(uri)) return cons(mklist(defaultBindingURI(cn, sn), cn), bindingToComponentAssoc(cn, sn, cdr(b))); - return cons(mklist(httpd::pathValues(c_str(string(uri))), cn), bindingToComponentAssoc(cn, sn, cdr(b))); + return cons(mklist(pathValues(c_str(string(uri))), cn), bindingToComponentAssoc(cn, sn, cdr(b))); } const list serviceToComponentAssoc(const string& cn, const list& s) { @@ -266,10 +262,6 @@ const list uriToComponentAssoc(const list& c) { return append(serviceToComponentAssoc(scdl::name(car(c)), scdl::services(car(c))), uriToComponentAssoc(cdr(c))); } -const list uriToComponentTree(const list& c) { - return mkbtree(sort(uriToComponentAssoc(c))); -} - /** * Configure the components declared in the server's deployment composite. */ @@ -277,18 +269,55 @@ const bool confComponents(ServerConf& sc) { if (sc.contributionPath == "" || sc.compositeName == "") return true; - // Read the component configuration and store the references and service - // URIs in trees for fast retrieval later + // Read the component configuration and store the references and service URIs + // in trees for fast retrieval later const failable > comps = readComponents(sc.contributionPath + sc.compositeName); if (!hasContent(comps)) return true; - sc.references = componentReferenceToTargetTree(content(comps)); - debug(sc.references, "modwiring::confComponents::references"); - sc.services = uriToComponentTree(content(comps)); - debug(sc.services, "modwiring::confComponents::services"); + const list refs = componentReferenceToTargetAssoc(content(comps)); + debug(refs, "modwiring::confComponents::references"); + sc.references = mkbtree(sort(refs)); + + const list svcs = uriToComponentAssoc(content(comps)); + debug(svcs, "modwiring::confComponents::services"); + sc.services = mkbtree(sort(svcs)); return true; } +/** + * Called after all the configuration commands have been run. + * Process the server configuration and configure the wiring for the deployed components. + */ +int postConfig(unused apr_pool_t *p, unused apr_pool_t *plog, unused apr_pool_t *ptemp, server_rec *s) { + // Count the calls to post config, skip the first one as + // postConfig is always called twice + const string k("tuscany::modwiring::postConfig"); + const int count = (int)httpd::userData(k, s); + httpd::putUserData(k, (void*)(count + 1), s); + if (count == 0) + return OK; + + // Configure the wiring for the deployed components + ServerConf& sc = httpd::serverConf(s, &mod_tuscany_wiring); + debug(sc.wiringServerName, "modwiring::postConfig::wiringServerName"); + debug(sc.contributionPath, "modwiring::postConfig::contributionPath"); + debug(sc.compositeName, "modwiring::postConfig::compositeName"); + confComponents(sc); + return OK; +} + +/** + * Child process initialization. + */ +void childInit(apr_pool_t* p, server_rec* svr_rec) { + gc_scoped_pool pool(p); + ServerConf *conf = (ServerConf*)ap_get_module_config(svr_rec->module_config, &mod_tuscany_wiring); + if(conf == NULL) { + cerr << "[Tuscany] Due to one or more errors mod_tuscany_wiring loading failed. Causing apache to stop loading." << endl; + exit(APEXIT_CHILDFATAL); + } +} + /** * Configuration commands. */ @@ -328,33 +357,6 @@ const command_rec commands[] = { {NULL, NULL, NULL, 0, NO_ARGS, NULL} }; -int postConfig(unused apr_pool_t *p, unused apr_pool_t *plog, unused apr_pool_t *ptemp, server_rec *s) { - // Count the calls to post config, skip the first one as - // postConfig is always called twice - const string k("tuscany::modwiring::postConfig"); - const int count = httpd::userData(k, s); - httpd::putUserData(k, count +1, s); - if (count == 0) - return OK; - - // Configure the wiring for the deployed components - ServerConf& sc = httpd::serverConf(s, &mod_tuscany_wiring); - debug(sc.wiringServerName, "modwiring::postConfig::wiringServerName"); - debug(sc.contributionPath, "modwiring::postConfig::contributionPath"); - debug(sc.compositeName, "modwiring::postConfig::compositeName"); - confComponents(sc); - return OK; -} - -void childInit(apr_pool_t* p, server_rec* svr_rec) { - gc_scoped_pool pool(p); - ServerConf *conf = (ServerConf*)ap_get_module_config(svr_rec->module_config, &mod_tuscany_wiring); - if(conf == NULL) { - cerr << "[Tuscany] Due to one or more errors mod_tuscany_wiring loading failed. Causing apache to stop loading." << endl; - exit(APEXIT_CHILDFATAL); - } -} - void registerHooks(unused apr_pool_t *p) { ap_hook_post_config(postConfig, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE); diff --git a/sca-cpp/trunk/modules/server/server-test.scm b/sca-cpp/trunk/modules/server/server-test.scm index 8864f55cfd..490f168170 100644 --- a/sca-cpp/trunk/modules/server/server-test.scm +++ b/sca-cpp/trunk/modules/server/server-test.scm @@ -21,30 +21,24 @@ ; ATOMPub test case -(define (getall) - '("Sample Feed" "123456789" - ("Item" "111" ((javaClass "services.Item") (name "Apple") (currencyCode "USD") (currencySymbol "$") (price 2.99))) - ("Item" "222" ((javaClass "services.Item") (name "Orange") (currencyCode "USD") (currencySymbol "$") (price 3.55))) - ("Item" "333" ((javaClass "services.Item") (name "Pear") (currencyCode "USD") (currencySymbol "$") (price 1.55)))) -) - (define (get id) - (define entry '((javaClass "services.Item") (name "Apple") (currencyCode "USD") (currencySymbol "$") (price 2.99))) - (cons "Item" (list id entry)) + (if (nul id) + '("Sample Feed" "123456789" + ("Item" "111" ((javaClass "services.Item") (name "Apple") (currencyCode "USD") (currencySymbol "$") (price 2.99))) + ("Item" "222" ((javaClass "services.Item") (name "Orange") (currencyCode "USD") (currencySymbol "$") (price 3.55))) + ("Item" "333" ((javaClass "services.Item") (name "Pear") (currencyCode "USD") (currencySymbol "$") (price 1.55)))) + + '("Item" "111" ((javaClass "services.Item") (name "Apple") (currencyCode "USD") (currencySymbol "$") (price 2.99)))) ) -(define (post entry) - "123456789" +(define (post coll entry) + '("123456789") ) (define (put id entry) true ) -(define (deleteall) - true -) - (define (delete id) true ) diff --git a/sca-cpp/trunk/test/store-cpp/start b/sca-cpp/trunk/test/store-cpp/start index 06410aca01..3c1da356e6 100755 --- a/sca-cpp/trunk/test/store-cpp/start +++ b/sca-cpp/trunk/test/store-cpp/start @@ -25,8 +25,5 @@ SCAContribution `pwd`/ SCAComposite store.composite EOF +../../components/cache/memcached-start ../../modules/http/httpd-start tmp - -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & - diff --git a/sca-cpp/trunk/test/store-cpp/stop b/sca-cpp/trunk/test/store-cpp/stop index 07231ee7ce..a59273b8ed 100755 --- a/sca-cpp/trunk/test/store-cpp/stop +++ b/sca-cpp/trunk/test/store-cpp/stop @@ -18,6 +18,4 @@ # under the License. ../../modules/http/httpd-stop tmp -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` - +../../components/cache/memcached-stop diff --git a/sca-cpp/trunk/test/store-java/start b/sca-cpp/trunk/test/store-java/start index 3f669f0111..ae2743178a 100755 --- a/sca-cpp/trunk/test/store-java/start +++ b/sca-cpp/trunk/test/store-java/start @@ -27,8 +27,5 @@ EOF export CLASSPATH=`pwd`/../../modules/java/libmod-tuscany-java-1.0.jar:`pwd` +../../components/cache/memcached-start ../../modules/http/httpd-start tmp - -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & - diff --git a/sca-cpp/trunk/test/store-java/stop b/sca-cpp/trunk/test/store-java/stop index 54b22172bb..79d914e937 100755 --- a/sca-cpp/trunk/test/store-java/stop +++ b/sca-cpp/trunk/test/store-java/stop @@ -20,6 +20,4 @@ export CLASSPATH=`pwd`/../../modules/java/libmod-tuscany-java-1.0.jar:`pwd` ../../modules/http/httpd-stop tmp -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` - +../../components/cache/memcached-stop diff --git a/sca-cpp/trunk/test/store-python/start b/sca-cpp/trunk/test/store-python/start index 33758b75fd..93e1dbe755 100755 --- a/sca-cpp/trunk/test/store-python/start +++ b/sca-cpp/trunk/test/store-python/start @@ -25,8 +25,5 @@ SCAContribution `pwd`/ SCAComposite store.composite EOF +../../components/cache/memcached-start ../../modules/http/httpd-start tmp - -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & - diff --git a/sca-cpp/trunk/test/store-python/stop b/sca-cpp/trunk/test/store-python/stop index 07231ee7ce..a59273b8ed 100755 --- a/sca-cpp/trunk/test/store-python/stop +++ b/sca-cpp/trunk/test/store-python/stop @@ -18,6 +18,4 @@ # under the License. ../../modules/http/httpd-stop tmp -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` - +../../components/cache/memcached-stop diff --git a/sca-cpp/trunk/test/store-scheme/start b/sca-cpp/trunk/test/store-scheme/start index 398e21c8e4..db8e19c4cc 100755 --- a/sca-cpp/trunk/test/store-scheme/start +++ b/sca-cpp/trunk/test/store-scheme/start @@ -25,8 +25,5 @@ SCAContribution `pwd`/ SCAComposite store.composite EOF +../../components/cache/memcached-start ../../modules/http/httpd-start tmp - -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & - diff --git a/sca-cpp/trunk/test/store-scheme/stop b/sca-cpp/trunk/test/store-scheme/stop index 07231ee7ce..a59273b8ed 100755 --- a/sca-cpp/trunk/test/store-scheme/stop +++ b/sca-cpp/trunk/test/store-scheme/stop @@ -18,6 +18,4 @@ # under the License. ../../modules/http/httpd-stop tmp -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` - +../../components/cache/memcached-stop -- cgit v1.2.3