summaryrefslogtreecommitdiffstats
path: root/sca-cpp/trunk/components/queue/qpid.hpp
diff options
context:
space:
mode:
authorjsdelfino <jsdelfino@13f79535-47bb-0310-9956-ffa450edef68>2010-02-17 04:14:31 +0000
committerjsdelfino <jsdelfino@13f79535-47bb-0310-9956-ffa450edef68>2010-02-17 04:14:31 +0000
commit00438314438f3dde00b532ac5d8d28ccc35c7096 (patch)
tree80dbbb010c5125455a164c77670b8694231f123f /sca-cpp/trunk/components/queue/qpid.hpp
parent50063bc212e8e93d014519ef0e4d4cabef0b6be2 (diff)
Working queue and chat components. Added a few useful start/stop scripts. Fixed lifecycle code to call start/stop/restart functions before APR pools are cleaned up in both parent and child processes. Minor build script improvements.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@910819 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-cpp/trunk/components/queue/qpid.hpp')
-rw-r--r--sca-cpp/trunk/components/queue/qpid.hpp153
1 files changed, 139 insertions, 14 deletions
diff --git a/sca-cpp/trunk/components/queue/qpid.hpp b/sca-cpp/trunk/components/queue/qpid.hpp
index e6be319be0..8b466cedcc 100644
--- a/sca-cpp/trunk/components/queue/qpid.hpp
+++ b/sca-cpp/trunk/components/queue/qpid.hpp
@@ -34,6 +34,8 @@
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/SubscriptionManager.h>
#include "string.hpp"
#include "list.hpp"
@@ -53,6 +55,10 @@ public:
c.open("localhost", 5672);
}
+ QpidConnection(const bool owner) : owner(owner) {
+ c.open("localhost", 5672);
+ }
+
QpidConnection(const QpidConnection& qc) : owner(false), c(qc.c) {
}
@@ -63,6 +69,7 @@ public:
}
private:
+ friend const failable<bool> close(QpidConnection& qc);
friend class QpidSession;
const bool owner;
@@ -71,6 +78,14 @@ private:
};
/**
+ * Close a Qpid connection.
+ */
+const failable<bool> close(QpidConnection& qc) {
+ qc.c.close();
+ return true;
+}
+
+/**
* Represents a Qpid session.
*/
class QpidSession {
@@ -78,6 +93,9 @@ public:
QpidSession(QpidConnection& qc) : owner(true), s(qc.c.newSession()) {
}
+ QpidSession(QpidConnection& qc, const bool owner) : owner(owner), s(qc.c.newSession()) {
+ }
+
QpidSession(const QpidSession& qs) : owner(false), s(qs.s) {
}
@@ -88,38 +106,145 @@ public:
}
private:
- friend qpid::client::Session session(const QpidSession& qs);
+ friend const failable<bool> close(QpidSession& qs);
+ friend const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs);
+ friend const failable<bool> post(const value& key, const value& val, QpidSession& qs);
+ friend class QpidSubscription;
const bool owner;
qpid::client::Session s;
};
-qpid::client::Session session(const QpidSession& qs) {
- return qs.s;
+/**
+ * Close a Qpid session.
+ */
+const failable<bool> close(QpidSession& qs) {
+ try {
+ qs.s.close();
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ return true;
}
/**
* Declare a key / AMQP queue pair.
*/
-const failable<bool> declareQueue(const string& key, const string& name, QpidSession& qs) {
- session(qs).queueDeclare(qpid::client::arg::queue=c_str(name));
- session(qs).exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(key));
+const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs) {
+ const string ks(scheme::writeValue(key));
+ try {
+ qs.s.queueDeclare(qpid::client::arg::queue=c_str(name));
+ qs.s.exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(ks));
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
return true;
}
/**
- * Post a key / value pair in message to an AMQP broker.
+ * Post a key / value pair message to an AMQP broker.
*/
-const failable<bool> post(const string& key, const value& val, QpidSession& qs) {
+const failable<bool> post(const value& key, const value& val, QpidSession& qs) {
- // Convert the value to a string
+ // Send in a message with the given key.
+ const string ks(scheme::writeValue(key));
const string vs(scheme::writeValue(val));
+ try {
+ qpid::client::Message message;
+ message.getDeliveryProperties().setRoutingKey(c_str(ks));
+ message.setData(c_str(vs));
+ qs.s.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct");
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ return true;
+}
- // Send in a message with the given key.
- qpid::client::Message message;
- message.getDeliveryProperties().setRoutingKey(c_str(key));
- message.setData(c_str(vs));
- session(qs).messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct");
+/**
+ * Represents a Qpid subscription.
+ */
+class QpidSubscription {
+public:
+ QpidSubscription(QpidSession& qs) : owner(true), subs(qs.s) {
+ }
+
+ QpidSubscription(QpidSession& qs, const bool owner) : owner(owner), subs(qs.s) {
+ }
+
+ QpidSubscription(const QpidSubscription& qsub) : owner(false), subs(qsub.subs) {
+ }
+
+ ~QpidSubscription() {
+ if (!owner)
+ return;
+ try {
+ subs.stop();
+ } catch (const qpid::Exception& e) {
+ mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ }
+
+private:
+ friend const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub);
+ friend const failable<bool> stop(QpidSubscription& qsub);
+
+ const bool owner;
+ qpid::client::SubscriptionManager subs;
+};
+
+/**
+ * Register a listener function with an AMQP queue.
+ */
+class Listener : public qpid::client::MessageListener {
+public:
+ Listener(const lambda<bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) {
+ }
+
+ virtual void received(qpid::client::Message& msg) {
+
+ // Call the listener function
+ const value k(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str()));
+ const value v(scheme::readValue(msg.getData().c_str()));
+ const bool r = l(k, v);
+ if (!r) {
+ try {
+ subs.cancel(msg.getDestination());
+ } catch (const qpid::Exception& e) {
+ mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ }
+ }
+
+private:
+ const lambda<bool(const value&, const value&)> l;
+ qpid::client::SubscriptionManager& subs;
+};
+
+
+const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub) {
+ debug("queue::listen");
+ Listener listener(l, qsub.subs);
+ try {
+ qsub.subs.subscribe(listener, c_str(name));
+ qsub.subs.run();
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ debug("queue::listen::stopped");
+ return true;
+}
+
+/**
+ * Stop an AMQP subscription.
+ */
+const failable<bool> stop(QpidSubscription& qsub) {
+ debug("queue::stop");
+ try {
+ qsub.subs.stop();
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ debug("queue::stopped");
return true;
}