/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* $Rev$ $Date$ */ /** * AMQP queue listener component implementation. */ #define WANT_HTTPD_LOG 1 #include "string.hpp" #include "function.hpp" #include "list.hpp" #include "value.hpp" #include "monad.hpp" #include "parallel.hpp" #include "qpid.hpp" // Ignore conversion issues and redundant declarations in Qpid headers #ifdef WANT_MAINTAINER_WARNINGS #pragma GCC diagnostic ignored "-Wconversion" #pragma GCC diagnostic ignored "-Wredundant-decls" #endif namespace tuscany { namespace queue { /** * Start the component. */ const failable start(const list& params) { // Extract the relay reference and the AMQP key and queue name const value rel = car(params); const value pk = ((lvvlambda)cadr(params))(nilListValue); const value key = isList(pk)? (list)pk : mklist(pk); const value qname = ((lvvlambda)caddr(params))(nilListValue); // Create an AMQP session QpidConnection qc(false); QpidSession qs(qc, false); // Declare the configured AMQP key / queue pair declareQueue(key, qname, qs); // Listen and relay messages in a worker thread QpidSubscription qsub(qs, false); const worker w(3); const lambda rl = [rel](const value& k, const value& v) -> const bool { // A relay function that posts the AMQP messages it receives to a relay component reference. debug(k, "queue::relay::key"); debug(v, "queue::relay::value"); const value res = rel(mklist("post", isList(k)? (list)k : mklist(k), v)); return true; }; // Subscribe and listen to the AMQP queue. const lambda()> subscribe = [qname, rl, qsub]() -> const failable { const gc_pool pool; debug(qname, "queue::subscribe::listen"); const failable r = listen(qname, rl, const_cast(qsub)); debug(qname, "queue::subscribe::stopped"); return r; }; submit >(w, subscribe); // Return the listener component lambda function const lvvlambda listener = [qc, qs, qsub, w](const list& params) -> const value { const tuscany::value func(car(params)); // Stop the component if (func != "stop") return mkfailure(); debug("queue::listener::stop"); // TODO check why stop() and close() hang in child processes stop(const_cast(qsub)); close(const_cast(qs)); close(const_cast(qc)); cancel(const_cast(w)); debug("queue::listener::stopped"); return failable(value(lvvlambda())); }; return value(listener); } } } extern "C" { const tuscany::value apply(const tuscany::list& params) { const tuscany::value func(car(params)); if (func == "start") return tuscany::queue::start(cdr(params)); return tuscany::mkfailure(); } }