summaryrefslogtreecommitdiffstats
path: root/sca-cpp/trunk/components/queue/queue-listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'sca-cpp/trunk/components/queue/queue-listener.cpp')
-rw-r--r--sca-cpp/trunk/components/queue/queue-listener.cpp106
1 files changed, 31 insertions, 75 deletions
diff --git a/sca-cpp/trunk/components/queue/queue-listener.cpp b/sca-cpp/trunk/components/queue/queue-listener.cpp
index 483d0de65a..f6793ecae7 100644
--- a/sca-cpp/trunk/components/queue/queue-listener.cpp
+++ b/sca-cpp/trunk/components/queue/queue-listener.cpp
@@ -42,57 +42,45 @@ namespace tuscany {
namespace queue {
/**
- * A relay function that posts the AMQP messages it receives to a relay component reference.
+ * Start the component.
*/
-class relay {
-public:
- relay(const lambda<value(const list<value>&)>& rel) : rel(rel) {
- }
+const failable<value> start(const list<value>& params) {
+ // Extract the relay reference and the AMQP key and queue name
+ const value rel = car(params);
+ const value pk = ((lvvlambda)cadr(params))(nilListValue);
+ const value key = isList(pk)? (list<value>)pk : mklist<value>(pk);
+ const value qname = ((lvvlambda)caddr(params))(nilListValue);
+
+ // Create an AMQP session
+ QpidConnection qc(false);
+ QpidSession qs(qc, false);
+
+ // Declare the configured AMQP key / queue pair
+ declareQueue(key, qname, qs);
- const bool operator()(const value& k, const value& v) const {
+ // Listen and relay messages in a worker thread
+ QpidSubscription qsub(qs, false);
+ const worker w(3);
+ const lambda<const bool(const value&, const value&)> rl = [rel](const value& k, const value& v) -> const bool {
+ // A relay function that posts the AMQP messages it receives to a relay component reference.
debug(k, "queue::relay::key");
debug(v, "queue::relay::value");
const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v));
return true;
- }
+ };
-private:
- const lambda<value(const list<value>&)> rel;
-};
-
-/**
- * Subscribe and listen to an AMQP queue.
- */
-class subscribe {
-public:
- subscribe(const string& qname, const lambda<bool(const value&, const value&)>& l, const QpidSubscription& qsub) : qname(qname), l(l), qsub(qsub) {
- }
-
- const failable<bool> operator()() const {
- gc_pool pool;
+ // Subscribe and listen to the AMQP queue.
+ const lambda<const failable<bool>()> subscribe = [qname, rl, qsub]() -> const failable<bool> {
+ const gc_pool pool;
debug(qname, "queue::subscribe::listen");
- const failable<bool> r = listen(qname, l, const_cast<QpidSubscription&>(qsub));
+ const failable<bool> r = listen(qname, rl, const_cast<QpidSubscription&>(qsub));
debug(qname, "queue::subscribe::stopped");
return r;
- }
-
-private:
- const string qname;
- const lambda<bool(const value&, const value&)> l;
- const QpidSubscription qsub;
-};
-
-/**
- * Listener lambda function, responsible for starting an AMQP subscription in a worker thread, and
- * apply any function calls to the listener component. The only supported function is stop(),
- * called to stop the listener component and shutdown the worker thread.
- */
-class listener {
-public:
- listener(QpidConnection& qc, QpidSession& qs, QpidSubscription& qsub, worker& w) : qc(qc), qs(qs), qsub(qsub), w(w) {
- }
+ };
+ submit<failable<bool> >(w, subscribe);
- const value operator()(const list<value>& params) const {
+ // Return the listener component lambda function
+ const lvvlambda listener = [qc, qs, qsub, w](const list<value>& params) -> const value {
const tuscany::value func(car(params));
// Stop the component
@@ -107,41 +95,9 @@ public:
cancel(const_cast<worker&>(w));
debug("queue::listener::stopped");
- return failable<value>(value(lambda<value(const list<value>&)>()));
- }
-
-private:
- QpidConnection qc;
- QpidSession qs;
- QpidSubscription qsub;
- worker w;
-};
-
-/**
- * Start the component.
- */
-const failable<value> start(const list<value>& params) {
- // Extract the relay reference and the AMQP key and queue name
- const value rel = car(params);
- const value pk = ((lambda<value(const list<value>&)>)cadr(params))(list<value>());
- const value key = isList(pk)? (list<value>)pk : mklist<value>(pk);
- const value qname = ((lambda<value(const list<value>&)>)caddr(params))(list<value>());
-
- // Create an AMQP session
- QpidConnection qc(false);
- QpidSession qs(qc, false);
-
- // Declare the configured AMQP key / queue pair
- declareQueue(key, qname, qs);
-
- // Listen and relay messages in a worker thread
- QpidSubscription qsub(qs, false);
- worker w(3);
- const lambda<bool(const value&, const value&)> rl = relay(rel);
- submit<failable<bool> >(w, lambda<failable<bool>()>(subscribe(qname, rl, qsub)));
-
- // Return the listener component lambda function
- return value(lambda<value(const list<value>&)>(listener(qc, qs, qsub, w)));
+ return failable<value>(value(lvvlambda()));
+ };
+ return value(listener);
}
}