summaryrefslogtreecommitdiffstats
path: root/sca-cpp/trunk/components/queue
diff options
context:
space:
mode:
Diffstat (limited to 'sca-cpp/trunk/components/queue')
-rw-r--r--sca-cpp/trunk/components/queue/Makefile.am2
-rw-r--r--sca-cpp/trunk/components/queue/client-test.cpp24
-rw-r--r--sca-cpp/trunk/components/queue/qpid-test.cpp14
-rw-r--r--sca-cpp/trunk/components/queue/qpid.hpp27
-rw-r--r--sca-cpp/trunk/components/queue/queue-listener.cpp106
-rw-r--r--sca-cpp/trunk/components/queue/queue-sender.cpp2
6 files changed, 62 insertions, 113 deletions
diff --git a/sca-cpp/trunk/components/queue/Makefile.am b/sca-cpp/trunk/components/queue/Makefile.am
index c44722a523..81a7128b4a 100644
--- a/sca-cpp/trunk/components/queue/Makefile.am
+++ b/sca-cpp/trunk/components/queue/Makefile.am
@@ -48,7 +48,7 @@ qpid_test_SOURCES = qpid-test.cpp
qpid_test_LDFLAGS = -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient -lqpidcommon
client_test_SOURCES = client-test.cpp
-client_test_LDFLAGS = -lxml2 -lcurl -lmozjs -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient -lqpidcommon
+client_test_LDFLAGS = -lxml2 -lcurl -ljansson -L${QPIDC_LIB} -R${QPIDC_LIB} -lqpidclient -lqpidcommon
dist_noinst_SCRIPTS = send-test server-test
noinst_PROGRAMS = qpid-test client-test
diff --git a/sca-cpp/trunk/components/queue/client-test.cpp b/sca-cpp/trunk/components/queue/client-test.cpp
index 30bfe07bf7..f1399bb22c 100644
--- a/sca-cpp/trunk/components/queue/client-test.cpp
+++ b/sca-cpp/trunk/components/queue/client-test.cpp
@@ -46,15 +46,15 @@ namespace queue {
const value key(mklist<value>(string("report")));
const string qname("reportq");
-const list<value> item = list<value>() + "content" + (list<value>() + "item"
- + (list<value>() + "name" + string("Apple"))
- + (list<value>() + "price" + string("$2.99")));
-const list<value> entry = list<value>() + (list<value>() + "entry"
- + (list<value>() + "title" + string("item"))
- + (list<value>() + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"))
+const list<value> item = nilListValue + "content" + (nilListValue + "item"
+ + (nilListValue + "name" + string("Apple"))
+ + (nilListValue + "price" + string("$2.99")));
+const list<value> entry = nilListValue + (nilListValue + "entry"
+ + (nilListValue + "title" + string("item"))
+ + (nilListValue + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"))
+ item);
-bool testDeclareQueue() {
+const bool testDeclareQueue() {
QpidConnection qc;
QpidSession qs(qc);
const failable<bool> r = declareQueue(key, qname, qs);
@@ -69,18 +69,18 @@ const bool listener(const value& k, const value& v) {
return false;
}
-bool testListen() {
+const bool testListen() {
QpidConnection qc;
QpidSession qs(qc);
QpidSubscription qsub(qs);
- const lambda<bool(const value&, const value&)> l(listener);
+ const lambda<const bool(const value&, const value&)> l(listener);
listen(qname, l, qsub);
return true;
}
-bool testPost() {
- gc_scoped_pool pool;
- http::CURLSession ch("", "", "", "");
+const bool testPost() {
+ const gc_scoped_pool pool;
+ const http::CURLSession ch("", "", "", "", 0);
const failable<value> id = http::post(entry, "http://localhost:8090/print-sender", ch);
assert(hasContent(id));
return true;
diff --git a/sca-cpp/trunk/components/queue/qpid-test.cpp b/sca-cpp/trunk/components/queue/qpid-test.cpp
index 27db7734b0..87fc39c8bd 100644
--- a/sca-cpp/trunk/components/queue/qpid-test.cpp
+++ b/sca-cpp/trunk/components/queue/qpid-test.cpp
@@ -45,7 +45,7 @@ namespace queue {
const value key(mklist<value>("test"));
const string qname("testq");
-bool testDeclareQueue() {
+const bool testDeclareQueue() {
QpidConnection qc;
QpidSession qs(qc);
const failable<bool> r = declareQueue(key, qname, qs);
@@ -53,12 +53,12 @@ bool testDeclareQueue() {
return true;
}
-const list<value> item = list<value>()
- + (list<value>() + "name" + string("Apple"))
- + (list<value>() + "price" + string("$2.99"));
+const list<value> item = nilListValue
+ + (nilListValue + "name" + string("Apple"))
+ + (nilListValue + "price" + string("$2.99"));
const list<value> entry = mklist<value>(string("item"), string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b"), item);
-bool testPost() {
+const bool testPost() {
QpidConnection qc;
QpidSession qs(qc);
const failable<bool> r = post(key, entry, qs);
@@ -72,11 +72,11 @@ const bool listener(const value& k, const value& v) {
return false;
}
-bool testListen() {
+const bool testListen() {
QpidConnection qc;
QpidSession qs(qc);
QpidSubscription qsub(qs);
- const lambda<bool(const value&, const value&)> l(listener);
+ const lambda<const bool(const value&, const value&)> l(listener);
listen(qname, l, qsub);
return true;
}
diff --git a/sca-cpp/trunk/components/queue/qpid.hpp b/sca-cpp/trunk/components/queue/qpid.hpp
index ef53c529e8..77361461c6 100644
--- a/sca-cpp/trunk/components/queue/qpid.hpp
+++ b/sca-cpp/trunk/components/queue/qpid.hpp
@@ -65,14 +65,7 @@ public:
debug("queue::qpidonnection::copy");
}
- const QpidConnection& operator=(const QpidConnection& qc) {
- debug("queue::qpidonnection::operator=");
- if(this == &c)
- return *this;
- owner = false;
- c = qc.c;
- return *this;
- }
+ QpidConnection& operator=(const QpidConnection& qc) = delete;
~QpidConnection() {
debug("queue::~qpidonnection");
@@ -148,7 +141,7 @@ const failable<bool> close(QpidSession& qs) {
* Declare a key / AMQP queue pair.
*/
const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs) {
- const string ks(scheme::writeValue(key));
+ const string ks(write(content(scheme::writeValue(key))));
try {
qs.s.queueDeclare(qpid::client::arg::queue=c_str(name));
qs.s.exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(ks));
@@ -164,8 +157,8 @@ const failable<bool> declareQueue(const value& key, const string& name, QpidSess
const failable<bool> post(const value& key, const value& val, QpidSession& qs) {
// Send in a message with the given key.
- const string ks(scheme::writeValue(key));
- const string vs(scheme::writeValue(val));
+ const string ks(write(content(scheme::writeValue(key))));
+ const string vs(write(content(scheme::writeValue(val))));
try {
qpid::client::Message message;
message.getDeliveryProperties().setRoutingKey(c_str(ks));
@@ -202,7 +195,7 @@ public:
}
private:
- friend const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub);
+ friend const failable<bool> listen(const string& name, const lambda<const bool(const value&, const value&)>& l, QpidSubscription& qsub);
friend const failable<bool> stop(QpidSubscription& qsub);
const bool owner;
@@ -214,14 +207,14 @@ private:
*/
class Listener : public qpid::client::MessageListener {
public:
- Listener(const lambda<bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) {
+ Listener(const lambda<const bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) {
}
virtual void received(qpid::client::Message& msg) {
// Call the listener function
- const value k(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str()));
- const value v(scheme::readValue(msg.getData().c_str()));
+ const value k(content(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str())));
+ const value v(content(scheme::readValue(msg.getData().c_str())));
const bool r = l(k, v);
if (!r) {
try {
@@ -233,12 +226,12 @@ public:
}
private:
- const lambda<bool(const value&, const value&)> l;
+ const lambda<const bool(const value&, const value&)> l;
qpid::client::SubscriptionManager& subs;
};
-const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub) {
+const failable<bool> listen(const string& name, const lambda<const bool(const value&, const value&)>& l, QpidSubscription& qsub) {
debug("queue::listen");
Listener listener(l, qsub.subs);
try {
diff --git a/sca-cpp/trunk/components/queue/queue-listener.cpp b/sca-cpp/trunk/components/queue/queue-listener.cpp
index 483d0de65a..f6793ecae7 100644
--- a/sca-cpp/trunk/components/queue/queue-listener.cpp
+++ b/sca-cpp/trunk/components/queue/queue-listener.cpp
@@ -42,57 +42,45 @@ namespace tuscany {
namespace queue {
/**
- * A relay function that posts the AMQP messages it receives to a relay component reference.
+ * Start the component.
*/
-class relay {
-public:
- relay(const lambda<value(const list<value>&)>& rel) : rel(rel) {
- }
+const failable<value> start(const list<value>& params) {
+ // Extract the relay reference and the AMQP key and queue name
+ const value rel = car(params);
+ const value pk = ((lvvlambda)cadr(params))(nilListValue);
+ const value key = isList(pk)? (list<value>)pk : mklist<value>(pk);
+ const value qname = ((lvvlambda)caddr(params))(nilListValue);
+
+ // Create an AMQP session
+ QpidConnection qc(false);
+ QpidSession qs(qc, false);
+
+ // Declare the configured AMQP key / queue pair
+ declareQueue(key, qname, qs);
- const bool operator()(const value& k, const value& v) const {
+ // Listen and relay messages in a worker thread
+ QpidSubscription qsub(qs, false);
+ const worker w(3);
+ const lambda<const bool(const value&, const value&)> rl = [rel](const value& k, const value& v) -> const bool {
+ // A relay function that posts the AMQP messages it receives to a relay component reference.
debug(k, "queue::relay::key");
debug(v, "queue::relay::value");
const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v));
return true;
- }
+ };
-private:
- const lambda<value(const list<value>&)> rel;
-};
-
-/**
- * Subscribe and listen to an AMQP queue.
- */
-class subscribe {
-public:
- subscribe(const string& qname, const lambda<bool(const value&, const value&)>& l, const QpidSubscription& qsub) : qname(qname), l(l), qsub(qsub) {
- }
-
- const failable<bool> operator()() const {
- gc_pool pool;
+ // Subscribe and listen to the AMQP queue.
+ const lambda<const failable<bool>()> subscribe = [qname, rl, qsub]() -> const failable<bool> {
+ const gc_pool pool;
debug(qname, "queue::subscribe::listen");
- const failable<bool> r = listen(qname, l, const_cast<QpidSubscription&>(qsub));
+ const failable<bool> r = listen(qname, rl, const_cast<QpidSubscription&>(qsub));
debug(qname, "queue::subscribe::stopped");
return r;
- }
-
-private:
- const string qname;
- const lambda<bool(const value&, const value&)> l;
- const QpidSubscription qsub;
-};
-
-/**
- * Listener lambda function, responsible for starting an AMQP subscription in a worker thread, and
- * apply any function calls to the listener component. The only supported function is stop(),
- * called to stop the listener component and shutdown the worker thread.
- */
-class listener {
-public:
- listener(QpidConnection& qc, QpidSession& qs, QpidSubscription& qsub, worker& w) : qc(qc), qs(qs), qsub(qsub), w(w) {
- }
+ };
+ submit<failable<bool> >(w, subscribe);
- const value operator()(const list<value>& params) const {
+ // Return the listener component lambda function
+ const lvvlambda listener = [qc, qs, qsub, w](const list<value>& params) -> const value {
const tuscany::value func(car(params));
// Stop the component
@@ -107,41 +95,9 @@ public:
cancel(const_cast<worker&>(w));
debug("queue::listener::stopped");
- return failable<value>(value(lambda<value(const list<value>&)>()));
- }
-
-private:
- QpidConnection qc;
- QpidSession qs;
- QpidSubscription qsub;
- worker w;
-};
-
-/**
- * Start the component.
- */
-const failable<value> start(const list<value>& params) {
- // Extract the relay reference and the AMQP key and queue name
- const value rel = car(params);
- const value pk = ((lambda<value(const list<value>&)>)cadr(params))(list<value>());
- const value key = isList(pk)? (list<value>)pk : mklist<value>(pk);
- const value qname = ((lambda<value(const list<value>&)>)caddr(params))(list<value>());
-
- // Create an AMQP session
- QpidConnection qc(false);
- QpidSession qs(qc, false);
-
- // Declare the configured AMQP key / queue pair
- declareQueue(key, qname, qs);
-
- // Listen and relay messages in a worker thread
- QpidSubscription qsub(qs, false);
- worker w(3);
- const lambda<bool(const value&, const value&)> rl = relay(rel);
- submit<failable<bool> >(w, lambda<failable<bool>()>(subscribe(qname, rl, qsub)));
-
- // Return the listener component lambda function
- return value(lambda<value(const list<value>&)>(listener(qc, qs, qsub, w)));
+ return failable<value>(value(lvvlambda()));
+ };
+ return value(listener);
}
}
diff --git a/sca-cpp/trunk/components/queue/queue-sender.cpp b/sca-cpp/trunk/components/queue/queue-sender.cpp
index 202a0e4435..a479f255ce 100644
--- a/sca-cpp/trunk/components/queue/queue-sender.cpp
+++ b/sca-cpp/trunk/components/queue/queue-sender.cpp
@@ -48,7 +48,7 @@ const failable<value> post(const list<value>& params) {
QpidSession qs(qc);
// Post the item
- const value pk = ((lambda<value(const list<value>&)>)caddr(params))(list<value>());
+ const value pk = ((lvvlambda)caddr(params))(nilListValue);
const value key = isList(pk)? append<value>(pk, (list<value>)car(params)) : cons<value>(pk, (list<value>)car(params));
debug(key, "queue::post::key");
debug(cadr(params), "queue::post::value");