From da3b69356cf3e9d14fd08321fd832edd5a4621d2 Mon Sep 17 00:00:00 2001 From: Rich Prohaska Date: Thu, 12 Jun 2008 13:53:39 +0000 Subject: [PATCH] merge branch 838 to main. addresses #838 git-svn-id: file:///svn/tokudb@4493 c7de825b-a66e-492c-adef-691d508d4ae1 --- newbrt/brt.c | 29 ++- newbrt/leafentry.c | 21 +++ newbrt/leafentry.h | 1 + newbrt/log.c | 22 ++- newbrt/log.h | 4 + src/tests/test_838.c | 418 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 485 insertions(+), 10 deletions(-) create mode 100644 src/tests/test_838.c diff --git a/newbrt/brt.c b/newbrt/brt.c index 296aab801f1..0034e86c013 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -2703,7 +2703,7 @@ static int bessel_from_search_t (OMTVALUE lev, void *extra) { LESWITCHCALL(leafval, pair_leafval_bessel, search); } -static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, OMTCURSOR omtcursor) { +static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor) { // Now we have to convert from brt_search_t to the bessel function with a direction. What a pain... int direction; switch (search->direction) { @@ -2723,12 +2723,35 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT LEAFENTRY le = datav; if (le_is_provdel(le)) { + TXNID xid = le_any_xid(le); + TOKUTXN txn = 0; + toku_txn_find_by_xid(brt, xid, &txn); + // Provisionally deleted stuff is gone. // So we need to scan in the direction to see if we can find something while (1) { + // see if the transaction is alive + TXNID newxid = le_any_xid(le); + if (newxid != xid) { + xid = newxid; + txn = 0; + toku_txn_find_by_xid(brt, xid, &txn); + } + switch (search->direction) { case BRT_SEARCH_LEFT: - idx++; + if (txn) { + // printf("xid %llu -> %p\n", (unsigned long long) xid, txn); + idx++; + } else { + // apply a commit message for this leafentry to the node + // printf("apply commit_both %llu\n", (unsigned long long) xid); + DBT key, val; + BRT_CMD_S brtcmd = { BRT_COMMIT_BOTH, xid, .u.id= {toku_fill_dbt(&key, le_latest_key(le), le_latest_keylen(le)), + toku_fill_dbt(&val, le_latest_val(le), le_latest_vallen(le))} }; + r = brt_leaf_apply_cmd_once(brt, node, &brtcmd, logger, idx, le); + assert(r == 0); + } if (idx>=toku_omt_size(node->u.l.buffer)) return DB_NOTFOUND; break; case BRT_SEARCH_RIGHT: @@ -2760,7 +2783,7 @@ static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *new if (node->height > 0) return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, logger, omtcursor); else - return brt_search_leaf_node(brt, node, search, newkey, newval, omtcursor); + return brt_search_leaf_node(brt, node, search, newkey, newval, logger, omtcursor); } int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor, uint64_t *root_put_counter) diff --git a/newbrt/leafentry.c b/newbrt/leafentry.c index a8481829164..cf3f657b1fe 100644 --- a/newbrt/leafentry.c +++ b/newbrt/leafentry.c @@ -407,3 +407,24 @@ u_int32_t any_vallen_le_provpair (TXNID UU(xid), u_int32_t UU(klen), void *UU(kv u_int32_t le_any_vallen (LEAFENTRY le) { LESWITCHCALL(le, any_vallen); } + + +u_int64_t any_xid_le_committed (u_int32_t UU(keylen), void *UU(key), u_int32_t UU(vallen), void *UU(val)) { + return 0; +} + +u_int64_t any_xid_le_both (TXNID xid, u_int32_t UU(klen), void *UU(kval), u_int32_t UU(clen), void *UU(cval), u_int32_t UU(plen), void *UU(pval)) { + return xid; +} + +u_int64_t any_xid_le_provdel (TXNID xid, u_int32_t UU(klen), void *UU(kval), u_int32_t UU(clen), void *UU(cval)) { + return xid; +} + +u_int64_t any_xid_le_provpair (TXNID xid, u_int32_t UU(klen), void *UU(kval), u_int32_t UU(plen), void *UU(pval)) { + return xid; +} + +u_int64_t le_any_xid (LEAFENTRY le) { + LESWITCHCALL(le, any_xid); +} diff --git a/newbrt/leafentry.h b/newbrt/leafentry.h index 68022446784..6632b086d43 100644 --- a/newbrt/leafentry.h +++ b/newbrt/leafentry.h @@ -132,6 +132,7 @@ void* le_any_key (LEAFENTRY le); u_int32_t le_any_keylen (LEAFENTRY le); void* le_any_val (LEAFENTRY le); u_int32_t le_any_vallen (LEAFENTRY le); +u_int64_t le_any_xid (LEAFENTRY le); #endif diff --git a/newbrt/log.c b/newbrt/log.c index 211f312fe8f..2299c9b82ed 100644 --- a/newbrt/log.c +++ b/newbrt/log.c @@ -1001,11 +1001,10 @@ int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off return 0; } - -static int find_ptr (OMTVALUE v, void *vfind) { - if (vvfind) return +1; - return 0; +static int find_xid (OMTVALUE v, void *txnv) { + TOKUTXN txn = v; + TOKUTXN txnfind = txnv; + return txn->txnid64 - txnfind->txnid64; } static int find_filenum (OMTVALUE v, void *brtv) { @@ -1022,7 +1021,7 @@ static int find_filenum (OMTVALUE v, void *brtv) { int toku_txn_note_brt (TOKUTXN txn, BRT brt) { OMTVALUE txnv; u_int32_t index; - int r = toku_omt_find_zero(brt->txns, find_ptr, txn, &txnv, &index, NULL); + int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv, &index, NULL); if (r==0) { // It's already there. assert((TOKUTXN)txnv==txn); @@ -1060,7 +1059,7 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) { TOKUTXN txn = txnv; OMTVALUE txnv_again=NULL; u_int32_t index; - int r = toku_omt_find_zero(brt->txns, find_ptr, txn, &txnv_again, &index, NULL); + int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index, NULL); assert(r==0); assert((void*)txnv_again==txnv); r = toku_omt_delete_at(brt->txns, index); @@ -1073,3 +1072,12 @@ static void note_txn_closing (TOKUTXN txn) { toku_omt_iterate(txn->open_brts, remove_txn, txn); toku_omt_destroy(&txn->open_brts); } + +int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr) { + struct tokutxn fake_txn; fake_txn.txnid64 = xid; + uint32_t index; + OMTVALUE txnv; + int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index, NULL); + if (r == 0) *txnptr = txnv; + return r; +} diff --git a/newbrt/log.h b/newbrt/log.h index a8c14cc31d1..83e58ef1034 100644 --- a/newbrt/log.h +++ b/newbrt/log.h @@ -169,4 +169,8 @@ int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item); int toku_txn_note_brt (TOKUTXN txn, BRT brt); int toku_txn_note_close_brt (BRT brt); +// find the TOKUTXN object by xid +// if found then return 0 and set txnptr to the address of the TOKUTXN object +int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr); + #endif diff --git a/src/tests/test_838.c b/src/tests/test_838.c new file mode 100644 index 00000000000..0d3a0bb75d0 --- /dev/null +++ b/src/tests/test_838.c @@ -0,0 +1,418 @@ +/* -*- mode: C; c-basic-offset: 4 -*- */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "test.h" + +// the exit value of this program is nonzero when the test fails +int testresult = 0; +int numexperiments = 20; + +// maxt is set to the longest cursor next without transactions +// we then compare this time to the time with transactions and try to be within a factor of 10 +unsigned long long maxt; + +DBT *dbt_init_static(DBT *dbt) { + memset(dbt, 0, sizeof *dbt); + return dbt; +} + +void test_838(int n) { + if (verbose) printf("%s:%d\n", __FUNCTION__, n); + int r; + + // setup test directory + system("rm -rf " ENVDIR); + mkdir(ENVDIR, 0777); + + // setup environment + DB_ENV *env; + { + r = db_env_create(&env, 0); assert(r == 0); + r = env->set_data_dir(env, ENVDIR); + r = env->set_lg_dir(env, ENVDIR); + env->set_errfile(env, stdout); + r = env->open(env, 0, DB_INIT_MPOOL + DB_PRIVATE + DB_CREATE, 0777); + assert(r == 0); + } + + // setup database + DB *db; + { + DB_TXN *txn = 0; + r = db_create(&db, env, 0); assert(r == 0); + r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0); + } + + // insert, commit + { + DB_TXN *txn = 0; + int i; + for (i=0; iput(db, txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); + assert(r == 0); + } + } + + // delete, commit + { + DB_TXN *txn = 0; + int i; + for (i=0; idel(db, txn, dbt_init(&key, &k, sizeof k), 0); + assert(r == 0); + } + } + + // walk + maxt = 0; + { + DB_TXN *txn = 0; + DBC *cursor; + r = db->cursor(db, txn, &cursor, 0); assert(r == 0); + int i; + for (i=0; ic_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST); + assert(r == DB_NOTFOUND); + gettimeofday(&tnow, 0); + unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec; + t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec; + if (verbose) printf("%d %llu\n", i, t); + if (t > maxt) maxt = t; + } + r = cursor->c_close(cursor); assert(r == 0); + } + + // close db + r = db->close(db, 0); assert(r == 0); + + // reopen and walk + { + DB_TXN *txn = 0; + r = db_create(&db, env, 0); assert(r == 0); + r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0); + } + { + DB_TXN *txn = 0; + DBC *cursor; + r = db->cursor(db, txn, &cursor, 0); assert(r == 0); + int i; + for (i=0; ic_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST); + assert(r == DB_NOTFOUND); + gettimeofday(&tnow, 0); + unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec; + t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec; + if (verbose) printf("%d %llu\n", i, t); + if (t > maxt) maxt = t; + } + r = cursor->c_close(cursor); assert(r == 0); + + // close db + r = db->close(db, 0); assert(r == 0); + } + + // close env + r = env->close(env, 0); assert(r == 0); +} + +void test_838_txn(int n) { + if (verbose) printf("%s:%d\n", __FUNCTION__, n); + int r; + + // setup test directory + system("rm -rf " ENVDIR); + mkdir(ENVDIR, 0777); + + // setup environment + DB_ENV *env; + { + r = db_env_create(&env, 0); assert(r == 0); + r = env->set_data_dir(env, ENVDIR); + r = env->set_lg_dir(env, ENVDIR); + env->set_errfile(env, stdout); + r = env->open(env, 0, DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, 0777); + assert(r == 0); + } + + // setup database + DB *db; + { + DB_TXN *txn = 0; + r = env->txn_begin(env, 0, &txn, 0); assert(r == 0); + + r = db_create(&db, env, 0); assert(r == 0); + r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0); + + r = txn->commit(txn, 0); assert(r == 0); + } + + // insert, commit + { + DB_TXN *txn_master; + r = env->txn_begin(env, 0, &txn_master, 0); assert(r == 0); + DB_TXN *txn; + r = env->txn_begin(env, txn_master, &txn, 0); assert(r == 0); + int i; + for (i=0; iput(db, txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); + assert(r == 0); + } + r = txn->commit(txn, 0); assert(r == 0); + r = txn_master->commit(txn_master, 0); assert(r == 0); + } + + // delete, commit + { + DB_TXN *txn_master; + r = env->txn_begin(env, 0, &txn_master, 0); assert(r == 0); + DB_TXN *txn; + r = env->txn_begin(env, txn_master, &txn, 0); assert(r == 0); + int i; + for (i=0; idel(db, txn, dbt_init(&key, &k, sizeof k), 0); + assert(r == 0); + } + r = txn->commit(txn, 0); assert(r == 0); + r = txn_master->commit(txn_master, 0); assert(r == 0); + } + + // walk + { + DB_TXN *txn; + r = env->txn_begin(env, 0, &txn, 0); assert(r == 0); + DBC *cursor; + r = db->cursor(db, txn, &cursor, 0); assert(r == 0); + int i; + for (i=0; ic_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST); + assert(r == DB_NOTFOUND); + gettimeofday(&tnow, 0); + unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec; + t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec; + if (verbose) printf("%d %llu\n", i, t); + + // the first cursor op takes a long time as it needs to clean out the provisionally + // deleted messages + if (i > 0 && t > 10*maxt) + testresult = 1; + } + r = cursor->c_close(cursor); assert(r == 0); + r = txn->commit(txn, 0); assert(r == 0); + } + + // close db + r = db->close(db, 0); assert(r == 0); + + // reopen and walk + { + DB_TXN *txn = 0; + r = env->txn_begin(env, 0, &txn, 0); assert(r == 0); + + r = db_create(&db, env, 0); assert(r == 0); + r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0); + + r = txn->commit(txn, 0); assert(r == 0); + } + { + DB_TXN *txn; + r = env->txn_begin(env, 0, &txn, 0); assert(r == 0); + DBC *cursor; + r = db->cursor(db, txn, &cursor, 0); assert(r == 0); + int i; + for (i=0; ic_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST); + assert(r == DB_NOTFOUND); + gettimeofday(&tnow, 0); + unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec; + t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec; + if (verbose) printf("%d %llu\n", i, t); + if (i > 0 && t > 10*maxt) + testresult = 1; + } + r = cursor->c_close(cursor); assert(r == 0); + r = txn->commit(txn, 0); assert(r == 0); + + // close db + r = db->close(db, 0); assert(r == 0); + } + + // close env + r = env->close(env, 0); assert(r == 0); +} + +void test_838_defer_delete_commit(int n) { + if (verbose) printf("%s:%d\n", __FUNCTION__, n); + int r; + + // setup test directory + system("rm -rf " ENVDIR); + mkdir(ENVDIR, 0777); + + // setup environment + DB_ENV *env; + { + r = db_env_create(&env, 0); assert(r == 0); + r = env->set_data_dir(env, ENVDIR); + r = env->set_lg_dir(env, ENVDIR); + env->set_errfile(env, stdout); + r = env->open(env, 0, DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, 0777); + assert(r == 0); + } + + // setup database + DB *db; + { + DB_TXN *txn = 0; + r = env->txn_begin(env, 0, &txn, 0); assert(r == 0); + + r = db_create(&db, env, 0); assert(r == 0); + r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0); + + r = txn->commit(txn, 0); assert(r == 0); + } + + // insert, commit + { + DB_TXN *txn_master; + r = env->txn_begin(env, 0, &txn_master, 0); assert(r == 0); + DB_TXN *txn; + r = env->txn_begin(env, txn_master, &txn, 0); assert(r == 0); + int i; + for (i=0; iput(db, txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); + assert(r == 0); + } + r = txn->commit(txn, 0); assert(r == 0); + r = txn_master->commit(txn_master, 0); assert(r == 0); + } + + // delete + DB_TXN *txn_master_delete; + r = env->txn_begin(env, 0, &txn_master_delete, 0); assert(r == 0); + DB_TXN *txn_delete; + r = env->txn_begin(env, txn_master_delete, &txn_delete, 0); assert(r == 0); + int i; + for (i=0; idel(db, txn_delete, dbt_init(&key, &k, sizeof k), 0); + assert(r == 0); + } + + // walk + { + DB_TXN *txn; + r = env->txn_begin(env, 0, &txn, 0); assert(r == 0); + DBC *cursor; + r = db->cursor(db, txn, &cursor, 0); assert(r == 0); + int i; + for (i=0; ic_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST); + assert(r == DB_LOCK_NOTGRANTED); + gettimeofday(&tnow, 0); + unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec; + t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec; + if (verbose) printf("%d %llu\n", i, t); + + // the first cursor op takes a long time as it needs to clean out the provisionally + // deleted messages + if (i > 0 && t > 10*maxt) + testresult = 1; + } + r = cursor->c_close(cursor); assert(r == 0); + r = txn->commit(txn, 0); assert(r == 0); + } + + // delete commit + r = txn_delete->commit(txn_delete, 0); assert(r == 0); + r = txn_master_delete->commit(txn_master_delete, 0); assert(r == 0); + + // close db + r = db->close(db, 0); assert(r == 0); + + // reopen and walk + { + DB_TXN *txn = 0; + r = env->txn_begin(env, 0, &txn, 0); assert(r == 0); + + r = db_create(&db, env, 0); assert(r == 0); + r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0); + + r = txn->commit(txn, 0); assert(r == 0); + } + { + DB_TXN *txn; + r = env->txn_begin(env, 0, &txn, 0); assert(r == 0); + DBC *cursor; + r = db->cursor(db, txn, &cursor, 0); assert(r == 0); + int i; + for (i=0; ic_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST); + assert(r == DB_NOTFOUND); + gettimeofday(&tnow, 0); + unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec; + t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec; + if (verbose) printf("%d %llu\n", i, t); + if (i > 0 && t > 10*maxt) + testresult = 1; + } + r = cursor->c_close(cursor); assert(r == 0); + r = txn->commit(txn, 0); assert(r == 0); + + // close db + r = db->close(db, 0); assert(r == 0); + } + + // close env + r = env->close(env, 0); assert(r == 0); +} + +int main(int argc, const char *argv[]) { + parse_args(argc, argv); + + int n; + for (n=100000; n<=100000; n *= 10) { + test_838(n); + test_838_txn(n); + test_838_defer_delete_commit(n); + } + return testresult; +}