diff --git a/newbrt/brttypes.h b/newbrt/brttypes.h index 9741558ed2b..047acd38b20 100644 --- a/newbrt/brttypes.h +++ b/newbrt/brttypes.h @@ -4,7 +4,9 @@ #ident "Copyright (c) 2007 Tokutek Inc. All rights reserved." #include +#ifndef _XOPEN_SOURCE #define _XOPEN_SOURCE 500 +#endif #define _FILE_OFFSET_BITS 64 typedef struct brt *BRT; diff --git a/src/Makefile b/src/Makefile index d7e399ec215..53562bae062 100644 --- a/src/Makefile +++ b/src/Makefile @@ -6,7 +6,7 @@ LIBNAME=libdb # OPTFLAGS = -O2 CFLAGS = -W -Wall -Werror -g -fPIC $(OPTFLAGS) CPPFLAGS = -I../include -I../newbrt -CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE +CPPFLAGS += -D_GNU_SOURCE -D_THREAD_SAFE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE ifeq ($(OSX),OSX) diff --git a/src/ydb-internal.h b/src/ydb-internal.h index 7ac88733e82..de709b8b084 100644 --- a/src/ydb-internal.h +++ b/src/ydb-internal.h @@ -32,6 +32,44 @@ struct __toku_db_internal { int associate_is_immutable; // If this DB is a secondary then this field indicates that the index never changes due to updates. }; +#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR == 1 +typedef void (*toku_env_errcall_t)(const char *, char *); +#elif DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3 +typedef void (*toku_env_errcall_t)(const DB_ENV *, const char *, const char *); +#else +#error +#endif + +struct __toku_db_env_internal { + int is_panicked; + int ref_count; + u_int32_t open_flags; + int open_mode; + toku_env_errcall_t errcall; + void *errfile; + const char *errpfx; + char *dir; /* A malloc'd copy of the directory. */ + char *tmp_dir; + char *lg_dir; + char **data_dirs; + u_int32_t n_data_dirs; + //void (*noticecall)(DB_ENV *, db_notices); + unsigned long cachetable_size; + CACHETABLE cachetable; + TOKULOGGER logger; +}; + +struct __toku_db_txn_internal { + //TXNID txnid64; /* A sixty-four bit txn id. */ + TOKUTXN tokutxn; + DB_TXN *parent; +}; + +struct __toku_dbc_internal { + BRT_CURSOR c; + DB_TXN *txn; +}; + typedef struct __toku_lock_tree { DB* db; //Some Red Black tree @@ -868,4 +906,6 @@ DUPSORT db +++++ */ + + #endif diff --git a/src/ydb.c b/src/ydb.c index 777c7ebda18..84c03f5124d 100644 --- a/src/ydb.c +++ b/src/ydb.c @@ -1,3 +1,5 @@ +int rfp = 0; + /* -*- mode: C; c-basic-offset: 4 -*- */ #ident "Copyright (c) 2007 Tokutek Inc. All rights reserved." @@ -19,6 +21,7 @@ const char *toku_copyright_string = "Copyright (c) 2007 Tokutek Inc. All rights #include #include #include +#include #include "ydb-internal.h" @@ -27,41 +30,85 @@ const char *toku_copyright_string = "Copyright (c) 2007 Tokutek Inc. All rights #include "log.h" #include "memory.h" -struct __toku_db_txn_internal { - //TXNID txnid64; /* A sixty-four bit txn id. */ - TOKUTXN tokutxn; - DB_TXN *parent; -}; +/* the ydb big lock serializes access to the tokudb + every call (including methods) into the tokudb library gets the lock + no internal function should invoke a method through an object */ +static pthread_mutex_t ydb_big_lock = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP; + +static inline void ydb_lock() { + int r = pthread_mutex_lock(&ydb_big_lock); assert(r == 0); +} + +static inline void ydb_unlock() { + int r = pthread_mutex_unlock(&ydb_big_lock); assert(r == 0); +} + +/* the ydb reference is used to cleanup the library when there are no more references to it */ +static int toku_ydb_refs = 0; + +static inline void ydb_add_ref() { + ++toku_ydb_refs; +} + +static inline void ydb_unref() { + assert(toku_ydb_refs > 0); + if (--toku_ydb_refs == 0) { + /* call global destructors */ + toku_malloc_cleanup(); + } +} + +/* env methods */ +static int toku_env_close(DB_ENV *env, u_int32_t flags); + +static inline void env_add_ref(DB_ENV *env) { + env->i->ref_count += 1; +} + +static inline void env_unref(DB_ENV *env) { + assert(env->i->ref_count > 0); + if (--env->i->ref_count == 0) + toku_env_close(env, 0); +} + +static inline int env_opened(DB_ENV *env) { + return env->i->cachetable != 0; +} + +static int env_is_panicked(DB_ENV *dbenv) { + if (dbenv==0) return 0; + return dbenv->i->is_panicked || toku_logger_panicked(dbenv->i->logger); +} + +#define HANDLE_PANICKED_ENV(env) ({ if (env_is_panicked(env)) return EINVAL; }) +#define HANDLE_PANICKED_DB(db) HANDLE_PANICKED_ENV(db->dbenv) + + +/* db methods */ +static inline int db_opened(DB *db) { + return db->i->full_fname != 0; +} + +static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags); +static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags); +static int toku_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_int32_t flags); +static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags); + +/* txn methods */ + +/* cursor methods */ +static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag); +static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag); +static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag); +static int toku_c_del(DBC *c, u_int32_t flags); +static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags); +static int toku_c_close(DBC * c); + +/* misc */ static char *construct_full_name(const char *dir, const char *fname); static int do_associated_inserts (DB_TXN *txn, DBT *key, DBT *data, DB *secondary); -#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR == 1 -typedef void (*toku_env_errcall_t)(const char *, char *); -#elif DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3 -typedef void (*toku_env_errcall_t)(const DB_ENV *, const char *, const char *); -#else -#error -#endif - -struct __toku_db_env_internal { - int is_panicked; - int ref_count; - u_int32_t open_flags; - int open_mode; - toku_env_errcall_t errcall; - void *errfile; - const char *errpfx; - char *dir; /* A malloc'd copy of the directory. */ - char *tmp_dir; - char *lg_dir; - char **data_dirs; - u_int32_t n_data_dirs; - //void (*noticecall)(DB_ENV *, db_notices); - unsigned long cachetable_size; - CACHETABLE cachetable; - TOKULOGGER logger; -}; // If errcall is set, call it with the format string and optionally the stderrstring (if include_stderrstring). The prefix is passed as a separate argument. // If errfile is set, print to the errfile: prefix, fmt string, maybe include the stderr string. @@ -96,14 +143,6 @@ void toku_do_error_all_cases(const DB_ENV * env, int error, int include_stderrst } } - -static int env_is_panicked(DB_ENV *dbenv) { - if (dbenv==0) return 0; - return dbenv->i->is_panicked || toku_logger_panicked(dbenv->i->logger); -} -#define HANDLE_PANICKED_ENV(env) ({ if (env_is_panicked(env)) return EINVAL; }) -#define HANDLE_PANICKED_DB(db) HANDLE_PANICKED_ENV(db->dbenv) - // Handle all the error cases (but don't do the default thing.) static int do_error (DB_ENV *dbenv, int error, const char *string, ...) { if (toku_logger_panicked(dbenv->i->logger)) dbenv->i->is_panicked=1; @@ -114,14 +153,6 @@ static int do_error (DB_ENV *dbenv, int error, const char *string, ...) { return error; } - -static void toku_db_env_err(const DB_ENV * env, int error, const char *fmt, ...) { - va_list ap; - va_start(ap, fmt); - toku_do_error_all_cases(env, error, 1, 1, fmt, ap); - va_end(ap); -} - #define barf() ({ fprintf(stderr, "YDB: BARF %s:%d in %s\n", __FILE__, __LINE__, __func__); }) #define barff(fmt,...) ({ fprintf(stderr, "YDB: BARF %s:%d in %s, ", __FILE__, __LINE__, __func__); fprintf(stderr, fmt, __VA_ARGS__); }) #define note() ({ fprintf(svtderr, "YDB: Note %s:%d in %s\n", __FILE__, __LINE__, __func__); }) @@ -146,43 +177,7 @@ static void print_flags(u_int32_t flags) { } #endif -/* TODO make these thread safe */ - -/* a count of the open env handles */ -static int toku_ydb_refs = 0; - -static void ydb_add_ref() { - toku_ydb_refs += 1; -} - -static void ydb_unref() { - assert(toku_ydb_refs > 0); - toku_ydb_refs -= 1; - if (toku_ydb_refs == 0) { - /* call global destructors */ - toku_malloc_cleanup(); - } -} - -static void db_env_add_ref(DB_ENV *env) { - env->i->ref_count += 1; -} - -static void db_env_unref(DB_ENV *env) { - env->i->ref_count -= 1; - if (env->i->ref_count == 0) - env->close(env, 0); -} - -static inline int db_env_opened(DB_ENV *env) { - return env->i->cachetable != 0; -} - -static inline int db_opened(DB *db) { - return db->i->full_fname != 0; -} - -static int db_env_parse_config_line(DB_ENV* dbenv, char *command, char *value) { +static int env_parse_config_line(DB_ENV* dbenv, char *command, char *value) { int r; if (!strcmp(command, "set_data_dir")) { @@ -199,7 +194,7 @@ static int db_env_parse_config_line(DB_ENV* dbenv, char *command, char *value) { return r; } -static int db_env_read_config(DB_ENV *env) { +static int env_read_config(DB_ENV *env) { HANDLE_PANICKED_ENV(env); const char* config_name = "DB_CONFIG"; char* full_name = NULL; @@ -289,7 +284,7 @@ static int db_env_read_config(DB_ENV *env) { } //Parse the line. if (strlen(command) == 0 || command[0] == '#') continue; //Ignore Comments. - r = db_env_parse_config_line(env, command, value < end ? value : ""); + r = env_parse_config_line(env, command, value < end ? value : ""); if (r != 0) goto parseerror; } if (0) { @@ -307,11 +302,11 @@ cleanup: return r ? r : r2; } -static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { +static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { HANDLE_PANICKED_ENV(env); int r; - if (db_env_opened(env)) { + if (env_opened(env)) { return do_error(env, EINVAL, "The environment is already open\n"); } @@ -338,11 +333,7 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int } } - - if (!(flags & DB_PRIVATE)) { - // This means that we don't have to do anything with shared memory. - // And that's good enough for mysql. return do_error(env, EINVAL, "TokuDB requires DB_PRIVATE when opening an env\n"); } @@ -358,7 +349,7 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int env->i->dir = NULL; return r; } - if ((r = db_env_read_config(env)) != 0) { + if ((r = env_read_config(env)) != 0) { goto died1; } @@ -387,7 +378,7 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int return 0; } -static int toku_db_env_close(DB_ENV * env, u_int32_t flags) { +static int toku_env_close(DB_ENV * env, u_int32_t flags) { // Even if the env is panicedk, try to close as much as we can. int is_panicked = env_is_panicked(env); int r0=0,r1=0; @@ -418,21 +409,23 @@ static int toku_db_env_close(DB_ENV * env, u_int32_t flags) { return 0; } -static int toku_db_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) { +static int toku_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) { env=env; flags=flags; // Suppress compiler warnings. *list = NULL; return 0; } -static int toku_db_env_log_flush(DB_ENV * env, const DB_LSN * lsn) { +static int toku_env_log_flush(DB_ENV * env, const DB_LSN * lsn) { HANDLE_PANICKED_ENV(env); env=env; lsn=lsn; barf(); return 1; } -static int toku_db_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t bytes, int ncache __attribute__((__unused__))) { +static int toku_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t bytes, int ncache) { HANDLE_PANICKED_ENV(env); + if (ncache != 1) + return EINVAL; u_int64_t cs64 = ((u_int64_t) gbytes << 30) + bytes; unsigned long cs = cs64; if (cs64 > cs) @@ -443,7 +436,7 @@ static int toku_db_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t b #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3 -static int toku_db_env_get_cachesize(DB_ENV * env, u_int32_t *gbytes, u_int32_t *bytes, int *ncache) { +static int toku_env_get_cachesize(DB_ENV * env, u_int32_t *gbytes, u_int32_t *bytes, int *ncache) { HANDLE_PANICKED_ENV(env); *gbytes = env->i->cachetable_size >> 30; *bytes = env->i->cachetable_size & ((1<<30)-1); @@ -451,16 +444,20 @@ static int toku_db_env_get_cachesize(DB_ENV * env, u_int32_t *gbytes, u_int32_t return 0; } +static int locked_env_get_cachesize(DB_ENV *env, u_int32_t *gbytes, u_int32_t *bytes, int *ncache) { + ydb_lock(); int r = toku_env_get_cachesize(env, gbytes, bytes, ncache); ydb_unlock(); return r; +} + #endif -static int toku_db_env_set_data_dir(DB_ENV * env, const char *dir) { +static int toku_env_set_data_dir(DB_ENV * env, const char *dir) { HANDLE_PANICKED_ENV(env); u_int32_t i; int r; char** temp; char* new_dir; - if (db_env_opened(env) || !dir) { + if (env_opened(env) || !dir) { return do_error(env, EINVAL, "You cannot set the data dir after opening the env\n"); } @@ -492,19 +489,19 @@ static int toku_db_env_set_data_dir(DB_ENV * env, const char *dir) { return 0; } -static void toku_db_env_set_errcall(DB_ENV * env, toku_env_errcall_t errcall) { +static void toku_env_set_errcall(DB_ENV * env, toku_env_errcall_t errcall) { env->i->errcall = errcall; } -static void toku_db_env_set_errfile(DB_ENV*env, FILE*errfile) { +static void toku_env_set_errfile(DB_ENV*env, FILE*errfile) { env->i->errfile = errfile; } -static void toku_db_env_set_errpfx(DB_ENV * env, const char *errpfx) { +static void toku_env_set_errpfx(DB_ENV * env, const char *errpfx) { env->i->errpfx = errpfx; } -static int toku_db_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) { +static int toku_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) { HANDLE_PANICKED_ENV(env); if (flags != 0 && onoff) { return do_error(env, EINVAL, "TokuDB does not (yet) support any nonzero ENV flags\n"); @@ -512,15 +509,15 @@ static int toku_db_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) { return 0; } -static int toku_db_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) { +static int toku_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) { HANDLE_PANICKED_ENV(env); bsize=bsize; return do_error(env, EINVAL, "TokuDB does not (yet) support ENV->set_lg_bsize\n"); } -static int toku_db_env_set_lg_dir(DB_ENV * env, const char *dir) { +static int toku_env_set_lg_dir(DB_ENV * env, const char *dir) { HANDLE_PANICKED_ENV(env); - if (db_env_opened(env)) { + if (env_opened(env)) { return do_error(env, EINVAL, "Cannot set log dir after opening the env\n"); } @@ -535,33 +532,37 @@ static int toku_db_env_set_lg_dir(DB_ENV * env, const char *dir) { return 0; } -static int toku_db_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) { +static int toku_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) { HANDLE_PANICKED_ENV(env); lg_max=lg_max; return do_error(env, EINVAL, "TokuDB does not (yet) support set_lg_max\n"); } -static int toku_db_env_set_lk_detect(DB_ENV * env, u_int32_t detect) { +static int toku_env_set_lk_detect(DB_ENV * env, u_int32_t detect) { HANDLE_PANICKED_ENV(env); detect=detect; return do_error(env, EINVAL, "TokuDB does not (yet) support set_lk_detect\n"); } #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4 -static int toku_db_env_set_lk_max(DB_ENV * env, u_int32_t lk_max) { +static int toku_env_set_lk_max(DB_ENV * env, u_int32_t lk_max) { HANDLE_PANICKED_ENV(env); lk_max=lk_max; return 0; } + +static int locked_env_set_lk_max(DB_ENV * env, u_int32_t lk_max) { + ydb_lock(); int r = toku_env_set_lk_max(env, lk_max); ydb_unlock(); return r; +} #endif -//void __toku_db_env_set_noticecall (DB_ENV *env, void (*noticecall)(DB_ENV *, db_notices)) { +//void __toku_env_set_noticecall (DB_ENV *env, void (*noticecall)(DB_ENV *, db_notices)) { // env->i->noticecall = noticecall; //} -static int toku_db_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) { +static int toku_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) { HANDLE_PANICKED_ENV(env); - if (db_env_opened(env)) { + if (env_opened(env)) { return do_error(env, EINVAL, "Cannot set the tmp dir after opening an env\n"); } if (!tmp_dir) { @@ -573,18 +574,18 @@ static int toku_db_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) { return env->i->tmp_dir ? 0 : ENOMEM; } -static int toku_db_env_set_verbose(DB_ENV * env, u_int32_t which, int onoff) { +static int toku_env_set_verbose(DB_ENV * env, u_int32_t which, int onoff) { HANDLE_PANICKED_ENV(env); which=which; onoff=onoff; return 1; } -static int toku_db_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags) { +static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags) { env=env; kbyte=kbyte; min=min; flags=flags; return 0; } -static int toku_db_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags) { +static int toku_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags) { HANDLE_PANICKED_ENV(env); statp=statp;flags=flags; return 1; @@ -599,41 +600,114 @@ void toku_default_errcall(const DB_ENV *env, const char *errpfx, const char *msg fprintf(stderr, "YDB: %s: %s", errpfx, msg); } -static int toku_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); +#if _THREAD_SAFE -int db_env_create(DB_ENV ** envp, u_int32_t flags) { +static void locked_env_err(const DB_ENV * env, int error, const char *fmt, ...) { + ydb_lock(); + va_list ap; + va_start(ap, fmt); + toku_do_error_all_cases(env, error, 1, 1, fmt, ap); + va_end(ap); + ydb_unlock(); +} + +static int locked_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { + ydb_lock(); int r = toku_env_open(env, home, flags, mode); ydb_unlock(); return r; +} + +static int locked_env_close(DB_ENV * env, u_int32_t flags) { + ydb_lock(); int r = toku_env_close(env, flags); ydb_unlock(); return r; +} + +static int locked_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) { + ydb_lock(); int r = toku_env_log_archive(env, list, flags); ydb_unlock(); return r; +} + +static int locked_env_log_flush(DB_ENV * env, const DB_LSN * lsn) { + ydb_lock(); int r = toku_env_log_flush(env, lsn); ydb_unlock(); return r; +} + +static int locked_env_set_cachesize(DB_ENV *env, u_int32_t gbytes, u_int32_t bytes, int ncache) { + ydb_lock(); int r = toku_env_set_cachesize(env, gbytes, bytes, ncache); ydb_unlock(); return r; +} + +static int locked_env_set_data_dir(DB_ENV * env, const char *dir) { + ydb_lock(); int r = toku_env_set_data_dir(env, dir); ydb_unlock(); return r; +} + +static int locked_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) { + ydb_lock(); int r = toku_env_set_flags(env, flags, onoff); ydb_unlock(); return r; +} + +static int locked_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) { + ydb_lock(); int r = toku_env_set_lg_bsize(env, bsize); ydb_unlock(); return r; +} + +static int locked_env_set_lg_dir(DB_ENV * env, const char *dir) { + ydb_lock(); int r = toku_env_set_lg_dir(env, dir); ydb_unlock(); return r; +} + +static int locked_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) { + ydb_lock(); int r = toku_env_set_lg_max(env, lg_max); ydb_unlock(); return r; +} + +static int locked_env_set_lk_detect(DB_ENV * env, u_int32_t detect) { + ydb_lock(); int r = toku_env_set_lk_detect(env, detect); ydb_unlock(); return r; +} + +static int locked_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) { + ydb_lock(); int r = toku_env_set_tmp_dir(env, tmp_dir); ydb_unlock(); return r; +} + +static int locked_env_set_verbose(DB_ENV * env, u_int32_t which, int onoff) { + ydb_lock(); int r = toku_env_set_verbose(env, which, onoff); ydb_unlock(); return r; +} + +static int locked_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags) { + ydb_lock(); int r = toku_env_txn_checkpoint(env, kbyte, min, flags); ydb_unlock(); return r; +} + +static int locked_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags) { + ydb_lock(); int r = toku_env_txn_stat(env, statp, flags); ydb_unlock(); return r; +} + +static int locked_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); + +#endif + +static int toku_env_create(DB_ENV ** envp, u_int32_t flags) { if (flags!=0) return EINVAL; DB_ENV *MALLOC(result); if (result == 0) return ENOMEM; memset(result, 0, sizeof *result); - result->err = toku_db_env_err; - result->open = toku_db_env_open; - result->close = toku_db_env_close; - result->txn_checkpoint = toku_db_env_txn_checkpoint; - result->log_flush = toku_db_env_log_flush; - result->set_errcall = toku_db_env_set_errcall; - result->set_errfile = toku_db_env_set_errfile; - result->set_errpfx = toku_db_env_set_errpfx; - //result->set_noticecall = toku_db_env_set_noticecall; - result->set_flags = toku_db_env_set_flags; - result->set_data_dir = toku_db_env_set_data_dir; - result->set_tmp_dir = toku_db_env_set_tmp_dir; - result->set_verbose = toku_db_env_set_verbose; - result->set_lg_bsize = toku_db_env_set_lg_bsize; - result->set_lg_dir = toku_db_env_set_lg_dir; - result->set_lg_max = toku_db_env_set_lg_max; - result->set_cachesize = toku_db_env_set_cachesize; + result->err = locked_env_err; + result->open = locked_env_open; + result->close = locked_env_close; + result->txn_checkpoint = locked_env_txn_checkpoint; + result->log_flush = locked_env_log_flush; + result->set_errcall = toku_env_set_errcall; + result->set_errfile = toku_env_set_errfile; + result->set_errpfx = toku_env_set_errpfx; + //result->set_noticecall = locked_env_set_noticecall; + result->set_flags = locked_env_set_flags; + result->set_data_dir = locked_env_set_data_dir; + result->set_tmp_dir = locked_env_set_tmp_dir; + result->set_verbose = locked_env_set_verbose; + result->set_lg_bsize = locked_env_set_lg_bsize; + result->set_lg_dir = locked_env_set_lg_dir; + result->set_lg_max = locked_env_set_lg_max; + result->set_cachesize = locked_env_set_cachesize; #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3 - result->get_cachesize = toku_db_env_get_cachesize; + result->get_cachesize = locked_env_get_cachesize; #endif - result->set_lk_detect = toku_db_env_set_lk_detect; + result->set_lk_detect = locked_env_set_lk_detect; #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4 - result->set_lk_max = toku_db_env_set_lk_max; + result->set_lk_max = locked_env_set_lk_max; #endif - result->log_archive = toku_db_env_log_archive; - result->txn_stat = toku_db_env_txn_stat; - result->txn_begin = toku_txn_begin; + result->log_archive = locked_env_log_archive; + result->txn_stat = locked_env_txn_stat; + result->txn_begin = locked_txn_begin; MALLOC(result->i); if (result->i == 0) { @@ -662,7 +736,11 @@ int db_env_create(DB_ENV ** envp, u_int32_t flags) { return 0; } -static int toku_db_txn_commit(DB_TXN * txn, u_int32_t flags) { +int db_env_create(DB_ENV ** envp, u_int32_t flags) { + ydb_lock(); int r = toku_env_create(envp, flags); ydb_unlock(); return r; +} + +static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) { HANDLE_PANICKED_ENV(txn->mgrp); //notef("flags=%d\n", flags); int r; @@ -683,10 +761,11 @@ static int toku_db_txn_commit(DB_TXN * txn, u_int32_t flags) { return r; // The txn is no good after the commit. } -static u_int32_t toku_db_txn_id(DB_TXN * txn) { +static u_int32_t toku_txn_id(DB_TXN * txn) { HANDLE_PANICKED_ENV(txn->mgrp); barf(); abort(); + return -1; } static TXNID next_txn = 0; @@ -699,7 +778,29 @@ static int toku_txn_abort(DB_TXN * txn) { return r; } -static int toku_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { +#if _THREAD_SAFE + +static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); + +static int locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { + ydb_lock(); int r = toku_txn_begin(env, stxn, txn, flags); ydb_unlock(); return r; +} + +static u_int32_t locked_txn_id(DB_TXN *txn) { + ydb_lock(); u_int32_t r = toku_txn_id(txn); ydb_unlock(); return r; +} + +static int locked_txn_commit(DB_TXN *txn, u_int32_t flags) { + ydb_lock(); int r = toku_txn_commit(txn, flags); ydb_unlock(); return r; +} + +static int locked_txn_abort(DB_TXN *txn) { + ydb_lock(); int r = toku_txn_abort(txn); ydb_unlock(); return r; +} + +#endif + +static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { HANDLE_PANICKED_ENV(env); if (!toku_logger_is_open(env->i->logger)) return do_error(env, EINVAL, "Environment does not have logging enabled\n"); flags=flags; @@ -709,9 +810,9 @@ static int toku_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t memset(result, 0, sizeof *result); //notef("parent=%p flags=0x%x\n", stxn, flags); result->mgrp = env; - result->abort = toku_txn_abort; - result->commit = toku_db_txn_commit; - result->id = toku_db_txn_id; + result->abort = locked_txn_abort; + result->commit = locked_txn_commit; + result->id = locked_txn_id; MALLOC(result->i); assert(result->i); result->i->parent = stxn; @@ -730,31 +831,31 @@ int txn_commit(DB_TXN * txn, u_int32_t flags) { #endif int log_compare(const DB_LSN * a, const DB_LSN * b) { + ydb_lock(); fprintf(stderr, "%s:%d log_compare(%p,%p)\n", __FILE__, __LINE__, a, b); abort(); + ydb_unlock(); } static int maybe_do_associate_create (DB_TXN*txn, DB*primary, DB*secondary) { DBC *dbc; - int r = secondary->cursor(secondary, txn, &dbc, 0); + int r = toku_db_cursor(secondary, txn, &dbc, 0); if (r!=0) return r; DBT key,data; - r = dbc->c_get(dbc, &key, &data, DB_FIRST); + r = toku_c_get(dbc, &key, &data, DB_FIRST); { - int r2=dbc->c_close(dbc); + int r2=toku_c_close(dbc); if (r!=DB_NOTFOUND) { return r2; } } /* Now we know the secondary is empty. */ - r = primary->cursor(primary, txn, &dbc, 0); + r = toku_db_cursor(primary, txn, &dbc, 0); if (r!=0) return r; - for (r = dbc->c_get(dbc, &key, &data, DB_FIRST); - r==0; - r = dbc->c_get(dbc, &key, &data, DB_NEXT)) { + for (r = toku_c_get(dbc, &key, &data, DB_FIRST); r==0; r = toku_c_get(dbc, &key, &data, DB_NEXT)) { r = do_associated_inserts(txn, &key, &data, secondary); if (r!=0) { - dbc->c_close(dbc); + toku_c_close(dbc); return r; } } @@ -816,7 +917,7 @@ static int toku_db_close(DB * db, u_int32_t flags) { return r; // printf("%s:%d %d=__toku_db_close(%p)\n", __FILE__, __LINE__, r, db); int is_panicked = env_is_panicked(db->dbenv); // Even if panicked, let's close as much as we can. - db_env_unref(db->dbenv); + env_unref(db->dbenv); toku_free(db->i->database_name); toku_free(db->i->full_fname); toku_free(db->i); @@ -826,11 +927,6 @@ static int toku_db_close(DB * db, u_int32_t flags) { return r; } -struct __toku_dbc_internal { - BRT_CURSOR c; - DB_TXN *txn; -}; - static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey) { int r = 0; DBT idx; @@ -851,8 +947,6 @@ static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey) return r; } -static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag); - static inline int keyeq(DBC *c, DBT *a, DBT *b) { DB *db = c->dbp; return db->i->brt->compare_fun(db, a, b) == 0; @@ -983,53 +1077,9 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag return r; } -static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) { - int r; - DBC *count_cursor = 0; - DBT currentkey; memset(¤tkey, 0, sizeof currentkey); currentkey.flags = DB_DBT_REALLOC; - DBT currentval; memset(¤tval, 0, sizeof currentval); currentval.flags = DB_DBT_REALLOC; - DBT key; memset(&key, 0, sizeof key); key.flags = DB_DBT_REALLOC; - DBT val; memset(&val, 0, sizeof val); val.flags = DB_DBT_REALLOC; - - if (flags != 0) { - r = EINVAL; goto finish; - } - - r = cursor->c_get(cursor, ¤tkey, ¤tval, DB_CURRENT+256); - if (r != 0) goto finish; - - r = cursor->dbp->cursor(cursor->dbp, 0, &count_cursor, 0); - if (r != 0) goto finish; - - *count = 0; - r = count_cursor->c_get(count_cursor, ¤tkey, ¤tval, DB_SET); - if (r != 0) { - r = 0; goto finish; /* success, the current key must be deleted and there are no more */ - } - - for (;;) { - *count += 1; - r = count_cursor->c_get(count_cursor, &key, &val, DB_NEXT); - if (r != 0) break; - if (!keyeq(count_cursor, ¤tkey, &key)) break; - } - r = 0; /* success, we found at least one before the end */ -finish: - if (key.data) toku_free(key.data); - if (val.data) toku_free(val.data); - if (currentkey.data) toku_free(currentkey.data); - if (currentval.data) toku_free(currentval.data); - if (count_cursor) { - int rr = count_cursor->c_close(count_cursor); assert(rr == 0); - } - return r; -} - static int toku_c_del_noassociate(DBC * c, u_int32_t flags) { HANDLE_PANICKED_DB(c->dbp); - int r; - - r = toku_brt_cursor_delete(c->i->c, flags); + int r = toku_brt_cursor_delete(c->i->c, flags); return r; } @@ -1080,8 +1130,6 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) HANDLE_PANICKED_DB(db); DB *pdb = db->i->primary; - - if (!pdb) return EINVAL; //c_pget does not work on a primary. // If data and primary_key are both zeroed, the temporary storage used to fill in data is different in the two cases because they come from different trees. assert(db->i->brt!=pdb->i->brt); // Make sure they realy are different trees. @@ -1135,7 +1183,7 @@ delete_silently_and_retry: r = toku_c_get_noassociate(c, key, pkey, flag); if (r != 0) goto died3; - r = pdb->get(pdb, c->i->txn, pkey, data, 0); + r = toku_db_get(pdb, c->i->txn, pkey, data, 0); if (r == DB_NOTFOUND) goto delete_silently_and_retry; if (r != 0) goto died3; r = verify_secondary_key(db, pkey, data, key); @@ -1189,6 +1237,48 @@ static int toku_c_close(DBC * c) { return r; } +static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) { + int r; + DBC *count_cursor = 0; + DBT currentkey; memset(¤tkey, 0, sizeof currentkey); currentkey.flags = DB_DBT_REALLOC; + DBT currentval; memset(¤tval, 0, sizeof currentval); currentval.flags = DB_DBT_REALLOC; + DBT key; memset(&key, 0, sizeof key); key.flags = DB_DBT_REALLOC; + DBT val; memset(&val, 0, sizeof val); val.flags = DB_DBT_REALLOC; + + if (flags != 0) { + r = EINVAL; goto finish; + } + + r = toku_c_get(cursor, ¤tkey, ¤tval, DB_CURRENT+256); + if (r != 0) goto finish; + + r = toku_db_cursor(cursor->dbp, 0, &count_cursor, 0); + if (r != 0) goto finish; + + *count = 0; + r = toku_c_get(count_cursor, ¤tkey, ¤tval, DB_SET); + if (r != 0) { + r = 0; goto finish; /* success, the current key must be deleted and there are no more */ + } + + for (;;) { + *count += 1; + r = toku_c_get(count_cursor, &key, &val, DB_NEXT); + if (r != 0) break; + if (!keyeq(count_cursor, ¤tkey, &key)) break; + } + r = 0; /* success, we found at least one before the end */ +finish: + if (key.data) toku_free(key.data); + if (val.data) toku_free(val.data); + if (currentkey.data) toku_free(currentkey.data); + if (currentval.data) toku_free(currentval.data); + if (count_cursor) { + int rr = toku_c_close(count_cursor); assert(rr == 0); + } + return r; +} + static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { int r; unsigned int brtflags; @@ -1201,10 +1291,10 @@ static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, // We aren't ready to handle flags such as DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW DBC *dbc; - r = db->cursor(db, txn, &dbc, 0); + r = toku_db_cursor(db, txn, &dbc, 0); if (r!=0) return r; r = toku_c_get_noassociate(dbc, key, data, flags == DB_GET_BOTH ? DB_GET_BOTH : DB_SET); - int r2 = dbc->c_close(dbc); + int r2 = toku_c_close(dbc); if (r!=0) return r; return r2; } else { @@ -1246,13 +1336,13 @@ static int do_associated_deletes(DB_TXN *txn, DBT *key, DBT *data, DB *secondary if ((brtflags & TOKU_DB_DUPSORT) || (brtflags & TOKU_DB_DUP)) { //If the secondary has duplicates we need to use cursor deletes. DBC *dbc; - r = secondary->cursor(secondary, txn, &dbc, 0); + r = toku_db_cursor(secondary, txn, &dbc, 0); if (r!=0) goto cleanup; r = toku_c_get_noassociate(dbc, &idx, key, DB_GET_BOTH); if (r!=0) goto cleanup; r = toku_c_del_noassociate(dbc, 0); cleanup: - r2 = dbc->c_close(dbc); + r2 = toku_c_close(dbc); } else r = toku_db_del_noassociate(secondary, txn, &idx, DB_DELETE_ANY); if (idx.flags & DB_DBT_APPMALLOC) { @@ -1278,9 +1368,8 @@ static int toku_c_del(DBC * c, u_int32_t flags) { memset(&data, 0, sizeof(data)); if (db->i->primary == 0) { pdb = db; - r = c->c_get(c, &pkey, &data, DB_CURRENT); - } - else { + r = toku_c_get(c, &pkey, &data, DB_CURRENT); + } else { DBT skey; pdb = db->i->primary; memset(&skey, 0, sizeof(skey)); @@ -1364,13 +1453,41 @@ static int toku_c_put(DBC *dbc, DBT *key, DBT *data, u_int32_t flags) { //Flags must NOT be 0. else return EINVAL; finish: - //Insert new data with the key we got from c_get. - r = db->put(db, dbc->i->txn, put_key, put_data, DB_YESOVERWRITE); // when doing the put, it should do an overwrite. + //Insert new data with the key we got from c_get + r = toku_db_put(db, dbc->i->txn, put_key, put_data, DB_YESOVERWRITE); // when doing the put, it should do an overwrite. if (r!=0) goto cleanup; r = toku_c_get(dbc, get_key, get_data, DB_GET_BOTH); goto cleanup; } +#if _THREAD_SAFE + +static int locked_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) { + ydb_lock(); int r = toku_c_pget(c, key, pkey, data, flag); ydb_unlock(); return r; +} + +static int locked_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) { + ydb_lock(); int r = toku_c_get(c, key, data, flag); ydb_unlock(); return r; +} + +static int locked_c_close(DBC * c) { + ydb_lock(); int r = toku_c_close(c); ydb_unlock(); return r; +} + +static int locked_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) { + ydb_lock(); int r = toku_c_count(cursor, count, flags); ydb_unlock(); return r; +} + +static int locked_c_del(DBC * c, u_int32_t flags) { + ydb_lock(); int r = toku_c_del(c, flags); ydb_unlock(); return r; +} + +static int locked_c_put(DBC *dbc, DBT *key, DBT *data, u_int32_t flags) { + ydb_lock(); int r = toku_c_put(dbc, key, data, flags); ydb_unlock(); return r; +} + +#endif + static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) { HANDLE_PANICKED_DB(db); if (flags != 0) @@ -1379,12 +1496,12 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) { if (result == 0) return ENOMEM; memset(result, 0, sizeof *result); - result->c_get = toku_c_get; - result->c_pget = toku_c_pget; - result->c_put = toku_c_put; - result->c_close = toku_c_close; - result->c_del = toku_c_del; - result->c_count = toku_c_count; + result->c_get = locked_c_get; + result->c_pget = locked_c_pget; + result->c_put = locked_c_put; + result->c_close = locked_c_close; + result->c_del = locked_c_del; + result->c_count = locked_c_count; MALLOC(result->i); assert(result->i); result->dbp = db; @@ -1395,7 +1512,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) { return 0; } -static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { +static int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) { HANDLE_PANICKED_DB(db); int r; @@ -1426,7 +1543,7 @@ static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { if (r!=0) goto cleanup; while (r==0) { - r = dbc->c_del(dbc, 0); + r = toku_c_del(dbc, 0); if (r==0) found = TRUE; if (r!=0 && r!=DB_KEYEMPTY) goto cleanup; r = toku_c_get_noassociate(dbc, key, &data, DB_NEXT_DUP); @@ -1437,20 +1554,20 @@ static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { } } cleanup: - r2 = dbc->c_close(dbc); + r2 = toku_c_close(dbc); if (r != 0) return r; return r2; } if (db->i->primary == 0) { pdb = db; - r = db->get(db, txn, key, &data, 0); + r = toku_db_get(db, txn, key, &data, 0); pdb_key = key; } else { memset(&pkey, 0, sizeof(pkey)); pdb = db->i->primary; - r = db->pget(db, txn, key, &pkey, &data, 0); + r = toku_db_pget(db, txn, key, &pkey, &data, 0); pdb_key = &pkey; } if (r != 0) return r; @@ -1500,21 +1617,23 @@ static int toku_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_ assert(db->i->brt != db->i->primary->i->brt); // Make sure they realy are different trees. assert(db!=db->i->primary); - r = db->cursor(db, txn, &dbc, 0); + r = toku_db_cursor(db, txn, &dbc, 0); if (r!=0) return r; - r = dbc->c_pget(dbc, key, pkey, data, DB_SET); + r = toku_c_pget(dbc, key, pkey, data, DB_SET); if (r==DB_KEYEMPTY) r = DB_NOTFOUND; - r2 = dbc->c_close(dbc); + r2 = toku_c_close(dbc); if (r!=0) return r; return r2; } +#if 0 static int toku_db_key_range(DB * db, DB_TXN * txn, DBT * dbt, DB_KEY_RANGE * kr, u_int32_t flags) { HANDLE_PANICKED_DB(db); txn=txn; dbt=dbt; kr=kr; flags=flags; barf(); abort(); } +#endif static int construct_full_name_in_buf(const char *dir, const char *fname, char* full, int length) { int l; @@ -1760,17 +1879,17 @@ static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int3 //TODO: Verify the target db is not open //TODO: Use master database (instead of manual edit) when implemented. - if ((r = db->open(db, NULL, fname, dbname, DB_BTREE, 0, 0777)) != 0) goto cleanup; + if ((r = toku_db_open(db, NULL, fname, dbname, DB_BTREE, 0, 0777)) != 0) goto cleanup; r = toku_brt_remove_subdb(db->i->brt, dbname, flags); cleanup: - r2 = db->close(db, 0); + r2 = toku_db_close(db, 0); return r ? r : r2; } //TODO: Verify db file not in use. (all dbs in the file must be unused) r = find_db_file(db->dbenv, fname, &full_name); if (r!=0) return r; assert(full_name); - r2 = db->close(db, 0); + r2 = toku_db_close(db, 0); if (r == 0 && r2 == 0) { if (unlink(full_name) != 0) r = errno; } @@ -1803,11 +1922,7 @@ static int toku_db_set_dup_compare(DB *db, int (*dup_compare)(DB *, const DBT *, return r; } -static void toku_db_set_errfile (DB*db, FILE *errfile) { - db->dbenv->set_errfile(db->dbenv, errfile); -} - -static int toku_db_set_flags(DB * db, u_int32_t flags) { +static int toku_db_set_flags(DB *db, u_int32_t flags) { HANDLE_PANICKED_DB(db); /* the following matches BDB */ @@ -1853,12 +1968,14 @@ static int toku_db_set_pagesize(DB *db, u_int32_t pagesize) { return r; } +#if 0 static int toku_db_stat(DB * db, void *v, u_int32_t flags) { HANDLE_PANICKED_DB(db); v=v; flags=flags; barf(); abort(); } +#endif static int toku_db_fd(DB *db, int *fdp) { HANDLE_PANICKED_DB(db); @@ -1866,7 +1983,80 @@ static int toku_db_fd(DB *db, int *fdp) { return toku_brt_get_fd(db->i->brt, fdp); } -int db_create(DB ** db, DB_ENV * env, u_int32_t flags) { +#if _THREAD_SAFE + +static int locked_db_associate (DB *primary, DB_TXN *txn, DB *secondary, + int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result), u_int32_t flags) { + ydb_lock(); int r = toku_db_associate(primary, txn, secondary, callback, flags); ydb_unlock(); return r; +} + +static int locked_db_close(DB * db, u_int32_t flags) { + ydb_lock(); int r = toku_db_close(db, flags); ydb_unlock(); return r; +} + +static int locked_db_cursor(DB *db, DB_TXN *txn, DBC **c, u_int32_t flags) { + ydb_lock(); int r = toku_db_cursor(db, txn, c, flags); ydb_unlock(); return r; +} + +static int locked_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { + ydb_lock(); int r = toku_db_del(db, txn, key, flags); ydb_unlock(); return r; +} + +static int locked_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { + ydb_lock(); int r = toku_db_get(db, txn, key, data, flags); ydb_unlock(); return r; +} + +static int locked_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_int32_t flags) { + ydb_lock(); int r = toku_db_pget(db, txn, key, pkey, data, flags); ydb_unlock(); return r; +} + +static int locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) { + ydb_lock(); int r = toku_db_open(db, txn, fname, dbname, dbtype, flags, mode); ydb_unlock(); return r; +} + +static int locked_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { + ydb_lock(); int r = toku_db_put(db, txn, key, data, flags); ydb_unlock(); return r; +} + +static int locked_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) { + ydb_lock(); int r = toku_db_remove(db, fname, dbname, flags); ydb_unlock(); return r; +} + +static int locked_db_rename(DB * db, const char *namea, const char *nameb, const char *namec, u_int32_t flags) { + ydb_lock(); int r = toku_db_rename(db, namea, nameb, namec, flags); ydb_unlock(); return r; +} + +static int locked_db_set_bt_compare(DB * db, int (*bt_compare) (DB *, const DBT *, const DBT *)) { + ydb_lock(); int r = toku_db_set_bt_compare(db, bt_compare); ydb_unlock(); return r; +} + +static int locked_db_set_dup_compare(DB * db, int (*dup_compare) (DB *, const DBT *, const DBT *)) { + ydb_lock(); int r = toku_db_set_dup_compare(db, dup_compare); ydb_unlock(); return r; +} + +static void locked_db_set_errfile (DB *db, FILE *errfile) { + db->dbenv->set_errfile(db->dbenv, errfile); +} + +static int locked_db_set_flags(DB *db, u_int32_t flags) { + ydb_lock(); int r = toku_db_set_flags(db, flags); ydb_unlock(); return r; +} + +static int locked_db_get_flags(DB *db, u_int32_t *flags) { + ydb_lock(); int r = toku_db_get_flags(db, flags); ydb_unlock(); return r; +} + +static int locked_db_set_pagesize(DB *db, u_int32_t pagesize) { + ydb_lock(); int r = toku_db_set_pagesize(db, pagesize); ydb_unlock(); return r; +} + +static int locked_db_fd(DB *db, int *fdp) { + ydb_lock(); int r = toku_db_fd(db, fdp); ydb_unlock(); return r; +} + +#endif + +static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) { int r; if (flags) return EINVAL; @@ -1874,51 +2064,51 @@ int db_create(DB ** db, DB_ENV * env, u_int32_t flags) { /* if the env already exists then add a ref to it otherwise create one */ if (env) { - if (!db_env_opened(env)) + if (!env_opened(env)) return EINVAL; - db_env_add_ref(env); + env_add_ref(env); } else { - r = db_env_create(&env, 0); + r = toku_env_create(&env, 0); if (r != 0) return r; - r = env->open(env, ".", DB_PRIVATE + DB_INIT_MPOOL, 0); + r = toku_env_open(env, ".", DB_PRIVATE + DB_INIT_MPOOL, 0); if (r != 0) { - env->close(env, 0); + toku_env_close(env, 0); return r; } - assert(db_env_opened(env)); + assert(env_opened(env)); } DB *MALLOC(result); if (result == 0) { - db_env_unref(env); + env_unref(env); return ENOMEM; } memset(result, 0, sizeof *result); result->dbenv = env; - result->associate = toku_db_associate; - result->close = toku_db_close; - result->cursor = toku_db_cursor; - result->del = toku_db_del; - result->get = toku_db_get; - result->key_range = toku_db_key_range; - result->open = toku_db_open; - result->pget = toku_db_pget; - result->put = toku_db_put; - result->remove = toku_db_remove; - result->rename = toku_db_rename; - result->set_bt_compare = toku_db_set_bt_compare; - result->set_dup_compare = toku_db_set_dup_compare; - result->set_errfile = toku_db_set_errfile; - result->set_pagesize = toku_db_set_pagesize; - result->set_flags = toku_db_set_flags; - result->get_flags = toku_db_get_flags; - result->stat = toku_db_stat; - result->fd = toku_db_fd; + result->associate = locked_db_associate; + result->close = locked_db_close; + result->cursor = locked_db_cursor; + result->del = locked_db_del; + result->get = locked_db_get; + // result->key_range = locked_db_key_range; + result->open = locked_db_open; + result->pget = locked_db_pget; + result->put = locked_db_put; + result->remove = locked_db_remove; + result->rename = locked_db_rename; + result->set_bt_compare = locked_db_set_bt_compare; + result->set_dup_compare = locked_db_set_dup_compare; + result->set_errfile = locked_db_set_errfile; + result->set_pagesize = locked_db_set_pagesize; + result->set_flags = locked_db_set_flags; + result->get_flags = locked_db_get_flags; + // result->stat = locked_db_stat; + result->fd = locked_db_fd; MALLOC(result->i); if (result->i == 0) { toku_free(result); - db_env_unref(env); + env_unref(env); return ENOMEM; } memset(result->i, 0, sizeof *result->i); @@ -1938,7 +2128,7 @@ int db_create(DB ** db, DB_ENV * env, u_int32_t flags) { if (r != 0) { toku_free(result->i); toku_free(result); - db_env_unref(env); + env_unref(env); return ENOMEM; } ydb_add_ref(); @@ -1946,6 +2136,12 @@ int db_create(DB ** db, DB_ENV * env, u_int32_t flags) { return 0; } +int db_create(DB ** db, DB_ENV * env, u_int32_t flags) { + ydb_lock(); int r = toku_db_create(db, env, flags); ydb_unlock(); return r; +} + +/* need db_strerror_r for multiple threads */ + char *db_strerror(int error) { char *errorstr; if (error >= 0) { @@ -1958,7 +2154,7 @@ char *db_strerror(int error) { return "Database Bad Format (probably a corrupted database)"; } - static char unknown_result[100]; // Race condition if two threads call this at the same time. However even in a bad case, it should be some sort of nul-terminated string. + static char unknown_result[100]; // Race condition if two threads call this at the same time. However even in a bad case, it should be some sort of null-terminated string. errorstr = unknown_result; snprintf(errorstr, sizeof unknown_result, "Unknown error code: %d", error); return errorstr;