diff options
Diffstat (limited to '')
73 files changed, 1779 insertions, 700 deletions
diff --git a/sca-cpp/trunk/INSTALL b/sca-cpp/trunk/INSTALL index d6e5229683..2fe522d7e4 100644 --- a/sca-cpp/trunk/INSTALL +++ b/sca-cpp/trunk/INSTALL @@ -55,7 +55,7 @@ build it from source at git://code.stanziq.com//libstrophe To configure the Tuscany SCA build do this: -./autogen.sh +./bootstrap ./configure --prefix=<install dir> To enable debugging and strict warning compile options, add: @@ -64,7 +64,8 @@ To enable debugging and strict warning compile options, add: To enable gprof profiling, add: --enable-profiling -To enable multi-threading with the HTTPD worker MPM, add: +To enable multi-threading (required by the Queue and Chat components and +for running with the HTTPD worker or event multi-threaded MPMs): --enable-threads To enable support for Python component implementations: @@ -99,6 +100,7 @@ dependencies installed under $HOME: --with-libcurl=/usr --with-libxml2=/usr \ --with-js-include=/usr/include/xulrunner-1.9.1.7/unstable \ --with-js-lib=/usr/lib/xulrunner-1.9.1.7 \ +--enable-threads \ --enable-python --with-python=/usr \ --enable-java --with-java=/usr/lib/jvm/default-java \ --enable-webservice --with-axis2c=$HOME/axis2c-1.6.0-bin \ @@ -135,7 +137,7 @@ Building dependencies from source Here are example build and install steps for some of the dependencies. -Apache HTTPD, including APR: +Apache HTTPD, including APR, using the HTTP prefork MPM (recommended): ./configure --enable-ssl --enable-proxy --enable-rewrite --with-included-apr \ --with-mpm=prefork --prefix=$HOME/httpd-2.2.13-bin make @@ -150,6 +152,11 @@ make install export AXIS2C_HOME=$HOME/axis2c-1.6.0-bin Apache Qpid/C++: +git clone git://git.apache.org/qpid.git +cd qpid +git checkout -b 0.6-release origin/0.6-release +cd qpid/cpp +./bootstrap ./configure --prefix=$HOME/qpidc-0.6-bin make make install @@ -162,6 +169,7 @@ git submodule update aclocal automake --add-missing --foreign --copy autoconf -./configure +./configure --prefix=$HOME/libstrophe-bin make +make install diff --git a/sca-cpp/trunk/autogen.sh b/sca-cpp/trunk/bootstrap index af38864985..af38864985 100755 --- a/sca-cpp/trunk/autogen.sh +++ b/sca-cpp/trunk/bootstrap diff --git a/sca-cpp/trunk/components/cache/client-test.cpp b/sca-cpp/trunk/components/cache/client-test.cpp index c8b23b391e..ddf093a6dc 100644 --- a/sca-cpp/trunk/components/cache/client-test.cpp +++ b/sca-cpp/trunk/components/cache/client-test.cpp @@ -36,7 +36,7 @@ namespace tuscany { namespace cache { -const string url("http://localhost:8090/mcache"); +const string uri("http://localhost:8090/mcache"); bool testCache() { http::CURLSession cs; @@ -46,10 +46,12 @@ bool testCache() { + (list<value>() + "price" + string("$2.99")); const list<value> a = mklist<value>(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), i); - const failable<value> id = http::post(a, url, cs); + const failable<value> id = http::post(a, uri, cs); assert(hasContent(id)); + + const string p = path(content(id)); { - const failable<value> val = http::get(url + "/" + content(id), cs); + const failable<value> val = http::get(uri + p, cs); assert(hasContent(val)); assert(content(val) == a); } @@ -60,22 +62,22 @@ bool testCache() { const list<value> b = mklist<value>(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), j); { - const failable<value> r = http::put(b, url + "/" + content(id), cs); + const failable<value> r = http::put(b, uri + p, cs); assert(hasContent(r)); assert(content(r) == value(true)); } { - const failable<value> val = http::get(url + "/" + content(id), cs); + const failable<value> val = http::get(uri + p, cs); assert(hasContent(val)); assert(content(val) == b); } { - const failable<value> r = http::del(url + "/" + content(id), cs); + const failable<value> r = http::del(uri + p, cs); assert(hasContent(r)); assert(content(r) == value(true)); } { - const failable<value> val = http::get(url + "/" + content(id), cs); + const failable<value> val = http::get(uri + p, cs); assert(!hasContent(val)); } @@ -83,13 +85,13 @@ bool testCache() { } struct getLoop { - const value id; + const string path; const value entry; http::CURLSession cs; - getLoop(const value& id, const value& entry, http::CURLSession cs) : id(id), entry(entry), cs(cs) { + getLoop(const string& path, const value& entry, http::CURLSession cs) : path(path), entry(entry), cs(cs) { } const bool operator()() const { - const failable<value> val = http::get(url + "/" + id, cs); + const failable<value> val = http::get(uri + path, cs); assert(hasContent(val)); assert(content(val) == entry); return true; @@ -103,10 +105,11 @@ bool testGetPerf() { const value a = mklist<value>(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), i); http::CURLSession cs; - const failable<value> id = http::post(a, url, cs); + const failable<value> id = http::post(a, uri, cs); assert(hasContent(id)); + const string p = path(content(id)); - const lambda<bool()> gl = getLoop(content(id), a, cs); + const lambda<bool()> gl = getLoop(p, a, cs); cout << "Cache get test " << time(gl, 5, 200) << " ms" << endl; return true; diff --git a/sca-cpp/trunk/components/cache/mcache-test.cpp b/sca-cpp/trunk/components/cache/mcache-test.cpp index 90f17ffd48..316372c5be 100644 --- a/sca-cpp/trunk/components/cache/mcache-test.cpp +++ b/sca-cpp/trunk/components/cache/mcache-test.cpp @@ -33,33 +33,36 @@ namespace tuscany { namespace cache { bool testMemCached() { - MemCached ch; + MemCached ch("127.0.0.1", 11211); + const value k = mklist<value>("a"); - assert(hasContent(post("a", string("AAA"), ch))); - assert((get("a", ch)) == value(string("AAA"))); - assert(hasContent(put("a", string("aaa"), ch))); - assert((get("a", ch)) == value(string("aaa"))); - assert(hasContent(del("a", ch))); - assert(!hasContent(get("a", ch))); + assert(hasContent(post(k, string("AAA"), ch))); + assert((get(k, ch)) == value(string("AAA"))); + assert(hasContent(put(k, string("aaa"), ch))); + assert((get(k, ch)) == value(string("aaa"))); + assert(hasContent(del(k, ch))); + assert(!hasContent(get(k, ch))); return true; } struct getLoop { + const value k; MemCached& ch; - getLoop(MemCached& ch) : ch(ch) { + getLoop(const value& k, MemCached& ch) : k(k), ch(ch) { } const bool operator()() const { - assert((get("c", ch)) == value(string("CCC"))); + assert((get(k, ch)) == value(string("CCC"))); return true; } }; bool testGetPerf() { - MemCached ch; - assert(hasContent(post("c", string("CCC"), ch))); + const value k = mklist<value>("c"); + MemCached ch("127.0.0.1", 11211); + assert(hasContent(post(k, string("CCC"), ch))); - const lambda<bool()> gl = getLoop(ch); + const lambda<bool()> gl = getLoop(k, ch); cout << "Memcached get test " << time(gl, 5, 200) << " ms" << endl; return true; } diff --git a/sca-cpp/trunk/components/cache/mcache.cpp b/sca-cpp/trunk/components/cache/mcache.cpp index b1dc750ccb..50b642106f 100644 --- a/sca-cpp/trunk/components/cache/mcache.cpp +++ b/sca-cpp/trunk/components/cache/mcache.cpp @@ -34,14 +34,12 @@ #include "mcache.hpp" namespace tuscany { -namespace cache { - -cache::MemCached ch; +namespace mcache { /** * Get an item from the cache. */ -const failable<value> get(const list<value>& params) { +const failable<value> get(const list<value>& params, cache::MemCached& ch) { return cache::get(car(params), ch); } @@ -56,9 +54,9 @@ const value uuidValue() { return value(string(buf, APR_UUID_FORMATTED_LENGTH)); } -const failable<value> post(const list<value>& params) { - const value id = uuidValue(); - const failable<bool> val = cache::post(id, car(params), ch); +const failable<value> post(const list<value>& params, cache::MemCached& ch) { + const value id = append<value>(car(params), mklist(uuidValue())); + const failable<bool> val = cache::post(id, cadr(params), ch); if (!hasContent(val)) return mkfailure<value>(reason(val)); return id; @@ -67,7 +65,7 @@ const failable<value> post(const list<value>& params) { /** * Put an item into the cache. */ -const failable<value> put(const list<value>& params) { +const failable<value> put(const list<value>& params, cache::MemCached& ch) { const failable<bool> val = cache::put(car(params), cadr(params), ch); if (!hasContent(val)) return mkfailure<value>(reason(val)); @@ -77,13 +75,49 @@ const failable<value> put(const list<value>& params) { /** * Delete an item from the cache. */ -const failable<value> del(const list<value>& params) { +const failable<value> del(const list<value>& params, cache::MemCached& ch) { const failable<bool> val = cache::del(car(params), ch); if (!hasContent(val)) return mkfailure<value>(reason(val)); return value(content(val)); } +/** + * Component implementation lambda function. + */ +class applyCache { +public: + applyCache(cache::MemCached& ch) : ch(ch) { + } + + const value operator()(const list<value>& params) const { + const value func(car(params)); + if (func == "get") + return get(cdr(params), ch); + if (func == "post") + return post(cdr(params), ch); + if (func == "put") + return put(cdr(params), ch); + if (func == "delete") + return del(cdr(params), ch); + return tuscany::mkfailure<tuscany::value>(); + } + +private: + cache::MemCached& ch; +}; + +/** + * Start the component. + */ +const failable<value> start(unused const list<value>& params) { + // Connect to memcached + cache::MemCached& ch = *(new (gc_new<cache::MemCached>()) cache::MemCached("127.0.0.1", 11211)); + + // Return the component implementation lambda function + return value(lambda<value(const list<value>&)>(applyCache(ch))); +} + } } @@ -91,14 +125,8 @@ extern "C" { const tuscany::value apply(const tuscany::list<tuscany::value>& params) { const tuscany::value func(car(params)); - if (func == "get") - return tuscany::cache::get(cdr(params)); - if (func == "post") - return tuscany::cache::post(cdr(params)); - if (func == "put") - return tuscany::cache::put(cdr(params)); - if (func == "delete") - return tuscany::cache::del(cdr(params)); + if (func == "start" || func == "restart") + return tuscany::mcache::start(cdr(params)); return tuscany::mkfailure<tuscany::value>(); } diff --git a/sca-cpp/trunk/components/cache/mcache.hpp b/sca-cpp/trunk/components/cache/mcache.hpp index 9659c11788..4751975099 100644 --- a/sca-cpp/trunk/components/cache/mcache.hpp +++ b/sca-cpp/trunk/components/cache/mcache.hpp @@ -48,23 +48,28 @@ namespace cache { */ class MemCached { public: - MemCached() { - apr_pool_create(&pool, NULL); - apr_memcache_create(pool, 1, 0, &mc); - init("localhost", 11211); + MemCached() : owner(false) { } - MemCached(const string host, const int port) { + MemCached(const string host, const int port) : owner(true) { apr_pool_create(&pool, NULL); apr_memcache_create(pool, 1, 0, &mc); init(host, port); } + MemCached(const MemCached& c) : owner(false) { + pool = c.pool; + mc = c.mc; + } + ~MemCached() { + if (!owner) + return; apr_pool_destroy(pool); } private: + bool owner; apr_pool_t* pool; apr_memcache_t* mc; @@ -86,7 +91,6 @@ private: return mkfailure<bool>("Could not add server"); return true; } - }; /** diff --git a/sca-cpp/trunk/components/cache/memcached-start b/sca-cpp/trunk/components/cache/memcached-start new file mode 100755 index 0000000000..cd27faf046 --- /dev/null +++ b/sca-cpp/trunk/components/cache/memcached-start @@ -0,0 +1,21 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Start memcached +memcached -l 127.0.0.1 -m 4 -p 11211 & diff --git a/sca-cpp/trunk/components/cache/memcached-stop b/sca-cpp/trunk/components/cache/memcached-stop new file mode 100755 index 0000000000..b999228b46 --- /dev/null +++ b/sca-cpp/trunk/components/cache/memcached-stop @@ -0,0 +1,23 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Stop memcached +mc="memcached -l 127.0.0.1 -m 4 -p 11211" + +kill `ps -f | grep -v grep | grep "${mc}" | awk '{ print $2 }'` diff --git a/sca-cpp/trunk/components/cache/memcached-test b/sca-cpp/trunk/components/cache/memcached-test index df21d32a57..d4b9c04eda 100755 --- a/sca-cpp/trunk/components/cache/memcached-test +++ b/sca-cpp/trunk/components/cache/memcached-test @@ -18,14 +18,13 @@ # under the License. # Setup -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & +./memcached-start sleep 1 # Test -./mcache-test +./mcache-test 2>/dev/null rc=$? # Cleanup -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` +./memcached-stop return $rc diff --git a/sca-cpp/trunk/components/cache/server-test b/sca-cpp/trunk/components/cache/server-test index 821724295d..4942f547bc 100755 --- a/sca-cpp/trunk/components/cache/server-test +++ b/sca-cpp/trunk/components/cache/server-test @@ -26,18 +26,16 @@ SCAContribution `pwd`/ SCAComposite mcache.composite EOF +./memcached-start ../../modules/http/httpd-start tmp - -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & sleep 2 # Test -./client-test +./client-test 2>/dev/null rc=$? # Cleanup -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` ../../modules/http/httpd-stop tmp +./memcached-stop sleep 2 return $rc diff --git a/sca-cpp/trunk/components/chat/Makefile.am b/sca-cpp/trunk/components/chat/Makefile.am index ea63a475e2..11e3179e8a 100644 --- a/sca-cpp/trunk/components/chat/Makefile.am +++ b/sca-cpp/trunk/components/chat/Makefile.am @@ -17,15 +17,20 @@ if WANT_CHAT -INCLUDES = -I${LIBSTROPHE_INCLUDE} +noinst_PROGRAMS = xmpp-test client-test + +INCLUDES = -I${LIBSTROPHE_INCLUDE} -I${LIBSTROPHE_INCLUDE}/src compdir=$(prefix)/components/chat -comp_LTLIBRARIES = libchat-sender.la libchat-listener.la +comp_LTLIBRARIES = libchatter.la + +libchatter_la_SOURCES = chatter.cpp +libchatter_la_LDFLAGS = -L${LIBSTROPHE_LIB} -R${LIBSTROPHE_LIB} -lstrophe -lexpat -lssl -lresolv -libchat_sender_la_SOURCES = chat-sender.cpp -libchat_sender_la_LDFLAGS = -L${LIBSTROPHE_LIB} -R${LIBSTROPHE_LIB} -lstrophe -lexpat -lssl -lresolv +xmpp_test_SOURCES = xmpp-test.cpp +xmpp_test_LDFLAGS = -L${LIBSTROPHE_LIB} -R${LIBSTROPHE_LIB} -lstrophe -lexpat -lssl -lresolv -libchat_listener_la_SOURCES = chat-listener.cpp -libchat_listener_la_LDFLAGS = -L${LIBSTROPHE_LIB} -R${LIBSTROPHE_LIB} -lstrophe -lexpat -lssl -lresolv +client_test_SOURCES = client-test.cpp +client_test_LDFLAGS = -lxml2 -lcurl -lmozjs -L${LIBSTROPHE_LIB} -R${LIBSTROPHE_LIB} -lstrophe -lexpat -lssl -lresolv endif diff --git a/sca-cpp/trunk/components/chat/chat-listener.cpp b/sca-cpp/trunk/components/chat/chat-listener.cpp deleted file mode 100644 index 884965839a..0000000000 --- a/sca-cpp/trunk/components/chat/chat-listener.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* $Rev$ $Date$ */ - -/** - * XMPP chat listener component implementation. - */ - -#include "string.hpp" -#include "function.hpp" -#include "list.hpp" -#include "value.hpp" -#include "monad.hpp" -#include "xmpp.hpp" - -namespace tuscany { -namespace chat { - -/** - * Initialize the component. - */ -const failable<value> start(unused const list<value>& params) { - //TODO establish a session with an XMPP server for a JID and mark the current server instance busy - - return value(true); -} - -} -} - -extern "C" { - -const tuscany::value apply(const tuscany::list<tuscany::value>& params) { - const tuscany::value func(car(params)); - if (func == "start") - return tuscany::chat::start(cdr(params)); - return tuscany::mkfailure<tuscany::value>(); -} - -} diff --git a/sca-cpp/trunk/components/chat/chat-sender.cpp b/sca-cpp/trunk/components/chat/chat-sender.cpp deleted file mode 100644 index bd67bf7315..0000000000 --- a/sca-cpp/trunk/components/chat/chat-sender.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* $Rev$ $Date$ */ - -/** - * XMPP chat sender component implementation. - */ - -#include "string.hpp" -#include "function.hpp" -#include "list.hpp" -#include "value.hpp" -#include "monad.hpp" -#include "xmpp.hpp" - -namespace tuscany { -namespace chat { - -/** - * Post an item to an XMPP JID. - */ -const failable<value> post(unused const list<value>& params) { - - //TODO post the item - - return value(true); -} - -} -} - -extern "C" { - -const tuscany::value apply(const tuscany::list<tuscany::value>& params) { - const tuscany::value func(car(params)); - if (func == "post") - return tuscany::chat::post(cdr(params)); - return tuscany::mkfailure<tuscany::value>(); -} - -} diff --git a/sca-cpp/trunk/components/chat/chat.composite b/sca-cpp/trunk/components/chat/chat.composite index 2c19e2d0de..f02eed1418 100644 --- a/sca-cpp/trunk/components/chat/chat.composite +++ b/sca-cpp/trunk/components/chat/chat.composite @@ -22,17 +22,22 @@ targetNamespace="http://tuscany.apache.org/xmlns/sca/components" name="chat"> - <component name="chat-sender"> - <implementation.cpp path=".libs" library="libchat-sender"/> - <property name="jid">sample@apache.org</property> - <service name="chat-sender"> + <component name="print-sender"> + <implementation.cpp path=".libs" library="libchatter"/> + <property name="jid">sca1@localhost</property> + <property name="password">sca1</property> + <service name="print-sender"> <t:binding.http uri="print-sender"/> </service> </component> - <component name="chat-listener"> - <implementation.cpp path=".libs" library="libchat-listener"/> - <property name="jid">sample@apache.org</property> + <component name="print-chatter"> + <implementation.cpp path=".libs" library="libchatter"/> + <property name="jid">sca2@localhost</property> + <property name="password">sca2</property> + <service name="print-chatter"> + <t:binding.http uri="print-chatter"/> + </service> <reference name="relay" target="print"/> </component> @@ -41,6 +46,7 @@ <service name="print"> <t:binding.http uri="print"/> </service> + <reference name="report" target="print-sender"/> </component> </composite> diff --git a/sca-cpp/trunk/components/chat/chatter.cpp b/sca-cpp/trunk/components/chat/chatter.cpp new file mode 100644 index 0000000000..5a6d4909df --- /dev/null +++ b/sca-cpp/trunk/components/chat/chatter.cpp @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +/** + * XMPP chatter component implementation. + */ + +#include "string.hpp" +#include "function.hpp" +#include "list.hpp" +#include "value.hpp" +#include "monad.hpp" +#include "parallel.hpp" +#include "xmpp.hpp" + +namespace tuscany { +namespace chat { + +/** + * Post an item to an XMPP JID. + */ +const failable<value> post(const list<value>& params, XMPPClient& xc) { + const value to = car<value>(car(params)); + const value val = cadr(params); + debug(to, "chat::post::jid"); + debug(val, "chat::post::value"); + const failable<bool> r = post(to, val, xc); + if (!hasContent(r)) + return mkfailure<value>(reason(r)); + return value(mklist<value>(to)); +} + +/** + * A relay function that posts the XMPP messages it receives to a relay component reference. + */ +class relay { +public: + relay(const lambda<value(const list<value>&)>& rel) : rel(rel) { + } + + const failable<bool> operator()(const value& jid, const value& val, unused XMPPClient& xc) const { + if (isNil(rel)) + return true; + debug(jid, "chat::relay::jid"); + debug(val, "chat::relay::value"); + const value res = rel(mklist<value>("post", mklist<value>(jid), val)); + return true; + } + +private: + const lambda<value(const list<value>&)> rel; +}; + +/** + * Subscribe and listen to an XMPP session. + */ +class subscribe { +public: + subscribe(const lambda<failable<bool>(const value&, const value&, XMPPClient&)>& l, XMPPClient& xc) : l(l), xc(xc) { + } + + const failable<bool> operator()() const { + gc_pool pool; + debug("chat::subscribe::listen"); + const failable<bool> r = listen(l, const_cast<XMPPClient&>(xc)); + debug("chat::subscribe::stopped"); + return r; + } + +private: + const lambda<failable<bool>(const value&, const value&, XMPPClient&)> l; + XMPPClient xc; +}; + +/** + * Chatter component lambda function + */ +class chatter { +public: + chatter(XMPPClient& xc, worker& w) : xc(xc), w(w) { + } + + const value operator()(const list<value>& params) const { + const tuscany::value func(car(params)); + if (func == "post") + return post(cdr(params), const_cast<XMPPClient&>(xc)); + + // Stop the chatter component + if (func != "stop") + return tuscany::mkfailure<tuscany::value>(); + debug("chat::chatter::stop"); + + // Disconnect and shutdown the worker thread + disconnect(const_cast<XMPPClient&>(xc)); + shutdown(const_cast<worker&>(w)); + debug("chat::chatter::stopped"); + + return failable<value>(value(lambda<value(const list<value>&)>())); + } + +private: + const XMPPClient xc; + worker w; +}; + +/** + * Start the component. + */ +const failable<value> start(const list<value>& params) { + // Extract the relay reference and the XMPP JID and password + const bool hasRelay = !isNil(cddr(params)); + const value rel = hasRelay? car(params) : value(lambda<value(const list<value>&)>()); + const list<value> props = hasRelay? cdr(params) : params; + const value jid = ((lambda<value(list<value>)>)car(props))(list<value>()); + const value pass = ((lambda<value(list<value>)>)cadr(props))(list<value>()); + + // Create an XMPP client session + XMPPClient xc(jid, pass, false); + const failable<bool> r = connect(xc); + if (!hasContent(r)) + return mkfailure<value>(reason(r)); + + // Listen and relay messages in a worker thread + worker w(3); + const lambda<failable<bool>(const value&, const value&, XMPPClient&)> rl = relay(rel); + submit<failable<bool> >(w, lambda<failable<bool>()>(subscribe(rl, xc))); + + // Return the chatter component lambda function + return value(lambda<value(const list<value>&)>(chatter(xc, w))); +} + +} +} + +extern "C" { + +const tuscany::value apply(const tuscany::list<tuscany::value>& params) { + const tuscany::value func(car(params)); + if (func == "start" || func == "restart") + return tuscany::chat::start(cdr(params)); + return tuscany::mkfailure<tuscany::value>(); +} + +} diff --git a/sca-cpp/trunk/components/chat/client-test.cpp b/sca-cpp/trunk/components/chat/client-test.cpp new file mode 100644 index 0000000000..f9ca2cabd6 --- /dev/null +++ b/sca-cpp/trunk/components/chat/client-test.cpp @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +/** + * Test chat component. + */ + +#include <assert.h> +#include "stream.hpp" +#include "string.hpp" +#include "list.hpp" +#include "element.hpp" +#include "value.hpp" +#include "monad.hpp" +#include "perf.hpp" +#include "parallel.hpp" +#include "../../modules/http/curl.hpp" +#include "xmpp.hpp" + +namespace tuscany { +namespace chat { + +const value jid1("sca1@localhost"); +const value pass1("sca1"); +const value jid2("sca2@localhost"); +const value pass2("sca2"); +const value jid3("sca3@localhost"); +const value pass3("sca3"); + +const list<value> item = list<value>() + + (list<value>() + "name" + string("Apple")) + + (list<value>() + "price" + string("$2.99")); +const list<value> entry = mklist<value>(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), item); + +worker w(2); +bool received; + +const failable<bool> listener(const value& from, const value& val, unused XMPPClient& xc) { + assert(contains(from, "sca2@localhost")); + assert(val == entry); + received = true; + return false; +} + +struct subscribe { + XMPPClient& xc; + subscribe(XMPPClient& xc) : xc(xc) { + } + const failable<bool> operator()() const { + const lambda<failable<bool>(const value&, const value&, XMPPClient&)> l(listener); + listen(l, xc); + return true; + } +}; + +bool testListen() { + received = false; + XMPPClient& xc = *(new (gc_new<XMPPClient>()) XMPPClient(jid3, pass3)); + const failable<bool> c = connect(xc); + assert(hasContent(c)); + const lambda<failable<bool>()> subs = subscribe(xc); + submit(w, subs); + return true; +} + +bool testPost() { + gc_scoped_pool pool; + http::CURLSession ch; + const failable<value> id = http::post(entry, "http://localhost:8090/print-sender/sca2@localhost", ch); + assert(hasContent(id)); + return true; +} + +bool testReceived() { + shutdown(w); + assert(received == true); + return true; +} + +} +} + +int main() { + tuscany::cout << "Testing..." << tuscany::endl; + + tuscany::chat::testListen(); + tuscany::chat::testPost(); + tuscany::chat::testReceived(); + + tuscany::cout << "OK" << tuscany::endl; + + return 0; +} diff --git a/sca-cpp/trunk/components/chat/server-test b/sca-cpp/trunk/components/chat/server-test new file mode 100755 index 0000000000..919a798fa5 --- /dev/null +++ b/sca-cpp/trunk/components/chat/server-test @@ -0,0 +1,39 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Setup +../../modules/http/httpd-conf tmp 8090 ../../modules/http/htdocs +../../modules/server/server-conf tmp +../../modules/server/scheme-conf tmp +cat >>tmp/conf/httpd.conf <<EOF +SCAContribution `pwd`/ +SCAComposite chat.composite +EOF + +../../modules/http/httpd-start tmp +sleep 2 + +# Test +./client-test 2>/dev/null +rc=$? + +# Cleanup +../../modules/http/httpd-stop tmp +sleep 1 +return $rc diff --git a/sca-cpp/trunk/components/chat/server-test.scm b/sca-cpp/trunk/components/chat/server-test.scm index 17a42ed795..a6023708e1 100644 --- a/sca-cpp/trunk/components/chat/server-test.scm +++ b/sca-cpp/trunk/components/chat/server-test.scm @@ -17,4 +17,4 @@ ; Chat test case -(define (print x) (display x)) +(define (post key val report) (report "post" '("sca3@localhost") val)) diff --git a/sca-cpp/trunk/components/chat/xmpp-test.cpp b/sca-cpp/trunk/components/chat/xmpp-test.cpp new file mode 100644 index 0000000000..6b7fa3439f --- /dev/null +++ b/sca-cpp/trunk/components/chat/xmpp-test.cpp @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +/** + * Test XMPP support functions. + */ + +#include <assert.h> +#include "stream.hpp" +#include "string.hpp" +#include "list.hpp" +#include "element.hpp" +#include "monad.hpp" +#include "value.hpp" +#include "perf.hpp" +#include "parallel.hpp" +#include "xmpp.hpp" + +namespace tuscany { +namespace chat { + +const value jid1("sca1@localhost"); +const value pass1("sca1"); +const value jid2("sca2@localhost"); +const value pass2("sca2"); + +worker w(2); +bool received; + +const failable<bool> listener(const value& from, const value& val, unused XMPPClient& xc) { + assert(contains(from, "sca1@localhost")); + assert(val == "hey"); + received = true; + return false; +} + +struct subscribe { + XMPPClient& xc; + subscribe(XMPPClient& xc) : xc(xc) { + } + const failable<bool> operator()() const { + const lambda<failable<bool>(const value&, const value&, XMPPClient&)> l(listener); + listen(l, xc); + return true; + } +}; + +bool testListen() { + received = false; + XMPPClient& xc = *(new (gc_new<XMPPClient>()) XMPPClient(jid2, pass2)); + const failable<bool> c = connect(xc); + assert(hasContent(c)); + const lambda<failable<bool>()> subs = subscribe(xc); + submit(w, subs); + return true; +} + +bool testPost() { + XMPPClient xc(jid1, pass1); + const failable<bool> c = connect(xc); + assert(hasContent(c)); + const failable<bool> p = post(jid2, "hey", xc); + assert(hasContent(p)); + return true; +} + +bool testReceived() { + shutdown(w); + assert(received == true); + return true; +} + +} +} + +int main() { + tuscany::cout << "Testing..." << tuscany::endl; + + tuscany::chat::testListen(); + tuscany::chat::testPost(); + tuscany::chat::testReceived(); + + tuscany::cout << "OK" << tuscany::endl; + return 0; +} diff --git a/sca-cpp/trunk/components/chat/xmpp.hpp b/sca-cpp/trunk/components/chat/xmpp.hpp index b9508f0a53..0f84d5ec4a 100644 --- a/sca-cpp/trunk/components/chat/xmpp.hpp +++ b/sca-cpp/trunk/components/chat/xmpp.hpp @@ -26,11 +26,303 @@ * XMPP support functions. */ +#include <apr_uuid.h> + #include "strophe.h" +extern "C" { +#include "common.h" +} +#include "string.hpp" +#include "list.hpp" +#include "value.hpp" +#include "monad.hpp" +#include "../../modules/scheme/eval.hpp" namespace tuscany { namespace chat { +/** + * XMPP runtime, one per process. + */ +class XMPPRuntime { +public: + XMPPRuntime() { + xmpp_initialize(); + log = xmpp_get_default_logger(XMPP_LEVEL_DEBUG); + } + + ~XMPPRuntime() { + xmpp_shutdown(); + } + +private: + friend class XMPPClient; + xmpp_log_t* log; + +} xmppRuntime; + +/** + * Represents an XMPP client. + */ +const string resourceUUID() { + apr_uuid_t uuid; + apr_uuid_get(&uuid); + char buf[APR_UUID_FORMATTED_LENGTH]; + apr_uuid_format(buf, &uuid); + return string(buf, APR_UUID_FORMATTED_LENGTH); +} + +class XMPPClient { +public: + XMPPClient(const string& jid, const string& pass, bool owner = true) : owner(owner), ctx(xmpp_ctx_new(NULL, xmppRuntime.log)), conn(xmpp_conn_new(ctx)), connecting(false), connected(false), disconnecting(false) { + xmpp_conn_set_jid(conn, c_str(jid + "/" + resourceUUID())); + xmpp_conn_set_pass(conn, c_str(pass)); + } + + XMPPClient(const XMPPClient& xc) : owner(false), ctx(xc.ctx), conn(xc.conn), listener(xc.listener), connecting(xc.connecting), connected(xc.connected), disconnecting(xc.disconnecting) { + } + + ~XMPPClient() { + extern const failable<bool> disconnect(XMPPClient& xc); + if (!owner) + return; + if (!disconnecting) + disconnect(*this); + xmpp_conn_release(conn); + xmpp_ctx_free(ctx); + } + +private: + friend int versionHandler(xmpp_conn_t* const conn, xmpp_stanza_t* const stanza, void* const udata); + friend void connHandler(xmpp_conn_t * const conn, const xmpp_conn_event_t status, const int err, xmpp_stream_error_t* const errstream, void *const udata); + friend int messageHandler(xmpp_conn_t* const conn, xmpp_stanza_t* const stanza, void* const udata); + friend const failable<bool> connect(XMPPClient& xc); + friend const failable<int> send(const char* data, const int len, XMPPClient& xc); + friend const failable<int> send(xmpp_stanza_t* const stanza, XMPPClient& xc); + friend const failable<bool> post(const value& to, const value& val, XMPPClient& xc); + friend const failable<bool> disconnect(XMPPClient& xc); + friend const failable<bool> listen(const lambda<failable<bool>(const value&, const value&, XMPPClient&)>& listener, XMPPClient& xc); + + const bool owner; + xmpp_ctx_t* ctx; + xmpp_conn_t* conn; + lambda<failable<bool>(const value&, const value&, XMPPClient&)> listener; + bool connecting; + bool connected; + bool disconnecting; +}; + +/** + * Make a text stanza. + */ +xmpp_stanza_t* textStanza(const char* text, xmpp_ctx_t* ctx) { + xmpp_stanza_t* stanza = xmpp_stanza_new(ctx); + xmpp_stanza_set_text(stanza, text); + return stanza; +} + +/** + * Make a named stanza. + */ +xmpp_stanza_t* namedStanza(const char* ns, const char* name, xmpp_ctx_t* ctx) { + xmpp_stanza_t* stanza = xmpp_stanza_new(ctx); + xmpp_stanza_set_name(stanza, name); + if (ns != NULL) + xmpp_stanza_set_ns(stanza, ns); + return stanza; +} + +/** + * Make a named stanza using a qualified name. + */ +xmpp_stanza_t* namedStanza(const char* name, xmpp_ctx_t* ctx) { + xmpp_stanza_t* stanza = xmpp_stanza_new(ctx); + xmpp_stanza_set_name(stanza, name); + return stanza; +} + +/** + * XMPP version handler. + */ +int versionHandler(xmpp_conn_t* const conn, xmpp_stanza_t* const stanza, void* const udata) { + XMPPClient& xc = *(XMPPClient*)udata; + + // Build version reply stanza + xmpp_stanza_t* reply = namedStanza("iq", xc.ctx); + xmpp_stanza_set_type(reply, "result"); + xmpp_stanza_set_id(reply, xmpp_stanza_get_id(stanza)); + xmpp_stanza_set_attribute(reply, "to", xmpp_stanza_get_attribute(stanza, "from")); + xmpp_stanza_t* query = namedStanza(xmpp_stanza_get_ns(xmpp_stanza_get_children(stanza)), "query", xc.ctx); + xmpp_stanza_add_child(reply, query); + xmpp_stanza_t* name = namedStanza("name", xc.ctx); + xmpp_stanza_add_child(query, name); + xmpp_stanza_add_child(name, textStanza("Apache Tuscany", xc.ctx)); + xmpp_stanza_t* version = namedStanza("version", xc.ctx); + xmpp_stanza_add_child(query, version); + xmpp_stanza_add_child(version, textStanza("1.0", xc.ctx)); + + // Send it + xmpp_send(conn, reply); + xmpp_stanza_release(reply); + return 1; +} + +/** + * XMPP message handler + */ +int messageHandler(unused xmpp_conn_t* const conn, xmpp_stanza_t* const stanza, void* const udata) { + // Ignore noise + if(xmpp_stanza_get_child_by_name(stanza, "body") == NULL) + return 1; + if(!strcmp(xmpp_stanza_get_attribute(stanza, "type"), "error")) + return 1; + + // Call the client listener function + XMPPClient& xc = *(XMPPClient*)udata; + const char* from = xmpp_stanza_get_attribute(stanza, "from"); + const char* text = xmpp_stanza_get_text(xmpp_stanza_get_child_by_name(stanza, "body")); + if (isNil(xc.listener)) + return 1; + const value val(scheme::readValue(text)); + debug(from, "chat::messageHandler::from"); + debug(val, "chat::messageHandler::body"); + const failable<bool> r = xc.listener(value(string(from)), val, xc); + if (!hasContent(r) || !content(r)) { + // Stop listening + xc.listener = lambda<failable<bool>(const value&, const value&, XMPPClient&)>(); + return 0; + } + return 1; +} + +/** + * XMPP connection handler. + */ +void connHandler(xmpp_conn_t * const conn, const xmpp_conn_event_t status, unused const int err, unused xmpp_stream_error_t* const errstream, void *const udata) { + XMPPClient& xc = *(XMPPClient*)udata; + xc.connecting = false; + + if (status == XMPP_CONN_CONNECT) { + debug("chat::connHandler::connected"); + xmpp_handler_add(conn, versionHandler, "jabber:iq:version", "iq", NULL, &xc); + + // Send a <presence/> stanza so that we appear online to contacts + xmpp_stanza_t* pres = xmpp_stanza_new(xc.ctx); + xmpp_stanza_set_name(pres, "presence"); + xmpp_send(conn, pres); + xmpp_stanza_release(pres); + xc.connected = true; + return; + } + + debug("chat::connHandler::disconnected"); + xc.connected = false; + if (xc.ctx->loop_status == XMPP_LOOP_RUNNING) + xc.ctx->loop_status = XMPP_LOOP_QUIT; +} + +/** + * Connect to an XMPP server. + */ +const failable<bool> connect(XMPPClient& xc) { + xc.connecting = true; + xmpp_connect_client(xc.conn, NULL, 0, connHandler, &xc); + while(xc.connecting) + xmpp_run_once(xc.ctx, 20L); + if (!xc.connected) + return mkfailure<bool>("Couldn't connect to XMPP server"); + return true; +} + +/** + * Send a buffer on an XMPP session. + */ +const failable<int> send(const char* data, const int len, XMPPClient& xc) { + if (len == 0) + return 0; + const int written = xc.conn->tls? tls_write(xc.conn->tls, data, len) : sock_write(xc.conn->sock, data, len); + if (written < 0) { + xc.conn->error = xc.conn->tls? tls_error(xc.conn->tls) : sock_error(); + return mkfailure<int>("Couldn't send stanza to XMPP server"); + } + return send(data + written, len - written, xc); +} + +/** + * Send a string on an XMPP session. + */ +const failable<int> send(const string& data, XMPPClient& xc) { + return send(c_str(data), length(data), xc); +} + +/** + * Send a stanza on an XMPP session. + */ +const failable<int> send(xmpp_stanza_t* const stanza, XMPPClient& xc) { + char *buf; + size_t len; + const int rc = xmpp_stanza_to_text(stanza, &buf, &len); + if (rc != 0) + return mkfailure<int>("Couldn't convert stanza to text"); + const failable<int> r = send(buf, len, xc); + if (!hasContent(r)) { + xmpp_free(xc.conn->ctx, buf); + return r; + } + xmpp_debug(xc.conn->ctx, "conn", "SENT: %s", buf); + xmpp_free(xc.conn->ctx, buf); + return content(r); +} + +/** + * Post a message to an XMPP jid. + */ +const failable<bool> post(const value& to, const value& val, XMPPClient& xc) { + debug(to, "chat::post::to"); + debug(val, "chat::post::body"); + + // Convert the value to a string + const string vs(scheme::writeValue(val)); + + // Build message stanza + xmpp_stanza_t* stanza = namedStanza("message", xc.ctx); + xmpp_stanza_set_type(stanza, "chat"); + xmpp_stanza_set_attribute(stanza, "to", c_str(string(to))); + xmpp_stanza_t* body = namedStanza("body", xc.ctx); + xmpp_stanza_add_child(stanza, body); + xmpp_stanza_add_child(body, textStanza(c_str(vs), xc.ctx)); + + // Send it + const failable<int> r = send(stanza, xc); + xmpp_stanza_release(stanza); + if (!hasContent(r)) + return mkfailure<bool>(reason(r)); + return true; +} + +/** + * Disconnect an XMPP session. + */ +const failable<bool> disconnect(XMPPClient& xc) { + xc.disconnecting = true; + const failable<int> r = send("</stream:stream>", xc); + if (!hasContent(r)) + return mkfailure<bool>(reason(r)); + return true; +} + +/** + * Listen to messages received by an XMPP client. + */ +const failable<bool> listen(const lambda<failable<bool>(const value&, const value&, XMPPClient&)>& listener, XMPPClient& xc) { + debug("chat::listen"); + xc.listener = listener; + xmpp_handler_add(xc.conn, messageHandler, NULL, "message", NULL, &xc); + while(xc.connected && !isNil(xc.listener)) + xmpp_run_once(xc.ctx, 1000L); + return true; +} + } } diff --git a/sca-cpp/trunk/components/queue/Makefile.am b/sca-cpp/trunk/components/queue/Makefile.am index 6bbdc119b4..09ff0e54a4 100644 --- a/sca-cpp/trunk/components/queue/Makefile.am +++ b/sca-cpp/trunk/components/queue/Makefile.am @@ -35,11 +35,11 @@ qpid_test_SOURCES = qpid-test.cpp qpid_test_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient client_test_SOURCES = client-test.cpp -client_test_LDFLAGS = -lxml2 -lcurl -lmozjs +client_test_LDFLAGS = -lxml2 -lcurl -lmozjs -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient qpidc.prefix: $(top_builddir)/config.status echo ${QPIDC_PREFIX} >qpidc.prefix -#TESTS = qpid-test server-test +TESTS = send-test server-test endif diff --git a/sca-cpp/trunk/components/queue/client-test.cpp b/sca-cpp/trunk/components/queue/client-test.cpp index 121a739e0d..a448d1fccd 100644 --- a/sca-cpp/trunk/components/queue/client-test.cpp +++ b/sca-cpp/trunk/components/queue/client-test.cpp @@ -32,28 +32,54 @@ #include "monad.hpp" #include "perf.hpp" #include "../../modules/http/curl.hpp" +#include "qpid.hpp" + +// Ignore conversion issues and redundant declarations in Qpid headers +#ifdef WANT_MAINTAINER_MODE +#pragma GCC diagnostic ignored "-Wconversion" +#pragma GCC diagnostic ignored "-Wredundant-decls" +#endif namespace tuscany { namespace queue { +const value key(mklist<value>(string("report"))); +const string qname("reportq"); + +const list<value> item = list<value>() + + (list<value>() + "name" + string("Apple")) + + (list<value>() + "price" + string("$2.99")); +const list<value> entry = mklist<value>(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), item); + +bool testDeclareQueue() { + QpidConnection qc; + QpidSession qs(qc); + const failable<bool> r = declareQueue(key, qname, qs); + assert(hasContent(r)); + return true; +} + +const bool listener(const value& k, const value& v) { + cerr << "k " << k << " v " << v << endl; + assert(k == key); + assert(v == entry); + return false; +} + +bool testListen() { + QpidConnection qc; + QpidSession qs(qc); + QpidSubscription qsub(qs); + const lambda<bool(const value&, const value&)> l(listener); + listen(qname, l, qsub); + return true; +} bool testPost() { - http::CURLSession cs; - - const value func = "http://ws.apache.org/axis2/c/samples/echoString"; - const list<value> arg = mklist<value>( - list<value>() + "ns1:echoString" - + (list<value>() + "@xmlns:ns1" + string("http://ws.apache.org/axis2/services/echo")) - + (list<value>() + "text" + string("Hello World!"))); - - const failable<value> rval = http::evalExpr(mklist<value>(func, arg), "http://localhost:8090/echo-client", cs); - assert(hasContent(rval)); - - const list<value> r = mklist<value>( - list<value>() + "ns1:echoString" - + (list<value>() + "@xmlns:ns1" + string("http://ws.apache.org/axis2/c/samples")) - + (list<value>() + "text" + string("Hello World!"))); - assert(content(rval) == r); + gc_scoped_pool pool; + http::CURLSession ch; + const failable<value> id = http::post(entry, "http://localhost:8090/print-sender", ch); + assert(hasContent(id)); return true; } @@ -63,7 +89,9 @@ bool testPost() { int main() { tuscany::cout << "Testing..." << tuscany::endl; + tuscany::queue::testDeclareQueue(); tuscany::queue::testPost(); + tuscany::queue::testListen(); tuscany::cout << "OK" << tuscany::endl; diff --git a/sca-cpp/trunk/components/queue/qpid-test.cpp b/sca-cpp/trunk/components/queue/qpid-test.cpp index 97d8d2f5d5..1a650157b2 100644 --- a/sca-cpp/trunk/components/queue/qpid-test.cpp +++ b/sca-cpp/trunk/components/queue/qpid-test.cpp @@ -33,20 +33,51 @@ #include "perf.hpp" #include "qpid.hpp" +// Ignore conversion issues and redundant declarations in Qpid headers +#ifdef WANT_MAINTAINER_MODE +#pragma GCC diagnostic ignored "-Wconversion" +#pragma GCC diagnostic ignored "-Wredundant-decls" +#endif + namespace tuscany { namespace queue { -bool testPost() { +const value key(mklist<value>("test")); +const string qname("testq"); + +bool testDeclareQueue() { QpidConnection qc; + QpidSession qs(qc); + const failable<bool> r = declareQueue(key, qname, qs); + assert(hasContent(r)); + return true; +} + +const list<value> item = list<value>() + + (list<value>() + "name" + string("Apple")) + + (list<value>() + "price" + string("$2.99")); +const list<value> entry = mklist<value>(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), item); +bool testPost() { + QpidConnection qc; QpidSession qs(qc); + const failable<bool> r = post(key, entry, qs); + assert(hasContent(r)); + return true; +} - // Post the item - const list<value> params; - const value key = ((lambda<value(list<value>)>)cadr(params))(list<value>()); - post(key, car(params), qs); +const bool listener(const value& k, const value& v) { + assert(k == key); + assert(v == entry); + return false; +} - return value(true); +bool testListen() { + QpidConnection qc; + QpidSession qs(qc); + QpidSubscription qsub(qs); + const lambda<bool(const value&, const value&)> l(listener); + listen(qname, l, qsub); return true; } @@ -56,7 +87,9 @@ bool testPost() { int main() { tuscany::cout << "Testing..." << tuscany::endl; + tuscany::queue::testDeclareQueue(); tuscany::queue::testPost(); + tuscany::queue::testListen(); tuscany::cout << "OK" << tuscany::endl; diff --git a/sca-cpp/trunk/components/queue/qpid.hpp b/sca-cpp/trunk/components/queue/qpid.hpp index e6be319be0..8b466cedcc 100644 --- a/sca-cpp/trunk/components/queue/qpid.hpp +++ b/sca-cpp/trunk/components/queue/qpid.hpp @@ -34,6 +34,8 @@ #include <qpid/client/Connection.h> #include <qpid/client/Session.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> #include "string.hpp" #include "list.hpp" @@ -53,6 +55,10 @@ public: c.open("localhost", 5672); } + QpidConnection(const bool owner) : owner(owner) { + c.open("localhost", 5672); + } + QpidConnection(const QpidConnection& qc) : owner(false), c(qc.c) { } @@ -63,6 +69,7 @@ public: } private: + friend const failable<bool> close(QpidConnection& qc); friend class QpidSession; const bool owner; @@ -71,6 +78,14 @@ private: }; /** + * Close a Qpid connection. + */ +const failable<bool> close(QpidConnection& qc) { + qc.c.close(); + return true; +} + +/** * Represents a Qpid session. */ class QpidSession { @@ -78,6 +93,9 @@ public: QpidSession(QpidConnection& qc) : owner(true), s(qc.c.newSession()) { } + QpidSession(QpidConnection& qc, const bool owner) : owner(owner), s(qc.c.newSession()) { + } + QpidSession(const QpidSession& qs) : owner(false), s(qs.s) { } @@ -88,38 +106,145 @@ public: } private: - friend qpid::client::Session session(const QpidSession& qs); + friend const failable<bool> close(QpidSession& qs); + friend const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs); + friend const failable<bool> post(const value& key, const value& val, QpidSession& qs); + friend class QpidSubscription; const bool owner; qpid::client::Session s; }; -qpid::client::Session session(const QpidSession& qs) { - return qs.s; +/** + * Close a Qpid session. + */ +const failable<bool> close(QpidSession& qs) { + try { + qs.s.close(); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + return true; } /** * Declare a key / AMQP queue pair. */ -const failable<bool> declareQueue(const string& key, const string& name, QpidSession& qs) { - session(qs).queueDeclare(qpid::client::arg::queue=c_str(name)); - session(qs).exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(key)); +const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs) { + const string ks(scheme::writeValue(key)); + try { + qs.s.queueDeclare(qpid::client::arg::queue=c_str(name)); + qs.s.exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(ks)); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } return true; } /** - * Post a key / value pair in message to an AMQP broker. + * Post a key / value pair message to an AMQP broker. */ -const failable<bool> post(const string& key, const value& val, QpidSession& qs) { +const failable<bool> post(const value& key, const value& val, QpidSession& qs) { - // Convert the value to a string + // Send in a message with the given key. + const string ks(scheme::writeValue(key)); const string vs(scheme::writeValue(val)); + try { + qpid::client::Message message; + message.getDeliveryProperties().setRoutingKey(c_str(ks)); + message.setData(c_str(vs)); + qs.s.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct"); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + return true; +} - // Send in a message with the given key. - qpid::client::Message message; - message.getDeliveryProperties().setRoutingKey(c_str(key)); - message.setData(c_str(vs)); - session(qs).messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct"); +/** + * Represents a Qpid subscription. + */ +class QpidSubscription { +public: + QpidSubscription(QpidSession& qs) : owner(true), subs(qs.s) { + } + + QpidSubscription(QpidSession& qs, const bool owner) : owner(owner), subs(qs.s) { + } + + QpidSubscription(const QpidSubscription& qsub) : owner(false), subs(qsub.subs) { + } + + ~QpidSubscription() { + if (!owner) + return; + try { + subs.stop(); + } catch (const qpid::Exception& e) { + mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + } + +private: + friend const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub); + friend const failable<bool> stop(QpidSubscription& qsub); + + const bool owner; + qpid::client::SubscriptionManager subs; +}; + +/** + * Register a listener function with an AMQP queue. + */ +class Listener : public qpid::client::MessageListener { +public: + Listener(const lambda<bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) { + } + + virtual void received(qpid::client::Message& msg) { + + // Call the listener function + const value k(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str())); + const value v(scheme::readValue(msg.getData().c_str())); + const bool r = l(k, v); + if (!r) { + try { + subs.cancel(msg.getDestination()); + } catch (const qpid::Exception& e) { + mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + } + } + +private: + const lambda<bool(const value&, const value&)> l; + qpid::client::SubscriptionManager& subs; +}; + + +const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub) { + debug("queue::listen"); + Listener listener(l, qsub.subs); + try { + qsub.subs.subscribe(listener, c_str(name)); + qsub.subs.run(); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + debug("queue::listen::stopped"); + return true; +} + +/** + * Stop an AMQP subscription. + */ +const failable<bool> stop(QpidSubscription& qsub) { + debug("queue::stop"); + try { + qsub.subs.stop(); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + debug("queue::stopped"); return true; } diff --git a/sca-cpp/trunk/components/queue/qpidd-start b/sca-cpp/trunk/components/queue/qpidd-start new file mode 100755 index 0000000000..02e048c41e --- /dev/null +++ b/sca-cpp/trunk/components/queue/qpidd-start @@ -0,0 +1,24 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Start qpidd +here=`readlink -f $0`; here=`dirname $here` + +qpid_prefix=`cat $here/qpidc.prefix` +$qpid_prefix/sbin/qpidd & diff --git a/sca-cpp/trunk/components/queue/qpidd-stop b/sca-cpp/trunk/components/queue/qpidd-stop new file mode 100755 index 0000000000..6fb0467cff --- /dev/null +++ b/sca-cpp/trunk/components/queue/qpidd-stop @@ -0,0 +1,26 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Stop qpidd +here=`readlink -f $0`; here=`dirname $here` + +qpid_prefix=`cat $here/qpidc.prefix` +qpidd="$qpid_prefix/sbin/qpidd" + +kill `ps -f | grep -v grep | grep "${qpidd}" | awk '{ print $2 }'` diff --git a/sca-cpp/trunk/components/queue/queue-listener.cpp b/sca-cpp/trunk/components/queue/queue-listener.cpp index 75c53e7b41..57a63b620c 100644 --- a/sca-cpp/trunk/components/queue/queue-listener.cpp +++ b/sca-cpp/trunk/components/queue/queue-listener.cpp @@ -28,6 +28,7 @@ #include "list.hpp" #include "value.hpp" #include "monad.hpp" +#include "parallel.hpp" #include "qpid.hpp" // Ignore conversion issues and redundant declarations in Qpid headers @@ -39,22 +40,107 @@ namespace tuscany { namespace queue { -QpidConnection qc; +/** + * A relay function that posts the AMQP messages it receives to a relay component reference. + */ +class relay { +public: + relay(const lambda<value(const list<value>&)>& rel) : rel(rel) { + } + + const bool operator()(const value& k, const value& v) const { + debug(k, "queue::relay::key"); + debug(v, "queue::relay::value"); + const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v)); + return true; + } + +private: + const lambda<value(const list<value>&)> rel; +}; + +/** + * Subscribe and listen to an AMQP queue. + */ +class subscribe { +public: + subscribe(const string& qname, const lambda<bool(const value&, const value&)>& l, const QpidSubscription& qsub) : qname(qname), l(l), qsub(qsub) { + } + + const failable<bool> operator()() const { + gc_pool pool; + debug(qname, "queue::subscribe::listen"); + const failable<bool> r = listen(qname, l, const_cast<QpidSubscription&>(qsub)); + debug(qname, "queue::subscribe::stopped"); + return r; + } + +private: + const string qname; + const lambda<bool(const value&, const value&)> l; + const QpidSubscription qsub; +}; /** - * Initialize the component. + * Listener lambda function, responsible for starting an AMQP subscription in a worker thread, and + * apply any function calls to the listener component. The only supported function is stop(), + * called to stop the listener component and shutdown the worker thread. + */ +class listener { +public: + listener(QpidConnection& qc, QpidSession& qs, QpidSubscription& qsub, worker& w) : qc(qc), qs(qs), qsub(qsub), w(w) { + } + + const value operator()(const list<value>& params) const { + const tuscany::value func(car(params)); + + // Stop the component + if (func != "stop") + return tuscany::mkfailure<tuscany::value>(); + debug("queue::listener::stop"); + + // TODO check why stop() and close() hang in child processes + //stop(const_cast<QpidSubscription&>(qsub)); + //close(const_cast<QpidSession&>(qs)); + //close(const_cast<QpidConnection&>(qc)); + cancel(const_cast<worker&>(w)); + + debug("queue::listener::stopped"); + return failable<value>(value(lambda<value(const list<value>&)>())); + } + +private: + QpidConnection qc; + QpidSession qs; + QpidSubscription qsub; + worker w; +}; + +/** + * Start the component. */ const failable<value> start(const list<value>& params) { - QpidSession qs(qc); + // Extract the relay reference and the AMQP key and queue name + const value rel = car(params); + const value pk = ((lambda<value(list<value>)>)cadr(params))(list<value>()); + const value key = isList(pk)? (list<value>)pk : mklist<value>(pk); + const value qname = ((lambda<value(list<value>)>)caddr(params))(list<value>()); + + // Create an AMQP session + QpidConnection qc(false); + QpidSession qs(qc, false); // Declare the configured AMQP key / queue pair - const value key = ((lambda<value(list<value>)>)caddr(params))(list<value>()); - const value qname = ((lambda<value(list<value>)>)cadddr(params))(list<value>()); declareQueue(key, qname, qs); - //TODO create a subscription and mark the current server instance busy + // Listen and relay messages in a worker thread + QpidSubscription qsub(qs, false); + worker w(3); + const lambda<bool(const value&, const value&)> rl = relay(rel); + submit<failable<bool> >(w, lambda<failable<bool>()>(subscribe(qname, rl, qsub))); - return value(true); + // Return the listener component lambda function + return value(lambda<value(const list<value>&)>(listener(qc, qs, qsub, w))); } } @@ -64,7 +150,7 @@ extern "C" { const tuscany::value apply(const tuscany::list<tuscany::value>& params) { const tuscany::value func(car(params)); - if (func == "start") + if (func == "start" || func == "restart") return tuscany::queue::start(cdr(params)); return tuscany::mkfailure<tuscany::value>(); } diff --git a/sca-cpp/trunk/components/queue/queue-sender.cpp b/sca-cpp/trunk/components/queue/queue-sender.cpp index 15128411c2..07f8491f54 100644 --- a/sca-cpp/trunk/components/queue/queue-sender.cpp +++ b/sca-cpp/trunk/components/queue/queue-sender.cpp @@ -39,19 +39,22 @@ namespace tuscany { namespace queue { -QpidConnection qc; - /** * Post an item to a queue. */ const failable<value> post(const list<value>& params) { + QpidConnection qc; QpidSession qs(qc); // Post the item - const value key = ((lambda<value(list<value>)>)cadr(params))(list<value>()); - post(key, car(params), qs); - - return value(true); + const value pk = ((lambda<value(list<value>)>)caddr(params))(list<value>()); + const value key = isList(pk)? append<value>(pk, (list<value>)car(params)) : cons<value>(pk, (list<value>)car(params)); + debug(key, "queue::post::key"); + debug(cadr(params), "queue::post::value"); + const failable<bool> r = post(key, cadr(params), qs); + if (!hasContent(r)) + return mkfailure<value>(reason(r)); + return key; } } diff --git a/sca-cpp/trunk/components/queue/queue.composite b/sca-cpp/trunk/components/queue/queue.composite index b908481bad..535680c6c3 100644 --- a/sca-cpp/trunk/components/queue/queue.composite +++ b/sca-cpp/trunk/components/queue/queue.composite @@ -22,15 +22,15 @@ targetNamespace="http://tuscany.apache.org/xmlns/sca/components" name="queue"> - <component name="queue-sender"> + <component name="print-sender"> <implementation.cpp path=".libs" library="libqueue-sender"/> <property name="key">print</property> - <service name="queue-sender"> + <service name="print-sender"> <t:binding.http uri="print-sender"/> </service> </component> - <component name="queue-listener"> + <component name="print-listener"> <implementation.cpp path=".libs" library="libqueue-listener"/> <property name="key">print</property> <property name="queue">printq</property> @@ -42,6 +42,15 @@ <service name="print"> <t:binding.http uri="print"/> </service> + <reference name="report" target="report-sender"/> + </component> + + <component name="report-sender"> + <implementation.cpp path=".libs" library="libqueue-sender"/> + <property name="key">report</property> + <service name="report-sender"> + <t:binding.http uri="report-sender"/> + </service> </component> </composite> diff --git a/sca-cpp/trunk/components/queue/send-test b/sca-cpp/trunk/components/queue/send-test new file mode 100755 index 0000000000..ec6d9d9083 --- /dev/null +++ b/sca-cpp/trunk/components/queue/send-test @@ -0,0 +1,31 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Setup +./qpidd-start +sleep 1 + +# Test +./qpid-test 2>/dev/null +rc=$? + +# Cleanup +./qpidd-stop +sleep 1 +return $rc diff --git a/sca-cpp/trunk/components/queue/server-test b/sca-cpp/trunk/components/queue/server-test new file mode 100755 index 0000000000..3fc94e6f35 --- /dev/null +++ b/sca-cpp/trunk/components/queue/server-test @@ -0,0 +1,43 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Setup +../../modules/http/httpd-conf tmp 8090 ../../modules/http/htdocs +../../modules/server/server-conf tmp +../../modules/server/scheme-conf tmp +cat >>tmp/conf/httpd.conf <<EOF +SCAContribution `pwd`/ +SCAComposite queue.composite +EOF + +./qpidd-start +sleep 1 +../../modules/http/httpd-start tmp +sleep 2 + +# Test +./client-test 2>/dev/null +rc=$? + +# Cleanup +../../modules/http/httpd-stop tmp +sleep 1 +./qpidd-stop +sleep 1 +return $rc diff --git a/sca-cpp/trunk/components/queue/server-test.scm b/sca-cpp/trunk/components/queue/server-test.scm index 16c4d95dc8..1a89ce8b31 100644 --- a/sca-cpp/trunk/components/queue/server-test.scm +++ b/sca-cpp/trunk/components/queue/server-test.scm @@ -17,4 +17,4 @@ ; Queue test case -(define (print x) (display x)) +(define (post key val report) (report "post" '() val)) diff --git a/sca-cpp/trunk/components/webservice/echo-test b/sca-cpp/trunk/components/webservice/echo-test index 5e7a380521..e5cd1d9a9d 100755 --- a/sca-cpp/trunk/components/webservice/echo-test +++ b/sca-cpp/trunk/components/webservice/echo-test @@ -28,7 +28,7 @@ cd $pwd sleep 1 # Test -./axis2-test +./axis2-test 2>/dev/null rc=$? # Cleanup diff --git a/sca-cpp/trunk/components/webservice/server-test b/sca-cpp/trunk/components/webservice/server-test index 7d33223b3d..8311b94220 100755 --- a/sca-cpp/trunk/components/webservice/server-test +++ b/sca-cpp/trunk/components/webservice/server-test @@ -39,7 +39,7 @@ cd $pwd sleep 2 # Test -./client-test +./client-test 2>/dev/null rc=$? # Cleanup diff --git a/sca-cpp/trunk/components/webservice/webservice-client.cpp b/sca-cpp/trunk/components/webservice/webservice-client.cpp index ae4e472e65..cb756c015e 100644 --- a/sca-cpp/trunk/components/webservice/webservice-client.cpp +++ b/sca-cpp/trunk/components/webservice/webservice-client.cpp @@ -57,7 +57,7 @@ extern "C" { const tuscany::value apply(const tuscany::list<tuscany::value>& params) { const tuscany::value func(car(params)); - if (func == "start" || func == "stop" || func == "restart") + if (func == "start" || func == "restart") return tuscany::mkfailure<tuscany::value>(); return tuscany::webservice::apply(func, cdr(params)); } diff --git a/sca-cpp/trunk/configure.ac b/sca-cpp/trunk/configure.ac index 1ac3a9e5bd..19f7e19080 100644 --- a/sca-cpp/trunk/configure.ac +++ b/sca-cpp/trunk/configure.ac @@ -341,8 +341,8 @@ AC_ARG_ENABLE(java, [AS_HELP_STRING([--enable-java], [enable Java support [defau esac ], [ AC_MSG_RESULT(no)]) if test "${want_java}" = "true"; then - LIBS="-L${JAVA_LIB} ${default_LIBS}" - #AC_CHECK_LIB([java], [JNI_OnLoad], [], [AC_MSG_ERROR([couldn't find a suitable libjava, use --with-java=PATH])]) + LIBS="-L${JAVA_LIB} -L${JAVA_LIB}/server ${default_LIBS}" + AC_CHECK_LIB([java], [JNI_CreateJavaVM], [], [AC_MSG_ERROR([couldn't find a suitable libjava, use --with-java=PATH])], [-ljvm -lverify]) AC_PROG_JAVAC AC_PROG_JAR AM_CONDITIONAL([WANT_JAVA], true) @@ -441,8 +441,11 @@ AC_ARG_ENABLE(queue, [AS_HELP_STRING([--enable-queue], [enable Queue component [ esac ], [ AC_MSG_RESULT(no)]) if test "${want_queue}" = "true"; then + if test "${want_threads}" != "true"; then + AC_MSG_ERROR([--enable-queue requires multi-threading, use --enable-threads]) + fi LIBS="-L${QPIDC_LIB} ${default_LIBS}" - #AC_CHECK_LIB([qpidclient], [], [], [AC_MSG_ERROR([couldn't find a suitable libqpidclient, use --with-qpidc=PATH])]) + AC_CHECK_LIB([qpidclient], [_init], [], [AC_MSG_ERROR([couldn't find a suitable libqpidclient, use --with-qpidc=PATH])]) AM_CONDITIONAL([WANT_QUEUE], true) AC_DEFINE([WANT_QUEUE], 1, [enable Queue component]) else @@ -451,7 +454,7 @@ fi # Configure path to Libstrophe includes and lib. AC_MSG_CHECKING([for libstrophe]) -AC_ARG_WITH([libstrophe], [AC_HELP_STRING([--with-libstrophe=PATH], [path to libstrophe source build [default=${HOME}/libstrophe]])], [ +AC_ARG_WITH([libstrophe], [AC_HELP_STRING([--with-libstrophe=PATH], [path to libstrophe source [default=${HOME}/libstrophe]])], [ LIBSTROPHE_INCLUDE="${withval}" LIBSTROPHE_LIB="${withval}" AC_MSG_RESULT("${withval}") @@ -477,8 +480,11 @@ AC_ARG_ENABLE(chat, [AS_HELP_STRING([--enable-chat], [enable Chat component [def esac ], [ AC_MSG_RESULT(no)]) if test "${want_chat}" = "true"; then + if test "${want_threads}" != "true"; then + AC_MSG_ERROR([--enable-chat requires multi-threading, use --enable-threads]) + fi LIBS="-L${LIBSTROPHE_LIB} ${default_LIBS}" - #AC_CHECK_LIB([strophe], [xmpp_initialize], [], [AC_MSG_ERROR([couldn't find a suitable libstrophe, use --with-libstrophe=PATH])], [-lexpat -lssl -lresolv]) + AC_CHECK_LIB([strophe], [xmpp_initialize], [], [AC_MSG_ERROR([couldn't find a suitable libstrophe, use --with-libstrophe=PATH])], [-lexpat -lssl -lresolv]) AM_CONDITIONAL([WANT_CHAT], true) AC_DEFINE([WANT_CHAT], 1, [enable Chat component]) else diff --git a/sca-cpp/trunk/etc/git-exclude b/sca-cpp/trunk/etc/git-exclude index 29c58979a8..8aa4146c56 100644 --- a/sca-cpp/trunk/etc/git-exclude +++ b/sca-cpp/trunk/etc/git-exclude @@ -20,6 +20,7 @@ surefire*.properties .settings/ .deployables/ .wtpmodules/ +.pydevproject .svn/ .cvs/ .dotest/ @@ -85,4 +86,5 @@ script-test axiom-test axis2-test qpid-test +xmpp-test diff --git a/sca-cpp/trunk/kernel/element.hpp b/sca-cpp/trunk/kernel/element.hpp index 0d14acc4a3..c6aa2c44eb 100644 --- a/sca-cpp/trunk/kernel/element.hpp +++ b/sca-cpp/trunk/kernel/element.hpp @@ -193,7 +193,7 @@ const value valueToElement(const value& t) { const list<value> valuesToElements(const list<value>& l); // Convert a name value pair - if (isList(t) && isSymbol(car<value>(t))) { + if (isList(t) && !isNil((list<value>)t) && isSymbol(car<value>(t))) { const value n = car<value>(t); const value v = cadr<value>(t); diff --git a/sca-cpp/trunk/kernel/parallel.hpp b/sca-cpp/trunk/kernel/parallel.hpp index 9fb6c59ba7..09cf0df9a3 100644 --- a/sca-cpp/trunk/kernel/parallel.hpp +++ b/sca-cpp/trunk/kernel/parallel.hpp @@ -59,9 +59,12 @@ private: pthread_cond_init(&valueCond, NULL); } + futureValue(const futureValue& fv) : valueMutex(fv.valueMutex), valueCond(fv.valueCond), hasValue(fv.hasValue), value(fv.value) { + } + ~futureValue() { - pthread_mutex_destroy(&valueMutex); - pthread_cond_destroy(&valueCond); + //pthread_mutex_destroy(&valueMutex); + //pthread_cond_destroy(&valueCond); } bool set(const T& v) { @@ -129,18 +132,21 @@ public: /** * A bounded thread safe queue. */ -template<typename T> class queue { +template<typename T> class wqueue { public: - queue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew<T>(max)) T[max]) { + wqueue(int max) : max(max), size(0), tail(0), head(0), values(new (gc_anew<T>(max)) T[max]) { pthread_mutex_init(&mutex, NULL); pthread_cond_init(&full, NULL); pthread_cond_init(&empty, NULL); } - ~queue() { - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&full); - pthread_cond_destroy(&empty); + wqueue(const wqueue& wq) : max(wq.max), size(wq.size), tail(wq.tail), head(wq.head), mutex(wq.mutex), full(wq.full), empty(wq.empty), values(wq.values) { + } + + ~wqueue() { + //pthread_mutex_destroy(&mutex); + //pthread_cond_destroy(&full); + //pthread_cond_destroy(&empty); } private: @@ -153,14 +159,14 @@ private: pthread_cond_t empty; gc_ptr<T> values; - template<typename X> friend const int enqueue(queue<X>& q, const X& v); - template<typename X> friend const X dequeue(queue<X>& q); + template<typename X> friend const int enqueue(wqueue<X>& q, const X& v); + template<typename X> friend const X dequeue(wqueue<X>& q); }; /** * Adds an element to the tail of the queue. */ -template<typename T> const int enqueue(queue<T>&q, const T& v) { +template<typename T> const int enqueue(wqueue<T>&q, const T& v) { pthread_mutex_lock(&q.mutex); while(q.size == q.max) pthread_cond_wait(&q.full, &q.mutex); @@ -175,7 +181,7 @@ template<typename T> const int enqueue(queue<T>&q, const T& v) { /** * Returns the element at the head of the queue. */ -template<typename T> const T dequeue(queue<T>& q) { +template<typename T> const T dequeue(wqueue<T>& q) { pthread_mutex_lock(&q.mutex); while(q.size == 0) pthread_cond_wait(&q.empty, &q.mutex); @@ -191,7 +197,12 @@ template<typename T> const T dequeue(queue<T>& q) { * The worker thread function. */ void *workerThreadFunc(void *arg) { - queue<lambda<bool()> >* work = reinterpret_cast<queue<lambda<bool()> >*>(arg); + int ost; + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ost); + int ot; + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ot); + + wqueue<lambda<bool()> >* work = reinterpret_cast<wqueue<lambda<bool()> >*>(arg); while(dequeue(*work)()) ; return NULL; @@ -200,28 +211,44 @@ void *workerThreadFunc(void *arg) { /** * Returns a list of worker threads. */ -const list<pthread_t> workerThreads(queue<lambda<bool()> >& queue, const int count) { +const list<pthread_t> workerThreads(wqueue<lambda<bool()> >& wqueue, const int count) { if (count == 0) return list<pthread_t>(); pthread_t thread; - pthread_create(&thread, NULL, workerThreadFunc, &queue); - return cons(thread, workerThreads(queue, count - 1)); + pthread_create(&thread, NULL, workerThreadFunc, &wqueue); + return cons(thread, workerThreads(wqueue, count - 1)); } /** * A worker, implemented with a work queue and a pool of threads. */ class worker { +private: + + // The worker holds a reference to a sharedWorker, to avoid non-thread-safe + // copies of the queue and thread pool when a worker is copied + class sharedWorker { + public: + sharedWorker(int max) : work(wqueue<lambda<bool()> >(max)), threads(workerThreads(work, max)) { + } + + wqueue<lambda<bool()> > work; + const list<pthread_t> threads; + }; + public: - worker(int max) : work(queue<lambda<bool()> >(max)), threads(workerThreads(work, max)) { + worker(int max) : w(*(new (gc_new<sharedWorker>()) sharedWorker(max))) { + } + + worker(const worker& wk) : w(wk.w) { } private: - queue<lambda<bool()> > work; - const list<pthread_t> threads; + sharedWorker& w; template<typename X> friend const future<X> submit(worker& w, const lambda<X()>& func); friend const bool shutdown(worker& w); + friend const bool cancel(worker& w); }; /** @@ -238,14 +265,14 @@ template<typename R> bool submitFunc(const lambda<R()>& func, const future<R>& f template<typename R> const future<R> submit(worker& w, const lambda<R()>& func) { const future<R> fut; const lambda<bool()> f = curry(lambda<bool(const lambda<R()>, future<R>)>(submitFunc<R>), func, fut); - enqueue(w.work, f); + enqueue(w.w.work, f); return fut; } /** * Enqueues shutdown requests. */ -const bool shutdownEnqueue(const list<pthread_t>& threads, queue<lambda<bool()> >& work) { +const bool shutdownEnqueue(const list<pthread_t>& threads, wqueue<lambda<bool()> >& work) { if (isNil(threads)) return true; enqueue(work, result(false)); @@ -266,8 +293,23 @@ const bool shutdownJoin(const list<pthread_t>& threads) { * Shutdown a worker. */ const bool shutdown(worker& w) { - shutdownEnqueue(w.threads, w.work); - shutdownJoin(w.threads); + shutdownEnqueue(w.w.threads, w.w.work); + shutdownJoin(w.w.threads); + return true; +} + +/** + * Cancel a worker. + */ +const bool cancel(const list<pthread_t>& threads) { + if (isNil(threads)) + return true; + pthread_cancel(car(threads)); + return cancel(cdr(threads)); +} + +const bool cancel(worker& w) { + cancel(w.w.threads); return true; } diff --git a/sca-cpp/trunk/kernel/value.hpp b/sca-cpp/trunk/kernel/value.hpp index 4f19ee3915..1aafbe6d38 100644 --- a/sca-cpp/trunk/kernel/value.hpp +++ b/sca-cpp/trunk/kernel/value.hpp @@ -556,5 +556,29 @@ template<typename T> const list<value> mkvalues(const list<T>& l) { return cons<value>(car(l), mkvalues(cdr(l))); } +/** + * Convert a path string value to a list of values. + */ +const list<string> pathTokens(const char* p) { + if (p == NULL || p[0] == '\0') + return list<string>(); + if (p[0] == '/') + return tokenize("/", p + 1); + return tokenize("/", p); +} + +const list<value> pathValues(const value& p) { + return mkvalues(pathTokens(c_str(p))); +} + +/** + * Convert a path represented as a list of values to a string value. + */ +const value path(const list<value>& p) { + if (isNil(p)) + return ""; + return string("/") + car(p) + path(cdr(p)); +} + } #endif /* tuscany_value_hpp */ diff --git a/sca-cpp/trunk/modules/http/curl.hpp b/sca-cpp/trunk/modules/http/curl.hpp index 4e96411ec6..f2c9458f42 100644 --- a/sca-cpp/trunk/modules/http/curl.hpp +++ b/sca-cpp/trunk/modules/http/curl.hpp @@ -252,7 +252,7 @@ const failable<value> entryId(const failable<string> l) { if (!hasContent(l)) return mkfailure<value>(reason(l)); const string ls(content(l)); - return value(string(substr(ls, find_last(ls, '/') + 1))); + return value(mklist<value>(string(substr(ls, find_last(ls, '/') + 1)))); } /** @@ -381,18 +381,18 @@ const failable<value, string> del(const string& url, const CURLSession& ch) { * HTTP client proxy function. */ struct proxy { - proxy(const string& url) : url(url) { + proxy(const string& uri) : uri(uri) { } const value operator()(const list<value>& args) const { CURLSession cs; - failable<value> val = evalExpr(args, url, cs); + failable<value> val = evalExpr(args, uri, cs); if (!hasContent(val)) return value(); return content(val); } - const string url; + const string uri; }; } diff --git a/sca-cpp/trunk/modules/http/httpd.hpp b/sca-cpp/trunk/modules/http/httpd.hpp index 6b08c3b838..bd0e23f76b 100644 --- a/sca-cpp/trunk/modules/http/httpd.hpp +++ b/sca-cpp/trunk/modules/http/httpd.hpp @@ -75,30 +75,6 @@ template<typename C> C& serverConf(const cmd_parms *cmd, const module* mod) { /** - * Convert a path string to a list of values. - */ -const list<string> pathTokens(const char* p) { - if (p == NULL || p[0] == '\0') - return list<string>(); - if (p[0] == '/') - return tokenize("/", p + 1); - return tokenize("/", p); -} - -const list<value> pathValues(const char* p) { - return mkvalues(pathTokens(p)); -} - -/** - * Convert a path represented as a list of values to a string. - */ -const string path(const list<value>& p) { - if (isNil(p)) - return ""; - return string("/") + car(p) + path(cdr(p)); -} - -/** * Return the content type of a request. */ const char* optional(const char* s) { @@ -219,10 +195,10 @@ const list<string> read(request_rec* r) { } /** - * Convert a URI value to an absolute URL. + * Convert a URI represented as a list to an absolute URL. */ -const char* url(const value& v, request_rec* r) { - const string u = string(r->uri) + "/" + v; +const char* url(const list<value>& v, request_rec* r) { + const string u = string(r->uri) + path(v); return ap_construct_url(r->pool, c_str(u), r); } @@ -400,7 +376,7 @@ const int internalRedirect(const string& uri, request_rec* r) { /** * Put a value in the process user data. */ -const bool putUserData(const string& k, const int v, const server_rec* s) { +const bool putUserData(const string& k, const void* v, const server_rec* s) { apr_pool_userdata_set((const void *)v, c_str(k), apr_pool_cleanup_null, s->process->pool); return true; } @@ -408,10 +384,10 @@ const bool putUserData(const string& k, const int v, const server_rec* s) { /** * Return a user data value. */ -const int userData(const string& k, const server_rec* s) { - void* v = (int)0; +const void* userData(const string& k, const server_rec* s) { + void* v = NULL; apr_pool_userdata_get(&v, c_str(k), s->process->pool); - return (int)v; + return v; } } diff --git a/sca-cpp/trunk/modules/java/eval.hpp b/sca-cpp/trunk/modules/java/eval.hpp index 9140fe75b4..58a1ab5ff9 100644 --- a/sca-cpp/trunk/modules/java/eval.hpp +++ b/sca-cpp/trunk/modules/java/eval.hpp @@ -501,8 +501,8 @@ const failable<value> evalClass(const JavaRuntime& jr, const value& expr, const // The start, stop, and restart functions are optional const value fn = car<value>(expr); - if (fn == "start" || fn == "stop" || fn == "restart") - return value(false); + if (fn == "start" || fn == "restart" || "stop") + return value(lambda<value(const list<value>&)>()); return mkfailure<value>(string("Couldn't find function: ") + car<value>(expr) + " : " + lastException(jr)); } diff --git a/sca-cpp/trunk/modules/java/mod-java.cpp b/sca-cpp/trunk/modules/java/mod-java.cpp index 4b8e7dca56..510f9574b0 100644 --- a/sca-cpp/trunk/modules/java/mod-java.cpp +++ b/sca-cpp/trunk/modules/java/mod-java.cpp @@ -37,40 +37,41 @@ namespace server { namespace modeval { /** - * Start the module. + * Apply a lifecycle start or restart event. */ -const failable<bool> start(unused ServerConf& sc) { - // Start a Java runtime - sc.moduleConf = new (gc_new<java::JavaRuntime>()) java::JavaRuntime(); - return true; -} +struct javaLifecycle { + javaLifecycle(java::JavaRuntime& jr) : jr(jr) { + } + const value operator()(const list<value>& params) const { + const value func = car(params); + if (func == "javaRuntime") + return (gc_ptr<value>)(value*)(void*)&jr; + return lambda<value(const list<value>&)>(); + } + java::JavaRuntime& jr; +}; -/** - * Stop the module. - */ -const failable<bool> stop(unused ServerConf& sc) { - return true; -} +const value applyLifecycle(unused const list<value>& params) { -/** - * Restart the module. - */ -const failable<bool> restart(ServerConf& sc) { - // Start a Java runtime - sc.moduleConf = new (gc_new<java::JavaRuntime>()) java::JavaRuntime(); - return true; + // Create a Java runtime + java::JavaRuntime& jr = *(new (gc_new<java::JavaRuntime>()) java::JavaRuntime()); + + // Return the function to invoke on subsequent events + return failable<value>(lambda<value(const list<value>&)>(javaLifecycle(jr))); } /** * Evaluate a Java component implementation and convert it to an applicable * lambda function. */ -const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px, modeval::ServerConf& sc) { +const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px, const lambda<value(const list<value>&)>& lifecycle) { const string itype(elementName(impl)); - if (contains(itype, ".java")) - return modjava::evalImplementation(path, impl, px, sc); + if (contains(itype, ".java")) { + const void* p = (gc_ptr<value>)lifecycle(mklist<value>("javaRuntime")); + return modjava::evalImplementation(path, impl, px, *(java::JavaRuntime*)p); + } if (contains(itype, ".cpp")) - return modcpp::evalImplementation(path, impl, px, sc); + return modcpp::evalImplementation(path, impl, px); return mkfailure<lambda<value(const list<value>&)> >(string("Unsupported implementation type: ") + itype); } diff --git a/sca-cpp/trunk/modules/java/mod-java.hpp b/sca-cpp/trunk/modules/java/mod-java.hpp index ccb6d9262b..e7da06e930 100644 --- a/sca-cpp/trunk/modules/java/mod-java.hpp +++ b/sca-cpp/trunk/modules/java/mod-java.hpp @@ -34,20 +34,12 @@ #include "value.hpp" #include "monad.hpp" #include "eval.hpp" -#include "../server/mod-eval.hpp" namespace tuscany { namespace server { namespace modjava { /** - * Return the Java runtime configured in a server. - */ -java::JavaRuntime& javaRuntime(modeval::ServerConf sc) { - return *(java::JavaRuntime*)sc.moduleConf; -} - -/** * Apply a Java component implementation function. */ struct applyImplementation { @@ -59,11 +51,10 @@ struct applyImplementation { const value operator()(const list<value>& params) const { const value expr = append<value>(params, px); debug(expr, "modeval::java::applyImplementation::input"); - const failable<value> val = java::evalClass(jr, expr, impl); + const failable<value> res = java::evalClass(jr, expr, impl); + const value val = !hasContent(res)? mklist<value>(value(), reason(res)) : mklist<value>(content(res)); debug(val, "modeval::java::applyImplementation::result"); - if (!hasContent(val)) - return mklist<value>(value(), reason(val)); - return mklist<value>(content(val)); + return val; } }; @@ -71,12 +62,12 @@ struct applyImplementation { * Evaluate a Java component implementation and convert it to an applicable * lambda function. */ -const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px, modeval::ServerConf& sc) { +const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px, java::JavaRuntime& jr) { const string cn(attributeValue("class", impl)); - const failable<java::JavaClass> jc = java::readClass(javaRuntime(sc), path, cn); + const failable<java::JavaClass> jc = java::readClass(jr, path, cn); if (!hasContent(jc)) return mkfailure<lambda<value(const list<value>&)> >(reason(jc)); - return lambda<value(const list<value>&)>(applyImplementation(content(jc), px, javaRuntime(sc))); + return lambda<value(const list<value>&)>(applyImplementation(content(jc), px, jr)); } } diff --git a/sca-cpp/trunk/modules/java/org/apache/tuscany/Service.java b/sca-cpp/trunk/modules/java/org/apache/tuscany/Service.java index eedba5f46f..a00d5b1b53 100644 --- a/sca-cpp/trunk/modules/java/org/apache/tuscany/Service.java +++ b/sca-cpp/trunk/modules/java/org/apache/tuscany/Service.java @@ -26,34 +26,24 @@ package org.apache.tuscany; public interface Service { /** - * Post a new item to a resource. + * Post a new item to a collection of items. */ - String post(Iterable<?> item); + Iterable<String> post(Iterable<String> collection, Iterable<?> item); /** * Return an item. */ - Iterable<?> get(String id); + Iterable<?> get(Iterable<String> id); /** - * Return all items in the resource. + * Update an item. */ - Iterable<?> getall(); - - /** - * Update am item. - */ - boolean put(String id, Iterable<?> item); + boolean put(Iterable<String> id, Iterable<?> item); /** * Delete an item. */ - boolean delete(String id); - - /** - * Delete all items in the resource. - */ - boolean deleteall(); + boolean delete(Iterable<String> id); /** * Evaluate an expression. diff --git a/sca-cpp/trunk/modules/java/test/Client.java b/sca-cpp/trunk/modules/java/test/Client.java index ff2d3fee13..c3bd875fcc 100644 --- a/sca-cpp/trunk/modules/java/test/Client.java +++ b/sca-cpp/trunk/modules/java/test/Client.java @@ -23,16 +23,12 @@ public interface Client { String echo(String x); - Iterable<?> getall(); + Iterable<?> get(Iterable<String> id); - Iterable<?> get(String id); + Iterable<String> post(Iterable<String> collection, Iterable<?> item); - String post(Iterable<?> item); + Boolean put(Iterable<String> id, Iterable<?> item); - Boolean put(String id, Iterable<?> entry); - - Boolean deleteall(); - - Boolean delete(String id); + Boolean delete(Iterable<String> id); } diff --git a/sca-cpp/trunk/modules/java/test/ClientImpl.java b/sca-cpp/trunk/modules/java/test/ClientImpl.java index 4bd3498dd5..ade2ba302e 100644 --- a/sca-cpp/trunk/modules/java/test/ClientImpl.java +++ b/sca-cpp/trunk/modules/java/test/ClientImpl.java @@ -25,27 +25,19 @@ public class ClientImpl { return server.echo(x); } - public Iterable<?> getall(Server server) { - return server.getall(); - } - - public Iterable<?> get(String id, Server server) { + public Iterable<?> get(Iterable<String> id, Server server) { return server.get(id); } - public String post(Iterable<?> item, Server server) { - return server.post(item); + public Iterable<String> post(Iterable<String> collection, Iterable<?> item, Server server) { + return server.post(collection, item); } - public Boolean put(String id, Iterable<?> item, Server server) { + public Boolean put(Iterable<String> id, Iterable<?> item, Server server) { return server.put(id, item); } - public Boolean deleteall(Server server) { - return server.deleteall(); - } - - public Boolean delete(String id, Server server) { + public Boolean delete(Iterable<String> id, Server server) { return server.delete(id); } diff --git a/sca-cpp/trunk/modules/java/test/Server.java b/sca-cpp/trunk/modules/java/test/Server.java index 917ccfd6b9..3dfe3c84ef 100644 --- a/sca-cpp/trunk/modules/java/test/Server.java +++ b/sca-cpp/trunk/modules/java/test/Server.java @@ -23,16 +23,12 @@ public interface Server { String echo(String x); - Iterable<?> getall(); + Iterable<?> get(Iterable<String> id); - Iterable<?> get(String id); + Iterable<String> post(Iterable<String> collection, Iterable<?> item); - String post(Iterable<?> item); + Boolean put(Iterable<String> id, Iterable<?> item); - Boolean put(String id, Iterable<?> entry); - - Boolean deleteall(); - - Boolean delete(String id); + Boolean delete(Iterable<String> id); } diff --git a/sca-cpp/trunk/modules/java/test/ServerImpl.java b/sca-cpp/trunk/modules/java/test/ServerImpl.java index dd4e227123..d979c20a15 100644 --- a/sca-cpp/trunk/modules/java/test/ServerImpl.java +++ b/sca-cpp/trunk/modules/java/test/ServerImpl.java @@ -27,31 +27,25 @@ public class ServerImpl { return x; } - public Iterable<?> getall() { - return list("Sample Feed", "123456789", - list("Item", "111", list(list("'javaClass", "services.Item"), list("'name", "Apple"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 2.99))), - list("Item", "222", list(list("'javaClass", "services.Item"), list("'name", "Orange"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 3.55))), - list("Item", "333", list(list("'javaClass", "services.Item"), list("'name", "Pear"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 1.55)))); - } - - public Iterable<?> get(String id) { + public Iterable<?> get(Iterable<String> id) { + if (isNil(id)) + return list("Sample Feed", "123456789", + list("Item", "111", list(list("'javaClass", "services.Item"), list("'name", "Apple"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 2.99))), + list("Item", "222", list(list("'javaClass", "services.Item"), list("'name", "Orange"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 3.55))), + list("Item", "333", list(list("'javaClass", "services.Item"), list("'name", "Pear"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 1.55)))); Iterable<?> entry = list(list("'javaClass", "services.Item"), list("'name", "Apple"), list("'currencyCode", "USD"), list("'currencySymbol", "$"), list("'price", 2.99)); return list("Item", id, entry); } - public String post(Iterable<?> item) { - return "123456789"; + public Iterable<String> post(Iterable<String> collection, Iterable<?> item) { + return list("123456789"); } - public Boolean put(String id, Iterable<?> entry) { - return true; - } - - public Boolean deleteall() { + public Boolean put(Iterable<String> id, Iterable<?> item) { return true; } - public Boolean delete(String id) { + public Boolean delete(Iterable<String> id) { return true; } } diff --git a/sca-cpp/trunk/modules/json/json.hpp b/sca-cpp/trunk/modules/json/json.hpp index 4c36b8b477..1d966d3f67 100644 --- a/sca-cpp/trunk/modules/json/json.hpp +++ b/sca-cpp/trunk/modules/json/json.hpp @@ -140,7 +140,7 @@ const bool isJSArray(const list<value>& l) { if (isSymbol(v)) return false; if(isList(v)) { - if(isSymbol(car<value>(v))) + if(!isNil((list<value>)v) && isSymbol(car<value>(v))) return false; } return true; diff --git a/sca-cpp/trunk/modules/python/client-test.py b/sca-cpp/trunk/modules/python/client-test.py index 6e3723302a..47e6cf4bda 100644 --- a/sca-cpp/trunk/modules/python/client-test.py +++ b/sca-cpp/trunk/modules/python/client-test.py @@ -22,18 +22,14 @@ def echo(x, ref): # ATOMPub test case -def getall(ref): - return ref("getall") - def get(id, ref): return ref("get", id) -def post(entry, ref): - return ref("post", entry) +def post(collection, item, ref): + return ref("post", collection, item) -def put(id, entry, ref): - return ref("put", id, entry) +def put(id, item, ref): + return ref("put", id, item) def delete(id, ref): return ref("delete", id) - diff --git a/sca-cpp/trunk/modules/python/eval.hpp b/sca-cpp/trunk/modules/python/eval.hpp index 5231b0ef60..136ecf6499 100644 --- a/sca-cpp/trunk/modules/python/eval.hpp +++ b/sca-cpp/trunk/modules/python/eval.hpp @@ -241,9 +241,9 @@ const failable<value> evalScript(const value& expr, PyObject* script) { // The start, stop, and restart functions are optional const value fn = car<value>(expr); - if (fn == "start" || fn == "stop" || fn == "restart") { + if (fn == "start" || fn == "restart" || fn == "stop") { PyErr_Clear(); - return value(false); + return value(lambda<value(const list<value>&)>()); } return mkfailure<value>(string("Couldn't find function: ") + car<value>(expr) + " : " + lastError()); diff --git a/sca-cpp/trunk/modules/python/mod-python.cpp b/sca-cpp/trunk/modules/python/mod-python.cpp index df94f2e8a1..8561a1fbf4 100644 --- a/sca-cpp/trunk/modules/python/mod-python.cpp +++ b/sca-cpp/trunk/modules/python/mod-python.cpp @@ -37,40 +37,27 @@ namespace server { namespace modeval { /** - * Start the module. + * Apply a lifecycle start or restart event. */ -const failable<bool> start(unused ServerConf& sc) { - // Start a Python runtime - sc.moduleConf = new (gc_new<python::PythonRuntime>()) python::PythonRuntime(); - return true; -} +const value applyLifecycle(unused const list<value>& params) { -/** - * Stop the module. - */ -const failable<bool> stop(unused ServerConf& sc) { - return true; -} + // Create a Python runtime + new (gc_new<python::PythonRuntime>()) python::PythonRuntime(); -/** - * Restart the module. - */ -const failable<bool> restart(unused ServerConf& sc) { - // Start a Python runtime - sc.moduleConf = new (gc_new<python::PythonRuntime>()) python::PythonRuntime(); - return true; + // Return a nil function as we don't need to handle the stop event + return failable<value>(lambda<value(const list<value>&)>()); } /** * Evaluate a Python component implementation and convert it to an applicable * lambda function. */ -const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px, modeval::ServerConf& sc) { +const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px, unused const lambda<value(const list<value>&)>& lifecycle) { const string itype(elementName(impl)); if (contains(itype, ".python")) - return modpython::evalImplementation(path, impl, px, sc); + return modpython::evalImplementation(path, impl, px); if (contains(itype, ".cpp")) - return modcpp::evalImplementation(path, impl, px, sc); + return modcpp::evalImplementation(path, impl, px); return mkfailure<lambda<value(const list<value>&)> >(string("Unsupported implementation type: ") + itype); } diff --git a/sca-cpp/trunk/modules/python/mod-python.hpp b/sca-cpp/trunk/modules/python/mod-python.hpp index cbe2b6b97c..d13f2227ab 100644 --- a/sca-cpp/trunk/modules/python/mod-python.hpp +++ b/sca-cpp/trunk/modules/python/mod-python.hpp @@ -34,7 +34,6 @@ #include "value.hpp" #include "monad.hpp" #include "eval.hpp" -#include "../server/mod-eval.hpp" namespace tuscany { namespace server { @@ -51,11 +50,10 @@ struct applyImplementation { const value operator()(const list<value>& params) const { const value expr = append<value>(params, px); debug(expr, "modeval::python::applyImplementation::input"); - const failable<value> val = python::evalScript(expr, impl); + const failable<value> res = python::evalScript(expr, impl); + const value val = !hasContent(res)? mklist<value>(value(), reason(res)) : mklist<value>(content(res)); debug(val, "modeval::python::applyImplementation::result"); - if (!hasContent(val)) - return mklist<value>(value(), reason(val)); - return mklist<value>(content(val)); + return val; } }; @@ -63,7 +61,7 @@ struct applyImplementation { * Evaluate a Python component implementation and convert it to an applicable * lambda function. */ -const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px, unused modeval::ServerConf& sc) { +const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px) { const string fpath(path + attributeValue("script", impl)); ifstream is(fpath); if (fail(is)) diff --git a/sca-cpp/trunk/modules/python/server-test.py b/sca-cpp/trunk/modules/python/server-test.py index ac2b6829c9..61e177d0aa 100644 --- a/sca-cpp/trunk/modules/python/server-test.py +++ b/sca-cpp/trunk/modules/python/server-test.py @@ -22,25 +22,21 @@ def echo(x): # ATOMPub test case -def getall(): - return ("Sample Feed", "123456789", - ("Item", "111", (("'javaClass", "services.Item"), ("'name", "Apple"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 2.99))), - ("Item", "222", (("'javaClass", "services.Item"), ("'name", "Orange"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 3.55))), - ("Item", "333", (("'javaClass", "services.Item"), ("name", "Pear"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 1.55)))) - def get(id): - entry = (("'javaClass", "services.Item"), ("'name", "Apple"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 2.99)) - return ("Item", id, entry) - -def post(entry): - return "123456789" - -def put(id, entry): - return true - -def deleteall(): + if id == (): + return ("Sample Feed", "123456789", + ("Item", "111", (("'javaClass", "services.Item"), ("'name", "Apple"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 2.99))), + ("Item", "222", (("'javaClass", "services.Item"), ("'name", "Orange"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 3.55))), + ("Item", "333", (("'javaClass", "services.Item"), ("name", "Pear"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 1.55)))) + + entry = (("'javaClass", "services.Item"), ("'name", "Apple"), ("'currencyCode", "USD"), ("'currencySymbol", "$"), ("'price", 2.99)) + return ("Item", id, entry) + +def post(collection, item): + return ("123456789",) + +def put(id, item): return true def delete(id): return true - diff --git a/sca-cpp/trunk/modules/scheme/primitive.hpp b/sca-cpp/trunk/modules/scheme/primitive.hpp index 5a13725ffd..e911c76f6c 100644 --- a/sca-cpp/trunk/modules/scheme/primitive.hpp +++ b/sca-cpp/trunk/modules/scheme/primitive.hpp @@ -172,15 +172,15 @@ const value cdddrProc(unused const list<value>& args) { } const value startProc(unused const list<value>& args) { - return false; + return lambda<value(const list<value>&)>(); } const value stopProc(unused const list<value>& args) { - return false; + return lambda<value(const list<value>&)>(); } const value restartProc(unused const list<value>& args) { - return false; + return lambda<value(const list<value>&)>(); } const value applyPrimitiveProcedure(const value& proc, list<value>& args) { diff --git a/sca-cpp/trunk/modules/server/client-test.scm b/sca-cpp/trunk/modules/server/client-test.scm index 11c71cdfce..47b799d390 100644 --- a/sca-cpp/trunk/modules/server/client-test.scm +++ b/sca-cpp/trunk/modules/server/client-test.scm @@ -21,26 +21,18 @@ ; ATOMPub test case -(define (getall ref) - (ref "getall") -) - (define (get id ref) (ref "get" id) ) -(define (post entry ref) - (ref "post" entry) +(define (post coll entry ref) + (ref "post" coll entry) ) (define (put id entry ref) (ref "put" id entry) ) -(define (deleteall ref) - (ref deleteall) -) - (define (delete id ref) (ref "delete" id) ) diff --git a/sca-cpp/trunk/modules/server/impl-test.cpp b/sca-cpp/trunk/modules/server/impl-test.cpp index 51162d070d..3fa1d65323 100644 --- a/sca-cpp/trunk/modules/server/impl-test.cpp +++ b/sca-cpp/trunk/modules/server/impl-test.cpp @@ -38,7 +38,7 @@ const failable<value> get(unused const list<value>& params) { } const failable<value> post(unused const list<value>& params) { - return value(string("123456789")); + return value(mklist<value>(string("123456789"))); } const failable<value> put(unused const list<value>& params) { diff --git a/sca-cpp/trunk/modules/server/mod-cpp.hpp b/sca-cpp/trunk/modules/server/mod-cpp.hpp index 19ba938034..23b4941c26 100644 --- a/sca-cpp/trunk/modules/server/mod-cpp.hpp +++ b/sca-cpp/trunk/modules/server/mod-cpp.hpp @@ -37,7 +37,6 @@ #include "monad.hpp" #include "dynlib.hpp" #include "../scheme/driver.hpp" -#include "mod-eval.hpp" namespace tuscany { namespace server { @@ -54,8 +53,8 @@ const list<value> failableResult(const value& func, const list<value>& v) { // Except for the start, stop, and restart functions, which are optional const value reason = cadr(v); if (length(reason) == 0) { - if (func == "start" || func == "stop" || func == "restart") - return mklist<value>(false); + if (func == "start" || func == "restart" || func == "stop") + return mklist<value>(lambda<value(const list<value>&)>()); return mklist<value>(value(), string("Function not supported: ") + func); } return v; @@ -82,7 +81,7 @@ struct applyImplementation { * Evaluate a C++ component implementation and convert it to * an applicable lambda function. */ -const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px, unused modeval::ServerConf& sc) { +const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px) { // Configure the implementation's lambda function const value ipath(attributeValue("path", impl)); diff --git a/sca-cpp/trunk/modules/server/mod-eval.cpp b/sca-cpp/trunk/modules/server/mod-eval.cpp index 5a9b87ca2f..a94ccf5bbe 100644 --- a/sca-cpp/trunk/modules/server/mod-eval.cpp +++ b/sca-cpp/trunk/modules/server/mod-eval.cpp @@ -37,36 +37,23 @@ namespace server { namespace modeval { /** - * Start the module. + * Apply a lifecycle start or restart event. */ -const failable<bool> start(unused ServerConf& sc) { - return true; -} - -/** - * Stop the module. - */ -const failable<bool> stop(unused ServerConf& sc) { - return true; -} - -/** - * Restart the module. - */ -const failable<bool> restart(unused ServerConf& sc) { - return true; +const value applyLifecycle(unused const list<value>& params) { + // Return a nil function as we don't need to handle any subsequent events + return failable<value>(lambda<value(const list<value>&)>()); } /** * Evaluate a Scheme or C++ component implementation and convert it to an * applicable lambda function. */ -const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px, ServerConf& sc) { +const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px, unused const lambda<value(const list<value>&)>& lifecycle) { const string itype(elementName(impl)); if (contains(itype, ".scheme")) - return modscheme::evalImplementation(path, impl, px, sc); + return modscheme::evalImplementation(path, impl, px); if (contains(itype, ".cpp")) - return modcpp::evalImplementation(path, impl, px, sc); + return modcpp::evalImplementation(path, impl, px); return mkfailure<lambda<value(const list<value>&)> >(string("Unsupported implementation type: ") + itype); } diff --git a/sca-cpp/trunk/modules/server/mod-eval.hpp b/sca-cpp/trunk/modules/server/mod-eval.hpp index 6ab1d68ad6..03cce5aeb4 100644 --- a/sca-cpp/trunk/modules/server/mod-eval.hpp +++ b/sca-cpp/trunk/modules/server/mod-eval.hpp @@ -53,16 +53,17 @@ namespace modeval { */ class ServerConf { public: - ServerConf(server_rec* s) : s(s), moduleConf(NULL), home(""), wiringServerName(""), contributionPath(""), compositeName("") { + ServerConf(server_rec* s) : s(s), home(""), wiringServerName(""), contributionPath(""), compositeName("") { } const server_rec* s; - void* moduleConf; + lambda<value(const list<value>&)> lifecycle; string home; string wiringServerName; string contributionPath; string compositeName; list<value> implementations; + list<value> implTree; }; /** @@ -103,20 +104,15 @@ const failable<int> get(request_rec* r, const lambda<value(const list<value>&)>& return httpd::writeResult(json::jsonResult(id, content(val), cx), "application/json-rpc", r); } - // Evaluate an ATOM GET request and return an ATOM feed representing a collection of resources - const list<value> path(httpd::pathValues(r->uri)); - if (isNil(cddr(path))) { - const failable<value> val = failableResult(impl(cons<value>("getall", list<value>()))); - if (!hasContent(val)) - return mkfailure<int>(reason(val)); - return httpd::writeResult(atom::writeATOMFeed(atom::feedValuesToElements(content(val))), "application/atom+xml;type=feed", r); - } - - // Evaluate an ATOM GET and return an ATOM entry representing a resource - const failable<value> val = failableResult(impl(cons<value>("get", mklist<value>(caddr(path))))); + // Evaluate the GET expression and return an ATOM entry or feed representing a resource + const list<value> path(pathValues(r->uri)); + const failable<value> val = failableResult(impl(cons<value>("get", mklist<value>(cddr(path))))); if (!hasContent(val)) return mkfailure<int>(reason(val)); - return httpd::writeResult(atom::writeATOMEntry(atom::entryValuesToElements(content(val))), "application/atom+xml;type=entry", r); + if (isNil(cddr(path))) + return httpd::writeResult(atom::writeATOMFeed(atom::feedValuesToElements(content(val))), "application/atom+xml;type=feed", r); + else + return httpd::writeResult(atom::writeATOMEntry(atom::entryValuesToElements(content(val))), "application/atom+xml;type=entry", r); } /** @@ -157,6 +153,7 @@ const failable<int> post(request_rec* r, const lambda<value(const list<value>&)> if (contains(ct, "application/atom+xml")) { // Read the ATOM entry + const list<value> path(pathValues(r->uri)); const int rc = httpd::setupReadPolicy(r); if(rc != OK) return rc; @@ -164,12 +161,13 @@ const failable<int> post(request_rec* r, const lambda<value(const list<value>&)> debug(ls, "modeval::post::input"); const value entry = atom::entryValue(content(atom::readEntry(ls))); - // Evaluate the request expression - const failable<value> val = failableResult(impl(cons<value>("post", mklist<value>(entry)))); + // Evaluate the POST expression + const failable<value> val = failableResult(impl(cons<value>("post", mklist<value>(cddr(path), entry)))); if (!hasContent(val)) return mkfailure<int>(reason(val)); // Return the created resource location + debug(content(val), "modeval::post::location"); apr_table_setn(r->headers_out, "Location", apr_pstrdup(r->pool, httpd::url(content(val), r))); r->status = HTTP_CREATED; return OK; @@ -190,7 +188,7 @@ const failable<int> put(request_rec* r, const lambda<value(const list<value>&)>& debug(r->uri, "modeval::put::url"); // Read the ATOM entry - const list<value> path(httpd::pathValues(r->uri)); + const list<value> path(pathValues(r->uri)); const int rc = httpd::setupReadPolicy(r); if(rc != OK) return rc; @@ -198,8 +196,8 @@ const failable<int> put(request_rec* r, const lambda<value(const list<value>&)>& debug(ls, "modeval::put::input"); const value entry = atom::entryValue(content(atom::readEntry(ls))); - // Evaluate the ATOM PUT request and update the corresponding resource - const failable<value> val = failableResult(impl(cons<value>("put", mklist<value>(caddr(path), entry)))); + // Evaluate the PUT expression and update the corresponding resource + const failable<value> val = failableResult(impl(cons<value>("put", mklist<value>(cddr(path), entry)))); if (!hasContent(val)) return mkfailure<int>(reason(val)); if (val == value(false)) @@ -214,20 +212,8 @@ const failable<int> del(request_rec* r, const lambda<value(const list<value>&)>& debug(r->uri, "modeval::delete::url"); // Evaluate an ATOM delete request - const list<value> path(httpd::pathValues(r->uri)); - if (isNil(cddr(path))) { - - // Delete a collection of resources - const failable<value> val = failableResult(impl(cons<value>("deleteall", list<value>()))); - if (!hasContent(val)) - return mkfailure<int>(reason(val)); - if (val == value(false)) - return HTTP_NOT_FOUND; - return OK; - } - - // Delete a resource - const failable<value> val = failableResult(impl(cons<value>("delete", mklist<value>(caddr(path))))); + const list<value> path(pathValues(r->uri)); + const failable<value> val = failableResult(impl(cons<value>("delete", mklist<value>(cddr(path))))); if (!hasContent(val)) return mkfailure<int>(reason(val)); if (val == value(false)) @@ -257,8 +243,8 @@ int handler(request_rec *r) { // Get the component implementation lambda const ServerConf& sc = httpd::serverConf<ServerConf>(r, &mod_tuscany_eval); - const list<value> path(httpd::pathValues(r->uri)); - const list<value> impl(assoctree<value>(cadr(path), sc.implementations)); + const list<value> path(pathValues(r->uri)); + const list<value> impl(assoctree<value>(cadr(path), sc.implTree)); if (isNil(impl)) return HTTP_NOT_FOUND; @@ -317,7 +303,7 @@ const list<value> propProxies(const list<value>& props) { * Evaluate a component and convert it to an applicable lambda function. */ const value evalComponent(ServerConf& sc, server_rec& server, const value& comp) { - extern const failable<lambda<value(const list<value>&)> > evalImplementation(const string& cpath, const value& impl, const list<value>& px, ServerConf& sc); + extern const failable<lambda<value(const list<value>&)> > evalImplementation(const string& cpath, const value& impl, const list<value>& px, const lambda<value(const list<value>&)>& lifecycle); const value impl = scdl::implementation(comp); @@ -336,7 +322,7 @@ const value evalComponent(ServerConf& sc, server_rec& server, const value& comp) const list<value> ppx(propProxies(scdl::properties(comp))); // Evaluate the component implementation and convert it to an applicable lambda function - const failable<lambda<value(const list<value>&)> > cimpl(evalImplementation(sc.contributionPath, impl, append(rpx, ppx), sc)); + const failable<lambda<value(const list<value>&)> > cimpl(evalImplementation(sc.contributionPath, impl, append(rpx, ppx), sc.lifecycle)); if (!hasContent(cimpl)) return reason(cimpl); return content(cimpl); @@ -362,19 +348,28 @@ const failable<list<value> > readComponents(const string& path) { } /** - * Apply a list of component implementations to a (start, stop or restart) lifecycle expression. + * Apply a list of component implementations to a start or restart lifecycle expression. + * Return the functions returned by the component implementations. */ -const failable<bool> applyLifecycleExpr(const list<value> impls, const list<value>& expr) { +const failable<list<value> > applyLifecycleExpr(const list<value>& impls, const list<value>& expr) { if (isNil(impls)) - return true; + return list<value>(); // Evaluate lifecycle expression against a component implementation lambda - const lambda<value(const list<value>&)> l(cadr<value>(car(impls))); + const lambda<value(const list<value>&)> l = cadr<value>(car(impls)); const failable<value> r = failableResult(l(expr)); if (!hasContent(r)) - return mkfailure<bool>(reason(r)); + return mkfailure<list<value> >(reason(r)); + const lambda<value(const list<value>&)> rl = content(r); + + // Use the returned lambda function, if any, from now on + const lambda<value(const list<value>&)> al = isNil(rl)? l : rl; - return applyLifecycleExpr(cdr(impls), expr); + // Continue with the rest of the list + const failable<list<value> > nr = applyLifecycleExpr(cdr(impls), expr); + if (!hasContent(nr)) + return nr; + return cons<value>(mklist<value>(car<value>(car(impls)), value(al)), content(nr)); } /** @@ -388,14 +383,106 @@ const failable<bool> confComponents(const string& lifecycle, ServerConf& sc, ser const failable<list<value> > comps = readComponents(sc.contributionPath + sc.compositeName); if (!hasContent(comps)) return mkfailure<bool>(reason(comps)); - const list<value> impls = componentToImplementationAssoc(sc, server, content(comps)); + const list<value> starts = componentToImplementationAssoc(sc, server, content(comps)); - // Store the implementation lambda functions in a tree for fast retrieval - sc.implementations = mkbtree(sort(impls)); + // Start or restart the component implementations + // Record the returned lambda functions + debug(starts, "modeval::confComponents::start"); + const failable<list<value> > impls = applyLifecycleExpr(starts, mklist<value>(c_str(lifecycle))); + if (!hasContent(impls)) + return mkfailure<bool>(reason(impls)); + sc.implementations = content(impls); debug(sc.implementations, "modeval::confComponents::implementations"); - // Start or restart the component implementations - return applyLifecycleExpr(impls, mklist<value>(c_str(lifecycle))); + // Store the implementation lambda functions in a tree for fast retrieval + sc.implTree = mkbtree(sort(sc.implementations)); + + return true; +} + +/** + * Cleanup callback, called when the server is stopped or restarted. + */ +apr_status_t serverCleanup(void* v) { + gc_pool pool; + ServerConf& sc = *(ServerConf*)v; + debug("modeval::serverCleanup"); + + // Stop the component implementations + applyLifecycleExpr(sc.implementations, mklist<value>("stop")); + + // Call the module lifecycle function + if (isNil(sc.lifecycle)) + return APR_SUCCESS; + debug("modeval::serverCleanup::stop"); + sc.lifecycle(mklist<value>("stop")); + + return APR_SUCCESS; +} + +/** + * Called after all the configuration commands have been run. + * Process the server configuration and configure the deployed components. + */ +int postConfig(apr_pool_t *p, unused apr_pool_t *plog, unused apr_pool_t *ptemp, server_rec *s) { + extern const value applyLifecycle(const list<value>&); + + gc_scoped_pool pool(p); + ServerConf& sc = httpd::serverConf<ServerConf>(s, &mod_tuscany_eval); + + // Count the calls to post config + const string k("tuscany::modeval::postConfig"); + const int count = (int)httpd::userData(k, s); + httpd::putUserData(k, (void*)(count + 1), s); + + // Count == 0, do nothing as post config is always called twice, + // count == 1 is the first start, count > 1 is a restart + if (count == 0) + return OK; + if (count == 1) { + debug("modeval::postConfig::start"); + const failable<value> r = failableResult(applyLifecycle(mklist<value>("start"))); + if (!hasContent(r)) + return -1; + sc.lifecycle = content(r); + } + if (count > 1) { + debug("modeval::postConfig::restart"); + const failable<value> r = failableResult(applyLifecycle(mklist<value>("restart"))); + if (!hasContent(r)) + return -1; + sc.lifecycle = content(r); + } + + // Configure the deployed components + debug(sc.wiringServerName, "modeval::postConfig::wiringServerName"); + debug(sc.contributionPath, "modeval::postConfig::contributionPath"); + debug(sc.compositeName, "modeval::postConfig::compositeName"); + const failable<bool> res = confComponents(count > 1? "restart" : "start", sc, *s); + if (!hasContent(res)) { + cerr << "[Tuscany] Due to one or more errors mod_tuscany_eval loading failed. Causing apache to stop loading." << endl; + return -1; + } + + // Register a cleanup callback, called when the server is stopped or restarted + apr_pool_pre_cleanup_register(p, (void*)&sc, serverCleanup); + + return OK; +} + +/** + * Child process initialization. + */ +void childInit(apr_pool_t* p, server_rec* s) { + gc_scoped_pool pool(p); + ServerConf* sc = (ServerConf*)ap_get_module_config(s->module_config, &mod_tuscany_eval); + if(sc == NULL) { + cerr << "[Tuscany] Due to one or more errors mod_tuscany_eval loading failed. Causing apache to stop loading." << endl; + exit(APEXIT_CHILDFATAL); + } + + // Register a cleanup callback, called when the child is stopped or restarted + apr_pool_pre_cleanup_register(p, (void*)sc, serverCleanup); } /** @@ -444,51 +531,6 @@ const command_rec commands[] = { {NULL, NULL, NULL, 0, NO_ARGS, NULL} }; -int postConfig(apr_pool_t *p, unused apr_pool_t *plog, unused apr_pool_t *ptemp, server_rec *s) { - extern const failable<bool> start(ServerConf& sc); - extern const failable<bool> restart(ServerConf& sc); - gc_scoped_pool pool(p); - ServerConf& sc = httpd::serverConf<ServerConf>(s, &mod_tuscany_eval); - - // Count the calls to post config - const string k("tuscany::modeval::postConfig"); - const int count = httpd::userData(k, s); - httpd::putUserData(k, count +1, s); - - // Count == 0, do nothing as post config is always called twice, - // count == 1 is the first start, count > 1 is a restart - if (count == 0) - return OK; - if (count == 1) { - debug("modeval::postConfig::start"); - start(sc); - } - if (count > 1) { - debug("modeval::postConfig::restart"); - restart(sc); - } - - // Configure the components deployed to the server - debug(sc.wiringServerName, "modeval::postConfig::wiringServerName"); - debug(sc.contributionPath, "modeval::postConfig::contributionPath"); - debug(sc.compositeName, "modeval::postConfig::compositeName"); - const failable<bool> res = confComponents(count > 1? "restart" : "start", sc, *s); - if (!hasContent(res)) - return -1; - return OK; -} - -/** - * Child process initialization. - */ -void childInit(apr_pool_t* p, server_rec* svr_rec) { - gc_scoped_pool pool(p); - ServerConf* c = (ServerConf*)ap_get_module_config(svr_rec->module_config, &mod_tuscany_eval); - if(c == NULL) { - cerr << "[Tuscany] Due to one or more errors mod_tuscany_eval loading failed. Causing apache to stop loading." << endl; - exit(APEXIT_CHILDFATAL); - } -} void registerHooks(unused apr_pool_t *p) { ap_hook_post_config(postConfig, NULL, NULL, APR_HOOK_MIDDLE); diff --git a/sca-cpp/trunk/modules/server/mod-scheme.hpp b/sca-cpp/trunk/modules/server/mod-scheme.hpp index e18eececf6..f5b9554c3f 100644 --- a/sca-cpp/trunk/modules/server/mod-scheme.hpp +++ b/sca-cpp/trunk/modules/server/mod-scheme.hpp @@ -34,7 +34,6 @@ #include "value.hpp" #include "monad.hpp" #include "../scheme/eval.hpp" -#include "../server/mod-eval.hpp" namespace tuscany { namespace server { @@ -61,11 +60,10 @@ struct applyImplementation { const value expr = cons<value>(car(params), append(scheme::quotedParameters(cdr(params)), px)); debug(expr, "modeval::scheme::applyImplementation::input"); scheme::Env env = scheme::setupEnvironment(); - const value val = scheme::evalScript(expr, impl, env); + const value res = scheme::evalScript(expr, impl, env); + const value val = isNil(res)? mklist<value>(value(), string("Could not evaluate expression")) : mklist<value>(res); debug(val, "modeval::scheme::applyImplementation::result"); - if (isNil(val)) - return mklist<value>(value(), string("Could not evaluate expression")); - return mklist<value>(val); + return val; } }; @@ -73,7 +71,7 @@ struct applyImplementation { * Evaluate a Scheme component implementation and convert it to an * applicable lambda function. */ -const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px, unused modeval::ServerConf& sc) { +const failable<lambda<value(const list<value>&)> > evalImplementation(const string& path, const value& impl, const list<value>& px) { const string fpath(path + attributeValue("script", impl)); ifstream is(fpath); if (fail(is)) diff --git a/sca-cpp/trunk/modules/server/mod-wiring.cpp b/sca-cpp/trunk/modules/server/mod-wiring.cpp index e21f8be773..c21b0fe254 100644 --- a/sca-cpp/trunk/modules/server/mod-wiring.cpp +++ b/sca-cpp/trunk/modules/server/mod-wiring.cpp @@ -82,7 +82,7 @@ int translateReference(request_rec *r) { // Find the requested component const ServerConf& sc = httpd::serverConf<ServerConf>(r, &mod_tuscany_wiring); - const list<value> rpath(httpd::pathValues(r->uri)); + const list<value> rpath(pathValues(r->uri)); const list<value> comp(assoctree(cadr(rpath), sc.references)); if (isNil(comp)) return HTTP_NOT_FOUND; @@ -147,20 +147,20 @@ int translateService(request_rec *r) { // Find the requested component const ServerConf& sc = httpd::serverConf<ServerConf>(r, &mod_tuscany_wiring); - const list<value> path(httpd::pathValues(r->uri)); - const list<value> svc(assocPath(path, sc.services)); + const list<value> p(pathValues(r->uri)); + const list<value> svc(assocPath(p, sc.services)); if (isNil(svc)) return DECLINED; debug(svc, "modwiring::translateService::service"); // Build a component-name + path-info URI - const list<value> target(cons<value>(cadr(svc), httpd::pathInfo(path, car(svc)))); + const list<value> target(cons<value>(cadr(svc), httpd::pathInfo(p, car(svc)))); debug(target, "modwiring::translateService::target"); // Dispatch to the target component using a local internal redirect - const string p(httpd::path(target)); - debug(p, "modwiring::translateService::path"); - const string redir(string("/redirect:/components") + httpd::path(target)); + const string tp(path(target)); + debug(tp, "modwiring::translateService::path"); + const string redir(string("/redirect:/components") + tp); debug(redir, "modwiring::translateService::redirect"); r->filename = apr_pstrdup(r->pool, c_str(redir)); r->handler = "mod_tuscany_wiring"; @@ -216,7 +216,7 @@ const failable<list<value> > readComponents(const string& path) { } /** - * Return a tree of component-name + references pairs. The references are + * Return a list of component-name + references pairs. The references are * arranged in trees of reference-name + reference-target pairs. */ const list<value> componentReferenceToTargetTree(const value& c) { @@ -229,12 +229,8 @@ const list<value> componentReferenceToTargetAssoc(const list<value>& c) { return cons<value>(componentReferenceToTargetTree(car(c)), componentReferenceToTargetAssoc(cdr(c))); } -const list<value> componentReferenceToTargetTree(const list<value>& c) { - return mkbtree(sort(componentReferenceToTargetAssoc(c))); -} - /** - * Return a tree of service-URI-path + component-name pairs. Service-URI-paths are + * Return a list of service-URI-path + component-name pairs. Service-URI-paths are * represented as lists of URI path fragments. */ const list<value> defaultBindingURI(const string& cn, const string& sn) { @@ -247,7 +243,7 @@ const list<value> bindingToComponentAssoc(const string& cn, const string& sn, co const value uri(scdl::uri(car(b))); if (isNil(uri)) return cons<value>(mklist<value>(defaultBindingURI(cn, sn), cn), bindingToComponentAssoc(cn, sn, cdr(b))); - return cons<value>(mklist<value>(httpd::pathValues(c_str(string(uri))), cn), bindingToComponentAssoc(cn, sn, cdr(b))); + return cons<value>(mklist<value>(pathValues(c_str(string(uri))), cn), bindingToComponentAssoc(cn, sn, cdr(b))); } const list<value> serviceToComponentAssoc(const string& cn, const list<value>& s) { @@ -266,10 +262,6 @@ const list<value> uriToComponentAssoc(const list<value>& c) { return append<value>(serviceToComponentAssoc(scdl::name(car(c)), scdl::services(car(c))), uriToComponentAssoc(cdr(c))); } -const list<value> uriToComponentTree(const list<value>& c) { - return mkbtree(sort(uriToComponentAssoc(c))); -} - /** * Configure the components declared in the server's deployment composite. */ @@ -277,19 +269,56 @@ const bool confComponents(ServerConf& sc) { if (sc.contributionPath == "" || sc.compositeName == "") return true; - // Read the component configuration and store the references and service - // URIs in trees for fast retrieval later + // Read the component configuration and store the references and service URIs + // in trees for fast retrieval later const failable<list<value> > comps = readComponents(sc.contributionPath + sc.compositeName); if (!hasContent(comps)) return true; - sc.references = componentReferenceToTargetTree(content(comps)); - debug(sc.references, "modwiring::confComponents::references"); - sc.services = uriToComponentTree(content(comps)); - debug(sc.services, "modwiring::confComponents::services"); + const list<value> refs = componentReferenceToTargetAssoc(content(comps)); + debug(refs, "modwiring::confComponents::references"); + sc.references = mkbtree(sort(refs)); + + const list<value> svcs = uriToComponentAssoc(content(comps)); + debug(svcs, "modwiring::confComponents::services"); + sc.services = mkbtree(sort(svcs)); return true; } /** + * Called after all the configuration commands have been run. + * Process the server configuration and configure the wiring for the deployed components. + */ +int postConfig(unused apr_pool_t *p, unused apr_pool_t *plog, unused apr_pool_t *ptemp, server_rec *s) { + // Count the calls to post config, skip the first one as + // postConfig is always called twice + const string k("tuscany::modwiring::postConfig"); + const int count = (int)httpd::userData(k, s); + httpd::putUserData(k, (void*)(count + 1), s); + if (count == 0) + return OK; + + // Configure the wiring for the deployed components + ServerConf& sc = httpd::serverConf<ServerConf>(s, &mod_tuscany_wiring); + debug(sc.wiringServerName, "modwiring::postConfig::wiringServerName"); + debug(sc.contributionPath, "modwiring::postConfig::contributionPath"); + debug(sc.compositeName, "modwiring::postConfig::compositeName"); + confComponents(sc); + return OK; +} + +/** + * Child process initialization. + */ +void childInit(apr_pool_t* p, server_rec* svr_rec) { + gc_scoped_pool pool(p); + ServerConf *conf = (ServerConf*)ap_get_module_config(svr_rec->module_config, &mod_tuscany_wiring); + if(conf == NULL) { + cerr << "[Tuscany] Due to one or more errors mod_tuscany_wiring loading failed. Causing apache to stop loading." << endl; + exit(APEXIT_CHILDFATAL); + } +} + +/** * Configuration commands. */ const char *confHome(cmd_parms *cmd, unused void *c, const char *arg) { @@ -328,33 +357,6 @@ const command_rec commands[] = { {NULL, NULL, NULL, 0, NO_ARGS, NULL} }; -int postConfig(unused apr_pool_t *p, unused apr_pool_t *plog, unused apr_pool_t *ptemp, server_rec *s) { - // Count the calls to post config, skip the first one as - // postConfig is always called twice - const string k("tuscany::modwiring::postConfig"); - const int count = httpd::userData(k, s); - httpd::putUserData(k, count +1, s); - if (count == 0) - return OK; - - // Configure the wiring for the deployed components - ServerConf& sc = httpd::serverConf<ServerConf>(s, &mod_tuscany_wiring); - debug(sc.wiringServerName, "modwiring::postConfig::wiringServerName"); - debug(sc.contributionPath, "modwiring::postConfig::contributionPath"); - debug(sc.compositeName, "modwiring::postConfig::compositeName"); - confComponents(sc); - return OK; -} - -void childInit(apr_pool_t* p, server_rec* svr_rec) { - gc_scoped_pool pool(p); - ServerConf *conf = (ServerConf*)ap_get_module_config(svr_rec->module_config, &mod_tuscany_wiring); - if(conf == NULL) { - cerr << "[Tuscany] Due to one or more errors mod_tuscany_wiring loading failed. Causing apache to stop loading." << endl; - exit(APEXIT_CHILDFATAL); - } -} - void registerHooks(unused apr_pool_t *p) { ap_hook_post_config(postConfig, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE); diff --git a/sca-cpp/trunk/modules/server/server-test.scm b/sca-cpp/trunk/modules/server/server-test.scm index 8864f55cfd..490f168170 100644 --- a/sca-cpp/trunk/modules/server/server-test.scm +++ b/sca-cpp/trunk/modules/server/server-test.scm @@ -21,30 +21,24 @@ ; ATOMPub test case -(define (getall) - '("Sample Feed" "123456789" - ("Item" "111" ((javaClass "services.Item") (name "Apple") (currencyCode "USD") (currencySymbol "$") (price 2.99))) - ("Item" "222" ((javaClass "services.Item") (name "Orange") (currencyCode "USD") (currencySymbol "$") (price 3.55))) - ("Item" "333" ((javaClass "services.Item") (name "Pear") (currencyCode "USD") (currencySymbol "$") (price 1.55)))) -) - (define (get id) - (define entry '((javaClass "services.Item") (name "Apple") (currencyCode "USD") (currencySymbol "$") (price 2.99))) - (cons "Item" (list id entry)) + (if (nul id) + '("Sample Feed" "123456789" + ("Item" "111" ((javaClass "services.Item") (name "Apple") (currencyCode "USD") (currencySymbol "$") (price 2.99))) + ("Item" "222" ((javaClass "services.Item") (name "Orange") (currencyCode "USD") (currencySymbol "$") (price 3.55))) + ("Item" "333" ((javaClass "services.Item") (name "Pear") (currencyCode "USD") (currencySymbol "$") (price 1.55)))) + + '("Item" "111" ((javaClass "services.Item") (name "Apple") (currencyCode "USD") (currencySymbol "$") (price 2.99)))) ) -(define (post entry) - "123456789" +(define (post coll entry) + '("123456789") ) (define (put id entry) true ) -(define (deleteall) - true -) - (define (delete id) true ) diff --git a/sca-cpp/trunk/test/store-cpp/start b/sca-cpp/trunk/test/store-cpp/start index 06410aca01..3c1da356e6 100755 --- a/sca-cpp/trunk/test/store-cpp/start +++ b/sca-cpp/trunk/test/store-cpp/start @@ -25,8 +25,5 @@ SCAContribution `pwd`/ SCAComposite store.composite EOF +../../components/cache/memcached-start ../../modules/http/httpd-start tmp - -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & - diff --git a/sca-cpp/trunk/test/store-cpp/stop b/sca-cpp/trunk/test/store-cpp/stop index 07231ee7ce..a59273b8ed 100755 --- a/sca-cpp/trunk/test/store-cpp/stop +++ b/sca-cpp/trunk/test/store-cpp/stop @@ -18,6 +18,4 @@ # under the License. ../../modules/http/httpd-stop tmp -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` - +../../components/cache/memcached-stop diff --git a/sca-cpp/trunk/test/store-java/start b/sca-cpp/trunk/test/store-java/start index 3f669f0111..ae2743178a 100755 --- a/sca-cpp/trunk/test/store-java/start +++ b/sca-cpp/trunk/test/store-java/start @@ -27,8 +27,5 @@ EOF export CLASSPATH=`pwd`/../../modules/java/libmod-tuscany-java-1.0.jar:`pwd` +../../components/cache/memcached-start ../../modules/http/httpd-start tmp - -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & - diff --git a/sca-cpp/trunk/test/store-java/stop b/sca-cpp/trunk/test/store-java/stop index 54b22172bb..79d914e937 100755 --- a/sca-cpp/trunk/test/store-java/stop +++ b/sca-cpp/trunk/test/store-java/stop @@ -20,6 +20,4 @@ export CLASSPATH=`pwd`/../../modules/java/libmod-tuscany-java-1.0.jar:`pwd` ../../modules/http/httpd-stop tmp -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` - +../../components/cache/memcached-stop diff --git a/sca-cpp/trunk/test/store-python/start b/sca-cpp/trunk/test/store-python/start index 33758b75fd..93e1dbe755 100755 --- a/sca-cpp/trunk/test/store-python/start +++ b/sca-cpp/trunk/test/store-python/start @@ -25,8 +25,5 @@ SCAContribution `pwd`/ SCAComposite store.composite EOF +../../components/cache/memcached-start ../../modules/http/httpd-start tmp - -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & - diff --git a/sca-cpp/trunk/test/store-python/stop b/sca-cpp/trunk/test/store-python/stop index 07231ee7ce..a59273b8ed 100755 --- a/sca-cpp/trunk/test/store-python/stop +++ b/sca-cpp/trunk/test/store-python/stop @@ -18,6 +18,4 @@ # under the License. ../../modules/http/httpd-stop tmp -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` - +../../components/cache/memcached-stop diff --git a/sca-cpp/trunk/test/store-scheme/start b/sca-cpp/trunk/test/store-scheme/start index 398e21c8e4..db8e19c4cc 100755 --- a/sca-cpp/trunk/test/store-scheme/start +++ b/sca-cpp/trunk/test/store-scheme/start @@ -25,8 +25,5 @@ SCAContribution `pwd`/ SCAComposite store.composite EOF +../../components/cache/memcached-start ../../modules/http/httpd-start tmp - -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -$mc & - diff --git a/sca-cpp/trunk/test/store-scheme/stop b/sca-cpp/trunk/test/store-scheme/stop index 07231ee7ce..a59273b8ed 100755 --- a/sca-cpp/trunk/test/store-scheme/stop +++ b/sca-cpp/trunk/test/store-scheme/stop @@ -18,6 +18,4 @@ # under the License. ../../modules/http/httpd-stop tmp -mc="memcached -l 127.0.0.1 -m 4 -p 11211" -kill `ps -f | grep -v grep | grep "$mc" | awk '{ print $2 }'` - +../../components/cache/memcached-stop |