Closes #2155 closes[t:2155] Forward recovery split

* Before 'checkpoint begin'
  * fcreate
   * Will not create (treated as fopen)
   * Will not delete file
   * Will force rollback entry of fcreate (if file exists)
 * After checkpoint begin
  * fcreate
   * Will create
   * Will first delete file
   * Will force rollback entry of fcreate (if file exists)



git-svn-id: file:///svn/toku/tokudb@15844 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Yoni Fogel 2013-04-16 23:58:52 -04:00
parent 7d6f6fef69
commit 6b7c5bdfff

View file

@ -17,7 +17,7 @@ int toku_recover_trace = 0;
#endif
struct backward_scan_state {
enum backward_state { BS_INIT, BS_SAW_CKPT_END, BS_SAW_CKPT } bs;
enum backward_state { BS_INIT, BS_SAW_CKPT_END, BS_SAW_CKPT, BS_FORWARD_SAW_CKPT_BEGIN } bs;
LSN checkpoint_lsn;
int n_live_txns;
TXNID min_live_txn;
@ -273,7 +273,7 @@ abort_on_upgrade(DB* UU(pdb),
// Open the file if it is not already open. If it is already open, then do nothing.
static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags, int mode, char *fixedfname, FILENUM filenum, u_int32_t treeflags, u_int32_t descriptor_version, BYTESTRING* descriptor) {
static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags, int mode, char *fixedfname, FILENUM filenum, u_int32_t treeflags, u_int32_t descriptor_version, BYTESTRING* descriptor, int recovery_force_fcreate) {
int r;
// already open
@ -312,13 +312,13 @@ static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags,
//Create fake DB for comparison functions.
DB *XCALLOC(fake_db);
if (flags&O_CREAT && descriptor_version > 0) {
if (descriptor_version > 0) {
DBT descriptor_dbt;
toku_fill_dbt(&descriptor_dbt, descriptor->data, descriptor->len);
r = toku_brt_set_descriptor(brt, descriptor_version, &descriptor_dbt, abort_on_upgrade);
if (r!=0) goto close_brt;
}
r = toku_brt_open_recovery(brt, fixedfname, fixedfname, (flags & O_CREAT) != 0, FALSE, renv->ct, NULL, fake_db, (flags&O_CREAT));
r = toku_brt_open_recovery(brt, fixedfname, fixedfname, (flags & O_CREAT) != 0, FALSE, renv->ct, NULL, fake_db, recovery_force_fcreate);
if (r != 0) {
close_brt:;
//Note: If brt_open fails, then close_brt will NOT write a header to disk.
@ -337,7 +337,7 @@ close_brt:;
static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&l->iname);
return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL);
return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0);
}
static int
@ -370,7 +370,17 @@ static int toku_recover_backward_fopen (struct logtype_fopen *l, RECOVER_ENV ren
static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&l->iname);
create_dir_from_file(fixedfname);
return internal_toku_recover_fopen_or_fcreate(renv, O_CREAT|O_TRUNC, l->mode, fixedfname, l->filenum, l->treeflags, l->descriptor_version, &l->descriptor);
struct backward_scan_state *bs = &renv->bs;
int flags;
if (bs->bs == BS_SAW_CKPT) {
//Treat as fopen
flags = 0;
}
else {
assert(bs->bs == BS_FORWARD_SAW_CKPT_BEGIN);
flags = O_CREAT|O_TRUNC;
}
return internal_toku_recover_fopen_or_fcreate(renv, flags, l->mode, fixedfname, l->filenum, l->treeflags, l->descriptor_version, &l->descriptor, 1);
}
static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(renv)) {
@ -527,12 +537,27 @@ static int toku_recover_backward_fclose (struct logtype_fclose *l, RECOVER_ENV r
if (renv->bs.bs == BS_SAW_CKPT) {
// tree open
char *fixedfname = fixup_fname(&l->iname);
internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL);
internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0);
}
return 0;
}
static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *UU(l), RECOVER_ENV UU(renv)) {
struct backward_scan_state *bs = &renv->bs;
switch (bs->bs) {
case BS_SAW_CKPT:
assert(l->lsn.lsn <= bs->checkpoint_lsn.lsn);
if (l->lsn.lsn == bs->checkpoint_lsn.lsn)
bs->bs = BS_FORWARD_SAW_CKPT_BEGIN;
return 0; // ignore it
case BS_FORWARD_SAW_CKPT_BEGIN:
assert(l->lsn.lsn > bs->checkpoint_lsn.lsn);
return 0; // ignore it
default:
break;
}
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)bs->bs);
abort();
// nothing
return 0;
}
@ -553,6 +578,8 @@ static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoi
return 0;
case BS_SAW_CKPT:
return 0; // ignore it
default:
break;
}
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)bs->bs);
abort();
@ -575,6 +602,8 @@ static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *
abort();
case BS_SAW_CKPT:
return 0;
default:
break;
}
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)bs->bs);
abort();
@ -587,7 +616,7 @@ static int toku_recover_fassociate (struct logtype_fassociate *UU(l), RECOVER_EN
static int toku_recover_backward_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&l->iname);
return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL);
return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0);
}
static int toku_recover_xstillopen (struct logtype_xstillopen *UU(l), RECOVER_ENV UU(renv)) {
@ -609,6 +638,8 @@ static int toku_recover_backward_xstillopen (struct logtype_xstillopen *l, RECOV
return 0;
case BS_SAW_CKPT:
return 0; // ignore live txns from older checkpoints
default:
break;
}
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)bs->bs);
abort();
@ -651,6 +682,8 @@ static int toku_recover_backward_xbegin (struct logtype_xbegin *l, RECOVER_ENV r
fprintf(stderr, "Scanning back at xbegin %" PRIu64 " (looking for %" PRIu64 ")\n", l->lsn.lsn, bs->min_live_txn);
return 0;
}
default:
break;
}
fprintf(stderr, "%s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)bs->bs);
abort();