/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* $Rev$ $Date$ */ /** * XMPP chat sender/receiver component implementation. */ #define WANT_HTTPD_LOG 1 #include "string.hpp" #include "function.hpp" #include "list.hpp" #include "value.hpp" #include "monad.hpp" #include "parallel.hpp" #include "xmpp.hpp" namespace tuscany { namespace chat { namespace sendreceiver { /** * Post an item to an XMPP JID. */ const failable post(const list& params, XMPPClient& xc) { const value to = car(car(params)); const value val = cadr(params); debug(to, "chat::post::jid"); debug(val, "chat::post::value"); const failable r = post(to, val, xc); if (!hasContent(r)) return mkfailure(r); return value(mklist(to)); } /** * A relay function that posts the XMPP messages it receives to a relay component reference. */ class relay { public: relay(const lambda&)>& rel) : rel(rel) { } const failable operator()(const value& jid, const value& val, unused XMPPClient& xc) const { if (isNil(rel)) return true; debug(jid, "chat::relay::jid"); debug(val, "chat::relay::value"); const value res = rel(mklist("post", mklist(jid), val)); return true; } private: const lambda&)> rel; }; /** * Subscribe and listen to an XMPP session. */ class subscribe { public: subscribe(const lambda(const value&, const value&, XMPPClient&)>& l, XMPPClient& xc) : l(l), xc(xc) { } const failable operator()() const { gc_pool pool; debug("chat::subscribe::listen"); const failable r = listen(l, const_cast(xc)); debug("chat::subscribe::stopped"); return r; } private: const lambda(const value&, const value&, XMPPClient&)> l; XMPPClient xc; }; /** * Chat sender/receiver component lambda function */ class chatSenderReceiver { public: chatSenderReceiver(XMPPClient& xc, worker& w) : xc(xc), w(w) { } const value operator()(const list& params) const { const tuscany::value func(car(params)); if (func == "post") return post(cdr(params), const_cast(xc)); // Stop the chat sender/receiver component if (func != "stop") return mkfailure(); debug("chat::sendreceiver::stop"); // Disconnect and shutdown the worker thread disconnect(const_cast(xc)); cancel(const_cast(w)); debug("chat::sendreceiver::stopped"); return failable(value(lambda&)>())); } private: const XMPPClient xc; worker w; }; /** * Start the component. */ const failable start(const list& params) { // Extract the relay reference and the XMPP JID and password const bool hasRelay = !isNil(cddr(params)); const value rel = hasRelay? car(params) : value(lambda&)>()); const list props = hasRelay? cdr(params) : params; const value jid = ((lambda)>)car(props))(list()); const value pass = ((lambda)>)cadr(props))(list()); // Create an XMPP client session XMPPClient xc(jid, pass, false); const failable r = connect(xc); if (!hasContent(r)) return mkfailure(r); // Listen and relay messages in a worker thread worker w(3); const lambda(const value&, const value&, XMPPClient&)> rl = relay(rel); submit >(w, lambda()>(subscribe(rl, xc))); // Return the chat sender/receiver component lambda function return value(lambda&)>(chatSenderReceiver(xc, w))); } } } } extern "C" { const tuscany::value apply(const tuscany::list& params) { const tuscany::value func(car(params)); if (func == "start") return tuscany::chat::sendreceiver::start(cdr(params)); return tuscany::mkfailure(); } }