diff options
Diffstat (limited to '')
-rw-r--r-- | sca-cpp/trunk/components/queue/Makefile.am | 2 | ||||
-rw-r--r-- | sca-cpp/trunk/components/queue/client-test.cpp | 24 | ||||
-rw-r--r-- | sca-cpp/trunk/components/queue/qpid-test.cpp | 14 | ||||
-rw-r--r-- | sca-cpp/trunk/components/queue/qpid.hpp | 27 | ||||
-rw-r--r-- | sca-cpp/trunk/components/queue/queue-listener.cpp | 106 | ||||
-rw-r--r-- | sca-cpp/trunk/components/queue/queue-sender.cpp | 2 |
6 files changed, 62 insertions, 113 deletions
diff --git a/sca-cpp/trunk/components/queue/Makefile.am b/sca-cpp/trunk/components/queue/Makefile.am index c44722a523..81a7128b4a 100644 --- a/sca-cpp/trunk/components/queue/Makefile.am +++ b/sca-cpp/trunk/components/queue/Makefile.am @@ -48,7 +48,7 @@ qpid_test_SOURCES = qpid-test.cpp qpid_test_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient -lqpidcommon client_test_SOURCES = client-test.cpp -client_test_LDFLAGS = -lxml2 -lcurl -lmozjs -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient -lqpidcommon +client_test_LDFLAGS = -lxml2 -lcurl -ljansson -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient -lqpidcommon dist_noinst_SCRIPTS = send-test server-test noinst_PROGRAMS = qpid-test client-test diff --git a/sca-cpp/trunk/components/queue/client-test.cpp b/sca-cpp/trunk/components/queue/client-test.cpp index 30bfe07bf7..f1399bb22c 100644 --- a/sca-cpp/trunk/components/queue/client-test.cpp +++ b/sca-cpp/trunk/components/queue/client-test.cpp @@ -46,15 +46,15 @@ namespace queue { const value key(mklist<value>(string("report"))); const string qname("reportq"); -const list<value> item = list<value>() + "content" + (list<value>() + "item" - + (list<value>() + "name" + string("Apple")) - + (list<value>() + "price" + string("$2.99"))); -const list<value> entry = list<value>() + (list<value>() + "entry" - + (list<value>() + "title" + string("item")) - + (list<value>() + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b")) +const list<value> item = nilListValue + "content" + (nilListValue + "item" + + (nilListValue + "name" + string("Apple")) + + (nilListValue + "price" + string("$2.99"))); +const list<value> entry = nilListValue + (nilListValue + "entry" + + (nilListValue + "title" + string("item")) + + (nilListValue + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b")) + item); -bool testDeclareQueue() { +const bool testDeclareQueue() { QpidConnection qc; QpidSession qs(qc); const failable<bool> r = declareQueue(key, qname, qs); @@ -69,18 +69,18 @@ const bool listener(const value& k, const value& v) { return false; } -bool testListen() { +const bool testListen() { QpidConnection qc; QpidSession qs(qc); QpidSubscription qsub(qs); - const lambda<bool(const value&, const value&)> l(listener); + const lambda<const bool(const value&, const value&)> l(listener); listen(qname, l, qsub); return true; } -bool testPost() { - gc_scoped_pool pool; - http::CURLSession ch("", "", "", ""); +const bool testPost() { + const gc_scoped_pool pool; + const http::CURLSession ch("", "", "", "", 0); const failable<value> id = http::post(entry, "http://localhost:8090/print-sender", ch); assert(hasContent(id)); return true; diff --git a/sca-cpp/trunk/components/queue/qpid-test.cpp b/sca-cpp/trunk/components/queue/qpid-test.cpp index 27db7734b0..87fc39c8bd 100644 --- a/sca-cpp/trunk/components/queue/qpid-test.cpp +++ b/sca-cpp/trunk/components/queue/qpid-test.cpp @@ -45,7 +45,7 @@ namespace queue { const value key(mklist<value>("test")); const string qname("testq"); -bool testDeclareQueue() { +const bool testDeclareQueue() { QpidConnection qc; QpidSession qs(qc); const failable<bool> r = declareQueue(key, qname, qs); @@ -53,12 +53,12 @@ bool testDeclareQueue() { return true; } -const list<value> item = list<value>() - + (list<value>() + "name" + string("Apple")) - + (list<value>() + "price" + string("$2.99")); +const list<value> item = nilListValue + + (nilListValue + "name" + string("Apple")) + + (nilListValue + "price" + string("$2.99")); const list<value> entry = mklist<value>(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), item); -bool testPost() { +const bool testPost() { QpidConnection qc; QpidSession qs(qc); const failable<bool> r = post(key, entry, qs); @@ -72,11 +72,11 @@ const bool listener(const value& k, const value& v) { return false; } -bool testListen() { +const bool testListen() { QpidConnection qc; QpidSession qs(qc); QpidSubscription qsub(qs); - const lambda<bool(const value&, const value&)> l(listener); + const lambda<const bool(const value&, const value&)> l(listener); listen(qname, l, qsub); return true; } diff --git a/sca-cpp/trunk/components/queue/qpid.hpp b/sca-cpp/trunk/components/queue/qpid.hpp index ef53c529e8..77361461c6 100644 --- a/sca-cpp/trunk/components/queue/qpid.hpp +++ b/sca-cpp/trunk/components/queue/qpid.hpp @@ -65,14 +65,7 @@ public: debug("queue::qpidonnection::copy"); } - const QpidConnection& operator=(const QpidConnection& qc) { - debug("queue::qpidonnection::operator="); - if(this == &c) - return *this; - owner = false; - c = qc.c; - return *this; - } + QpidConnection& operator=(const QpidConnection& qc) = delete; ~QpidConnection() { debug("queue::~qpidonnection"); @@ -148,7 +141,7 @@ const failable<bool> close(QpidSession& qs) { * Declare a key / AMQP queue pair. */ const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs) { - const string ks(scheme::writeValue(key)); + const string ks(write(content(scheme::writeValue(key)))); try { qs.s.queueDeclare(qpid::client::arg::queue=c_str(name)); qs.s.exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(ks)); @@ -164,8 +157,8 @@ const failable<bool> declareQueue(const value& key, const string& name, QpidSess const failable<bool> post(const value& key, const value& val, QpidSession& qs) { // Send in a message with the given key. - const string ks(scheme::writeValue(key)); - const string vs(scheme::writeValue(val)); + const string ks(write(content(scheme::writeValue(key)))); + const string vs(write(content(scheme::writeValue(val)))); try { qpid::client::Message message; message.getDeliveryProperties().setRoutingKey(c_str(ks)); @@ -202,7 +195,7 @@ public: } private: - friend const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub); + friend const failable<bool> listen(const string& name, const lambda<const bool(const value&, const value&)>& l, QpidSubscription& qsub); friend const failable<bool> stop(QpidSubscription& qsub); const bool owner; @@ -214,14 +207,14 @@ private: */ class Listener : public qpid::client::MessageListener { public: - Listener(const lambda<bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) { + Listener(const lambda<const bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) { } virtual void received(qpid::client::Message& msg) { // Call the listener function - const value k(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str())); - const value v(scheme::readValue(msg.getData().c_str())); + const value k(content(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str()))); + const value v(content(scheme::readValue(msg.getData().c_str()))); const bool r = l(k, v); if (!r) { try { @@ -233,12 +226,12 @@ public: } private: - const lambda<bool(const value&, const value&)> l; + const lambda<const bool(const value&, const value&)> l; qpid::client::SubscriptionManager& subs; }; -const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub) { +const failable<bool> listen(const string& name, const lambda<const bool(const value&, const value&)>& l, QpidSubscription& qsub) { debug("queue::listen"); Listener listener(l, qsub.subs); try { diff --git a/sca-cpp/trunk/components/queue/queue-listener.cpp b/sca-cpp/trunk/components/queue/queue-listener.cpp index 483d0de65a..f6793ecae7 100644 --- a/sca-cpp/trunk/components/queue/queue-listener.cpp +++ b/sca-cpp/trunk/components/queue/queue-listener.cpp @@ -42,57 +42,45 @@ namespace tuscany { namespace queue { /** - * A relay function that posts the AMQP messages it receives to a relay component reference. + * Start the component. */ -class relay { -public: - relay(const lambda<value(const list<value>&)>& rel) : rel(rel) { - } +const failable<value> start(const list<value>& params) { + // Extract the relay reference and the AMQP key and queue name + const value rel = car(params); + const value pk = ((lvvlambda)cadr(params))(nilListValue); + const value key = isList(pk)? (list<value>)pk : mklist<value>(pk); + const value qname = ((lvvlambda)caddr(params))(nilListValue); + + // Create an AMQP session + QpidConnection qc(false); + QpidSession qs(qc, false); + + // Declare the configured AMQP key / queue pair + declareQueue(key, qname, qs); - const bool operator()(const value& k, const value& v) const { + // Listen and relay messages in a worker thread + QpidSubscription qsub(qs, false); + const worker w(3); + const lambda<const bool(const value&, const value&)> rl = [rel](const value& k, const value& v) -> const bool { + // A relay function that posts the AMQP messages it receives to a relay component reference. debug(k, "queue::relay::key"); debug(v, "queue::relay::value"); const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v)); return true; - } + }; -private: - const lambda<value(const list<value>&)> rel; -}; - -/** - * Subscribe and listen to an AMQP queue. - */ -class subscribe { -public: - subscribe(const string& qname, const lambda<bool(const value&, const value&)>& l, const QpidSubscription& qsub) : qname(qname), l(l), qsub(qsub) { - } - - const failable<bool> operator()() const { - gc_pool pool; + // Subscribe and listen to the AMQP queue. + const lambda<const failable<bool>()> subscribe = [qname, rl, qsub]() -> const failable<bool> { + const gc_pool pool; debug(qname, "queue::subscribe::listen"); - const failable<bool> r = listen(qname, l, const_cast<QpidSubscription&>(qsub)); + const failable<bool> r = listen(qname, rl, const_cast<QpidSubscription&>(qsub)); debug(qname, "queue::subscribe::stopped"); return r; - } - -private: - const string qname; - const lambda<bool(const value&, const value&)> l; - const QpidSubscription qsub; -}; - -/** - * Listener lambda function, responsible for starting an AMQP subscription in a worker thread, and - * apply any function calls to the listener component. The only supported function is stop(), - * called to stop the listener component and shutdown the worker thread. - */ -class listener { -public: - listener(QpidConnection& qc, QpidSession& qs, QpidSubscription& qsub, worker& w) : qc(qc), qs(qs), qsub(qsub), w(w) { - } + }; + submit<failable<bool> >(w, subscribe); - const value operator()(const list<value>& params) const { + // Return the listener component lambda function + const lvvlambda listener = [qc, qs, qsub, w](const list<value>& params) -> const value { const tuscany::value func(car(params)); // Stop the component @@ -107,41 +95,9 @@ public: cancel(const_cast<worker&>(w)); debug("queue::listener::stopped"); - return failable<value>(value(lambda<value(const list<value>&)>())); - } - -private: - QpidConnection qc; - QpidSession qs; - QpidSubscription qsub; - worker w; -}; - -/** - * Start the component. - */ -const failable<value> start(const list<value>& params) { - // Extract the relay reference and the AMQP key and queue name - const value rel = car(params); - const value pk = ((lambda<value(const list<value>&)>)cadr(params))(list<value>()); - const value key = isList(pk)? (list<value>)pk : mklist<value>(pk); - const value qname = ((lambda<value(const list<value>&)>)caddr(params))(list<value>()); - - // Create an AMQP session - QpidConnection qc(false); - QpidSession qs(qc, false); - - // Declare the configured AMQP key / queue pair - declareQueue(key, qname, qs); - - // Listen and relay messages in a worker thread - QpidSubscription qsub(qs, false); - worker w(3); - const lambda<bool(const value&, const value&)> rl = relay(rel); - submit<failable<bool> >(w, lambda<failable<bool>()>(subscribe(qname, rl, qsub))); - - // Return the listener component lambda function - return value(lambda<value(const list<value>&)>(listener(qc, qs, qsub, w))); + return failable<value>(value(lvvlambda())); + }; + return value(listener); } } diff --git a/sca-cpp/trunk/components/queue/queue-sender.cpp b/sca-cpp/trunk/components/queue/queue-sender.cpp index 202a0e4435..a479f255ce 100644 --- a/sca-cpp/trunk/components/queue/queue-sender.cpp +++ b/sca-cpp/trunk/components/queue/queue-sender.cpp @@ -48,7 +48,7 @@ const failable<value> post(const list<value>& params) { QpidSession qs(qc); // Post the item - const value pk = ((lambda<value(const list<value>&)>)caddr(params))(list<value>()); + const value pk = ((lvvlambda)caddr(params))(nilListValue); const value key = isList(pk)? append<value>(pk, (list<value>)car(params)) : cons<value>(pk, (list<value>)car(params)); debug(key, "queue::post::key"); debug(cadr(params), "queue::post::value"); |