summaryrefslogtreecommitdiffstats
path: root/sca-cpp/branches/lightweight-sca/components/queue
diff options
context:
space:
mode:
Diffstat (limited to 'sca-cpp/branches/lightweight-sca/components/queue')
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/Makefile.am57
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/client-test.cpp102
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/qpid-test.cpp97
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/qpid.hpp277
-rwxr-xr-xsca-cpp/branches/lightweight-sca/components/queue/qpidd-start24
-rwxr-xr-xsca-cpp/branches/lightweight-sca/components/queue/qpidd-stop30
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/queue-listener.componentType29
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/queue-listener.cpp159
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/queue-sender.componentType28
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/queue-sender.cpp73
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/queue.composite55
-rwxr-xr-xsca-cpp/branches/lightweight-sca/components/queue/send-test31
-rwxr-xr-xsca-cpp/branches/lightweight-sca/components/queue/server-test45
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/server-test.scm20
14 files changed, 1027 insertions, 0 deletions
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/Makefile.am b/sca-cpp/branches/lightweight-sca/components/queue/Makefile.am
new file mode 100644
index 0000000000..c44722a523
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/Makefile.am
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if WANT_QUEUE
+
+INCLUDES = -I${QPIDC_INCLUDE}
+
+incl_HEADERS = *.hpp
+incldir = $(prefix)/include/components/queue
+
+dist_comp_SCRIPTS = qpidd-start qpidd-stop
+compdir=$(prefix)/components/queue
+
+comp_DATA = qpidc.prefix
+qpidc.prefix: $(top_builddir)/config.status
+ echo ${QPIDC_PREFIX} >qpidc.prefix
+
+EXTRA_DIST = queue.composite queue-sender.componentType queue-listener.componentType *.scm
+
+comp_LTLIBRARIES = libqueue-sender.la libqueue-listener.la
+noinst_DATA = libqueue-sender${libsuffix} libqueue-listener${libsuffix}
+
+libqueue_sender_la_SOURCES = queue-sender.cpp
+libqueue_sender_la_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient -lqpidcommon
+libqueue-sender${libsuffix}:
+ ln -s .libs/libqueue-sender${libsuffix}
+
+libqueue_listener_la_SOURCES = queue-listener.cpp
+libqueue_listener_la_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient -lqpidcommon
+libqueue-listener${libsuffix}:
+ ln -s .libs/libqueue-listener${libsuffix}
+
+qpid_test_SOURCES = qpid-test.cpp
+qpid_test_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient -lqpidcommon
+
+client_test_SOURCES = client-test.cpp
+client_test_LDFLAGS = -lxml2 -lcurl -lmozjs -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient -lqpidcommon
+
+dist_noinst_SCRIPTS = send-test server-test
+noinst_PROGRAMS = qpid-test client-test
+TESTS = send-test server-test
+
+endif
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/client-test.cpp b/sca-cpp/branches/lightweight-sca/components/queue/client-test.cpp
new file mode 100644
index 0000000000..30bfe07bf7
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/client-test.cpp
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* $Rev$ $Date$ */
+
+/**
+ * Test queue component.
+ */
+
+#include <assert.h>
+#include "stream.hpp"
+#include "string.hpp"
+#include "list.hpp"
+#include "element.hpp"
+#include "value.hpp"
+#include "monad.hpp"
+#include "perf.hpp"
+#include "../../modules/http/http.hpp"
+#include "qpid.hpp"
+
+// Ignore conversion issues and redundant declarations in Qpid headers
+#ifdef WANT_MAINTAINER_WARNINGS
+#pragma GCC diagnostic ignored "-Wconversion"
+#pragma GCC diagnostic ignored "-Wredundant-decls"
+#endif
+
+namespace tuscany {
+namespace queue {
+
+const value key(mklist<value>(string("report")));
+const string qname("reportq");
+
+const list<value> item = list<value>() + "content" + (list<value>() + "item"
+ + (list<value>() + "name" + string("Apple"))
+ + (list<value>() + "price" + string("$2.99")));
+const list<value> entry = list<value>() + (list<value>() + "entry"
+ + (list<value>() + "title" + string("item"))
+ + (list<value>() + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"))
+ + item);
+
+bool testDeclareQueue() {
+ QpidConnection qc;
+ QpidSession qs(qc);
+ const failable<bool> r = declareQueue(key, qname, qs);
+ assert(hasContent(r));
+ return true;
+}
+
+const bool listener(const value& k, const value& v) {
+ cerr << "k " << k << " v " << v << endl;
+ assert(k == key);
+ assert(v == entry);
+ return false;
+}
+
+bool testListen() {
+ QpidConnection qc;
+ QpidSession qs(qc);
+ QpidSubscription qsub(qs);
+ const lambda<bool(const value&, const value&)> l(listener);
+ listen(qname, l, qsub);
+ return true;
+}
+
+bool testPost() {
+ gc_scoped_pool pool;
+ http::CURLSession ch("", "", "", "");
+ const failable<value> id = http::post(entry, "http://localhost:8090/print-sender", ch);
+ assert(hasContent(id));
+ return true;
+}
+
+}
+}
+
+int main() {
+ tuscany::cout << "Testing..." << tuscany::endl;
+
+ tuscany::queue::testDeclareQueue();
+ tuscany::queue::testPost();
+ tuscany::queue::testListen();
+
+ tuscany::cout << "OK" << tuscany::endl;
+
+ return 0;
+}
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/qpid-test.cpp b/sca-cpp/branches/lightweight-sca/components/queue/qpid-test.cpp
new file mode 100644
index 0000000000..27db7734b0
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/qpid-test.cpp
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* $Rev$ $Date$ */
+
+/**
+ * Test Qpid support functions.
+ */
+
+#include <assert.h>
+#include "stream.hpp"
+#include "string.hpp"
+#include "list.hpp"
+#include "element.hpp"
+#include "monad.hpp"
+#include "value.hpp"
+#include "perf.hpp"
+#include "qpid.hpp"
+
+// Ignore conversion issues and redundant declarations in Qpid headers
+#ifdef WANT_MAINTAINER_WARNINGS
+#pragma GCC diagnostic ignored "-Wconversion"
+#pragma GCC diagnostic ignored "-Wredundant-decls"
+#endif
+
+namespace tuscany {
+namespace queue {
+
+const value key(mklist<value>("test"));
+const string qname("testq");
+
+bool testDeclareQueue() {
+ QpidConnection qc;
+ QpidSession qs(qc);
+ const failable<bool> r = declareQueue(key, qname, qs);
+ assert(hasContent(r));
+ return true;
+}
+
+const list<value> item = list<value>()
+ + (list<value>() + "name" + string("Apple"))
+ + (list<value>() + "price" + string("$2.99"));
+const list<value> entry = mklist<value>(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), item);
+
+bool testPost() {
+ QpidConnection qc;
+ QpidSession qs(qc);
+ const failable<bool> r = post(key, entry, qs);
+ assert(hasContent(r));
+ return true;
+}
+
+const bool listener(const value& k, const value& v) {
+ assert(k == key);
+ assert(v == entry);
+ return false;
+}
+
+bool testListen() {
+ QpidConnection qc;
+ QpidSession qs(qc);
+ QpidSubscription qsub(qs);
+ const lambda<bool(const value&, const value&)> l(listener);
+ listen(qname, l, qsub);
+ return true;
+}
+
+}
+}
+
+int main() {
+ tuscany::cout << "Testing..." << tuscany::endl;
+
+ tuscany::queue::testDeclareQueue();
+ tuscany::queue::testPost();
+ tuscany::queue::testListen();
+
+ tuscany::cout << "OK" << tuscany::endl;
+
+ return 0;
+}
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/qpid.hpp b/sca-cpp/branches/lightweight-sca/components/queue/qpid.hpp
new file mode 100644
index 0000000000..ef53c529e8
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/qpid.hpp
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* $Rev$ $Date$ */
+
+#ifndef tuscany_qpid_hpp
+#define tuscany_qpid_hpp
+
+/**
+ * AMQP queue access functions.
+ */
+
+// Ignore conversion issues and redundant declarations in Qpid headers
+#ifdef WANT_MAINTAINER_WARNINGS
+#pragma GCC diagnostic ignored "-Wconversion"
+#pragma GCC diagnostic ignored "-Wredundant-decls"
+#endif
+
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/SubscriptionManager.h>
+
+#include "string.hpp"
+#include "list.hpp"
+#include "value.hpp"
+#include "monad.hpp"
+#include "../../modules/scheme/eval.hpp"
+
+namespace tuscany {
+namespace queue {
+
+/**
+ * Represents a Qpid connection.
+ */
+class QpidConnection {
+public:
+ QpidConnection() : owner(true) {
+ debug("queue::qpidonnection");
+ c.open("localhost", 5672);
+ }
+
+ QpidConnection(const bool owner) : owner(owner) {
+ debug("queue::qpidonnection");
+ c.open("localhost", 5672);
+ }
+
+ QpidConnection(const QpidConnection& qc) : owner(false), c(qc.c) {
+ debug("queue::qpidonnection::copy");
+ }
+
+ const QpidConnection& operator=(const QpidConnection& qc) {
+ debug("queue::qpidonnection::operator=");
+ if(this == &c)
+ return *this;
+ owner = false;
+ c = qc.c;
+ return *this;
+ }
+
+ ~QpidConnection() {
+ debug("queue::~qpidonnection");
+ if (!owner)
+ return;
+ c.close();
+ }
+
+private:
+ friend const failable<bool> close(QpidConnection& qc);
+ friend class QpidSession;
+
+ const bool owner;
+ qpid::client::Connection c;
+
+};
+
+/**
+ * Close a Qpid connection.
+ */
+const failable<bool> close(QpidConnection& qc) {
+ qc.c.close();
+ return true;
+}
+
+/**
+ * Represents a Qpid session.
+ */
+class QpidSession {
+public:
+ QpidSession(QpidConnection& qc) : owner(true), s(qc.c.newSession()) {
+ debug("queue::qpidsession");
+ }
+
+ QpidSession(QpidConnection& qc, const bool owner) : owner(owner), s(qc.c.newSession()) {
+ debug("queue::qpidsession");
+ }
+
+ QpidSession(const QpidSession& qs) : owner(false), s(qs.s) {
+ debug("queue::qpidsession::copy");
+ }
+
+ ~QpidSession() {
+ debug("queue::~qpidsession");
+ if (!owner)
+ return;
+ s.close();
+ }
+
+private:
+ friend const failable<bool> close(QpidSession& qs);
+ friend const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs);
+ friend const failable<bool> post(const value& key, const value& val, QpidSession& qs);
+ friend class QpidSubscription;
+
+ const bool owner;
+ qpid::client::Session s;
+};
+
+/**
+ * Close a Qpid session.
+ */
+const failable<bool> close(QpidSession& qs) {
+ try {
+ qs.s.close();
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ return true;
+}
+
+/**
+ * Declare a key / AMQP queue pair.
+ */
+const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs) {
+ const string ks(scheme::writeValue(key));
+ try {
+ qs.s.queueDeclare(qpid::client::arg::queue=c_str(name));
+ qs.s.exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(ks));
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ return true;
+}
+
+/**
+ * Post a key / value pair message to an AMQP broker.
+ */
+const failable<bool> post(const value& key, const value& val, QpidSession& qs) {
+
+ // Send in a message with the given key.
+ const string ks(scheme::writeValue(key));
+ const string vs(scheme::writeValue(val));
+ try {
+ qpid::client::Message message;
+ message.getDeliveryProperties().setRoutingKey(c_str(ks));
+ message.setData(c_str(vs));
+ qs.s.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct");
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ return true;
+}
+
+/**
+ * Represents a Qpid subscription.
+ */
+class QpidSubscription {
+public:
+ QpidSubscription(QpidSession& qs) : owner(true), subs(qs.s) {
+ }
+
+ QpidSubscription(QpidSession& qs, const bool owner) : owner(owner), subs(qs.s) {
+ }
+
+ QpidSubscription(const QpidSubscription& qsub) : owner(false), subs(qsub.subs) {
+ }
+
+ ~QpidSubscription() {
+ if (!owner)
+ return;
+ try {
+ subs.stop();
+ } catch (const qpid::Exception& e) {
+ mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ }
+
+private:
+ friend const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub);
+ friend const failable<bool> stop(QpidSubscription& qsub);
+
+ const bool owner;
+ qpid::client::SubscriptionManager subs;
+};
+
+/**
+ * Register a listener function with an AMQP queue.
+ */
+class Listener : public qpid::client::MessageListener {
+public:
+ Listener(const lambda<bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) {
+ }
+
+ virtual void received(qpid::client::Message& msg) {
+
+ // Call the listener function
+ const value k(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str()));
+ const value v(scheme::readValue(msg.getData().c_str()));
+ const bool r = l(k, v);
+ if (!r) {
+ try {
+ subs.cancel(msg.getDestination());
+ } catch (const qpid::Exception& e) {
+ mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ }
+ }
+
+private:
+ const lambda<bool(const value&, const value&)> l;
+ qpid::client::SubscriptionManager& subs;
+};
+
+
+const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub) {
+ debug("queue::listen");
+ Listener listener(l, qsub.subs);
+ try {
+ qsub.subs.subscribe(listener, c_str(name));
+ qsub.subs.run();
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ debug("queue::listen::stopped");
+ return true;
+}
+
+/**
+ * Stop an AMQP subscription.
+ */
+const failable<bool> stop(QpidSubscription& qsub) {
+ debug("queue::stop");
+ try {
+ qsub.subs.stop();
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ debug("queue::stopped");
+ return true;
+}
+
+}
+}
+
+// Re-enable conversion and redundant declarations warnings
+#ifdef WANT_MAINTAINER_WARNINGS
+#pragma GCC diagnostic warning "-Wconversion"
+#pragma GCC diagnostic warning "-Wredundant-decls"
+#endif
+
+#endif /* tuscany_qpid_hpp */
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/qpidd-start b/sca-cpp/branches/lightweight-sca/components/queue/qpidd-start
new file mode 100755
index 0000000000..a65065182c
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/qpidd-start
@@ -0,0 +1,24 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Start qpidd
+here=`echo "import os; print os.path.realpath('$0')" | python`; here=`dirname $here`
+
+qpid_prefix=`cat $here/qpidc.prefix`
+$qpid_prefix/sbin/qpidd &
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/qpidd-stop b/sca-cpp/branches/lightweight-sca/components/queue/qpidd-stop
new file mode 100755
index 0000000000..c8af680d78
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/qpidd-stop
@@ -0,0 +1,30 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Stop qpidd
+here=`echo "import os; print os.path.realpath('$0')" | python`; here=`dirname $here`
+
+qpid_prefix=`cat $here/qpidc.prefix`
+qpidd="$qpid_prefix/sbin/qpidd"
+
+k=`ps -ef | grep -v grep | grep "${qpidd}" | awk '{ print $2 }'`
+if [ "$k" != "" ]; then
+ kill $k
+fi
+
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/queue-listener.componentType b/sca-cpp/branches/lightweight-sca/components/queue/queue-listener.componentType
new file mode 100644
index 0000000000..1e94f9a2df
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/queue-listener.componentType
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<componentType xmlns="http://docs.oasis-open.org/ns/opencsa/sca/200912"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:t="http://tuscany.apache.org/xmlns/sca/1.1"
+ targetNamespace="http://tuscany.apache.org/xmlns/sca/components">
+
+ <reference name="relay"/>
+ <property name="key" type="xsd:string"/>
+ <property name="queue" type="xsd:string"/>
+
+</composite>
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/queue-listener.cpp b/sca-cpp/branches/lightweight-sca/components/queue/queue-listener.cpp
new file mode 100644
index 0000000000..483d0de65a
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/queue-listener.cpp
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* $Rev$ $Date$ */
+
+/**
+ * AMQP queue listener component implementation.
+ */
+
+#define WANT_HTTPD_LOG 1
+#include "string.hpp"
+#include "function.hpp"
+#include "list.hpp"
+#include "value.hpp"
+#include "monad.hpp"
+#include "parallel.hpp"
+#include "qpid.hpp"
+
+// Ignore conversion issues and redundant declarations in Qpid headers
+#ifdef WANT_MAINTAINER_WARNINGS
+#pragma GCC diagnostic ignored "-Wconversion"
+#pragma GCC diagnostic ignored "-Wredundant-decls"
+#endif
+
+namespace tuscany {
+namespace queue {
+
+/**
+ * A relay function that posts the AMQP messages it receives to a relay component reference.
+ */
+class relay {
+public:
+ relay(const lambda<value(const list<value>&)>& rel) : rel(rel) {
+ }
+
+ const bool operator()(const value& k, const value& v) const {
+ debug(k, "queue::relay::key");
+ debug(v, "queue::relay::value");
+ const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v));
+ return true;
+ }
+
+private:
+ const lambda<value(const list<value>&)> rel;
+};
+
+/**
+ * Subscribe and listen to an AMQP queue.
+ */
+class subscribe {
+public:
+ subscribe(const string& qname, const lambda<bool(const value&, const value&)>& l, const QpidSubscription& qsub) : qname(qname), l(l), qsub(qsub) {
+ }
+
+ const failable<bool> operator()() const {
+ gc_pool pool;
+ debug(qname, "queue::subscribe::listen");
+ const failable<bool> r = listen(qname, l, const_cast<QpidSubscription&>(qsub));
+ debug(qname, "queue::subscribe::stopped");
+ return r;
+ }
+
+private:
+ const string qname;
+ const lambda<bool(const value&, const value&)> l;
+ const QpidSubscription qsub;
+};
+
+/**
+ * Listener lambda function, responsible for starting an AMQP subscription in a worker thread, and
+ * apply any function calls to the listener component. The only supported function is stop(),
+ * called to stop the listener component and shutdown the worker thread.
+ */
+class listener {
+public:
+ listener(QpidConnection& qc, QpidSession& qs, QpidSubscription& qsub, worker& w) : qc(qc), qs(qs), qsub(qsub), w(w) {
+ }
+
+ const value operator()(const list<value>& params) const {
+ const tuscany::value func(car(params));
+
+ // Stop the component
+ if (func != "stop")
+ return mkfailure<value>();
+ debug("queue::listener::stop");
+
+ // TODO check why stop() and close() hang in child processes
+ stop(const_cast<QpidSubscription&>(qsub));
+ close(const_cast<QpidSession&>(qs));
+ close(const_cast<QpidConnection&>(qc));
+ cancel(const_cast<worker&>(w));
+
+ debug("queue::listener::stopped");
+ return failable<value>(value(lambda<value(const list<value>&)>()));
+ }
+
+private:
+ QpidConnection qc;
+ QpidSession qs;
+ QpidSubscription qsub;
+ worker w;
+};
+
+/**
+ * Start the component.
+ */
+const failable<value> start(const list<value>& params) {
+ // Extract the relay reference and the AMQP key and queue name
+ const value rel = car(params);
+ const value pk = ((lambda<value(const list<value>&)>)cadr(params))(list<value>());
+ const value key = isList(pk)? (list<value>)pk : mklist<value>(pk);
+ const value qname = ((lambda<value(const list<value>&)>)caddr(params))(list<value>());
+
+ // Create an AMQP session
+ QpidConnection qc(false);
+ QpidSession qs(qc, false);
+
+ // Declare the configured AMQP key / queue pair
+ declareQueue(key, qname, qs);
+
+ // Listen and relay messages in a worker thread
+ QpidSubscription qsub(qs, false);
+ worker w(3);
+ const lambda<bool(const value&, const value&)> rl = relay(rel);
+ submit<failable<bool> >(w, lambda<failable<bool>()>(subscribe(qname, rl, qsub)));
+
+ // Return the listener component lambda function
+ return value(lambda<value(const list<value>&)>(listener(qc, qs, qsub, w)));
+}
+
+}
+}
+
+extern "C" {
+
+const tuscany::value apply(const tuscany::list<tuscany::value>& params) {
+ const tuscany::value func(car(params));
+ if (func == "start")
+ return tuscany::queue::start(cdr(params));
+ return tuscany::mkfailure<tuscany::value>();
+}
+
+}
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/queue-sender.componentType b/sca-cpp/branches/lightweight-sca/components/queue/queue-sender.componentType
new file mode 100644
index 0000000000..fc06bf2dcf
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/queue-sender.componentType
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<componentType xmlns="http://docs.oasis-open.org/ns/opencsa/sca/200912"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:t="http://tuscany.apache.org/xmlns/sca/1.1"
+ targetNamespace="http://tuscany.apache.org/xmlns/sca/components">
+
+ <service name="sender"/>
+ <property name="key" type="xsd:string"/>
+
+</composite>
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/queue-sender.cpp b/sca-cpp/branches/lightweight-sca/components/queue/queue-sender.cpp
new file mode 100644
index 0000000000..202a0e4435
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/queue-sender.cpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* $Rev$ $Date$ */
+
+/**
+ * AMQP queue sender component implementation.
+ */
+
+#define WANT_HTTPD_LOG 1
+#include "string.hpp"
+#include "function.hpp"
+#include "list.hpp"
+#include "value.hpp"
+#include "monad.hpp"
+#include "qpid.hpp"
+
+// Ignore conversion issues and redundant declarations in Qpid headers
+#ifdef WANT_MAINTAINER_WARNINGS
+#pragma GCC diagnostic ignored "-Wconversion"
+#pragma GCC diagnostic ignored "-Wredundant-decls"
+#endif
+
+namespace tuscany {
+namespace queue {
+
+/**
+ * Post an item to a queue.
+ */
+const failable<value> post(const list<value>& params) {
+ QpidConnection qc;
+ QpidSession qs(qc);
+
+ // Post the item
+ const value pk = ((lambda<value(const list<value>&)>)caddr(params))(list<value>());
+ const value key = isList(pk)? append<value>(pk, (list<value>)car(params)) : cons<value>(pk, (list<value>)car(params));
+ debug(key, "queue::post::key");
+ debug(cadr(params), "queue::post::value");
+ const failable<bool> r = post(key, cadr(params), qs);
+ if (!hasContent(r))
+ return mkfailure<value>(r);
+ return key;
+}
+
+}
+}
+
+extern "C" {
+
+const tuscany::value apply(const tuscany::list<tuscany::value>& params) {
+ const tuscany::value func(car(params));
+ if (func == "post")
+ return tuscany::queue::post(cdr(params));
+ return tuscany::mkfailure<tuscany::value>();
+}
+
+}
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/queue.composite b/sca-cpp/branches/lightweight-sca/components/queue/queue.composite
new file mode 100644
index 0000000000..9b6939e4bc
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/queue.composite
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<composite xmlns="http://docs.oasis-open.org/ns/opencsa/sca/200912"
+ targetNamespace="http://tuscany.apache.org/xmlns/sca/components"
+ name="queue">
+
+ <component name="print-sender">
+ <implementation.cpp path="." library="libqueue-sender"/>
+ <property name="key">print</property>
+ <service name="print-sender">
+ <binding.http uri="print-sender"/>
+ </service>
+ </component>
+
+ <component name="print-listener">
+ <implementation.cpp path="." library="libqueue-listener"/>
+ <property name="key">print</property>
+ <property name="queue">printq</property>
+ <reference name="relay" target="print"/>
+ </component>
+
+ <component name="print">
+ <implementation.scheme script="server-test.scm"/>
+ <service name="print">
+ <binding.http uri="print"/>
+ </service>
+ <reference name="report" target="report-sender"/>
+ </component>
+
+ <component name="report-sender">
+ <implementation.cpp path="." library="libqueue-sender"/>
+ <property name="key">report</property>
+ <service name="report-sender">
+ <binding.http uri="report-sender"/>
+ </service>
+ </component>
+
+</composite>
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/send-test b/sca-cpp/branches/lightweight-sca/components/queue/send-test
new file mode 100755
index 0000000000..f2cc53d851
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/send-test
@@ -0,0 +1,31 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Setup
+./qpidd-start
+sleep 1
+
+# Test
+./qpid-test 2>/dev/null
+rc=$?
+
+# Cleanup
+./qpidd-stop
+sleep 1
+exit $rc
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/server-test b/sca-cpp/branches/lightweight-sca/components/queue/server-test
new file mode 100755
index 0000000000..269d3f9376
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/server-test
@@ -0,0 +1,45 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Setup
+rm -rf tmp
+../../modules/http/httpd-conf tmp localhost 8090 ../../modules/http/htdocs
+../../modules/http/httpd-event-conf tmp
+../../modules/server/server-conf tmp
+../../modules/server/scheme-conf tmp
+cat >>tmp/conf/httpd.conf <<EOF
+SCAContribution `pwd`/
+SCAComposite queue.composite
+EOF
+
+./qpidd-start
+sleep 1
+../../modules/http/httpd-start tmp
+sleep 2
+
+# Test
+./client-test 2>/dev/null
+rc=$?
+
+# Cleanup
+../../modules/http/httpd-stop tmp
+sleep 1
+./qpidd-stop
+sleep 1
+exit $rc
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/server-test.scm b/sca-cpp/branches/lightweight-sca/components/queue/server-test.scm
new file mode 100644
index 0000000000..1a89ce8b31
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/server-test.scm
@@ -0,0 +1,20 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+; Queue test case
+
+(define (post key val report) (report "post" '() val))