panic brt's if recovery fails close[t:2035]

git-svn-id: file:///svn/toku/tokudb@14783 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Rich Prohaska 2013-04-16 23:58:03 -04:00 committed by Yoni Fogel
parent a39e28d36b
commit 46ae126764
4 changed files with 167 additions and 7 deletions

View file

@ -3249,9 +3249,11 @@ toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error
assert(list_empty(&h->live_brts));
assert(list_empty(&h->zombie_brts));
toku_brtheader_unlock(h);
LSN lsn = ZERO_LSN;
int r = 0;
if (h->fname) { //Otherwise header has never fully been created.
if (h->panic) {
r = h->panic;
} else if (h->fname) { //Otherwise header has never fully been created.
LSN lsn = ZERO_LSN;
//Get LSN
if (oplsn_valid) {
//Use recovery-specified lsn
@ -4991,6 +4993,20 @@ LSN toku_brt_checkpoint_lsn(BRT brt) {
return brt->h->checkpoint_lsn;
}
static int toku_brt_header_set_panic(struct brt_header *h, int panic, char *panic_string) {
if (h->panic == 0) {
h->panic = panic;
if (h->panic_string)
toku_free(h->panic_string);
h->panic_string = toku_strdup(panic_string);
}
return 0;
}
int toku_brt_set_panic(BRT brt, int panic, char *panic_string) {
return toku_brt_header_set_panic(brt->h, panic, panic_string);
}
//Wrapper functions for upgrading from version 10.
#include "backwards_10.h"
void

View file

@ -84,6 +84,8 @@ int toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t),
int toku_close_brt (BRT, TOKULOGGER, char **error_string);
int toku_close_brt_lsn (BRT brt, TOKULOGGER logger, char **error_string, BOOL oplsn_valid, LSN oplsn);
int toku_brt_set_panic(BRT brt, int panic, char *panic_string);
int toku_dump_brt (FILE *,BRT brt);
void brt_fsync (BRT); /* fsync, but don't clear the caches. */

View file

@ -76,10 +76,21 @@ static void file_map_close_dictionaries(struct file_map *fmap, BOOL recovery_suc
assert(r == 0);
struct file_map_tuple *tuple = v;
assert(tuple->brt);
assert(recovery_succeeded);
if (!recovery_succeeded) {
// don't update the brt on close
toku_brt_set_panic(tuple->brt, DB_RUNRECOVERY, "recovery failed");
}
//Logging is already back on. No need to pass LSN into close.
r = toku_close_brt(tuple->brt, 0, 0);
char *error_string = NULL;
r = toku_close_brt(tuple->brt, NULL, &error_string);
if (!recovery_succeeded) {
printf("%s:%d %d %s\n", __FUNCTION__, __LINE__, r, error_string);
assert(r != 0);
} else
assert(r == 0);
if (error_string)
toku_free(error_string);
file_map_tuple_destroy(tuple);
toku_free(tuple);
}

View file

@ -0,0 +1,131 @@
// verify that DB_RUNRECOVERY is returned when there is a missing db file
#include <sys/stat.h>
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN;
#define NAMEA "a.db"
const char *namea=NAMEA;
#define NAMEB "b.db"
const char *nameb=NAMEB;
// needed to get .bdb versions to compile
#ifndef DB_CLOSE_DONT_TRIM_LOG
#define DB_CLOSE_DONT_TRIM_LOG 0
#endif
static void run_test (void) {
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
DB *dba;
r = db_create(&dba, env, 0); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dba->close(dba, 0); CKERR(r);
DB *dbb;
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->close(dbb, 0); CKERR(r);
r = env->txn_checkpoint(env, 0, 0, 0); CKERR(r);
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = db_create(&dba, env, 0); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r);
r = env->txn_checkpoint(env, 0, 0, 0); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
abort();
}
static void run_recover (void) {
DB_ENV *env;
int r;
r = rename(ENVDIR "/" NAMEB, ENVDIR "/" NAMEB ".save" ); printf("r=%d error=%d\n", r, errno); CKERR(r);
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == DB_RUNRECOVERY);
r = rename(ENVDIR "/" NAMEB ".save" , ENVDIR "/" NAMEB); printf("r=%d error=%d\n", r, errno); CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
exit(0);
}
static void run_no_recover (void) {
DB_ENV *env;
int r;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags & ~DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
exit(0);
}
const char *cmd;
BOOL do_test=FALSE, do_recover=FALSE, do_recover_only=FALSE, do_no_recover = FALSE;
static void test_parse_args (int argc, char *argv[]) {
int resultcode;
cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v") == 0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "--test")==0) {
do_test=TRUE;
} else if (strcmp(argv[0], "--recover") == 0) {
do_recover=TRUE;
} else if (strcmp(argv[0], "--recover-only") == 0) {
do_recover_only=TRUE;
} else if (strcmp(argv[0], "--no-recover") == 0) {
do_no_recover=TRUE;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q]* [-h] {--test | --recover } \n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
int test_main (int argc, char *argv[]) {
test_parse_args(argc, argv);
if (do_test) {
run_test();
} else if (do_recover) {
run_recover();
} else if (do_recover_only) {
run_recover();
} else if (do_no_recover) {
run_no_recover();
}
return 0;
}