diff options
Diffstat (limited to '')
-rw-r--r-- | sca-cpp/trunk/components/queue/queue-listener.cpp | 106 |
1 files changed, 31 insertions, 75 deletions
diff --git a/sca-cpp/trunk/components/queue/queue-listener.cpp b/sca-cpp/trunk/components/queue/queue-listener.cpp index 483d0de65a..f6793ecae7 100644 --- a/sca-cpp/trunk/components/queue/queue-listener.cpp +++ b/sca-cpp/trunk/components/queue/queue-listener.cpp @@ -42,57 +42,45 @@ namespace tuscany { namespace queue { /** - * A relay function that posts the AMQP messages it receives to a relay component reference. + * Start the component. */ -class relay { -public: - relay(const lambda<value(const list<value>&)>& rel) : rel(rel) { - } +const failable<value> start(const list<value>& params) { + // Extract the relay reference and the AMQP key and queue name + const value rel = car(params); + const value pk = ((lvvlambda)cadr(params))(nilListValue); + const value key = isList(pk)? (list<value>)pk : mklist<value>(pk); + const value qname = ((lvvlambda)caddr(params))(nilListValue); + + // Create an AMQP session + QpidConnection qc(false); + QpidSession qs(qc, false); + + // Declare the configured AMQP key / queue pair + declareQueue(key, qname, qs); - const bool operator()(const value& k, const value& v) const { + // Listen and relay messages in a worker thread + QpidSubscription qsub(qs, false); + const worker w(3); + const lambda<const bool(const value&, const value&)> rl = [rel](const value& k, const value& v) -> const bool { + // A relay function that posts the AMQP messages it receives to a relay component reference. debug(k, "queue::relay::key"); debug(v, "queue::relay::value"); const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v)); return true; - } + }; -private: - const lambda<value(const list<value>&)> rel; -}; - -/** - * Subscribe and listen to an AMQP queue. - */ -class subscribe { -public: - subscribe(const string& qname, const lambda<bool(const value&, const value&)>& l, const QpidSubscription& qsub) : qname(qname), l(l), qsub(qsub) { - } - - const failable<bool> operator()() const { - gc_pool pool; + // Subscribe and listen to the AMQP queue. + const lambda<const failable<bool>()> subscribe = [qname, rl, qsub]() -> const failable<bool> { + const gc_pool pool; debug(qname, "queue::subscribe::listen"); - const failable<bool> r = listen(qname, l, const_cast<QpidSubscription&>(qsub)); + const failable<bool> r = listen(qname, rl, const_cast<QpidSubscription&>(qsub)); debug(qname, "queue::subscribe::stopped"); return r; - } - -private: - const string qname; - const lambda<bool(const value&, const value&)> l; - const QpidSubscription qsub; -}; - -/** - * Listener lambda function, responsible for starting an AMQP subscription in a worker thread, and - * apply any function calls to the listener component. The only supported function is stop(), - * called to stop the listener component and shutdown the worker thread. - */ -class listener { -public: - listener(QpidConnection& qc, QpidSession& qs, QpidSubscription& qsub, worker& w) : qc(qc), qs(qs), qsub(qsub), w(w) { - } + }; + submit<failable<bool> >(w, subscribe); - const value operator()(const list<value>& params) const { + // Return the listener component lambda function + const lvvlambda listener = [qc, qs, qsub, w](const list<value>& params) -> const value { const tuscany::value func(car(params)); // Stop the component @@ -107,41 +95,9 @@ public: cancel(const_cast<worker&>(w)); debug("queue::listener::stopped"); - return failable<value>(value(lambda<value(const list<value>&)>())); - } - -private: - QpidConnection qc; - QpidSession qs; - QpidSubscription qsub; - worker w; -}; - -/** - * Start the component. - */ -const failable<value> start(const list<value>& params) { - // Extract the relay reference and the AMQP key and queue name - const value rel = car(params); - const value pk = ((lambda<value(const list<value>&)>)cadr(params))(list<value>()); - const value key = isList(pk)? (list<value>)pk : mklist<value>(pk); - const value qname = ((lambda<value(const list<value>&)>)caddr(params))(list<value>()); - - // Create an AMQP session - QpidConnection qc(false); - QpidSession qs(qc, false); - - // Declare the configured AMQP key / queue pair - declareQueue(key, qname, qs); - - // Listen and relay messages in a worker thread - QpidSubscription qsub(qs, false); - worker w(3); - const lambda<bool(const value&, const value&)> rl = relay(rel); - submit<failable<bool> >(w, lambda<failable<bool>()>(subscribe(qname, rl, qsub))); - - // Return the listener component lambda function - return value(lambda<value(const list<value>&)>(listener(qc, qs, qsub, w))); + return failable<value>(value(lvvlambda())); + }; + return value(listener); } } |