summaryrefslogtreecommitdiffstats
path: root/sca-cpp/branches/lightweight-sca/components/queue/queue-listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'sca-cpp/branches/lightweight-sca/components/queue/queue-listener.cpp')
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/queue-listener.cpp159
1 files changed, 159 insertions, 0 deletions
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/queue-listener.cpp b/sca-cpp/branches/lightweight-sca/components/queue/queue-listener.cpp
new file mode 100644
index 0000000000..483d0de65a
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/queue-listener.cpp
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* $Rev$ $Date$ */
+
+/**
+ * AMQP queue listener component implementation.
+ */
+
+#define WANT_HTTPD_LOG 1
+#include "string.hpp"
+#include "function.hpp"
+#include "list.hpp"
+#include "value.hpp"
+#include "monad.hpp"
+#include "parallel.hpp"
+#include "qpid.hpp"
+
+// Ignore conversion issues and redundant declarations in Qpid headers
+#ifdef WANT_MAINTAINER_WARNINGS
+#pragma GCC diagnostic ignored "-Wconversion"
+#pragma GCC diagnostic ignored "-Wredundant-decls"
+#endif
+
+namespace tuscany {
+namespace queue {
+
+/**
+ * A relay function that posts the AMQP messages it receives to a relay component reference.
+ */
+class relay {
+public:
+ relay(const lambda<value(const list<value>&)>& rel) : rel(rel) {
+ }
+
+ const bool operator()(const value& k, const value& v) const {
+ debug(k, "queue::relay::key");
+ debug(v, "queue::relay::value");
+ const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v));
+ return true;
+ }
+
+private:
+ const lambda<value(const list<value>&)> rel;
+};
+
+/**
+ * Subscribe and listen to an AMQP queue.
+ */
+class subscribe {
+public:
+ subscribe(const string& qname, const lambda<bool(const value&, const value&)>& l, const QpidSubscription& qsub) : qname(qname), l(l), qsub(qsub) {
+ }
+
+ const failable<bool> operator()() const {
+ gc_pool pool;
+ debug(qname, "queue::subscribe::listen");
+ const failable<bool> r = listen(qname, l, const_cast<QpidSubscription&>(qsub));
+ debug(qname, "queue::subscribe::stopped");
+ return r;
+ }
+
+private:
+ const string qname;
+ const lambda<bool(const value&, const value&)> l;
+ const QpidSubscription qsub;
+};
+
+/**
+ * Listener lambda function, responsible for starting an AMQP subscription in a worker thread, and
+ * apply any function calls to the listener component. The only supported function is stop(),
+ * called to stop the listener component and shutdown the worker thread.
+ */
+class listener {
+public:
+ listener(QpidConnection& qc, QpidSession& qs, QpidSubscription& qsub, worker& w) : qc(qc), qs(qs), qsub(qsub), w(w) {
+ }
+
+ const value operator()(const list<value>& params) const {
+ const tuscany::value func(car(params));
+
+ // Stop the component
+ if (func != "stop")
+ return mkfailure<value>();
+ debug("queue::listener::stop");
+
+ // TODO check why stop() and close() hang in child processes
+ stop(const_cast<QpidSubscription&>(qsub));
+ close(const_cast<QpidSession&>(qs));
+ close(const_cast<QpidConnection&>(qc));
+ cancel(const_cast<worker&>(w));
+
+ debug("queue::listener::stopped");
+ return failable<value>(value(lambda<value(const list<value>&)>()));
+ }
+
+private:
+ QpidConnection qc;
+ QpidSession qs;
+ QpidSubscription qsub;
+ worker w;
+};
+
+/**
+ * Start the component.
+ */
+const failable<value> start(const list<value>& params) {
+ // Extract the relay reference and the AMQP key and queue name
+ const value rel = car(params);
+ const value pk = ((lambda<value(const list<value>&)>)cadr(params))(list<value>());
+ const value key = isList(pk)? (list<value>)pk : mklist<value>(pk);
+ const value qname = ((lambda<value(const list<value>&)>)caddr(params))(list<value>());
+
+ // Create an AMQP session
+ QpidConnection qc(false);
+ QpidSession qs(qc, false);
+
+ // Declare the configured AMQP key / queue pair
+ declareQueue(key, qname, qs);
+
+ // Listen and relay messages in a worker thread
+ QpidSubscription qsub(qs, false);
+ worker w(3);
+ const lambda<bool(const value&, const value&)> rl = relay(rel);
+ submit<failable<bool> >(w, lambda<failable<bool>()>(subscribe(qname, rl, qsub)));
+
+ // Return the listener component lambda function
+ return value(lambda<value(const list<value>&)>(listener(qc, qs, qsub, w)));
+}
+
+}
+}
+
+extern "C" {
+
+const tuscany::value apply(const tuscany::list<tuscany::value>& params) {
+ const tuscany::value func(car(params));
+ if (func == "start")
+ return tuscany::queue::start(cdr(params));
+ return tuscany::mkfailure<tuscany::value>();
+}
+
+}