From 9e1b9e73145e00ea591bd1e0e9777625bad66dc9 Mon Sep 17 00:00:00 2001 From: jsdelfino Date: Thu, 3 Jan 2013 07:41:14 +0000 Subject: Add support for HTTP patch and application of patch scripts to server and data store components. git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1428192 13f79535-47bb-0310-9956-ffa450edef68 --- sca-cpp/trunk/components/cache/client-test.cpp | 26 +++ sca-cpp/trunk/components/cache/datacache.cpp | 16 ++ sca-cpp/trunk/components/cache/memcache-test.cpp | 2 + sca-cpp/trunk/components/cache/memcache.cpp | 35 ++++ sca-cpp/trunk/components/cache/memcache.hpp | 23 ++- sca-cpp/trunk/components/cache/partitioner.cpp | 19 ++ sca-cpp/trunk/components/constdb/client-test.cpp | 26 +++ sca-cpp/trunk/components/constdb/constdb.cpp | 35 ++++ sca-cpp/trunk/components/constdb/tinycdb-test.cpp | 2 + sca-cpp/trunk/components/constdb/tinycdb.hpp | 43 ++++- sca-cpp/trunk/components/filedb/client-test.cpp | 26 +++ sca-cpp/trunk/components/filedb/file-test.cpp | 3 + sca-cpp/trunk/components/filedb/filedb.cpp | 35 ++++ sca-cpp/trunk/components/filedb/filedb.hpp | 27 ++- sca-cpp/trunk/components/sqldb/client-test.cpp | 26 +++ sca-cpp/trunk/components/sqldb/pgsql-conf | 6 +- sca-cpp/trunk/components/sqldb/pgsql-standby-conf | 7 +- .../trunk/components/sqldb/pgsql-standby-test.cpp | 3 + sca-cpp/trunk/components/sqldb/pgsql-test.cpp | 2 + sca-cpp/trunk/components/sqldb/pgsql.hpp | 207 ++++++++++++++++++--- sca-cpp/trunk/components/sqldb/sqldb.cpp | 61 ++++++ 21 files changed, 591 insertions(+), 39 deletions(-) (limited to 'sca-cpp/trunk/components') diff --git a/sca-cpp/trunk/components/cache/client-test.cpp b/sca-cpp/trunk/components/cache/client-test.cpp index 5e9be6c14a..3c8a261f84 100644 --- a/sca-cpp/trunk/components/cache/client-test.cpp +++ b/sca-cpp/trunk/components/cache/client-test.cpp @@ -81,6 +81,32 @@ const bool testCache(const string& uri) { assert(hasContent(val)); assert(content(val) == b); } + + const list k = nilListValue + "content" + (nilListValue + "item" + + (nilListValue + "name" + string("Apple")) + + (nilListValue + "price" + string("$3.99"))); + const list c = nilListValue + (nilListValue + "entry" + + (nilListValue + "title" + string("item")) + + (nilListValue + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b")) + + k); + + { + const list s = nilListValue + "content" + + (nilListValue + "patch" + string("(define (patch id e) (tree-subst-assoc '(price) '(price \"$3.99\") e))")); + const list ps = nilListValue + (nilListValue + "entry" + + (nilListValue + "title" + string("item")) + + (nilListValue + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b")) + + s); + + const failable r = http::patch(ps, uri + p, cs); + assert(hasContent(r)); + assert(content(r) == trueValue); + } + { + const failable val = http::get(uri + p, cs); + assert(hasContent(val)); + assert(content(val) == c); + } { const failable r = http::del(uri + p, cs); assert(hasContent(r)); diff --git a/sca-cpp/trunk/components/cache/datacache.cpp b/sca-cpp/trunk/components/cache/datacache.cpp index 975ca43dce..4fafd9e345 100644 --- a/sca-cpp/trunk/components/cache/datacache.cpp +++ b/sca-cpp/trunk/components/cache/datacache.cpp @@ -94,6 +94,20 @@ const failable put(const value& key, const value& val, unused const lvvla return trueValue; } +/** + * Patch an item in the cache. + */ +const failable patch(const value& key, const value& val, unused const lvvlambda& rcache1, const lvvlambda& wcache1, unused const lvvlambda& rcache2, const lvvlambda& wcache2) { + + // Update level1 cache + wcache1(mklist("patch", key, val)); + + // Update level2 cache + wcache2(mklist("patch", key, val)); + + return trueValue; +} + /** * Delete an item from the cache. */ @@ -121,6 +135,8 @@ const tuscany::value apply(const tuscany::list& params) { return tuscany::datacache::post(cadr(params), caddr(params), cadddr(params), caddddr(params), cadddddr(params), caddddddr(params)); if (func == "put") return tuscany::datacache::put(cadr(params), caddr(params), cadddr(params), caddddr(params), cadddddr(params), caddddddr(params)); + if (func == "patch") + return tuscany::datacache::patch(cadr(params), caddr(params), cadddr(params), caddddr(params), cadddddr(params), caddddddr(params)); if (func == "delete") return tuscany::datacache::del(cadr(params), caddr(params), cadddr(params), caddddr(params), cadddddr(params)); return tuscany::mkfailure(); diff --git a/sca-cpp/trunk/components/cache/memcache-test.cpp b/sca-cpp/trunk/components/cache/memcache-test.cpp index 6c6adb0541..10eda45eae 100644 --- a/sca-cpp/trunk/components/cache/memcache-test.cpp +++ b/sca-cpp/trunk/components/cache/memcache-test.cpp @@ -40,6 +40,8 @@ bool testMemCached() { assert(get(k, ch) == value(string("AAA"))); assert(hasContent(put(k, string("aaa"), ch))); assert(get(k, ch) == value(string("aaa"))); + assert(hasContent(patch(k, string("bbb"), ch))); + assert(get(k, ch) == value(string("bbb"))); assert(hasContent(del(k, ch))); assert(!hasContent(get(k, ch))); diff --git a/sca-cpp/trunk/components/cache/memcache.cpp b/sca-cpp/trunk/components/cache/memcache.cpp index 2e4597efd3..e1a3c7e9af 100644 --- a/sca-cpp/trunk/components/cache/memcache.cpp +++ b/sca-cpp/trunk/components/cache/memcache.cpp @@ -62,6 +62,39 @@ const failable put(const list& params, const memcache::MemCached& return value(content(val)); } +/** + * Patch an item in the cache. + */ +const failable patch(const list& params, const memcache::MemCached& ch) { + // Read patch + value p = assoc("patch", assoc("content", car(cadr(params)))); + if (isNil(p)) + return mkfailure("Couldn't read patch script"); + const string script = cadr(p); + debug(script, "memcache::patch::script"); + istringstream is(script); + + // Get existing value from cache + const failable ival = memcache::get(car(params), ch); + if (!hasContent(ival) && rcode(ival) != 404) + return mkfailure(ival); + + // Apply patch + scheme::Env env = scheme::setupEnvironment(); + const value pval = scheme::evalScript(cons("patch", scheme::quotedParameters(mklist(car(params), hasContent(ival)? content(ival) : (value)list()))), is, env); + if (isNil(pval)) { + ostringstream os; + os << "Couldn't patch memcached entry: " << car(params); + return mkfailure(str(os), 404, false); + } + + // Push patched value to cache + const failable val = memcache::patch(car(params), pval, ch); + if (!hasContent(val)) + return mkfailure(val); + return value(content(val)); +} + /** * Delete an item from the cache. */ @@ -98,6 +131,8 @@ const failable start(const list& params) { return post(cdr(params), ch); if (func == "put") return put(cdr(params), ch); + if (func == "patch") + return patch(cdr(params), ch); if (func == "delete") return del(cdr(params), ch); return mkfailure(); diff --git a/sca-cpp/trunk/components/cache/memcache.hpp b/sca-cpp/trunk/components/cache/memcache.hpp index 00ee9c6291..7962d1caa0 100644 --- a/sca-cpp/trunk/components/cache/memcache.hpp +++ b/sca-cpp/trunk/components/cache/memcache.hpp @@ -73,6 +73,7 @@ private: friend const failable post(const value& key, const value& val, const MemCached& cache); friend const failable put(const value& key, const value& val, const MemCached& cache); + friend const failable patch(const value& key, const value& val, const MemCached& cache); friend const failable get(const value& key, const MemCached& cache); friend const failable del(const value& key, const MemCached& cache); @@ -176,6 +177,26 @@ const failable put(const value& key, const value& val, const MemCached& ca return true; } +/** + * Patch an item in the cache. If the item doesn't exist it is added. + */ +const failable patch(const value& key, const value& val, const MemCached& cache) { + debug(key, "memcache::patch::key"); + debug(val, "memcache::patch::value"); + + const string ks(write(content(scheme::writeValue(key)))); + const string vs(write(content(scheme::writeValue(val)))); + const apr_status_t rc = apr_memcache_set(cache.mc, nospaces(c_str(ks)), const_cast(c_str(vs)), length(vs), 0, 27); + if (rc != APR_SUCCESS) { + ostringstream os; + os << "Couldn't set memcached entry: " << key; + return mkfailure(str(os)); + } + + debug(true, "memcache::patch::result"); + return true; +} + /** * Get an item from the cache. */ @@ -209,7 +230,7 @@ const failable del(const value& key, const MemCached& cache) { if (rc != APR_SUCCESS) { ostringstream os; os << "Couldn't delete memcached entry: " << key; - return mkfailure(str(os)); + return rc == APR_NOTFOUND? mkfailure(str(os), 404, false) : mkfailure(str(os)); } debug(true, "memcache::delete::result"); diff --git a/sca-cpp/trunk/components/cache/partitioner.cpp b/sca-cpp/trunk/components/cache/partitioner.cpp index a38c053358..8a56a7f932 100644 --- a/sca-cpp/trunk/components/cache/partitioner.cpp +++ b/sca-cpp/trunk/components/cache/partitioner.cpp @@ -142,6 +142,23 @@ const failable put(const value& key, const value& val, const lvvlambda& s return trueValue; } +/** + * Patch an item in a partition. + */ +const failable patch(const value& key, const value& val, const lvvlambda& selector, const list& partitions) { + + // Select partition + const failable > p = partition(key, selector, partitions); + if (!hasContent(p)) + return mkfailure(p); + + // Path item in selected partition + const lvvlambda l = car(content(p)); + l(mklist("patch", key, val)); + + return trueValue; +} + /** * Delete an item from a partition. */ @@ -172,6 +189,8 @@ const tuscany::value apply(const tuscany::list& params) { return tuscany::partitioner::post(cadr(params), caddr(params), cadddr(params), cddddr(params)); if (func == "put") return tuscany::partitioner::put(cadr(params), caddr(params), cadddr(params), cddddr(params)); + if (func == "patch") + return tuscany::partitioner::patch(cadr(params), caddr(params), cadddr(params), cddddr(params)); if (func == "delete") return tuscany::partitioner::del(cadr(params), caddr(params), cdddr(params)); return tuscany::mkfailure(); diff --git a/sca-cpp/trunk/components/constdb/client-test.cpp b/sca-cpp/trunk/components/constdb/client-test.cpp index b796ef01dd..165e3d8836 100644 --- a/sca-cpp/trunk/components/constdb/client-test.cpp +++ b/sca-cpp/trunk/components/constdb/client-test.cpp @@ -77,6 +77,32 @@ const bool testConstDb() { assert(hasContent(val)); assert(content(val) == b); } + + const list k = nilListValue + "content" + (nilListValue + "item" + + (nilListValue + "name" + string("Apple")) + + (nilListValue + "price" + string("$3.99"))); + const list c = nilListValue + (nilListValue + "entry" + + (nilListValue + "title" + string("item")) + + (nilListValue + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b")) + + k); + + { + const list s = nilListValue + "content" + + (nilListValue + "patch" + string("(define (patch id e) (tree-subst-assoc '(price) '(price \"$3.99\") e))")); + const list ps = nilListValue + (nilListValue + "entry" + + (nilListValue + "title" + string("item")) + + (nilListValue + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b")) + + s); + + const failable r = http::patch(ps, uri + p, cs); + assert(hasContent(r)); + assert(content(r) == trueValue); + } + { + const failable val = http::get(uri + p, cs); + assert(hasContent(val)); + assert(content(val) == c); + } { const failable r = http::del(uri + p, cs); assert(hasContent(r)); diff --git a/sca-cpp/trunk/components/constdb/constdb.cpp b/sca-cpp/trunk/components/constdb/constdb.cpp index a9a5bc5817..2d34ed2b73 100644 --- a/sca-cpp/trunk/components/constdb/constdb.cpp +++ b/sca-cpp/trunk/components/constdb/constdb.cpp @@ -62,6 +62,39 @@ const failable put(const list& params, const tinycdb::TinyCDB& cdb return value(content(val)); } +/** + * Patch an item in the database. + */ +const failable patch(const list& params, const tinycdb::TinyCDB& cdb) { + // Read patch + value p = assoc("patch", assoc("content", car(cadr(params)))); + if (isNil(p)) + return mkfailure("Couldn't read patch script"); + const string script = cadr(p); + debug(script, "tinycdb::patch::script"); + istringstream is(script); + + // Get existing value from database + const failable ival = tinycdb::get(car(params), cdb); + if (!hasContent(ival) && rcode(ival) != 404) + return mkfailure(ival); + + // Apply patch + scheme::Env env = scheme::setupEnvironment(); + const value pval = scheme::evalScript(cons("patch", scheme::quotedParameters(mklist(car(params), hasContent(ival)? content(ival) : (value)list()))), is, env); + if (isNil(pval)) { + ostringstream os; + os << "Couldn't patch tinycdb entry: " << car(params); + return mkfailure(str(os), 404, false); + } + + // Push patched value to database + const failable val = tinycdb::patch(car(params), pval, cdb); + if (!hasContent(val)) + return mkfailure(val); + return value(content(val)); +} + /** * Delete an item from the database. */ @@ -89,6 +122,8 @@ const failable start(unused const list& params) { return post(cdr(params), cdb); if (func == "put") return put(cdr(params), cdb); + if (func == "patch") + return patch(cdr(params), cdb); if (func == "delete") return del(cdr(params), cdb); return mkfailure(); diff --git a/sca-cpp/trunk/components/constdb/tinycdb-test.cpp b/sca-cpp/trunk/components/constdb/tinycdb-test.cpp index 6cc9e9eabb..41bfe12772 100644 --- a/sca-cpp/trunk/components/constdb/tinycdb-test.cpp +++ b/sca-cpp/trunk/components/constdb/tinycdb-test.cpp @@ -40,6 +40,8 @@ const bool testTinyCDB() { assert((get(k, cdb)) == value(string("AAA"))); assert(hasContent(put(k, string("aaa"), cdb))); assert((get(k, cdb)) == value(string("aaa"))); + assert(hasContent(patch(k, string("bbb"), cdb))); + assert((get(k, cdb)) == value(string("bbb"))); assert(hasContent(del(k, cdb))); assert(!hasContent(get(k, cdb))); diff --git a/sca-cpp/trunk/components/constdb/tinycdb.hpp b/sca-cpp/trunk/components/constdb/tinycdb.hpp index ce1dcbb011..3da5f3c216 100644 --- a/sca-cpp/trunk/components/constdb/tinycdb.hpp +++ b/sca-cpp/trunk/components/constdb/tinycdb.hpp @@ -388,6 +388,37 @@ const failable put(const value& key, const value& val, const TinyCDB& cdb) return r; } +/** + * Patch an item in the database. If the item doesn't exist it is added. + */ +const failable patch(const value& key, const value& val, const TinyCDB& cdb) { + debug(key, "tinycdb::patch::key"); + debug(val, "tinycdb::patch::value"); + debug(dbname(cdb), "tinycdb::patch::dbname"); + + const string ks(write(content(scheme::writeValue(key)))); + const string vs(write(content(scheme::writeValue(val)))); + + // Process each entry and skip existing key + const lambda(buffer&, const unsigned int, const unsigned int)> update = [ks](buffer& buf, const unsigned int klen, unused const unsigned int vlen) -> const failable { + if (ks == string((char*)buf, klen)) + return false; + return true; + }; + + // Add the new entry to the db + const lambda(struct cdb_make&)> finish = [ks, vs](struct cdb_make& cdbm) -> const failable { + if (cdb_make_add(&cdbm, c_str(ks), (unsigned int)length(ks), c_str(vs), (unsigned int)length(vs)) == -1) + return mkfailure(string("Couldn't add tinycdb entry: ") + ks); + return true; + }; + + // Rewrite the db + const failable r = rewrite(update, finish, cdb); + debug(r, "tinycdb::patch::result"); + return r; +} + /** * Get an item from the database. */ @@ -425,11 +456,14 @@ const failable del(const value& key, const TinyCDB& cdb) { debug(dbname(cdb), "tinycdb::delete::dbname"); const string ks(write(content(scheme::writeValue(key)))); + bool found = false; // Process each entry and skip existing key - const lambda(buffer&, const unsigned int, const unsigned int)> update = [ks](buffer& buf, const unsigned int klen, unused const unsigned int vlen) -> const failable { - if (ks == string((char*)buf, klen)) + const lambda(buffer&, const unsigned int, const unsigned int)> update = [ks, &found](buffer& buf, const unsigned int klen, unused const unsigned int vlen) -> const failable { + if (ks == string((char*)buf, klen)) { + found = true; return false; + } return true; }; @@ -440,6 +474,11 @@ const failable del(const value& key, const TinyCDB& cdb) { // Rewrite the db const failable r = rewrite(update, finish, cdb); + if (!hasContent(r) || !found) { + ostringstream os; + os << "Couldn't delete tinycdb entry: " << key; + return hasContent(r)? mkfailure(str(os), 404, false) : r; + } debug(r, "tinycdb::delete::result"); return r; } diff --git a/sca-cpp/trunk/components/filedb/client-test.cpp b/sca-cpp/trunk/components/filedb/client-test.cpp index 5694d97522..96c0212096 100644 --- a/sca-cpp/trunk/components/filedb/client-test.cpp +++ b/sca-cpp/trunk/components/filedb/client-test.cpp @@ -77,6 +77,32 @@ const bool testFileDB() { assert(hasContent(val)); assert(content(val) == b); } + + const list k = nilListValue + "content" + (nilListValue + "item" + + (nilListValue + "name" + string("Apple")) + + (nilListValue + "price" + string("$3.99"))); + const list c = nilListValue + (nilListValue + "entry" + + (nilListValue + "title" + string("item")) + + (nilListValue + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b")) + + k); + + { + const list s = nilListValue + "content" + + (nilListValue + "patch" + string("(define (patch id e) (tree-subst-assoc '(price) '(price \"$3.99\") e))")); + const list ps = nilListValue + (nilListValue + "entry" + + (nilListValue + "title" + string("item")) + + (nilListValue + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b")) + + s); + + const failable r = http::patch(ps, uri + p, cs); + assert(hasContent(r)); + assert(content(r) == trueValue); + } + { + const failable val = http::get(uri + p, cs); + assert(hasContent(val)); + assert(content(val) == c); + } { const failable r = http::del(uri + p, cs); assert(hasContent(r)); diff --git a/sca-cpp/trunk/components/filedb/file-test.cpp b/sca-cpp/trunk/components/filedb/file-test.cpp index 5270967ccb..55272800f0 100644 --- a/sca-cpp/trunk/components/filedb/file-test.cpp +++ b/sca-cpp/trunk/components/filedb/file-test.cpp @@ -38,11 +38,14 @@ const bool testFileDB(const string& dbname, const string& format) { const list a = mklist(nilListValue + "ns1:a" + (nilListValue + "@xmlns:ns1" + string("http://aaa")) + (nilListValue + "text" + string("Hey!"))); const list b = mklist(nilListValue + "ns1:b" + (nilListValue + "@xmlns:ns1" + string("http://bbb")) + (nilListValue + "text" + string("Hey!"))); + const list c = mklist(nilListValue + "ns1:c" + (nilListValue + "@xmlns:ns1" + string("http://ccc")) + (nilListValue + "text" + string("Hey!"))); assert(hasContent(post(k, a, db))); assert((get(k, db)) == value(a)); assert(hasContent(put(k, b, db))); assert((get(k, db)) == value(b)); + assert(hasContent(patch(k, c, db))); + assert((get(k, db)) == value(c)); assert(hasContent(del(k, db))); assert(!hasContent(get(k, db))); assert(hasContent(post(k, a, db))); diff --git a/sca-cpp/trunk/components/filedb/filedb.cpp b/sca-cpp/trunk/components/filedb/filedb.cpp index 37cb6c5260..b28cc9bedb 100644 --- a/sca-cpp/trunk/components/filedb/filedb.cpp +++ b/sca-cpp/trunk/components/filedb/filedb.cpp @@ -62,6 +62,39 @@ const failable put(const list& params, const filedb::FileDB& db) { return value(content(val)); } +/** + * Patch an item in the database. + */ +const failable patch(const list& params, const filedb::FileDB& db) { + // Read patch + value p = assoc("patch", assoc("content", car(cadr(params)))); + if (isNil(p)) + return mkfailure("Couldn't read patch script"); + const string script = cadr(p); + debug(script, "filedb::patch::script"); + istringstream is(script); + + // Get existing value from database + const failable ival = filedb::get(car(params), db); + if (!hasContent(ival) && rcode(ival) != 404) + return mkfailure(ival); + + // Apply patch + scheme::Env env = scheme::setupEnvironment(); + const value pval = scheme::evalScript(cons("patch", scheme::quotedParameters(mklist(car(params), hasContent(ival)? content(ival) : (value)list()))), is, env); + if (isNil(pval)) { + ostringstream os; + os << "Couldn't patch file database entry: " << car(params); + return mkfailure(str(os), 404, false); + } + + // Push patched value to database + const failable val = filedb::patch(car(params), pval, db); + if (!hasContent(val)) + return mkfailure(val); + return value(content(val)); +} + /** * Delete an item from the database. */ @@ -91,6 +124,8 @@ const failable start(const list& params) { return post(cdr(params), db); if (func == "put") return put(cdr(params), db); + if (func == "patch") + return patch(cdr(params), db); if (func == "delete") return del(cdr(params), db); return mkfailure(); diff --git a/sca-cpp/trunk/components/filedb/filedb.hpp b/sca-cpp/trunk/components/filedb/filedb.hpp index 2855cebfc6..41dde88bef 100644 --- a/sca-cpp/trunk/components/filedb/filedb.hpp +++ b/sca-cpp/trunk/components/filedb/filedb.hpp @@ -82,6 +82,7 @@ private: friend const failable read(istream& is, const string& format); friend const failable post(const value& key, const value& val, const FileDB& db); friend const failable put(const value& key, const value& val, const FileDB& db); + friend const failable patch(const value& key, const value& val, const FileDB& db); friend const failable get(const value& key, const FileDB& db); friend const failable del(const value& key, const FileDB& db); }; @@ -207,6 +208,30 @@ const failable put(const value& key, const value& val, const FileDB& db) { return r; } +/** + * Patch an item in the database. If the item doesn't exist it is added. + */ +const failable patch(const value& key, const value& val, const FileDB& db) { + debug(key, "filedb::patch::key"); + debug(val, "filedb::patch::value"); + debug(db.name, "filedb::patch::dbname"); + + if (isList(key)) + mkdirs(key, db.name); + const string fn = filename(key, db.name); + debug(fn, "filedb::patch::filename"); + ofstream os(fn); + if (os.fail()) { + ostringstream os; + os << "Couldn't patch file database entry: " << key; + return mkfailure(str(os)); + } + const failable r = write(val, os, db.format); + + debug(r, "filedb::patch::result"); + return r; +} + /** * Get an item from the database. */ @@ -241,7 +266,7 @@ const failable del(const value& key, const FileDB& db) { if (rc == -1) { ostringstream os; os << "Couldn't delete file database entry: " << key; - return mkfailure(str(os)); + return errno == ENOENT? mkfailure(str(os), 404, false) : mkfailure(str(os)); } debug(true, "filedb::delete::result"); diff --git a/sca-cpp/trunk/components/sqldb/client-test.cpp b/sca-cpp/trunk/components/sqldb/client-test.cpp index c9fbb7d5bb..fcdb8d3c2a 100644 --- a/sca-cpp/trunk/components/sqldb/client-test.cpp +++ b/sca-cpp/trunk/components/sqldb/client-test.cpp @@ -77,6 +77,32 @@ const bool testSqlDb() { assert(hasContent(val)); assert(content(val) == b); } + + const list k = nilListValue + "content" + (nilListValue + "item" + + (nilListValue + "name" + string("Apple")) + + (nilListValue + "price" + string("$3.99"))); + const list c = nilListValue + (nilListValue + "entry" + + (nilListValue + "title" + string("item")) + + (nilListValue + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b")) + + k); + + { + const list s = nilListValue + "content" + + (nilListValue + "patch" + string("(define (patch id e) (tree-subst-assoc '(price) '(price \"$3.99\") e))")); + const list ps = nilListValue + (nilListValue + "entry" + + (nilListValue + "title" + string("item")) + + (nilListValue + "id" + string("cart-53d67a61-aa5e-4e5e-8401-39edeba8b83b")) + + s); + + const failable r = http::patch(ps, uri + p, cs); + assert(hasContent(r)); + assert(content(r) == trueValue); + } + { + const failable val = http::get(uri + p, cs); + assert(hasContent(val)); + assert(content(val) == c); + } { const failable r = http::del(uri + p, cs); assert(hasContent(r)); diff --git a/sca-cpp/trunk/components/sqldb/pgsql-conf b/sca-cpp/trunk/components/sqldb/pgsql-conf index 8adbb902c9..020ce129fb 100755 --- a/sca-cpp/trunk/components/sqldb/pgsql-conf +++ b/sca-cpp/trunk/components/sqldb/pgsql-conf @@ -96,6 +96,7 @@ archive_command = '$here/pgsql-archive $root $host $bport %p %f' wal_level = hot_standby max_wal_senders = 5 wal_keep_segments = 32 +#synchronous_standby_names = '*' EOF @@ -166,7 +167,6 @@ cat >$root/sqldb/data/pgbouncer.conf <$root/sqldb/data/recovery.conf << EOF # Start in standby mode standby_mode = 'on' -primary_conninfo = 'host=$mhost port=$mport user=standby' +primary_conninfo = 'host=$mhost port=$mport user=standby application_name=$host:$port' # Failover trigger_file = '$root/sqldb/failover' @@ -165,7 +165,6 @@ cat >$root/sqldb/data/pgbouncer.conf <(string("Couldn't execute postgresql column select statement: ") + pgfailure(kr, conn)); + const string rs = string("Couldn't execute postgresql column select statement: ") + pgfailure(kr, conn); + PQclear(kr); + mkfailure(rs); return; } if (PQntuples(kr) != 2) { + const string rs = "Couldn't find postgresql table key and value column names"; PQclear(kr); - mkfailure(string("Couldn't find postgresql table key and value column names")); + mkfailure(rs); return; } kname = c_str(string(PQgetvalue(kr, 0, 0))); @@ -95,12 +98,10 @@ public: PGSql& operator=(const PGSql& c) = delete; ~PGSql() { - debug("pgsql::~pgsql"); if (!owner) return; if (conn == NULL) return; - debug(conn, "pgsql::~pgsql::conn"); PQfinish(conn); } @@ -113,8 +114,12 @@ private: const char* vname; friend const failable setup(const PGSql& pgsql); + friend const failable begin(const PGSql& pgsql); + friend const failable commit(const PGSql& pgsql); + friend const failable rollback(const PGSql& pgsql); friend const failable post(const value& key, const value& val, const PGSql& pgsql); friend const failable put(const value& key, const value& val, const PGSql& pgsql); + friend const failable patch(const value& key, const value& val, const PGSql& pgsql); friend const failable get(const value& key, const PGSql& pgsql); friend const failable del(const value& key, const PGSql& pgsql); }; @@ -133,6 +138,69 @@ const failable setup(const PGSql& pgsql) { return true; } +/** + * Begin a database transaction. + */ +const failable begin(const PGSql& pgsql) { + debug("pgsql::begin"); + debug(pgsql.conninfo, "pgsql::begin::conninfo"); + debug(pgsql.table, "pgsql::begin::table"); + setup(pgsql); + + PGresult* const r = PQexec(pgsql.conn, "begin transaction isolation level repeatable read"); + if (PQresultStatus(r) != PGRES_COMMAND_OK) { + const string rs = string("Couldn't execute begin SQL statement: ") + pgfailure(r, pgsql.conn); + PQclear(r); + return mkfailure(rs); + } + PQclear(r); + + debug(true, "pgsql::begin::result"); + return true; +} + +/** + * Commit a database transaction. + */ +const failable commit(const PGSql& pgsql) { + debug("pgsql::commit"); + debug(pgsql.conninfo, "pgsql::commit::conninfo"); + debug(pgsql.table, "pgsql::commit::table"); + setup(pgsql); + + PGresult* const r = PQexec(pgsql.conn, "commit"); + if (PQresultStatus(r) != PGRES_COMMAND_OK) { + const string rs = string("Couldn't execute commit SQL statement: ") + pgfailure(r, pgsql.conn); + PQclear(r); + return mkfailure(rs); + } + PQclear(r); + + debug(true, "pgsql::commit::result"); + return true; +} + +/** + * Rollback a database transaction. + */ +const failable rollback(const PGSql& pgsql) { + debug("pgsql::rollback"); + debug(pgsql.conninfo, "pgsql::rollback::conninfo"); + debug(pgsql.table, "pgsql::rollback::table"); + setup(pgsql); + + PGresult* const r = PQexec(pgsql.conn, "rollback"); + if (PQresultStatus(r) != PGRES_COMMAND_OK) { + const string rs = string("Couldn't execute rollback SQL statement: ") + pgfailure(r, pgsql.conn); + PQclear(r); + return mkfailure(rs); + } + PQclear(r); + + debug(true, "pgsql::rollback::result"); + return true; +} + /** * Post a new item to the database. */ @@ -147,8 +215,11 @@ const failable post(const value& key, const value& val, const PGSql& pgsql const string vs(write(content(scheme::writeValue(val)))); const char* const params[2] = { c_str(ks), c_str(vs) }; PGresult* const r = PQexecParams(pgsql.conn, c_str(string("insert into ") + pgsql.table + string(" values($1, $2);")), 2, NULL, params, NULL, NULL, 0); - if (PQresultStatus(r) != PGRES_COMMAND_OK) - return mkfailure(string("Couldn't execute insert postgresql SQL statement: ") + pgfailure(r, pgsql.conn)); + if (PQresultStatus(r) != PGRES_COMMAND_OK) { + const string rs = string("Couldn't execute insert postgresql SQL statement: ") + pgfailure(r, pgsql.conn); + PQclear(r); + return mkfailure(rs); + } PQclear(r); debug(true, "pgsql::post::result"); @@ -169,10 +240,13 @@ const failable put(const value& key, const value& val, const PGSql& pgsql) const string vs(write(content(scheme::writeValue(val)))); const char* const params[2] = { c_str(ks), c_str(vs) }; PGresult* const r = PQexecParams(pgsql.conn, c_str(string("update ") + pgsql.table + string(" set ") + pgsql.vname + string(" = $2 where ") + pgsql.kname + string(" = $1;")), 2, NULL, params, NULL, NULL, 0); - if (PQresultStatus(r) != PGRES_COMMAND_OK) - return mkfailure(string("Couldn't execute update postgresql SQL statement: ") + pgfailure(r, pgsql.conn)); - const string t = PQcmdTuples(r); - if (t != "0") { + if (PQresultStatus(r) != PGRES_COMMAND_OK) { + const string rs = string("Couldn't execute update postgresql SQL statement: ") + pgfailure(r, pgsql.conn); + PQclear(r); + return mkfailure(rs); + } + const char* const t = PQcmdTuples(r); + if (t != NULL && strcmp(t, "0")) { PQclear(r); debug(true, "pgsql::put::result"); return true; @@ -180,14 +254,68 @@ const failable put(const value& key, const value& val, const PGSql& pgsql) PQclear(r); PGresult* const pr = PQexecParams(pgsql.conn, c_str(string("insert into ") + pgsql.table + string(" values($1, $2);")), 2, NULL, params, NULL, NULL, 0); - if (PQresultStatus(pr) != PGRES_COMMAND_OK) - return mkfailure(string("Couldn't execute insert postgresql SQL statement: ") + pgfailure(pr, pgsql.conn)); + if (PQresultStatus(pr) != PGRES_COMMAND_OK) { + const string rs = string("Couldn't execute insert postgresql SQL statement: ") + pgfailure(pr, pgsql.conn); + PQclear(pr); + return mkfailure(rs); + } PQclear(pr); debug(true, "pgsql::put::result"); return true; } +/** + * Patch an item in the database. If the item doesn't exist it is added. + */ +const failable patch(const value& key, const value& val, const PGSql& pgsql) { + debug(key, "pgsql::patch::key"); + debug(val, "pgsql::patch::value"); + debug(pgsql.conninfo, "pgsql::patch::conninfo"); + debug(pgsql.table, "pgsql::patch::table"); + setup(pgsql); + + const string ks(write(content(scheme::writeValue(key)))); + const string vs(write(content(scheme::writeValue(val)))); + const char* const params[2] = { c_str(ks), c_str(vs) }; + PGresult* const r = PQexecParams(pgsql.conn, c_str(string("update ") + pgsql.table + string(" set ") + pgsql.vname + string(" = $2 where ") + pgsql.kname + string(" = $1;")), 2, NULL, params, NULL, NULL, 0); + if (PQresultStatus(r) != PGRES_COMMAND_OK) { + const string rs = string("Couldn't execute update postgresql SQL statement: ") + pgfailure(r, pgsql.conn); + const char* const st = PQresultErrorField(r, PG_DIAG_SQLSTATE); + if (st != NULL && !strncmp(st, "40", 2)) { + + // Report a transaction serialization conflict + PQclear(r); + return mkfailure(rs, 409); + } + PQclear(r); + return mkfailure(rs); + } + const char* const t = PQcmdTuples(r); + if (t != NULL && strcmp(t, "0")) { + PQclear(r); + debug(true, "pgsql::patch::result"); + return true; + } + PQclear(r); + + PGresult* const pr = PQexecParams(pgsql.conn, c_str(string("insert into ") + pgsql.table + string(" values($1, $2);")), 2, NULL, params, NULL, NULL, 0); + if (PQresultStatus(pr) != PGRES_COMMAND_OK) { + const string rs = string("Couldn't execute insert postgresql SQL statement: ") + pgfailure(pr, pgsql.conn); + const char* const st = PQresultErrorField(pr, PG_DIAG_SQLSTATE); + if (st != NULL && !strncmp(st, "40", 2)) { + PQclear(pr); + return mkfailure(rs, 40); + } + PQclear(pr); + return mkfailure(rs); + } + PQclear(pr); + + debug(true, "pgsql::patch::result"); + return true; +} + /** * Convert a key to an item id. */ @@ -200,14 +328,14 @@ const list keyid(const list& key) { } /** - * Convert a key to an param name / value assoc. + * Convert a key to a (param name, value) assoc. */ -const list > keyparams(const list& key) { +const list keyparams(const list& key) { if (isNil(key)) return nilListValue; if (!isList(car(key))) return keyparams(cdr(key)); - return cons >((list)car(key), keyparams(cdr(key))); + return cons(car(key), keyparams(cdr(key))); } /** @@ -216,9 +344,8 @@ const list > keyparams(const list& key) { const list getitems(PGresult* const r, const int i, const int n) { if (i == n) return nilListValue; - const value key(content(scheme::readValue(string(PQgetvalue(r, i, 0))))); const value val(content(scheme::readValue(string(PQgetvalue(r, i, 1))))); - return cons(mklist(key, val), getitems(r, i + 1, n)); + return cons(val, getitems(r, i + 1, n)); } /** @@ -250,12 +377,13 @@ const failable get(const value& key, const PGSql& pgsql) { // Get item and id and get parameters from the key const bool lk = isList(key); - const list > kparams = lk? keyparams(key) : list >(); + const list kparams = lk? keyparams(key) : nilListValue; const list regex = assoc("regex", kparams); const list like = assoc("like", kparams); const list textsearch = assoc("textsearch", kparams); const list limit = assoc("limit", kparams); const list offset = assoc("offset", kparams); + const list rank = assoc("rank", kparams); const list id = lk? keyid(key) : nilListValue; const list atable = assoc("table", kparams); const string table = isNil(atable)? pgsql.table : (string)cadr(atable); @@ -265,14 +393,20 @@ const failable get(const value& key, const PGSql& pgsql) { const string vname = isNil(avname)? pgsql.vname : (string)cadr(avname); // Build the SQL query - const char* sqlparams[5]; + const char* sqlparams[6]; int p = 0; int w = 0; ostringstream sqlos; - sqlos << "select data.*"; + sqlos << "select data." << kname << ", data." << vname; if (!isNil(textsearch)) { - // Text search, setup result ranking - sqlos << ", ts_rank_cd(to_tsvector(data." << vname << "), tsquery, 32) as rank"; + // Text search, setup text result ranking + sqlos << ", ts_rank_cd(to_tsvector(data." << vname << "), tsquery, 32) as tsrank"; + } + if (!isNil(rank)) { + // Ranking, setup rank expression + const string rs = (string)cadr(rank); + sqlparams[p++] = c_str(rs); + sqlos << ", $" << p << " as rank"; } sqlos << " from " << table << " data"; if (!isNil(textsearch)) { @@ -305,9 +439,13 @@ const failable get(const value& key, const PGSql& pgsql) { if (!isNil(textsearch)) { // Text search, apply the query sqlos << (w == 0? " where" : " and"); - sqlos << " tsquery @@ to_tsvector(data." << vname << ") order by rank desc"; + sqlos << " tsquery @@ to_tsvector(data." << vname << ")"; w++; } + if (!isNil(textsearch) || !isNil(rank)) { + // Result ordering + sqlos << " order by" << (isNil(rank)? "" : " rank desc") << ((isNil(rank) || isNil(textsearch))? "" : ",") << (isNil(textsearch)? "" : " tsrank desc"); + } if (!isNil(offset)) { // Result pagination offset sqlos << " offset " << atoi(c_str((string)cadr(offset))); @@ -320,8 +458,11 @@ const failable get(const value& key, const PGSql& pgsql) { const string sqls = str(sqlos); debug(sqls, "pgsql::get::sqls"); PGresult* r = PQexecParams(pgsql.conn, c_str(sqls), p, NULL, sqlparams, NULL, NULL, 0); - if (PQresultStatus(r) != PGRES_TUPLES_OK) - return mkfailure(string("Couldn't execute select postgresql SQL statement: ") + pgfailure(r, pgsql.conn)); + if (PQresultStatus(r) != PGRES_TUPLES_OK) { + const string rs = string("Couldn't execute select postgresql SQL statement: ") + pgfailure(r, pgsql.conn); + PQclear(r); + return mkfailure(rs); + } const int n = PQntuples(r); if (n < 1) { PQclear(r); @@ -330,7 +471,7 @@ const failable get(const value& key, const PGSql& pgsql) { return mkfailure(str(os), 404, false); } - // Return a collection of key / item pairs + // Return a collection of items if (l != 1) { const list lval = getitems(r, 0, n); PQclear(r); @@ -357,8 +498,18 @@ const failable del(const value& key, const PGSql& pgsql) { const string ks(write(content(scheme::writeValue(key)))); const char* const params[1] = { c_str(ks) }; PGresult* const r = PQexecParams(pgsql.conn, c_str(string("delete from ") + pgsql.table + string(" where ") + pgsql.kname + string(" = $1;")), 1, NULL, params, NULL, NULL, 0); - if (PQresultStatus(r) != PGRES_COMMAND_OK) - return mkfailure(string("Couldn't execute delete postgresql SQL statement: ") + pgfailure(r, pgsql.conn)); + if (PQresultStatus(r) != PGRES_COMMAND_OK) { + const string rs = string("Couldn't execute delete postgresql SQL statement: ") + pgfailure(r, pgsql.conn); + PQclear(r); + return mkfailure(rs); + } + const char* const t = PQcmdTuples(r); + if (t != NULL && !strcmp(t, "0")) { + PQclear(r); + ostringstream os; + os << "Couldn't delete postgresql entry: " << key; + return mkfailure(str(os), 404, false); + } PQclear(r); debug(true, "pgsql::delete::result"); diff --git a/sca-cpp/trunk/components/sqldb/sqldb.cpp b/sca-cpp/trunk/components/sqldb/sqldb.cpp index 1288dd553b..75be2c0624 100644 --- a/sca-cpp/trunk/components/sqldb/sqldb.cpp +++ b/sca-cpp/trunk/components/sqldb/sqldb.cpp @@ -63,6 +63,65 @@ const failable put(const list& params, const pgsql::PGSql& pg) { return value(content(val)); } +/** + * Patch an item in the database. + */ +const failable patch(const list& params, const pgsql::PGSql& pg) { + // Read patch + value p = assoc("patch", assoc("content", car(cadr(params)))); + if (isNil(p)) + return mkfailure("Couldn't read patch script"); + const string script = cadr(p); + debug(script, "pgsql::patch::script"); + + const lambda(const value&, const pgsql::PGSql&, const string&, const int)> tryPatch = [&tryPatch](const value& key, const pgsql::PGSql& pg, const string& script, const int count) -> const failable { + + // Begin database transaction + const failable brc = pgsql::begin(pg); + if (!hasContent(brc)) + return mkfailure(brc); + + // Get existing value from database + const failable ival = pgsql::get(key, pg); + if (!hasContent(ival) && rcode(ival) != 404) { + pgsql::rollback(pg); + return mkfailure(ival); + } + + // Apply patch + istringstream is(script); + scheme::Env env = scheme::setupEnvironment(); + const value pval = scheme::evalScript(cons("patch", scheme::quotedParameters(mklist(key, hasContent(ival)? content(ival) : (value)list()))), is, env); + if (isNil(pval)) { + ostringstream os; + os << "Couldn't patch postgresql entry: " << key; + return mkfailure(str(os), 404, false); + } + + // Push patched value to database + const failable val = pgsql::patch(key, pval, pg); + if (!hasContent(val)) { + pgsql::rollback(pg); + + // Retry on a transaction serialization error + if (rcode(val) == 409 && count > 0) + return tryPatch(key, pg, script, count - 1); + return mkfailure(val); + } + + // Commit database transaction + const failable crc = pgsql::commit(pg); + if (!hasContent(crc)) + return mkfailure(crc); + + return value(content(val)); + }; + + // Try patching the entry and automatically retry a few times on transaction + // serialization errors + return tryPatch(car(params), pg, script, 5); +} + /** * Delete an item from the database. */ @@ -98,6 +157,8 @@ const failable start(const list& params) { return post(cdr(params), *pg); if (func == "put") return put(cdr(params), *pg); + if (func == "patch") + return patch(cdr(params), *pg); if (func == "delete") return del(cdr(params), *pg); return mkfailure(); -- cgit v1.2.3