diff options
Diffstat (limited to 'sca-cpp/branches/cpp-contrib/components/queue')
12 files changed, 931 insertions, 0 deletions
diff --git a/sca-cpp/branches/cpp-contrib/components/queue/Makefile.am b/sca-cpp/branches/cpp-contrib/components/queue/Makefile.am new file mode 100644 index 0000000000..09ff0e54a4 --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/Makefile.am @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +if WANT_QUEUE + +noinst_PROGRAMS = qpid-test client-test + +INCLUDES = -I${QPIDC_INCLUDE} + +compdir=$(prefix)/components/queue +comp_LTLIBRARIES = libqueue-sender.la libqueue-listener.la +comp_DATA = qpidc.prefix + +libqueue_sender_la_SOURCES = queue-sender.cpp +libqueue_sender_la_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient + +libqueue_listener_la_SOURCES = queue-listener.cpp +libqueue_listener_la_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient + +qpid_test_SOURCES = qpid-test.cpp +qpid_test_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient + +client_test_SOURCES = client-test.cpp +client_test_LDFLAGS = -lxml2 -lcurl -lmozjs -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient + +qpidc.prefix: $(top_builddir)/config.status + echo ${QPIDC_PREFIX} >qpidc.prefix + +TESTS = send-test server-test + +endif diff --git a/sca-cpp/branches/cpp-contrib/components/queue/client-test.cpp b/sca-cpp/branches/cpp-contrib/components/queue/client-test.cpp new file mode 100644 index 0000000000..a448d1fccd --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/client-test.cpp @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +/** + * Test queue component. + */ + +#include <assert.h> +#include "stream.hpp" +#include "string.hpp" +#include "list.hpp" +#include "element.hpp" +#include "value.hpp" +#include "monad.hpp" +#include "perf.hpp" +#include "../../modules/http/curl.hpp" +#include "qpid.hpp" + +// Ignore conversion issues and redundant declarations in Qpid headers +#ifdef WANT_MAINTAINER_MODE +#pragma GCC diagnostic ignored "-Wconversion" +#pragma GCC diagnostic ignored "-Wredundant-decls" +#endif + +namespace tuscany { +namespace queue { + +const value key(mklist<value>(string("report"))); +const string qname("reportq"); + +const list<value> item = list<value>() + + (list<value>() + "name" + string("Apple")) + + (list<value>() + "price" + string("$2.99")); +const list<value> entry = mklist<value>(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), item); + +bool testDeclareQueue() { + QpidConnection qc; + QpidSession qs(qc); + const failable<bool> r = declareQueue(key, qname, qs); + assert(hasContent(r)); + return true; +} + +const bool listener(const value& k, const value& v) { + cerr << "k " << k << " v " << v << endl; + assert(k == key); + assert(v == entry); + return false; +} + +bool testListen() { + QpidConnection qc; + QpidSession qs(qc); + QpidSubscription qsub(qs); + const lambda<bool(const value&, const value&)> l(listener); + listen(qname, l, qsub); + return true; +} + +bool testPost() { + gc_scoped_pool pool; + http::CURLSession ch; + const failable<value> id = http::post(entry, "http://localhost:8090/print-sender", ch); + assert(hasContent(id)); + return true; +} + +} +} + +int main() { + tuscany::cout << "Testing..." << tuscany::endl; + + tuscany::queue::testDeclareQueue(); + tuscany::queue::testPost(); + tuscany::queue::testListen(); + + tuscany::cout << "OK" << tuscany::endl; + + return 0; +} diff --git a/sca-cpp/branches/cpp-contrib/components/queue/qpid-test.cpp b/sca-cpp/branches/cpp-contrib/components/queue/qpid-test.cpp new file mode 100644 index 0000000000..1a650157b2 --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/qpid-test.cpp @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +/** + * Test Qpid support functions. + */ + +#include <assert.h> +#include "stream.hpp" +#include "string.hpp" +#include "list.hpp" +#include "element.hpp" +#include "monad.hpp" +#include "value.hpp" +#include "perf.hpp" +#include "qpid.hpp" + +// Ignore conversion issues and redundant declarations in Qpid headers +#ifdef WANT_MAINTAINER_MODE +#pragma GCC diagnostic ignored "-Wconversion" +#pragma GCC diagnostic ignored "-Wredundant-decls" +#endif + +namespace tuscany { +namespace queue { + +const value key(mklist<value>("test")); +const string qname("testq"); + +bool testDeclareQueue() { + QpidConnection qc; + QpidSession qs(qc); + const failable<bool> r = declareQueue(key, qname, qs); + assert(hasContent(r)); + return true; +} + +const list<value> item = list<value>() + + (list<value>() + "name" + string("Apple")) + + (list<value>() + "price" + string("$2.99")); +const list<value> entry = mklist<value>(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), item); + +bool testPost() { + QpidConnection qc; + QpidSession qs(qc); + const failable<bool> r = post(key, entry, qs); + assert(hasContent(r)); + return true; +} + +const bool listener(const value& k, const value& v) { + assert(k == key); + assert(v == entry); + return false; +} + +bool testListen() { + QpidConnection qc; + QpidSession qs(qc); + QpidSubscription qsub(qs); + const lambda<bool(const value&, const value&)> l(listener); + listen(qname, l, qsub); + return true; +} + +} +} + +int main() { + tuscany::cout << "Testing..." << tuscany::endl; + + tuscany::queue::testDeclareQueue(); + tuscany::queue::testPost(); + tuscany::queue::testListen(); + + tuscany::cout << "OK" << tuscany::endl; + + return 0; +} diff --git a/sca-cpp/branches/cpp-contrib/components/queue/qpid.hpp b/sca-cpp/branches/cpp-contrib/components/queue/qpid.hpp new file mode 100644 index 0000000000..8b466cedcc --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/qpid.hpp @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +#ifndef tuscany_queue_hpp +#define tuscany_queue_hpp + +/** + * AMQP queue access functions. + */ + +// Ignore conversion issues and redundant declarations in Qpid headers +#ifdef WANT_MAINTAINER_MODE +#pragma GCC diagnostic ignored "-Wconversion" +#pragma GCC diagnostic ignored "-Wredundant-decls" +#endif + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> + +#include "string.hpp" +#include "list.hpp" +#include "value.hpp" +#include "monad.hpp" +#include "../../modules/scheme/eval.hpp" + +namespace tuscany { +namespace queue { + +/** + * Represents a Qpid connection. + */ +class QpidConnection { +public: + QpidConnection() : owner(true) { + c.open("localhost", 5672); + } + + QpidConnection(const bool owner) : owner(owner) { + c.open("localhost", 5672); + } + + QpidConnection(const QpidConnection& qc) : owner(false), c(qc.c) { + } + + ~QpidConnection() { + if (!owner) + return; + c.close(); + } + +private: + friend const failable<bool> close(QpidConnection& qc); + friend class QpidSession; + + const bool owner; + qpid::client::Connection c; + +}; + +/** + * Close a Qpid connection. + */ +const failable<bool> close(QpidConnection& qc) { + qc.c.close(); + return true; +} + +/** + * Represents a Qpid session. + */ +class QpidSession { +public: + QpidSession(QpidConnection& qc) : owner(true), s(qc.c.newSession()) { + } + + QpidSession(QpidConnection& qc, const bool owner) : owner(owner), s(qc.c.newSession()) { + } + + QpidSession(const QpidSession& qs) : owner(false), s(qs.s) { + } + + ~QpidSession() { + if (!owner) + return; + s.close(); + } + +private: + friend const failable<bool> close(QpidSession& qs); + friend const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs); + friend const failable<bool> post(const value& key, const value& val, QpidSession& qs); + friend class QpidSubscription; + + const bool owner; + qpid::client::Session s; +}; + +/** + * Close a Qpid session. + */ +const failable<bool> close(QpidSession& qs) { + try { + qs.s.close(); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + return true; +} + +/** + * Declare a key / AMQP queue pair. + */ +const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs) { + const string ks(scheme::writeValue(key)); + try { + qs.s.queueDeclare(qpid::client::arg::queue=c_str(name)); + qs.s.exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(ks)); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + return true; +} + +/** + * Post a key / value pair message to an AMQP broker. + */ +const failable<bool> post(const value& key, const value& val, QpidSession& qs) { + + // Send in a message with the given key. + const string ks(scheme::writeValue(key)); + const string vs(scheme::writeValue(val)); + try { + qpid::client::Message message; + message.getDeliveryProperties().setRoutingKey(c_str(ks)); + message.setData(c_str(vs)); + qs.s.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct"); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + return true; +} + +/** + * Represents a Qpid subscription. + */ +class QpidSubscription { +public: + QpidSubscription(QpidSession& qs) : owner(true), subs(qs.s) { + } + + QpidSubscription(QpidSession& qs, const bool owner) : owner(owner), subs(qs.s) { + } + + QpidSubscription(const QpidSubscription& qsub) : owner(false), subs(qsub.subs) { + } + + ~QpidSubscription() { + if (!owner) + return; + try { + subs.stop(); + } catch (const qpid::Exception& e) { + mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + } + +private: + friend const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub); + friend const failable<bool> stop(QpidSubscription& qsub); + + const bool owner; + qpid::client::SubscriptionManager subs; +}; + +/** + * Register a listener function with an AMQP queue. + */ +class Listener : public qpid::client::MessageListener { +public: + Listener(const lambda<bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) { + } + + virtual void received(qpid::client::Message& msg) { + + // Call the listener function + const value k(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str())); + const value v(scheme::readValue(msg.getData().c_str())); + const bool r = l(k, v); + if (!r) { + try { + subs.cancel(msg.getDestination()); + } catch (const qpid::Exception& e) { + mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + } + } + +private: + const lambda<bool(const value&, const value&)> l; + qpid::client::SubscriptionManager& subs; +}; + + +const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub) { + debug("queue::listen"); + Listener listener(l, qsub.subs); + try { + qsub.subs.subscribe(listener, c_str(name)); + qsub.subs.run(); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + debug("queue::listen::stopped"); + return true; +} + +/** + * Stop an AMQP subscription. + */ +const failable<bool> stop(QpidSubscription& qsub) { + debug("queue::stop"); + try { + qsub.subs.stop(); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + debug("queue::stopped"); + return true; +} + +} +} + +// Re-enable conversion and redundant declarations warnings +#ifdef WANT_MAINTAINER_MODE +#pragma GCC diagnostic warning "-Wconversion" +#pragma GCC diagnostic warning "-Wredundant-decls" +#endif + +#endif /* tuscany_qpid_hpp */ diff --git a/sca-cpp/branches/cpp-contrib/components/queue/qpidd-start b/sca-cpp/branches/cpp-contrib/components/queue/qpidd-start new file mode 100755 index 0000000000..02e048c41e --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/qpidd-start @@ -0,0 +1,24 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Start qpidd +here=`readlink -f $0`; here=`dirname $here` + +qpid_prefix=`cat $here/qpidc.prefix` +$qpid_prefix/sbin/qpidd & diff --git a/sca-cpp/branches/cpp-contrib/components/queue/qpidd-stop b/sca-cpp/branches/cpp-contrib/components/queue/qpidd-stop new file mode 100755 index 0000000000..6fb0467cff --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/qpidd-stop @@ -0,0 +1,26 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Stop qpidd +here=`readlink -f $0`; here=`dirname $here` + +qpid_prefix=`cat $here/qpidc.prefix` +qpidd="$qpid_prefix/sbin/qpidd" + +kill `ps -f | grep -v grep | grep "${qpidd}" | awk '{ print $2 }'` diff --git a/sca-cpp/branches/cpp-contrib/components/queue/queue-listener.cpp b/sca-cpp/branches/cpp-contrib/components/queue/queue-listener.cpp new file mode 100644 index 0000000000..d714101583 --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/queue-listener.cpp @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +/** + * AMQP queue listener component implementation. + */ + +#include "string.hpp" +#include "function.hpp" +#include "list.hpp" +#include "value.hpp" +#include "monad.hpp" +#include "parallel.hpp" +#include "qpid.hpp" + +// Ignore conversion issues and redundant declarations in Qpid headers +#ifdef WANT_MAINTAINER_MODE +#pragma GCC diagnostic ignored "-Wconversion" +#pragma GCC diagnostic ignored "-Wredundant-decls" +#endif + +namespace tuscany { +namespace queue { + +/** + * A relay function that posts the AMQP messages it receives to a relay component reference. + */ +class relay { +public: + relay(const lambda<value(const list<value>&)>& rel) : rel(rel) { + } + + const bool operator()(const value& k, const value& v) const { + debug(k, "queue::relay::key"); + debug(v, "queue::relay::value"); + const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v)); + return true; + } + +private: + const lambda<value(const list<value>&)> rel; +}; + +/** + * Subscribe and listen to an AMQP queue. + */ +class subscribe { +public: + subscribe(const string& qname, const lambda<bool(const value&, const value&)>& l, const QpidSubscription& qsub) : qname(qname), l(l), qsub(qsub) { + } + + const failable<bool> operator()() const { + gc_pool pool; + debug(qname, "queue::subscribe::listen"); + const failable<bool> r = listen(qname, l, const_cast<QpidSubscription&>(qsub)); + debug(qname, "queue::subscribe::stopped"); + return r; + } + +private: + const string qname; + const lambda<bool(const value&, const value&)> l; + const QpidSubscription qsub; +}; + +/** + * Listener lambda function, responsible for starting an AMQP subscription in a worker thread, and + * apply any function calls to the listener component. The only supported function is stop(), + * called to stop the listener component and shutdown the worker thread. + */ +class listener { +public: + listener(QpidConnection& qc, QpidSession& qs, QpidSubscription& qsub, worker& w) : qc(qc), qs(qs), qsub(qsub), w(w) { + } + + const value operator()(const list<value>& params) const { + const tuscany::value func(car(params)); + + // Stop the component + if (func != "stop") + return tuscany::mkfailure<tuscany::value>(); + debug("queue::listener::stop"); + + // TODO check why stop() and close() hang in child processes + stop(const_cast<QpidSubscription&>(qsub)); + close(const_cast<QpidSession&>(qs)); + close(const_cast<QpidConnection&>(qc)); + cancel(const_cast<worker&>(w)); + + debug("queue::listener::stopped"); + return failable<value>(value(lambda<value(const list<value>&)>())); + } + +private: + QpidConnection qc; + QpidSession qs; + QpidSubscription qsub; + worker w; +}; + +/** + * Start the component. + */ +const failable<value> start(const list<value>& params) { + // Extract the relay reference and the AMQP key and queue name + const value rel = car(params); + const value pk = ((lambda<value(list<value>)>)cadr(params))(list<value>()); + const value key = isList(pk)? (list<value>)pk : mklist<value>(pk); + const value qname = ((lambda<value(list<value>)>)caddr(params))(list<value>()); + + // Create an AMQP session + QpidConnection qc(false); + QpidSession qs(qc, false); + + // Declare the configured AMQP key / queue pair + declareQueue(key, qname, qs); + + // Listen and relay messages in a worker thread + QpidSubscription qsub(qs, false); + worker w(3); + const lambda<bool(const value&, const value&)> rl = relay(rel); + submit<failable<bool> >(w, lambda<failable<bool>()>(subscribe(qname, rl, qsub))); + + // Return the listener component lambda function + return value(lambda<value(const list<value>&)>(listener(qc, qs, qsub, w))); +} + +} +} + +extern "C" { + +const tuscany::value apply(const tuscany::list<tuscany::value>& params) { + const tuscany::value func(car(params)); + if (func == "start") + return tuscany::queue::start(cdr(params)); + return tuscany::mkfailure<tuscany::value>(); +} + +} diff --git a/sca-cpp/branches/cpp-contrib/components/queue/queue-sender.cpp b/sca-cpp/branches/cpp-contrib/components/queue/queue-sender.cpp new file mode 100644 index 0000000000..07f8491f54 --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/queue-sender.cpp @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* $Rev$ $Date$ */ + +/** + * AMQP queue sender component implementation. + */ + +#include "string.hpp" +#include "function.hpp" +#include "list.hpp" +#include "value.hpp" +#include "monad.hpp" +#include "qpid.hpp" + +// Ignore conversion issues and redundant declarations in Qpid headers +#ifdef WANT_MAINTAINER_MODE +#pragma GCC diagnostic ignored "-Wconversion" +#pragma GCC diagnostic ignored "-Wredundant-decls" +#endif + +namespace tuscany { +namespace queue { + +/** + * Post an item to a queue. + */ +const failable<value> post(const list<value>& params) { + QpidConnection qc; + QpidSession qs(qc); + + // Post the item + const value pk = ((lambda<value(list<value>)>)caddr(params))(list<value>()); + const value key = isList(pk)? append<value>(pk, (list<value>)car(params)) : cons<value>(pk, (list<value>)car(params)); + debug(key, "queue::post::key"); + debug(cadr(params), "queue::post::value"); + const failable<bool> r = post(key, cadr(params), qs); + if (!hasContent(r)) + return mkfailure<value>(reason(r)); + return key; +} + +} +} + +extern "C" { + +const tuscany::value apply(const tuscany::list<tuscany::value>& params) { + const tuscany::value func(car(params)); + if (func == "post") + return tuscany::queue::post(cdr(params)); + return tuscany::mkfailure<tuscany::value>(); +} + +} diff --git a/sca-cpp/branches/cpp-contrib/components/queue/queue.composite b/sca-cpp/branches/cpp-contrib/components/queue/queue.composite new file mode 100644 index 0000000000..535680c6c3 --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/queue.composite @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<composite xmlns="http://docs.oasis-open.org/ns/opencsa/sca/200912" + xmlns:t="http://tuscany.apache.org/xmlns/sca/1.1" + targetNamespace="http://tuscany.apache.org/xmlns/sca/components" + name="queue"> + + <component name="print-sender"> + <implementation.cpp path=".libs" library="libqueue-sender"/> + <property name="key">print</property> + <service name="print-sender"> + <t:binding.http uri="print-sender"/> + </service> + </component> + + <component name="print-listener"> + <implementation.cpp path=".libs" library="libqueue-listener"/> + <property name="key">print</property> + <property name="queue">printq</property> + <reference name="relay" target="print"/> + </component> + + <component name="print"> + <t:implementation.scheme script="server-test.scm"/> + <service name="print"> + <t:binding.http uri="print"/> + </service> + <reference name="report" target="report-sender"/> + </component> + + <component name="report-sender"> + <implementation.cpp path=".libs" library="libqueue-sender"/> + <property name="key">report</property> + <service name="report-sender"> + <t:binding.http uri="report-sender"/> + </service> + </component> + +</composite> diff --git a/sca-cpp/branches/cpp-contrib/components/queue/send-test b/sca-cpp/branches/cpp-contrib/components/queue/send-test new file mode 100755 index 0000000000..ec6d9d9083 --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/send-test @@ -0,0 +1,31 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Setup +./qpidd-start +sleep 1 + +# Test +./qpid-test 2>/dev/null +rc=$? + +# Cleanup +./qpidd-stop +sleep 1 +return $rc diff --git a/sca-cpp/branches/cpp-contrib/components/queue/server-test b/sca-cpp/branches/cpp-contrib/components/queue/server-test new file mode 100755 index 0000000000..3fc94e6f35 --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/server-test @@ -0,0 +1,43 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Setup +../../modules/http/httpd-conf tmp 8090 ../../modules/http/htdocs +../../modules/server/server-conf tmp +../../modules/server/scheme-conf tmp +cat >>tmp/conf/httpd.conf <<EOF +SCAContribution `pwd`/ +SCAComposite queue.composite +EOF + +./qpidd-start +sleep 1 +../../modules/http/httpd-start tmp +sleep 2 + +# Test +./client-test 2>/dev/null +rc=$? + +# Cleanup +../../modules/http/httpd-stop tmp +sleep 1 +./qpidd-stop +sleep 1 +return $rc diff --git a/sca-cpp/branches/cpp-contrib/components/queue/server-test.scm b/sca-cpp/branches/cpp-contrib/components/queue/server-test.scm new file mode 100644 index 0000000000..1a89ce8b31 --- /dev/null +++ b/sca-cpp/branches/cpp-contrib/components/queue/server-test.scm @@ -0,0 +1,20 @@ +; Licensed to the Apache Software Foundation (ASF) under one +; or more contributor license agreements. See the NOTICE file +; distributed with this work for additional information +; regarding copyright ownership. The ASF licenses this file +; to you under the Apache License, Version 2.0 (the +; "License"); you may not use this file except in compliance +; with the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, +; software distributed under the License is distributed on an +; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +; KIND, either express or implied. See the License for the +; specific language governing permissions and limitations +; under the License. + +; Queue test case + +(define (post key val report) (report "post" '() val)) |