diff --git a/buildheader/Makefile b/buildheader/Makefile index 33da9879d20..9022730cc03 100644 --- a/buildheader/Makefile +++ b/buildheader/Makefile @@ -12,7 +12,7 @@ # 4. make hs BDB = 4_4 -BDBDIR = ../../berkeleydb +BDBDIR = ../../../berkeleydb MYSQL_H = -I$(BDBDIR)/db-4.1.25/build_unix/ OPTFLAGS = -O2 CFLAGS = -Wall -W -Werror -g $(OPTFLAGS) diff --git a/buildheader/db.h_4_4 b/buildheader/db.h_4_4 index 83e93d1ec0b..a137769b835 100644 --- a/buildheader/db.h_4_4 +++ b/buildheader/db.h_4_4 @@ -69,6 +69,7 @@ typedef enum { #define DB_INIT_TXN 262144 #define DB_USE_ENVIRON 2048 #define DB_USE_ENVIRON_ROOT 4096 +#define DB_READ_UNCOMMITTED 67108864 #define DB_KEYEMPTY -30997 #define DB_KEYEXIST -30996 #define DB_LOCK_DEADLOCK -30995 diff --git a/buildheader/db.h_4_5 b/buildheader/db.h_4_5 index 07dfc891f1e..e327cd4b250 100644 --- a/buildheader/db.h_4_5 +++ b/buildheader/db.h_4_5 @@ -69,6 +69,7 @@ typedef enum { #define DB_INIT_TXN 524288 #define DB_USE_ENVIRON 4096 #define DB_USE_ENVIRON_ROOT 8192 +#define DB_READ_UNCOMMITTED 134217728 #define DB_KEYEMPTY -30997 #define DB_KEYEXIST -30996 #define DB_LOCK_DEADLOCK -30995 diff --git a/buildheader/db.h_4_6 b/buildheader/db.h_4_6 index a249e2ed5b2..6219ff58533 100644 --- a/buildheader/db.h_4_6 +++ b/buildheader/db.h_4_6 @@ -69,6 +69,7 @@ typedef enum { #define DB_INIT_TXN 2097152 #define DB_USE_ENVIRON 16384 #define DB_USE_ENVIRON_ROOT 32768 +#define DB_READ_UNCOMMITTED 134217728 #define DB_KEYEMPTY -30997 #define DB_KEYEXIST -30996 #define DB_LOCK_DEADLOCK -30995 diff --git a/buildheader/make_db_h.c b/buildheader/make_db_h.c index 6d0075e1c72..bfb5469960f 100644 --- a/buildheader/make_db_h.c +++ b/buildheader/make_db_h.c @@ -81,6 +81,9 @@ void print_defines (void) { dodefine(DB_USE_ENVIRON); dodefine(DB_USE_ENVIRON_ROOT); +#ifdef DB_READ_UNCOMMITTED + dodefine(DB_READ_UNCOMMITTED); +#endif dodefine(DB_KEYEMPTY); dodefine(DB_KEYEXIST); dodefine(DB_LOCK_DEADLOCK); diff --git a/include/db.h b/include/db.h index 83e93d1ec0b..a137769b835 100644 --- a/include/db.h +++ b/include/db.h @@ -69,6 +69,7 @@ typedef enum { #define DB_INIT_TXN 262144 #define DB_USE_ENVIRON 2048 #define DB_USE_ENVIRON_ROOT 4096 +#define DB_READ_UNCOMMITTED 67108864 #define DB_KEYEMPTY -30997 #define DB_KEYEXIST -30996 #define DB_LOCK_DEADLOCK -30995 diff --git a/src/tests/test_db_txn_locks_read_uncommitted.c b/src/tests/test_db_txn_locks_read_uncommitted.c new file mode 100644 index 00000000000..a5ce2db345a --- /dev/null +++ b/src/tests/test_db_txn_locks_read_uncommitted.c @@ -0,0 +1,223 @@ +/* -*- mode: C; c-basic-offset: 4 -*- */ +#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved." + +#include +#include +#include +#include +#include + +#include "test.h" + +// ENVDIR is defined in the Makefile + +static DB *db; +static DB_TXN* txns[(int)256]; +static DB_ENV* dbenv; +static DBC* cursors[(int)256]; + +static void +put(BOOL success, char txn, int _key, int _data) { + assert(txns[(int)txn]); + + int r; + DBT key; + DBT data; + + r = db->put(db, txns[(int)txn], + dbt_init(&key, &_key, sizeof(int)), + dbt_init(&data, &_data, sizeof(int)), + DB_YESOVERWRITE); + + if (success) CKERR(r); + else CKERR2s(r, DB_LOCK_DEADLOCK, DB_LOCK_NOTGRANTED); +} + +static void +init_txn (char name, u_int32_t flags) { + int r; + assert(!txns[(int)name]); + r = dbenv->txn_begin(dbenv, NULL, &txns[(int)name], DB_TXN_NOWAIT | flags); + CKERR(r); + assert(txns[(int)name]); +} + +static void +init_dbc (char name) { + int r; + + assert(!cursors[(int)name] && txns[(int)name]); + r = db->cursor(db, txns[(int)name], &cursors[(int)name], 0); + CKERR(r); + assert(cursors[(int)name]); +} + +static void +commit_txn (char name) { + int r; + assert(txns[(int)name] && !cursors[(int)name]); + + r = txns[(int)name]->commit(txns[(int)name], 0); + CKERR(r); + txns[(int)name] = NULL; +} + + +static void +close_dbc (char name) { + int r; + + assert(cursors[(int)name]); + r = cursors[(int)name]->c_close(cursors[(int)name]); + CKERR(r); + cursors[(int)name] = NULL; +} + +static void +early_commit (char name) { + assert(cursors[(int)name] && txns[(int)name]); + close_dbc(name); + commit_txn(name); +} + +static void +setup_dbs (u_int32_t dup_flags) { + int r; + + system("rm -rf " ENVDIR); + mkdir(ENVDIR, 0777); + dbenv = NULL; + db = NULL; + /* Open/create primary */ + r = db_env_create(&dbenv, 0); + CKERR(r); + u_int32_t env_txn_flags = DB_INIT_TXN | DB_INIT_LOCK; + u_int32_t env_open_flags = DB_CREATE | DB_PRIVATE | DB_INIT_MPOOL; + r = dbenv->open(dbenv, ENVDIR, env_open_flags | env_txn_flags, 0600); + CKERR(r); + + r = db_create(&db, dbenv, 0); + CKERR(r); + if (dup_flags) { + r = db->set_flags(db, dup_flags); + CKERR(r); + } + r = db->set_bt_compare( db, int_dbt_cmp); + CKERR(r); + r = db->set_dup_compare(db, int_dbt_cmp); + CKERR(r); + + char a; + for (a = 'a'; a <= 'z'; a++) init_txn(a, 0); + for (a = '0'; a <= '9'; a++) init_txn(a, DB_READ_UNCOMMITTED); + init_txn('\0', 0); + r = db->open(db, txns[(int)'\0'], "foobar.db", NULL, DB_BTREE, DB_CREATE | DB_READ_UNCOMMITTED, 0600); + CKERR(r); + commit_txn('\0'); + for (a = 'a'; a <= 'z'; a++) init_dbc(a); + for (a = '0'; a <= '9'; a++) init_dbc(a); +} + +static void +close_dbs(void) { + char a; + for (a = 'a'; a <= 'z'; a++) { + if (cursors[(int)a]) close_dbc(a); + if (txns[(int)a]) commit_txn(a); + } + for (a = '0'; a <= '9'; a++) { + if (cursors[(int)a]) close_dbc(a); + if (txns[(int)a]) commit_txn(a); + } + + int r; + r = db->close(db, 0); + CKERR(r); + db = NULL; + r = dbenv->close(dbenv, 0); + CKERR(r); + dbenv = NULL; +} + + +static void +table_scan(char txn, BOOL success) { + int r; + DBT key; + DBT data; + + assert(txns[(int)txn] && cursors[(int)txn]); + r = cursors[(int)txn]->c_get(cursors[(int)txn], + dbt_init(&key, 0, 0), + dbt_init(&data, 0, 0), + DB_FIRST); + while (r==0) { + r = cursors[(int)txn]->c_get(cursors[(int)txn], + dbt_init(&key, 0, 0), + dbt_init(&data, 0, 0), + DB_NEXT); + } + if (success) CKERR2(r, DB_NOTFOUND); + else CKERR2s(r, DB_LOCK_NOTGRANTED, DB_LOCK_DEADLOCK); +} + +static void +table_prelock(char txn, BOOL success) { + int r; +#if defined USE_TDB && USE_TDB + r = db->pre_acquire_table_lock(db, txns[(int)txn]); +#else + DBT key; + DBT data; + + assert(txns[(int)txn] && cursors[(int)txn]); + r = cursors[(int)txn]->c_get(cursors[(int)txn], + dbt_init(&key, 0, 0), + dbt_init(&data, 0, 0), + DB_FIRST | DB_RMW); + while (r==0) { + r = cursors[(int)txn]->c_get(cursors[(int)txn], + dbt_init(&key, 0, 0), + dbt_init(&data, 0, 0), + DB_NEXT | DB_RMW); + } +#endif + if (success) CKERR(r); + else CKERR2s(r, DB_LOCK_NOTGRANTED, DB_LOCK_DEADLOCK); +} + +static void +test (u_int32_t dup_flags) { + char txn; + /* ********************************************************************** */ + setup_dbs(dup_flags); + close_dbs(); + /* ********************************************************************** */ + setup_dbs(dup_flags); + table_scan('0', TRUE); + table_prelock('a', TRUE); + put(TRUE, 'a', 0, 0); + for (txn = 'b'; txn<'z'; txn++) { + table_scan(txn, FALSE); + } + for (txn = '0'; txn<'9'; txn++) { + table_scan(txn, TRUE); + } + early_commit('a'); + for (txn = 'b'; txn<'z'; txn++) { + table_scan(txn, TRUE); + } + for (txn = '0'; txn<'9'; txn++) { + table_scan(txn, TRUE); + } + close_dbs(); + /* ********************************************************************** */ +} + + +int main(int argc, const char* argv[]) { + parse_args(argc, argv); + test(0); + test(DB_DUP | DB_DUPSORT); + return 0; +} diff --git a/src/ydb-internal.h b/src/ydb-internal.h index 4a99b9ddbde..87c6b4db3e0 100644 --- a/src/ydb-internal.h +++ b/src/ydb-internal.h @@ -73,6 +73,7 @@ struct __toku_db_txn_internal { //TXNID txnid64; /* A sixty-four bit txn id. */ TOKUTXN tokutxn; toku_lth* lth; + u_int32_t flags; DB_TXN *child, *next, *prev; }; diff --git a/src/ydb.c b/src/ydb.c index a666ccde735..68b65b5e8b5 100644 --- a/src/ydb.c +++ b/src/ydb.c @@ -979,7 +979,7 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) { } } //toku_ydb_notef("flags=%d\n", flags); - int nosync = (flags & DB_TXN_NOSYNC)!=0; + int nosync = (flags & DB_TXN_NOSYNC)!=0 || (txn->i->flags&DB_TXN_NOSYNC); flags &= ~DB_TXN_NOSYNC; int r2 = toku_txn_release_locks(txn); @@ -1056,7 +1056,22 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f HANDLE_PANICKED_ENV(env); if (!toku_logger_is_open(env->i->logger)) return toku_ydb_do_error(env, EINVAL, "Environment does not have logging enabled\n"); if (!(env->i->open_flags & DB_INIT_TXN)) return toku_ydb_do_error(env, EINVAL, "Environment does not have transactions enabled\n"); - flags=flags; + u_int32_t txn_flags = 0; + txn_flags |= DB_TXN_NOWAIT; //We do not support blocking locks. + if (flags&DB_READ_UNCOMMITTED) { + txn_flags |= DB_READ_UNCOMMITTED; + flags &= ~DB_READ_UNCOMMITTED; + } + if (flags&DB_TXN_NOWAIT) { + txn_flags |= DB_TXN_NOWAIT; + flags &= ~DB_TXN_NOWAIT; + } + if (flags&DB_TXN_NOSYNC) { + txn_flags |= DB_TXN_NOSYNC; + flags &= ~DB_TXN_NOSYNC; + } + if (flags!=0) return toku_ydb_do_error(env, EINVAL, "Invalid flags passed to DB_ENV->txn_begin\n"); + DB_TXN *MALLOC(result); if (result == 0) return ENOMEM; @@ -1073,6 +1088,7 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f return ENOMEM; } memset(result->i, 0, sizeof *result->i); + result->i->flags = txn_flags; int r; if (env->i->open_flags & DB_INIT_LOCK && !stxn) { @@ -1321,8 +1337,12 @@ typedef struct { BOOL tmp_dat_malloced; } C_GET_VARS; -static inline u_int32_t get_prelocked_flags(u_int32_t flags) { - return flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE); +static inline u_int32_t get_prelocked_flags(u_int32_t flags, DB_TXN* txn) { + u_int32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE); + + //DB_READ_UNCOMMITTED transactions 'own' all read locks. + if (txn && txn->i->flags&DB_READ_UNCOMMITTED) lock_flags |= DB_PRELOCKED; + return lock_flags; } static void toku_c_get_fix_flags(C_GET_VARS* g) { @@ -1351,7 +1371,7 @@ static void toku_c_get_fix_flags(C_GET_VARS* g) { default: break; } - g->lock_flags = get_prelocked_flags(g->flag); + g->lock_flags = get_prelocked_flags(g->flag, g->c->i->txn); g->flag &= ~g->lock_flags; } @@ -1690,7 +1710,7 @@ static int toku_c_del_noassociate(DBC * c, u_int32_t flags) { HANDLE_PANICKED_DB(db); if (toku_c_uninitialized(c)) return EINVAL; - u_int32_t lock_flags = get_prelocked_flags(flags); + u_int32_t lock_flags = get_prelocked_flags(flags, c->i->txn); flags &= ~lock_flags; int r; @@ -1843,7 +1863,7 @@ static int locked_c_getf_next_dup(DBC *c, u_int32_t flag, void(*f)(DBT const *ke static int toku_c_getf_first(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { HANDLE_PANICKED_DB(c->dbp); - u_int32_t lock_flags = get_prelocked_flags(flag); + u_int32_t lock_flags = get_prelocked_flags(flag, c->i->txn); flag &= ~lock_flags; assert(flag==0); TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; @@ -1878,7 +1898,7 @@ cleanup: static int toku_c_getf_last(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { HANDLE_PANICKED_DB(c->dbp); - u_int32_t lock_flags = get_prelocked_flags(flag); + u_int32_t lock_flags = get_prelocked_flags(flag, c->i->txn); flag &= ~lock_flags; assert(flag==0); TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; @@ -1914,7 +1934,7 @@ cleanup: static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { HANDLE_PANICKED_DB(c->dbp); if (toku_c_uninitialized(c)) return toku_c_getf_first(c, flag, f, extra); - u_int32_t lock_flags = get_prelocked_flags(flag); + u_int32_t lock_flags = get_prelocked_flags(flag, c->i->txn); flag &= ~lock_flags; assert(flag==0); TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; @@ -1959,7 +1979,7 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT static int toku_c_getf_prev(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { HANDLE_PANICKED_DB(c->dbp); if (toku_c_uninitialized(c)) return toku_c_getf_last(c, flag, f, extra); - u_int32_t lock_flags = get_prelocked_flags(flag); + u_int32_t lock_flags = get_prelocked_flags(flag, c->i->txn); flag &= ~lock_flags; assert(flag==0); TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; @@ -2004,7 +2024,7 @@ static int toku_c_getf_prev(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT static int toku_c_getf_next_dup(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { HANDLE_PANICKED_DB(c->dbp); if (toku_c_uninitialized(c)) return EINVAL; - u_int32_t lock_flags = get_prelocked_flags(flag); + u_int32_t lock_flags = get_prelocked_flags(flag, c->i->txn); flag &= ~lock_flags; assert(flag==0); TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; @@ -2057,7 +2077,7 @@ static int toku_c_getf_heavi(DBC *c, u_int32_t flags, if (direction==0) return EINVAL; DBC *tmp_c = NULL; int r; - u_int32_t lock_flags = get_prelocked_flags(flags); + u_int32_t lock_flags = get_prelocked_flags(flags, c->i->txn); flags &= ~lock_flags; assert(flags==0); struct heavi_wrapper wrapper; @@ -2221,7 +2241,7 @@ finish: static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { int r; - u_int32_t lock_flags = get_prelocked_flags(flags); + u_int32_t lock_flags = get_prelocked_flags(flags, txn); flags &= ~lock_flags; if (flags!=0 && flags!=DB_GET_BOTH) return EINVAL; @@ -2236,7 +2256,7 @@ static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, static int toku_db_del_noassociate(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { int r; - u_int32_t lock_flags = get_prelocked_flags(flags); + u_int32_t lock_flags = get_prelocked_flags(flags, txn); flags &= ~lock_flags; if (flags!=0 && flags!=DB_DELETE_ANY) return EINVAL; //DB_DELETE_ANY supresses the BDB DB->del return value indicating that the key was not found prior to the delete @@ -2592,7 +2612,7 @@ static int toku_db_delboth_noassociate(DB *db, DB_TXN *txn, DBT *key, DBT *val, HANDLE_PANICKED_DB(db); int r; - u_int32_t lock_flags = get_prelocked_flags(flags); + u_int32_t lock_flags = get_prelocked_flags(flags, txn); flags &= ~lock_flags; u_int32_t delete_any = flags&DB_DELETE_ANY; flags &= ~DB_DELETE_ANY; @@ -2645,7 +2665,7 @@ static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t if ((db->i->open_flags & DB_THREAD) && db_thread_need_flags(data)) return EINVAL; - u_int32_t lock_flags = get_prelocked_flags(flags); + u_int32_t lock_flags = get_prelocked_flags(flags, txn); flags &= ~lock_flags; if (flags != 0 && flags != DB_GET_BOTH) return EINVAL; // We aren't ready to handle flags such as DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW @@ -2811,6 +2831,8 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db int is_db_excl = flags & DB_EXCL; flags&=~DB_EXCL; int is_db_create = flags & DB_CREATE; flags&=~DB_CREATE; int is_db_rdonly = flags & DB_RDONLY; flags&=~DB_RDONLY; + //We support READ_UNCOMMITTED whether or not the flag is provided. + flags&=~DB_READ_UNCOMMITTED; if (dbtype != DB_UNKNOWN && dbtype != DB_BTREE) return EINVAL; if (flags & ~DB_THREAD) return EINVAL; // unknown flags @@ -2935,7 +2957,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, if (key->size >= limit || data->size >= limit) return toku_ydb_do_error(db->dbenv, EINVAL, "The largest key or data item allowed is %d bytes", limit); } - u_int32_t lock_flags = get_prelocked_flags(flags); + u_int32_t lock_flags = get_prelocked_flags(flags, txn); flags &= ~lock_flags; if (flags == DB_YESOVERWRITE) { @@ -3191,6 +3213,8 @@ cleanup: int toku_db_pre_acquire_read_lock(DB *db, DB_TXN *txn, const DBT *key_left, const DBT *val_left, const DBT *key_right, const DBT *val_right) { HANDLE_PANICKED_DB(db); if (!db->i->lt || !txn) return EINVAL; + //READ_UNCOMMITTED transactions do not need read locks. + if (txn->i->flags&DB_READ_UNCOMMITTED) return 0; DB_TXN* txn_anc = toku_txn_ancestor(txn); int r;