/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* $Rev$ $Date$ */ #ifndef tuscany_http_hpp #define tuscany_http_hpp /** * CURL HTTP client functions. */ #include #include #include #include #include #include #include #include #include #include "string.hpp" #include "gc.hpp" #include "list.hpp" #include "value.hpp" #include "element.hpp" #include "monad.hpp" #include "parallel.hpp" #include "../atom/atom.hpp" #include "../rss/rss.hpp" #include "../json/json.hpp" namespace tuscany { namespace http { /** * CURL library runtime, one per process. */ class CURLRuntime { public: CURLRuntime() { curl_global_init(CURL_GLOBAL_ALL); } } curlRuntime; /** * Represents a CURL session handle. */ class CURLSession { public: CURLSession() : h(NULL), p(NULL), sock(NULL), wpollset(NULL), wpollfd(NULL), rpollset(NULL), rpollfd(NULL), owner(false), ca(""), cert(""), key("") { } CURLSession(const string& ca, const string& cert, const string& key) : h(curl_easy_init()), p(gc_pool(mkpool())), sock(NULL), wpollset(NULL), wpollfd(NULL), rpollset(NULL), rpollfd(NULL), owner(true), ca(ca), cert(cert), key(key) { } CURLSession(const CURLSession& c) : h(c.h), p(c.p), sock(c.sock), wpollset(c.wpollset), wpollfd(c.wpollfd), rpollset(c.rpollset), rpollfd(c.rpollfd), owner(false), ca(c.ca), cert(c.cert), key(c.key) { } ~CURLSession() { if (!owner) return; if (h == NULL) return; curl_easy_cleanup(h); destroy(p); } private: CURL* h; gc_pool p; apr_socket_t* sock; apr_pollset_t* wpollset; apr_pollfd_t* wpollfd; apr_pollset_t* rpollset; apr_pollfd_t* rpollfd; bool owner; friend CURL* handle(const CURLSession& cs); friend apr_socket_t* sock(const CURLSession& cs); friend const failable setup(const string& url, const CURLSession& cs); friend const failable connect(const string& url, CURLSession& cs); friend const failable send(const char* c, const size_t l, const CURLSession& cs); friend const failable recv(char* c, const size_t l, const CURLSession& cs); public: string ca; string cert; string key; }; /** * Returns the CURL handle used by a CURL session. */ CURL* handle(const CURLSession& cs) { return cs.h; } /** * Return an apr_socket_t for the socket used by a CURL session. */ apr_socket_t* sock(const CURLSession& cs) { return cs.sock; } /** * Convert a socket fd to an apr_socket_t. */ apr_socket_t* sock(const int sd, const gc_pool& p) { int fd = sd; apr_socket_t* s = NULL; apr_os_sock_put(&s, &fd, pool(p)); return s; } /** * Convert a CURL return code to an error string. */ const string curlreason(CURLcode rc) { return curl_easy_strerror(rc); } /** * Convert an APR status to an error string. */ const string apreason(apr_status_t rc) { char buf[256]; return apr_strerror(rc, buf, sizeof(buf)); } /** * Escape a URI or a query argument. */ const char escape_c2x[] = "0123456789ABCDEF"; const string escape(const string& unesc, const char* reserv) { char* copy = (char*)apr_palloc(gc_current_pool(), 3 * length(unesc) + 3); const unsigned char* s = (const unsigned char *)c_str(unesc); unsigned char* d = (unsigned char*)copy; unsigned c; while ((c = *s)) { if (!apr_isalnum(c) && !strchr(reserv, c)) { *d++ = '%'; *d++ = escape_c2x[c >> 4]; *d++ = escape_c2x[c & 0xf]; } else { *d++ = (unsigned char)c; } ++s; } *d = '\0'; return copy; } const string escapeURI(const string& uri) { debug(uri, "http::escapeURI::uri"); const string e = escape(uri, "?$-_.+!*'(),:@&=/~%"); debug(e, "http::escapeURI::result"); return e; } const string escapeArg(const string& arg) { debug(arg, "http::escapeArg::arg"); const string e = escape(arg, "-_.~"); debug(e, "http::escapeArg::result"); return e; } /** * Setup a CURL session */ const failable setup(const string& url, const CURLSession& cs) { // Init CURL session CURL* ch = handle(cs); curl_easy_reset(ch); curl_easy_setopt(ch, CURLOPT_USERAGENT, "libcurl/1.0"); // Setup protocol options curl_easy_setopt(ch, CURLOPT_TCP_NODELAY, true); curl_easy_setopt(ch, CURLOPT_FOLLOWLOCATION, true); curl_easy_setopt(ch, CURLOPT_POSTREDIR, CURL_REDIR_POST_ALL); // Setup SSL options if (cs.ca != "") { debug(cs.ca, "http::setup::ca"); curl_easy_setopt(ch, CURLOPT_CAINFO, c_str(cs.ca)); curl_easy_setopt(ch, CURLOPT_SSL_VERIFYPEER, true); curl_easy_setopt(ch, CURLOPT_SSL_VERIFYHOST, 2); } else curl_easy_setopt(ch, CURLOPT_SSL_VERIFYPEER, false); if (cs.cert != "") { debug(cs.cert, "http::setup::cert"); curl_easy_setopt(ch, CURLOPT_SSLCERT, c_str(cs.cert)); curl_easy_setopt(ch, CURLOPT_SSLCERTTYPE, "PEM"); } if (cs.key != "") { debug(cs.key, "http::setup::key"); curl_easy_setopt(ch, CURLOPT_SSLKEY, c_str(cs.key)); curl_easy_setopt(ch, CURLOPT_SSLKEYTYPE, "PEM"); } // Set target URL curl_easy_setopt(ch, CURLOPT_URL, c_str(escapeURI(url))); return ch; } /** * Context passed to the read callback function. */ class CURLReadContext { public: CURLReadContext(const list& ilist) : ilist(ilist) { } list ilist; }; /** * Called by CURL to read data to send. */ size_t readCallback(void *ptr, size_t size, size_t nmemb, void *data) { CURLReadContext& rcx = *static_cast(data); if (isNil(rcx.ilist)) return 0; const list f(fragment(rcx.ilist, size * nmemb)); const string s = car(f); rcx.ilist = cdr(f); memcpy(ptr, c_str(s), length(s)); return length(s); } /** * Context passed to CURL write callback function. */ template class CURLWriteContext { public: CURLWriteContext(const lambda& reduce, const R& accum) : reduce(reduce), accum(accum) { } const lambda reduce; R accum; }; /** * Called by CURL to write received data. */ template size_t writeCallback(void *ptr, size_t size, size_t nmemb, void *data) { CURLWriteContext& wcx = *(static_cast*> (data)); const size_t realsize = size * nmemb; wcx.accum = wcx.reduce(string((const char*)ptr, realsize), wcx.accum); return realsize; } /** * Apply an HTTP verb to a list containing a list of headers and a list of content, and * a reduce function used to process the response. */ curl_slist* headers(curl_slist* cl, const list& h) { if (isNil(h)) return cl; return headers(curl_slist_append(cl, c_str(string(car(h)))), cdr(h)); } template const failable > apply(const list >& hdr, const lambda& reduce, const R& initial, const string& url, const string& verb, const CURLSession& cs) { debug(url, "http::apply::url"); debug(verb, "http::apply::verb"); // Setup the CURL session const failable fch = setup(url, cs); if (!hasContent(fch)) return mkfailure>(reason(fch)); CURL* ch = content(fch); // Set the request headers curl_slist* hl = headers(NULL, car(hdr)); if (hl != NULL) curl_easy_setopt(ch, CURLOPT_HTTPHEADER, hl); // Convert request body to a string // TODO use HTTP chunking instead ostringstream os; write(cadr(hdr), os); const string s = str(os); const size_t sz = length(s); // Setup the read, write header and write data callbacks CURLReadContext rcx(mklist(s)); curl_easy_setopt(ch, CURLOPT_READFUNCTION, (size_t (*)(void*, size_t, size_t, void*))readCallback); curl_easy_setopt(ch, CURLOPT_READDATA, &rcx); CURLWriteContext hcx(reduce, initial); curl_easy_setopt(ch, CURLOPT_HEADERFUNCTION, (size_t (*)(void*, size_t, size_t, void*))(writeCallback)); curl_easy_setopt(ch, CURLOPT_HEADERDATA, &hcx); CURLWriteContext wcx(reduce, initial); curl_easy_setopt(ch, CURLOPT_WRITEFUNCTION, (size_t (*)(void*, size_t, size_t, void*))(writeCallback)); curl_easy_setopt(ch, CURLOPT_WRITEDATA, &wcx); // Apply the HTTP verb if (verb == "POST") { curl_easy_setopt(ch, CURLOPT_POST, true); curl_easy_setopt(ch, CURLOPT_POSTFIELDSIZE, sz); } else if (verb == "PUT") { curl_easy_setopt(ch, CURLOPT_UPLOAD, true); curl_easy_setopt(ch, CURLOPT_INFILESIZE, sz); } else if (verb == "DELETE") curl_easy_setopt(ch, CURLOPT_CUSTOMREQUEST, "DELETE"); const CURLcode rc = curl_easy_perform(ch); // Free the headers if (hl != NULL) curl_slist_free_all(hl); // Return the HTTP return code or content if (rc) return mkfailure >(string(curl_easy_strerror(rc))); long httprc; curl_easy_getinfo (ch, CURLINFO_RESPONSE_CODE, &httprc); if (httprc != 200 && httprc != 201) { ostringstream es; es << "HTTP code " << httprc; return mkfailure >(str(es)); } return mklist(hcx.accum, wcx.accum); } /** * Evaluate an expression remotely, at the given URL. */ const failable evalExpr(const value& expr, const string& url, const CURLSession& cs) { debug(url, "http::evalExpr::url"); debug(expr, "http::evalExpr::input"); // Convert expression to a JSON-RPC request js::JSContext cx; const failable > jsreq = json::jsonRequest(1, car(expr), cdr(expr), cx); if (!hasContent(jsreq)) return mkfailure(reason(jsreq)); // POST it to the URL const list h = mklist("Content-Type: application/json-rpc"); const failable > > res = apply >(mklist >(h, content(jsreq)), rcons, list(), url, "POST", cs); if (!hasContent(res)) return mkfailure(reason(res)); // Parse and return JSON-RPC result const failable rval = json::jsonResultValue(cadr >(content(res)), cx); debug(rval, "http::evalExpr::result"); if (!hasContent(rval)) return mkfailure(reason(rval)); return content(rval); } /** * Find and return a header. */ const failable header(const char* prefix, const list& h) { if (isNil(h)) return mkfailure(string("Couldn't find header: ") + prefix); const string s = car(h); if (find(s, prefix) != 0) return header(prefix, cdr(h)); const string l(substr(s, length(prefix))); return substr(l, 0, find_first_of(l, "\r\n")); } /** * Find and return a location header. */ const failable location(const list& h) { return header("Location: ", h); } /** * Convert a location to an entry id. */ const failable entryId(const failable l) { if (!hasContent(l)) return mkfailure(reason(l)); const string ls(content(l)); return value(mklist(string(substr(ls, find_last(ls, '/') + 1)))); } /** * Find and return a content-type header. */ const failable contentType(const list& h) { return header("Content-Type: ", h); } /** * HTTP GET, return the resource at the given URL. */ template const failable > get(const lambda& reduce, const R& initial, const string& url, const CURLSession& cs) { debug(url, "http::get::url"); const list > req = mklist(list(), list()); return apply(req, reduce, initial, url, "GET", cs); } /** * HTTP GET, return a list of values representing the resource at the given URL. */ const failable getcontent(const string& url, const CURLSession& cs) { debug(url, "http::get::url"); // Get the contents of the resource at the given URL const failable > > res = get>(rcons, list(), url, cs); if (!hasContent(res)) return mkfailure(reason(res)); const list ls(reverse(cadr(content(res)))); // Return the content as a list of values const value val(mkvalues(ls)); debug(val, "http::get::result"); return val; } /** * HTTP GET, return a list of values representing the resource at the given URL. */ const failable get(const string& url, const CURLSession& cs) { debug(url, "http::get::url"); // Get the contents of the resource at the given URL const failable > > res = get >(rcons, list(), url, cs); if (!hasContent(res)) return mkfailure(reason(res)); const string ct(content(contentType(car(content(res))))); debug(ct, "http::get::contentType"); const list ls(reverse(cadr(content(res)))); debug(ls, "http::get::content"); if (atom::isATOMEntry(ls)) { // Read an ATOM entry const value val(elementsToValues(content(atom::readATOMEntry(ls)))); debug(val, "http::get::result"); return val; } if (contains(ct, "application/atom+xml") || atom::isATOMFeed(ls)) { // Read an ATOM feed const value val(elementsToValues(content(atom::readATOMFeed(ls)))); debug(val, "http::get::result"); return val; } if (contains(ct, "application/rss+xml") || rss::isRSSFeed(ls)) { // Read an RSS feed const value val(elementsToValues(content(rss::readRSSFeed(ls)))); debug(val, "http::get::result"); return val; } if (contains(ct, "text/javascript") || contains(ct, "application/json") || json::isJSON(ls)) { // Read a JSON document js::JSContext cx; const value val(json::jsonValues(content(json::readJSON(ls, cx)))); debug(val, "http::get::result"); return val; } if (contains(ct, "application/x-javascript")) { // Read a JSON document enclosed in a javascript function call // Extract the JSON out of the enclosing parenthesis ostringstream os; write(ls, os); const string s = str(os); const size_t fp = find(s, '('); const size_t lp = find_last(s, ')'); const list jls = mklist(substr(s, fp + 1, lp - (fp + 1))); debug(jls, "http::get::javascript::content"); js::JSContext cx; const value val(json::jsonValues(content(json::readJSON(jls, cx)))); debug(val, "http::get::result"); return val; } if (contains(ct, "text/xml") || contains(ct, "application/xml") || isXML(ls)) { // Read an XML document const value val(elementsToValues(readXML(ls))); debug(val, "http::get::result"); return val; } // Return the content type and a content list const value val(mklist(ct, mkvalues(ls))); debug(val, "http::get::result"); return val; } /** * HTTP POST. */ const failable post(const value& val, const string& url, const CURLSession& cs) { // Convert value to an ATOM entry const failable > entry = atom::writeATOMEntry(valuesToElements(val)); if (!hasContent(entry)) return mkfailure(reason(entry)); debug(url, "http::post::url"); debug(content(entry), "http::post::input"); // POST it to the URL const list h = mklist("Content-Type: application/atom+xml"); const list > req = mklist >(h, content(entry)); const failable > > res = apply>(req, rcons, list(), url, "POST", cs); if (!hasContent(res)) return mkfailure(reason(res)); // Return the new entry id from the HTTP location header const failable eid(entryId(location(car(content(res))))); debug(eid, "http::post::result"); return eid; } /** * HTTP PUT. */ const failable put(const value& val, const string& url, const CURLSession& cs) { // Convert value to an ATOM entry const failable > entry = atom::writeATOMEntry(valuesToElements(val)); if (!hasContent(entry)) return mkfailure(reason(entry)); debug(url, "http::put::url"); debug(content(entry), "http::put::input"); // PUT it to the URL const list h = mklist("Content-Type: application/atom+xml"); const list > req = mklist >(h, content(entry)); const failable > > res = apply >(req, rcons, list(), url, "PUT", cs); if (!hasContent(res)) return mkfailure(reason(res)); debug(true, "http::put::result"); return value(true); } /** * HTTP DELETE. */ const failable del(const string& url, const CURLSession& cs) { debug(url, "http::delete::url"); const list > req = mklist(list(), list()); const failable > > res = apply >(req, rcons, list(), url, "DELETE", cs); if (!hasContent(res)) return mkfailure(reason(res)); debug(true, "http::delete::result"); return value(true); } /** * Returns the current host name. */ const string hostname() { char h[256]; if (gethostname(h, 256) == -1) return "localhost"; return h; } /** * Create an APR pollfd for a socket. */ apr_pollfd_t* pollfd(apr_socket_t* s, const int e, const gc_pool& p) { apr_pollfd_t* pfd = gc_new(p); pfd->p = pool(p); pfd->desc_type = APR_POLL_SOCKET; pfd->reqevents = (apr_int16_t)e; pfd->rtnevents = (apr_int16_t)e; pfd->desc.s = s; pfd->client_data = NULL; return pfd; } /** * Connect to a URL. */ const failable connect(const string& url, CURLSession& cs) { debug(url, "http::connect::url"); // Setup the CURL session const failable fch = setup(url, cs); if (!hasContent(fch)) return mkfailure(reason(fch)); CURL* ch = content(fch); // Connect curl_easy_setopt(ch, CURLOPT_CONNECT_ONLY, true); const CURLcode rc = curl_easy_perform(ch); if (rc) return mkfailure(string(curl_easy_strerror(rc))); // Convert the connected socket to an apr_socket_t int sd; const CURLcode grc = curl_easy_getinfo(ch, CURLINFO_LASTSOCKET, &sd); if (grc) return mkfailure(string(curl_easy_strerror(grc))); cs.sock = sock(sd, cs.p); // Create pollsets and pollfds which can be used to poll the socket apr_status_t rpcrc = apr_pollset_create(&cs.rpollset, 1, pool(cs.p), 0); if (rpcrc != APR_SUCCESS) return mkfailure(apreason(rpcrc)); cs.rpollfd = pollfd(cs.sock, APR_POLLIN, cs.p); apr_pollset_add(cs.rpollset, cs.rpollfd); apr_status_t wpcrc = apr_pollset_create(&cs.wpollset, 1, pool(cs.p), 0); if (wpcrc != APR_SUCCESS) return mkfailure(apreason(wpcrc)); cs.wpollfd = pollfd(cs.sock, APR_POLLOUT, cs.p); apr_pollset_add(cs.wpollset, cs.wpollfd); return true; } /** * Send an array of chars. */ const failable send(const char* c, const size_t l, const CURLSession& cs) { // Send the data size_t wl = 0; const CURLcode rc = curl_easy_send(cs.h, c, (size_t)l, &wl); if (rc == CURLE_OK && wl == (size_t)l) return true; if (rc != CURLE_AGAIN) return mkfailure(curlreason(rc)); // If the socket was not ready, wait for it to become ready const apr_pollfd_t* pollfds; apr_int32_t pollcount; apr_status_t pollrc = apr_pollset_poll(cs.wpollset, -1, &pollcount, &pollfds); if (pollrc != APR_SUCCESS) return mkfailure(apreason(pollrc)); // Send what's left return send(c + wl, l - wl, cs); } /** * Receive an array of chars. */ const failable recv(char* c, const size_t l, const CURLSession& cs) { // Receive data size_t rl; const CURLcode rc = curl_easy_recv(cs.h, c, (size_t)l, &rl); if (rc == CURLE_OK) return (size_t)rl; if (rc == 1) return 0; if (rc != CURLE_AGAIN) return mkfailure(curlreason(rc)); // If the socket was not ready, wait for it to become ready const apr_pollfd_t* pollfds; apr_int32_t pollcount; apr_status_t pollrc = apr_pollset_poll(cs.rpollset, -1, &pollcount, &pollfds); if (pollrc != APR_SUCCESS) return mkfailure(apreason(pollrc)); // Receive again return recv(c, l, cs); } /** * Converts a list of key value pairs to a query string. */ ostringstream& queryString(const list > args, ostringstream& os) { if (isNil(args)) return os; debug(car(args), "http::queryString::arg"); os << car(car(args)) << "=" << c_str(cadr(car(args))); if (!isNil(cdr(args))) os << "&"; return queryString(cdr(args), os); } const string queryString(const list > args) { ostringstream os; return str(queryString(args, os)); } /** * Filter path segment in a list of arguments. */ const bool filterPath(const value& arg) { return isString(arg); } /** * Filter query string arguments in a list of arguments. */ const bool filterQuery(const value& arg) { return isList(arg); } /** * Escape a query string argument. */ const value escapeQuery(const value& arg) { return arg; //return mklist(car(arg), escapeArg(cadr(arg))); } /** * HTTP client proxy function. */ struct proxy { proxy(const string& uri, const string& ca, const string& cert, const string& key, const gc_pool& p) : p(p), uri(uri), ca(ca), cert(cert), key(key), cs(*(new (gc_new(p)) CURLSession(ca, cert, key))) { } const value operator()(const list& args) const { const value fun = car(args); if (fun == "get") { const list lp = filter(filterPath, cadr(args)); debug(lp, "http::queryString::arg"); const list lq = map(escapeQuery, filter(filterQuery, cadr(args))); const value p = path(lp); const value q = queryString(lq); const failable val = get(uri + p + (q != ""? string("?") + q : string("")), cs); return content(val); } if (fun == "post") { const failable val = post(caddr(args), uri + path(cadr(args)), cs); return content(val); } if (fun == "put") { const failable val = put(caddr(args), uri + path(cadr(args)), cs); return content(val); } if (fun == "delete") { const failable val = del(uri + path(cadr(args)), cs); return content(val); } const failable val = evalExpr(args, uri, cs); return content(val); } const gc_pool p; const string uri; const string ca; const string cert; const string key; const CURLSession& cs; }; } } #endif /* tuscany_http_hpp */