157ca678de
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1428191 13f79535-47bb-0310-9956-ffa450edef68
268 lines
7.3 KiB
C++
268 lines
7.3 KiB
C++
/*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
|
|
/* $Rev$ $Date$ */
|
|
|
|
#ifndef tuscany_qpid_hpp
|
|
#define tuscany_qpid_hpp
|
|
|
|
/**
|
|
* AMQP queue access functions.
|
|
*/
|
|
|
|
// Ignore conversion issues and redundant declarations in Qpid headers
|
|
#ifdef WANT_MAINTAINER_WARNINGS
|
|
#pragma GCC diagnostic ignored "-Wconversion"
|
|
#pragma GCC diagnostic ignored "-Wredundant-decls"
|
|
#endif
|
|
|
|
#include <qpid/client/Connection.h>
|
|
#include <qpid/client/Session.h>
|
|
#include <qpid/client/MessageListener.h>
|
|
#include <qpid/client/SubscriptionManager.h>
|
|
|
|
#include "string.hpp"
|
|
#include "list.hpp"
|
|
#include "value.hpp"
|
|
#include "monad.hpp"
|
|
#include "../../modules/scheme/eval.hpp"
|
|
|
|
namespace tuscany {
|
|
namespace queue {
|
|
|
|
/**
|
|
* Represents a Qpid connection.
|
|
*/
|
|
class QpidConnection {
|
|
public:
|
|
QpidConnection() : owner(true) {
|
|
debug("queue::qpidonnection");
|
|
c.open("localhost", 5672);
|
|
}
|
|
|
|
QpidConnection(const bool owner) : owner(owner) {
|
|
debug("queue::qpidonnection");
|
|
c.open("localhost", 5672);
|
|
}
|
|
|
|
QpidConnection(const QpidConnection& qc) : owner(false), c(qc.c) {
|
|
debug("queue::qpidonnection::copy");
|
|
}
|
|
|
|
QpidConnection& operator=(const QpidConnection& qc) = delete;
|
|
|
|
~QpidConnection() {
|
|
if (!owner)
|
|
return;
|
|
c.close();
|
|
}
|
|
|
|
private:
|
|
friend const failable<bool> close(QpidConnection& qc);
|
|
friend class QpidSession;
|
|
|
|
const bool owner;
|
|
qpid::client::Connection c;
|
|
|
|
};
|
|
|
|
/**
|
|
* Close a Qpid connection.
|
|
*/
|
|
const failable<bool> close(QpidConnection& qc) {
|
|
qc.c.close();
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Represents a Qpid session.
|
|
*/
|
|
class QpidSession {
|
|
public:
|
|
QpidSession(QpidConnection& qc) : owner(true), s(qc.c.newSession()) {
|
|
debug("queue::qpidsession");
|
|
}
|
|
|
|
QpidSession(QpidConnection& qc, const bool owner) : owner(owner), s(qc.c.newSession()) {
|
|
debug("queue::qpidsession");
|
|
}
|
|
|
|
QpidSession(const QpidSession& qs) : owner(false), s(qs.s) {
|
|
debug("queue::qpidsession::copy");
|
|
}
|
|
|
|
~QpidSession() {
|
|
if (!owner)
|
|
return;
|
|
s.close();
|
|
}
|
|
|
|
private:
|
|
friend const failable<bool> close(QpidSession& qs);
|
|
friend const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs);
|
|
friend const failable<bool> post(const value& key, const value& val, QpidSession& qs);
|
|
friend class QpidSubscription;
|
|
|
|
const bool owner;
|
|
qpid::client::Session s;
|
|
};
|
|
|
|
/**
|
|
* Close a Qpid session.
|
|
*/
|
|
const failable<bool> close(QpidSession& qs) {
|
|
try {
|
|
qs.s.close();
|
|
} catch (const qpid::Exception& e) {
|
|
return mkfailure<bool>(string("Qpid failure: ") + e.what());
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Declare a key / AMQP queue pair.
|
|
*/
|
|
const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs) {
|
|
const string ks(write(content(scheme::writeValue(key))));
|
|
try {
|
|
qs.s.queueDeclare(qpid::client::arg::queue=c_str(name));
|
|
qs.s.exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(ks));
|
|
} catch (const qpid::Exception& e) {
|
|
return mkfailure<bool>(string("Qpid failure: ") + e.what());
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Post a key / value pair message to an AMQP broker.
|
|
*/
|
|
const failable<bool> post(const value& key, const value& val, QpidSession& qs) {
|
|
|
|
// Send in a message with the given key.
|
|
const string ks(write(content(scheme::writeValue(key))));
|
|
const string vs(write(content(scheme::writeValue(val))));
|
|
try {
|
|
qpid::client::Message message;
|
|
message.getDeliveryProperties().setRoutingKey(c_str(ks));
|
|
message.setData(c_str(vs));
|
|
qs.s.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct");
|
|
} catch (const qpid::Exception& e) {
|
|
return mkfailure<bool>(string("Qpid failure: ") + e.what());
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Represents a Qpid subscription.
|
|
*/
|
|
class QpidSubscription {
|
|
public:
|
|
QpidSubscription(QpidSession& qs) : owner(true), subs(qs.s) {
|
|
}
|
|
|
|
QpidSubscription(QpidSession& qs, const bool owner) : owner(owner), subs(qs.s) {
|
|
}
|
|
|
|
QpidSubscription(const QpidSubscription& qsub) : owner(false), subs(qsub.subs) {
|
|
}
|
|
|
|
~QpidSubscription() {
|
|
if (!owner)
|
|
return;
|
|
try {
|
|
subs.stop();
|
|
} catch (const qpid::Exception& e) {
|
|
mkfailure<bool>(string("Qpid failure: ") + e.what());
|
|
}
|
|
}
|
|
|
|
private:
|
|
friend const failable<bool> listen(const string& name, const lambda<const bool(const value&, const value&)>& l, QpidSubscription& qsub);
|
|
friend const failable<bool> stop(QpidSubscription& qsub);
|
|
|
|
const bool owner;
|
|
qpid::client::SubscriptionManager subs;
|
|
};
|
|
|
|
/**
|
|
* Register a listener function with an AMQP queue.
|
|
*/
|
|
class Listener : public qpid::client::MessageListener {
|
|
public:
|
|
Listener(const lambda<const bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) {
|
|
}
|
|
|
|
virtual void received(qpid::client::Message& msg) {
|
|
|
|
// Call the listener function
|
|
const value k(content(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str())));
|
|
const value v(content(scheme::readValue(msg.getData().c_str())));
|
|
const bool r = l(k, v);
|
|
if (!r) {
|
|
try {
|
|
subs.cancel(msg.getDestination());
|
|
} catch (const qpid::Exception& e) {
|
|
mkfailure<bool>(string("Qpid failure: ") + e.what());
|
|
}
|
|
}
|
|
}
|
|
|
|
private:
|
|
const lambda<const bool(const value&, const value&)> l;
|
|
qpid::client::SubscriptionManager& subs;
|
|
};
|
|
|
|
|
|
const failable<bool> listen(const string& name, const lambda<const bool(const value&, const value&)>& l, QpidSubscription& qsub) {
|
|
debug("queue::listen");
|
|
Listener listener(l, qsub.subs);
|
|
try {
|
|
qsub.subs.subscribe(listener, c_str(name));
|
|
qsub.subs.run();
|
|
} catch (const qpid::Exception& e) {
|
|
return mkfailure<bool>(string("Qpid failure: ") + e.what());
|
|
}
|
|
debug("queue::listen::stopped");
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Stop an AMQP subscription.
|
|
*/
|
|
const failable<bool> stop(QpidSubscription& qsub) {
|
|
debug("queue::stop");
|
|
try {
|
|
qsub.subs.stop();
|
|
} catch (const qpid::Exception& e) {
|
|
return mkfailure<bool>(string("Qpid failure: ") + e.what());
|
|
}
|
|
debug("queue::stopped");
|
|
return true;
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// Re-enable conversion and redundant declarations warnings
|
|
#ifdef WANT_MAINTAINER_WARNINGS
|
|
#pragma GCC diagnostic warning "-Wconversion"
|
|
#pragma GCC diagnostic warning "-Wredundant-decls"
|
|
#endif
|
|
|
|
#endif /* tuscany_qpid_hpp */
|