diff options
Diffstat (limited to 'sca-cpp/trunk/components/queue/qpid.hpp')
-rw-r--r-- | sca-cpp/trunk/components/queue/qpid.hpp | 153 |
1 files changed, 139 insertions, 14 deletions
diff --git a/sca-cpp/trunk/components/queue/qpid.hpp b/sca-cpp/trunk/components/queue/qpid.hpp index e6be319be0..8b466cedcc 100644 --- a/sca-cpp/trunk/components/queue/qpid.hpp +++ b/sca-cpp/trunk/components/queue/qpid.hpp @@ -34,6 +34,8 @@ #include <qpid/client/Connection.h> #include <qpid/client/Session.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> #include "string.hpp" #include "list.hpp" @@ -53,6 +55,10 @@ public: c.open("localhost", 5672); } + QpidConnection(const bool owner) : owner(owner) { + c.open("localhost", 5672); + } + QpidConnection(const QpidConnection& qc) : owner(false), c(qc.c) { } @@ -63,6 +69,7 @@ public: } private: + friend const failable<bool> close(QpidConnection& qc); friend class QpidSession; const bool owner; @@ -71,6 +78,14 @@ private: }; /** + * Close a Qpid connection. + */ +const failable<bool> close(QpidConnection& qc) { + qc.c.close(); + return true; +} + +/** * Represents a Qpid session. */ class QpidSession { @@ -78,6 +93,9 @@ public: QpidSession(QpidConnection& qc) : owner(true), s(qc.c.newSession()) { } + QpidSession(QpidConnection& qc, const bool owner) : owner(owner), s(qc.c.newSession()) { + } + QpidSession(const QpidSession& qs) : owner(false), s(qs.s) { } @@ -88,38 +106,145 @@ public: } private: - friend qpid::client::Session session(const QpidSession& qs); + friend const failable<bool> close(QpidSession& qs); + friend const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs); + friend const failable<bool> post(const value& key, const value& val, QpidSession& qs); + friend class QpidSubscription; const bool owner; qpid::client::Session s; }; -qpid::client::Session session(const QpidSession& qs) { - return qs.s; +/** + * Close a Qpid session. + */ +const failable<bool> close(QpidSession& qs) { + try { + qs.s.close(); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + return true; } /** * Declare a key / AMQP queue pair. */ -const failable<bool> declareQueue(const string& key, const string& name, QpidSession& qs) { - session(qs).queueDeclare(qpid::client::arg::queue=c_str(name)); - session(qs).exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(key)); +const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs) { + const string ks(scheme::writeValue(key)); + try { + qs.s.queueDeclare(qpid::client::arg::queue=c_str(name)); + qs.s.exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(ks)); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } return true; } /** - * Post a key / value pair in message to an AMQP broker. + * Post a key / value pair message to an AMQP broker. */ -const failable<bool> post(const string& key, const value& val, QpidSession& qs) { +const failable<bool> post(const value& key, const value& val, QpidSession& qs) { - // Convert the value to a string + // Send in a message with the given key. + const string ks(scheme::writeValue(key)); const string vs(scheme::writeValue(val)); + try { + qpid::client::Message message; + message.getDeliveryProperties().setRoutingKey(c_str(ks)); + message.setData(c_str(vs)); + qs.s.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct"); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + return true; +} - // Send in a message with the given key. - qpid::client::Message message; - message.getDeliveryProperties().setRoutingKey(c_str(key)); - message.setData(c_str(vs)); - session(qs).messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct"); +/** + * Represents a Qpid subscription. + */ +class QpidSubscription { +public: + QpidSubscription(QpidSession& qs) : owner(true), subs(qs.s) { + } + + QpidSubscription(QpidSession& qs, const bool owner) : owner(owner), subs(qs.s) { + } + + QpidSubscription(const QpidSubscription& qsub) : owner(false), subs(qsub.subs) { + } + + ~QpidSubscription() { + if (!owner) + return; + try { + subs.stop(); + } catch (const qpid::Exception& e) { + mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + } + +private: + friend const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub); + friend const failable<bool> stop(QpidSubscription& qsub); + + const bool owner; + qpid::client::SubscriptionManager subs; +}; + +/** + * Register a listener function with an AMQP queue. + */ +class Listener : public qpid::client::MessageListener { +public: + Listener(const lambda<bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) { + } + + virtual void received(qpid::client::Message& msg) { + + // Call the listener function + const value k(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str())); + const value v(scheme::readValue(msg.getData().c_str())); + const bool r = l(k, v); + if (!r) { + try { + subs.cancel(msg.getDestination()); + } catch (const qpid::Exception& e) { + mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + } + } + +private: + const lambda<bool(const value&, const value&)> l; + qpid::client::SubscriptionManager& subs; +}; + + +const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub) { + debug("queue::listen"); + Listener listener(l, qsub.subs); + try { + qsub.subs.subscribe(listener, c_str(name)); + qsub.subs.run(); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + debug("queue::listen::stopped"); + return true; +} + +/** + * Stop an AMQP subscription. + */ +const failable<bool> stop(QpidSubscription& qsub) { + debug("queue::stop"); + try { + qsub.subs.stop(); + } catch (const qpid::Exception& e) { + return mkfailure<bool>(string("Qpid failure: ") + e.what()); + } + debug("queue::stopped"); return true; } |