diff --git a/newbrt/log.c b/newbrt/log.c index e8c836913eb..dd0062a095d 100644 --- a/newbrt/log.c +++ b/newbrt/log.c @@ -446,7 +446,14 @@ int toku_logger_commit (TOKUTXN txn, int nosync) { if (txn->oldest_logentry) { // There are some entries, so link them in. txn->oldest_logentry->prev = txn->parent->newest_logentry; + if (txn->parent->newest_logentry) { + txn->parent->newest_logentry->next = txn->oldest_logentry; + } else { + txn->parent->oldest_logentry = txn->oldest_logentry; + } txn->parent->newest_logentry = txn->newest_logentry; + txn->parent->rollentry_resident_bytecount += txn->rollentry_resident_bytecount; + txn->rollentry_resident_bytecount = 0; } if (txn->parent->oldest_logentry==0) { txn->parent->oldest_logentry = txn->oldest_logentry; @@ -459,6 +466,9 @@ int toku_logger_commit (TOKUTXN txn, int nosync) { r = toku_omt_iterate(txn->open_brts, note_brt_used_in_parent_txn, txn->parent); assert(r==0); + r = toku_maybe_spill_rollbacks(txn->parent); + assert(r==0); + } else { // do the commit calls and free everything // we do the commit calls in reverse order too. diff --git a/src/tests/medium-nested-commit-commit.c b/src/tests/medium-nested-commit-commit.c new file mode 100644 index 00000000000..948324085ad --- /dev/null +++ b/src/tests/medium-nested-commit-commit.c @@ -0,0 +1,115 @@ +/* Test to see if a medium-size nested transaction (the nested pieces are not too big, but the whole thing is so big that it's rollbacks spill into a file) + * can commit properly. + * Four Tests: + * big child aborts, parent aborts + * big child aborts, parent commits + * big child commits, parent aborts + * big child commits, parent commits (This test) + */ + +#include +#include +#include "test.h" + +static DB_ENV *env; +static DB *db; +static DB_TXN *xchild, *xparent; + +static void insert (int i) { + char hello[30], there[30]; + DBT key,data; + if (verbose) printf("Insert %d\n", i); + snprintf(hello, sizeof(hello), "hello%d", i); + snprintf(there, sizeof(there), "there%d", i); + int r = db->put(db, xchild, + dbt_init(&key, hello, strlen(hello)+1), + dbt_init(&data, there, strlen(there)+1), + DB_YESOVERWRITE); + CKERR(r); +} + +static void lookup (int i, int expect, int expectj) { + char hello[30], there[30]; + DBT key,data; + snprintf(hello, sizeof(hello), "hello%d", i); + memset(&data, 0, sizeof(data)); + if (verbose) printf("Looking up %d (expecting %s)\n", i, expect==0 ? "to find" : "not to find"); + int r = db->get(db, xchild, + dbt_init(&key, hello, strlen(hello)+1), + &data, + 0); + assert(expect==r); + if (expect==0) { + CKERR(r); + snprintf(there, sizeof(there), "there%d", expectj); + assert(data.size==strlen(there)+1); + assert(strcmp(data.data, there)==0); + } +} + +const int N = 50000; +const int DIV = 10; + +void test_commit_commit (void) { + int i, j, k, r; + r=env->txn_begin(env, 0, &xparent, 0); CKERR(r); + k=0; + for (j=0; jtxn_begin(env, xparent, &xchild, 0); CKERR(r); + for (i=0; icommit(xchild, 0); CKERR(r); + } + k=0; + for (j=0; jtxn_begin(env, xparent, &xchild, 0); CKERR(r); + for (i=0; icommit(xchild, 0); CKERR(r); + } + r=xparent->commit(xparent, 0); CKERR(r); + r=env->txn_begin(env, 0, &xchild, 0); CKERR(r); + for (i=0; icommit(xchild, 0); CKERR(r); +} + +void setup (void) { + DB_TXN *txn; + int r; + system("rm -rf " ENVDIR); + r=mkdir(ENVDIR, 0777); CKERR(r); + + r=db_env_create(&env, 0); CKERR(r); + r=env->set_lk_max_locks(env, N); CKERR(r); +#ifndef TOKUDB + r=env->set_lk_max_objects(env, N); CKERR(r); +#endif + env->set_errfile(env, stderr); + r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, 0777); CKERR(r); + r=db_create(&db, env, 0); CKERR(r); + r=db->set_flags(db, DB_DUPSORT); + + r=env->txn_begin(env, 0, &txn, 0); assert(r==0); + r=db->open(db, txn, "foo.db", 0, DB_BTREE, DB_CREATE, 0777); CKERR(r); + r=txn->commit(txn, 0); assert(r==0); +} + +void shutdown (void) { + int r; + r=db->close(db, 0); CKERR(r); + r=env->close(env, 0); CKERR(r); +} + +int main (int argc, const char *argv[]) { + parse_args(argc, argv); + setup(); + test_commit_commit(); + shutdown(); + return 0; +}