summaryrefslogtreecommitdiffstats
path: root/sca-cpp/branches/lightweight-sca/components/queue/qpid.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'sca-cpp/branches/lightweight-sca/components/queue/qpid.hpp')
-rw-r--r--sca-cpp/branches/lightweight-sca/components/queue/qpid.hpp277
1 files changed, 277 insertions, 0 deletions
diff --git a/sca-cpp/branches/lightweight-sca/components/queue/qpid.hpp b/sca-cpp/branches/lightweight-sca/components/queue/qpid.hpp
new file mode 100644
index 0000000000..ef53c529e8
--- /dev/null
+++ b/sca-cpp/branches/lightweight-sca/components/queue/qpid.hpp
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* $Rev$ $Date$ */
+
+#ifndef tuscany_qpid_hpp
+#define tuscany_qpid_hpp
+
+/**
+ * AMQP queue access functions.
+ */
+
+// Ignore conversion issues and redundant declarations in Qpid headers
+#ifdef WANT_MAINTAINER_WARNINGS
+#pragma GCC diagnostic ignored "-Wconversion"
+#pragma GCC diagnostic ignored "-Wredundant-decls"
+#endif
+
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/SubscriptionManager.h>
+
+#include "string.hpp"
+#include "list.hpp"
+#include "value.hpp"
+#include "monad.hpp"
+#include "../../modules/scheme/eval.hpp"
+
+namespace tuscany {
+namespace queue {
+
+/**
+ * Represents a Qpid connection.
+ */
+class QpidConnection {
+public:
+ QpidConnection() : owner(true) {
+ debug("queue::qpidonnection");
+ c.open("localhost", 5672);
+ }
+
+ QpidConnection(const bool owner) : owner(owner) {
+ debug("queue::qpidonnection");
+ c.open("localhost", 5672);
+ }
+
+ QpidConnection(const QpidConnection& qc) : owner(false), c(qc.c) {
+ debug("queue::qpidonnection::copy");
+ }
+
+ const QpidConnection& operator=(const QpidConnection& qc) {
+ debug("queue::qpidonnection::operator=");
+ if(this == &c)
+ return *this;
+ owner = false;
+ c = qc.c;
+ return *this;
+ }
+
+ ~QpidConnection() {
+ debug("queue::~qpidonnection");
+ if (!owner)
+ return;
+ c.close();
+ }
+
+private:
+ friend const failable<bool> close(QpidConnection& qc);
+ friend class QpidSession;
+
+ const bool owner;
+ qpid::client::Connection c;
+
+};
+
+/**
+ * Close a Qpid connection.
+ */
+const failable<bool> close(QpidConnection& qc) {
+ qc.c.close();
+ return true;
+}
+
+/**
+ * Represents a Qpid session.
+ */
+class QpidSession {
+public:
+ QpidSession(QpidConnection& qc) : owner(true), s(qc.c.newSession()) {
+ debug("queue::qpidsession");
+ }
+
+ QpidSession(QpidConnection& qc, const bool owner) : owner(owner), s(qc.c.newSession()) {
+ debug("queue::qpidsession");
+ }
+
+ QpidSession(const QpidSession& qs) : owner(false), s(qs.s) {
+ debug("queue::qpidsession::copy");
+ }
+
+ ~QpidSession() {
+ debug("queue::~qpidsession");
+ if (!owner)
+ return;
+ s.close();
+ }
+
+private:
+ friend const failable<bool> close(QpidSession& qs);
+ friend const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs);
+ friend const failable<bool> post(const value& key, const value& val, QpidSession& qs);
+ friend class QpidSubscription;
+
+ const bool owner;
+ qpid::client::Session s;
+};
+
+/**
+ * Close a Qpid session.
+ */
+const failable<bool> close(QpidSession& qs) {
+ try {
+ qs.s.close();
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ return true;
+}
+
+/**
+ * Declare a key / AMQP queue pair.
+ */
+const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs) {
+ const string ks(scheme::writeValue(key));
+ try {
+ qs.s.queueDeclare(qpid::client::arg::queue=c_str(name));
+ qs.s.exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(ks));
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ return true;
+}
+
+/**
+ * Post a key / value pair message to an AMQP broker.
+ */
+const failable<bool> post(const value& key, const value& val, QpidSession& qs) {
+
+ // Send in a message with the given key.
+ const string ks(scheme::writeValue(key));
+ const string vs(scheme::writeValue(val));
+ try {
+ qpid::client::Message message;
+ message.getDeliveryProperties().setRoutingKey(c_str(ks));
+ message.setData(c_str(vs));
+ qs.s.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct");
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ return true;
+}
+
+/**
+ * Represents a Qpid subscription.
+ */
+class QpidSubscription {
+public:
+ QpidSubscription(QpidSession& qs) : owner(true), subs(qs.s) {
+ }
+
+ QpidSubscription(QpidSession& qs, const bool owner) : owner(owner), subs(qs.s) {
+ }
+
+ QpidSubscription(const QpidSubscription& qsub) : owner(false), subs(qsub.subs) {
+ }
+
+ ~QpidSubscription() {
+ if (!owner)
+ return;
+ try {
+ subs.stop();
+ } catch (const qpid::Exception& e) {
+ mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ }
+
+private:
+ friend const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub);
+ friend const failable<bool> stop(QpidSubscription& qsub);
+
+ const bool owner;
+ qpid::client::SubscriptionManager subs;
+};
+
+/**
+ * Register a listener function with an AMQP queue.
+ */
+class Listener : public qpid::client::MessageListener {
+public:
+ Listener(const lambda<bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) {
+ }
+
+ virtual void received(qpid::client::Message& msg) {
+
+ // Call the listener function
+ const value k(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str()));
+ const value v(scheme::readValue(msg.getData().c_str()));
+ const bool r = l(k, v);
+ if (!r) {
+ try {
+ subs.cancel(msg.getDestination());
+ } catch (const qpid::Exception& e) {
+ mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ }
+ }
+
+private:
+ const lambda<bool(const value&, const value&)> l;
+ qpid::client::SubscriptionManager& subs;
+};
+
+
+const failable<bool> listen(const string& name, const lambda<bool(const value&, const value&)>& l, QpidSubscription& qsub) {
+ debug("queue::listen");
+ Listener listener(l, qsub.subs);
+ try {
+ qsub.subs.subscribe(listener, c_str(name));
+ qsub.subs.run();
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ debug("queue::listen::stopped");
+ return true;
+}
+
+/**
+ * Stop an AMQP subscription.
+ */
+const failable<bool> stop(QpidSubscription& qsub) {
+ debug("queue::stop");
+ try {
+ qsub.subs.stop();
+ } catch (const qpid::Exception& e) {
+ return mkfailure<bool>(string("Qpid failure: ") + e.what());
+ }
+ debug("queue::stopped");
+ return true;
+}
+
+}
+}
+
+// Re-enable conversion and redundant declarations warnings
+#ifdef WANT_MAINTAINER_WARNINGS
+#pragma GCC diagnostic warning "-Wconversion"
+#pragma GCC diagnostic warning "-Wredundant-decls"
+#endif
+
+#endif /* tuscany_qpid_hpp */