/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* $Rev$ $Date$ */ /** * AMQP queue listener component implementation. */ #define WANT_HTTPD_LOG 1 #include "string.hpp" #include "function.hpp" #include "list.hpp" #include "value.hpp" #include "monad.hpp" #include "parallel.hpp" #include "qpid.hpp" // Ignore conversion issues and redundant declarations in Qpid headers #ifdef WANT_MAINTAINER_WARNINGS #pragma GCC diagnostic ignored "-Wconversion" #pragma GCC diagnostic ignored "-Wredundant-decls" #endif namespace tuscany { namespace queue { /** * A relay function that posts the AMQP messages it receives to a relay component reference. */ class relay { public: relay(const lambda&)>& rel) : rel(rel) { } const bool operator()(const value& k, const value& v) const { debug(k, "queue::relay::key"); debug(v, "queue::relay::value"); const value res = rel(mklist("post", isList(k)? (list)k : mklist(k), v)); return true; } private: const lambda&)> rel; }; /** * Subscribe and listen to an AMQP queue. */ class subscribe { public: subscribe(const string& qname, const lambda& l, const QpidSubscription& qsub) : qname(qname), l(l), qsub(qsub) { } const failable operator()() const { gc_pool pool; debug(qname, "queue::subscribe::listen"); const failable r = listen(qname, l, const_cast(qsub)); debug(qname, "queue::subscribe::stopped"); return r; } private: const string qname; const lambda l; const QpidSubscription qsub; }; /** * Listener lambda function, responsible for starting an AMQP subscription in a worker thread, and * apply any function calls to the listener component. The only supported function is stop(), * called to stop the listener component and shutdown the worker thread. */ class listener { public: listener(QpidConnection& qc, QpidSession& qs, QpidSubscription& qsub, worker& w) : qc(qc), qs(qs), qsub(qsub), w(w) { } const value operator()(const list& params) const { const tuscany::value func(car(params)); // Stop the component if (func != "stop") return mkfailure(); debug("queue::listener::stop"); // TODO check why stop() and close() hang in child processes stop(const_cast(qsub)); close(const_cast(qs)); close(const_cast(qc)); cancel(const_cast(w)); debug("queue::listener::stopped"); return failable(value(lambda&)>())); } private: QpidConnection qc; QpidSession qs; QpidSubscription qsub; worker w; }; /** * Start the component. */ const failable start(const list& params) { // Extract the relay reference and the AMQP key and queue name const value rel = car(params); const value pk = ((lambda&)>)cadr(params))(list()); const value key = isList(pk)? (list)pk : mklist(pk); const value qname = ((lambda&)>)caddr(params))(list()); // Create an AMQP session QpidConnection qc(false); QpidSession qs(qc, false); // Declare the configured AMQP key / queue pair declareQueue(key, qname, qs); // Listen and relay messages in a worker thread QpidSubscription qsub(qs, false); worker w(3); const lambda rl = relay(rel); submit >(w, lambda()>(subscribe(qname, rl, qsub))); // Return the listener component lambda function return value(lambda&)>(listener(qc, qs, qsub, w))); } } } extern "C" { const tuscany::value apply(const tuscany::list& params) { const tuscany::value func(car(params)); if (func == "start") return tuscany::queue::start(cdr(params)); return tuscany::mkfailure(); } }