summaryrefslogtreecommitdiffstats
path: root/sca-cpp/trunk/components/queue
diff options
context:
space:
mode:
Diffstat (limited to 'sca-cpp/trunk/components/queue')
-rw-r--r--sca-cpp/trunk/components/queue/Makefile.am11
-rw-r--r--sca-cpp/trunk/components/queue/qpid.hpp135
-rw-r--r--sca-cpp/trunk/components/queue/queue-listener.cpp72
-rw-r--r--sca-cpp/trunk/components/queue/queue-sender.cpp69
-rw-r--r--sca-cpp/trunk/components/queue/queue.composite7
-rw-r--r--sca-cpp/trunk/components/queue/server-test.scm3
6 files changed, 294 insertions, 3 deletions
diff --git a/sca-cpp/trunk/components/queue/Makefile.am b/sca-cpp/trunk/components/queue/Makefile.am
index b2a9b3819e..cee3ce7820 100644
--- a/sca-cpp/trunk/components/queue/Makefile.am
+++ b/sca-cpp/trunk/components/queue/Makefile.am
@@ -17,4 +17,15 @@
if WANT_QUEUE
+INCLUDES = -I${QPIDC_INCLUDE}
+
+compdir=$(prefix)/components/queue
+comp_LTLIBRARIES = libqueue-sender.la libqueue-listener.la
+
+libqueue_sender_la_SOURCES = queue-sender.cpp
+libqueue_sender_la_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient
+
+libqueue_listener_la_SOURCES = queue-listener.cpp
+libqueue_listener_la_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient
+
endif
diff --git a/sca-cpp/trunk/components/queue/qpid.hpp b/sca-cpp/trunk/components/queue/qpid.hpp
new file mode 100644
index 0000000000..e6be319be0
--- /dev/null
+++ b/sca-cpp/trunk/components/queue/qpid.hpp
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* $Rev$ $Date$ */
+
+#ifndef tuscany_queue_hpp
+#define tuscany_queue_hpp
+
+/**
+ * AMQP queue access functions.
+ */
+
+// Ignore conversion issues and redundant declarations in Qpid headers
+#ifdef WANT_MAINTAINER_MODE
+#pragma GCC diagnostic ignored "-Wconversion"
+#pragma GCC diagnostic ignored "-Wredundant-decls"
+#endif
+
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+
+#include "string.hpp"
+#include "list.hpp"
+#include "value.hpp"
+#include "monad.hpp"
+#include "../../modules/scheme/eval.hpp"
+
+namespace tuscany {
+namespace queue {
+
+/**
+ * Represents a Qpid connection.
+ */
+class QpidConnection {
+public:
+ QpidConnection() : owner(true) {
+ c.open("localhost", 5672);
+ }
+
+ QpidConnection(const QpidConnection& qc) : owner(false), c(qc.c) {
+ }
+
+ ~QpidConnection() {
+ if (!owner)
+ return;
+ c.close();
+ }
+
+private:
+ friend class QpidSession;
+
+ const bool owner;
+ qpid::client::Connection c;
+
+};
+
+/**
+ * Represents a Qpid session.
+ */
+class QpidSession {
+public:
+ QpidSession(QpidConnection& qc) : owner(true), s(qc.c.newSession()) {
+ }
+
+ QpidSession(const QpidSession& qs) : owner(false), s(qs.s) {
+ }
+
+ ~QpidSession() {
+ if (!owner)
+ return;
+ s.close();
+ }
+
+private:
+ friend qpid::client::Session session(const QpidSession& qs);
+
+ const bool owner;
+ qpid::client::Session s;
+};
+
+qpid::client::Session session(const QpidSession& qs) {
+ return qs.s;
+}
+
+/**
+ * Declare a key / AMQP queue pair.
+ */
+const failable<bool> declareQueue(const string& key, const string& name, QpidSession& qs) {
+ session(qs).queueDeclare(qpid::client::arg::queue=c_str(name));
+ session(qs).exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(key));
+ return true;
+}
+
+/**
+ * Post a key / value pair in message to an AMQP broker.
+ */
+const failable<bool> post(const string& key, const value& val, QpidSession& qs) {
+
+ // Convert the value to a string
+ const string vs(scheme::writeValue(val));
+
+ // Send in a message with the given key.
+ qpid::client::Message message;
+ message.getDeliveryProperties().setRoutingKey(c_str(key));
+ message.setData(c_str(vs));
+ session(qs).messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct");
+ return true;
+}
+
+}
+}
+
+// Re-enable conversion and redundant declarations warnings
+#ifdef WANT_MAINTAINER_MODE
+#pragma GCC diagnostic warning "-Wconversion"
+#pragma GCC diagnostic warning "-Wredundant-decls"
+#endif
+
+#endif /* tuscany_qpid_hpp */
diff --git a/sca-cpp/trunk/components/queue/queue-listener.cpp b/sca-cpp/trunk/components/queue/queue-listener.cpp
new file mode 100644
index 0000000000..6f1c54873c
--- /dev/null
+++ b/sca-cpp/trunk/components/queue/queue-listener.cpp
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* $Rev$ $Date$ */
+
+/**
+ * AMQP queue listener component implementation.
+ */
+
+#include "string.hpp"
+#include "function.hpp"
+#include "list.hpp"
+#include "value.hpp"
+#include "monad.hpp"
+#include "qpid.hpp"
+
+// Ignore conversion issues and redundant declarations in Qpid headers
+#ifdef WANT_MAINTAINER_MODE
+#pragma GCC diagnostic ignored "-Wconversion"
+#pragma GCC diagnostic ignored "-Wredundant-decls"
+#endif
+
+namespace tuscany {
+namespace queue {
+
+QpidConnection qc;
+
+/**
+ * Initialize the component.
+ */
+const failable<value> init(const list<value>& params) {
+ QpidSession qs(qc);
+
+ // Declare the configured AMQP key / queue pair
+ const value key = ((lambda<value(list<value>)>)caddr(params))(list<value>());
+ const value qname = ((lambda<value(list<value>)>)cadddr(params))(list<value>());
+ declareQueue(key, qname, qs);
+
+ //TODO create a subscription and mark the current server instance busy
+
+ return value(true);
+}
+
+}
+}
+
+extern "C" {
+
+const tuscany::value apply(const tuscany::list<tuscany::value>& params) {
+ const tuscany::value func(car(params));
+ if (func == "init")
+ return tuscany::queue::init(cdr(params));
+ return tuscany::mkfailure<tuscany::value>(tuscany::string("Function not supported: ") + func);
+}
+
+}
diff --git a/sca-cpp/trunk/components/queue/queue-sender.cpp b/sca-cpp/trunk/components/queue/queue-sender.cpp
new file mode 100644
index 0000000000..fbbbd0376b
--- /dev/null
+++ b/sca-cpp/trunk/components/queue/queue-sender.cpp
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* $Rev$ $Date$ */
+
+/**
+ * AMQP queue sender component implementation.
+ */
+
+#include "string.hpp"
+#include "function.hpp"
+#include "list.hpp"
+#include "value.hpp"
+#include "monad.hpp"
+#include "qpid.hpp"
+
+// Ignore conversion issues and redundant declarations in Qpid headers
+#ifdef WANT_MAINTAINER_MODE
+#pragma GCC diagnostic ignored "-Wconversion"
+#pragma GCC diagnostic ignored "-Wredundant-decls"
+#endif
+
+namespace tuscany {
+namespace queue {
+
+QpidConnection qc;
+
+/**
+ * Post an item to a queue.
+ */
+const failable<value> post(const list<value>& params) {
+ QpidSession qs(qc);
+
+ // Post the item
+ const value key = ((lambda<value(list<value>)>)cadr(params))(list<value>());
+ post(key, car(params), qs);
+
+ return value(true);
+}
+
+}
+}
+
+extern "C" {
+
+const tuscany::value apply(const tuscany::list<tuscany::value>& params) {
+ const tuscany::value func(car(params));
+ if (func == "post")
+ return tuscany::queue::post(cdr(params));
+ return tuscany::mkfailure<tuscany::value>(tuscany::string("Function not supported: ") + func);
+}
+
+}
diff --git a/sca-cpp/trunk/components/queue/queue.composite b/sca-cpp/trunk/components/queue/queue.composite
index c60427dcc5..b908481bad 100644
--- a/sca-cpp/trunk/components/queue/queue.composite
+++ b/sca-cpp/trunk/components/queue/queue.composite
@@ -24,7 +24,7 @@
<component name="queue-sender">
<implementation.cpp path=".libs" library="libqueue-sender"/>
- <property name="uri">amqp://localhost:5672/print</property>
+ <property name="key">print</property>
<service name="queue-sender">
<t:binding.http uri="print-sender"/>
</service>
@@ -32,12 +32,13 @@
<component name="queue-listener">
<implementation.cpp path=".libs" library="libqueue-listener"/>
- <property name="uri">amqp://localhost:5672/print</property>
+ <property name="key">print</property>
+ <property name="queue">printq</property>
<reference name="relay" target="print"/>
</component>
<component name="print">
- <t:implementation.scheme script="print.scm"/>
+ <t:implementation.scheme script="server-test.scm"/>
<service name="print">
<t:binding.http uri="print"/>
</service>
diff --git a/sca-cpp/trunk/components/queue/server-test.scm b/sca-cpp/trunk/components/queue/server-test.scm
new file mode 100644
index 0000000000..77b7fc9c51
--- /dev/null
+++ b/sca-cpp/trunk/components/queue/server-test.scm
@@ -0,0 +1,3 @@
+; Queue test case
+
+(define (print x) (display x))