mirror of
https://github.com/MariaDB/server.git
synced 2025-01-22 06:44:16 +01:00
filter some operations during recovery close[t:1993]
git-svn-id: file:///svn/toku/tokudb@14421 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
parent
d081460bcf
commit
8e214a919d
10 changed files with 189 additions and 148 deletions
38
newbrt/brt.c
38
newbrt/brt.c
|
@ -2593,10 +2593,13 @@ toku_brt_broadcast_commit_all (BRT brt)
|
|||
return r;
|
||||
}
|
||||
|
||||
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn)
|
||||
// Effect: Insert the key-val pair into brt.
|
||||
{
|
||||
int r;
|
||||
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
|
||||
return toku_brt_maybe_insert(brt, key, val, txn, ZERO_LSN);
|
||||
}
|
||||
|
||||
int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN oplsn) {
|
||||
int r = 0;
|
||||
XIDS message_xids;
|
||||
TXNID xid = toku_txn_get_txnid(txn);
|
||||
if (txn && (brt->h->txnid_that_created_or_locked_when_empty != xid)) {
|
||||
|
@ -2626,13 +2629,21 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn)
|
|||
if (r!=0) return r;
|
||||
}
|
||||
|
||||
LSN treelsn = toku_brt_checkpoint_lsn(brt);
|
||||
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn) {
|
||||
r = 0;
|
||||
} else {
|
||||
BRT_MSG_S brtcmd = { BRT_INSERT, message_xids, .u.id={key,val}};
|
||||
r = toku_brt_root_put_cmd(brt, &brtcmd);
|
||||
if (r!=0) return r;
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
|
||||
return toku_brt_maybe_delete(brt, key, txn, ZERO_LSN);
|
||||
}
|
||||
|
||||
int toku_brt_maybe_delete(BRT brt, DBT *key, TOKUTXN txn, LSN oplsn) {
|
||||
int r;
|
||||
XIDS message_xids;
|
||||
TXNID xid = toku_txn_get_txnid(txn);
|
||||
|
@ -2655,9 +2666,15 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
|
|||
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs);
|
||||
if (r!=0) return r;
|
||||
}
|
||||
|
||||
LSN treelsn = toku_brt_checkpoint_lsn(brt);
|
||||
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn) {
|
||||
r = 0;
|
||||
} else {
|
||||
DBT val;
|
||||
BRT_MSG_S brtcmd = { BRT_DELETE_ANY, message_xids, .u.id={key, toku_init_dbt(&val)}};
|
||||
r = toku_brt_root_put_cmd(brt, &brtcmd);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
|
@ -4504,6 +4521,10 @@ toku_brt_lookup (BRT brt, DBT *k, DBT *v, BRT_GET_CALLBACK_FUNCTION getf, void *
|
|||
/* ********************************* delete **************************************/
|
||||
|
||||
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
|
||||
return toku_brt_maybe_delete_both(brt, key, val, txn, ZERO_LSN);
|
||||
}
|
||||
|
||||
int toku_brt_maybe_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN oplsn) {
|
||||
//{ unsigned i; printf("del %p keylen=%d key={", brt->db, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", val->size); for(i=0; i<val->size; i++) printf("%d,", ((char*)val->data)[i]); printf("}\n"); }
|
||||
int r;
|
||||
XIDS message_xids;
|
||||
|
@ -4530,8 +4551,13 @@ int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
|
|||
if (r!=0) return r;
|
||||
}
|
||||
|
||||
LSN treelsn = toku_brt_checkpoint_lsn(brt);
|
||||
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn) {
|
||||
r = 0;
|
||||
} else {
|
||||
BRT_MSG_S brtcmd = { BRT_DELETE_BOTH, message_xids, .u.id={key,val}};
|
||||
r = toku_brt_root_put_cmd(brt, &brtcmd);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
|
@ -4880,6 +4906,10 @@ toku_brt_note_table_lock (BRT brt, TOKUTXN txn)
|
|||
return 0;
|
||||
}
|
||||
|
||||
LSN toku_brt_checkpoint_lsn(BRT brt) {
|
||||
return brt->h->checkpoint_lsn;
|
||||
}
|
||||
|
||||
//Wrapper functions for upgrading from version 10.
|
||||
#include "backwards_10.h"
|
||||
void
|
||||
|
|
31
newbrt/brt.h
31
newbrt/brt.h
|
@ -51,10 +51,33 @@ int toku_brt_open(BRT, const char *fname, const char *fname_in_env, int is_creat
|
|||
int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags);
|
||||
|
||||
int toku_brt_broadcast_commit_all (BRT brt);
|
||||
int toku_brt_insert (BRT, DBT *, DBT *, TOKUTXN);
|
||||
int toku_brt_lookup (BRT brt, DBT *k, DBT *v, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v);
|
||||
int toku_brt_delete (BRT brt, DBT *k, TOKUTXN);
|
||||
int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN); // Delete a pair only if both k and v are equal according to the comparison function.
|
||||
|
||||
// Effect: Insert a key and data pair into a brt
|
||||
// Returns 0 if successfull
|
||||
int toku_brt_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
|
||||
|
||||
// Effect: Insert a key and data pair into a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
|
||||
// Returns 0 if successfull
|
||||
int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, LSN oplsn);
|
||||
|
||||
// Effect: Delete a key from a brt
|
||||
// Returns 0 if successfull
|
||||
int toku_brt_delete (BRT brt, DBT *k, TOKUTXN txn);
|
||||
|
||||
// Effect: Delete a key from a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
|
||||
// Returns 0 if successfull
|
||||
int toku_brt_maybe_delete (BRT brt, DBT *k, TOKUTXN txn, LSN oplsn);
|
||||
|
||||
// Effect: Delete a pair only if both k and v are equal according to the comparison function.
|
||||
// Returns 0 if successfull
|
||||
int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
|
||||
|
||||
// Effect: Delete a pair only if both k and v are equal according to the comparison function and the
|
||||
// oplsn is newer than the brt lsn. This function is called by recovery.
|
||||
// Returns 0 if successfull
|
||||
int toku_brt_maybe_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN txn, LSN oplsn);
|
||||
|
||||
int toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t), u_int32_t close_flags);
|
||||
int toku_close_brt (BRT, TOKULOGGER, char **error_string);
|
||||
|
||||
|
@ -76,6 +99,8 @@ int toku_brt_truncate (BRT brt);
|
|||
// effect: remove everything from the tree
|
||||
// returns: 0 if success
|
||||
|
||||
LSN toku_brt_checkpoint_lsn(BRT brt);
|
||||
|
||||
// create and initialize a cache table
|
||||
// cachesize is the upper limit on the size of the size of the values in the table
|
||||
// pass 0 if you want the default
|
||||
|
|
|
@ -217,10 +217,10 @@ generate_log_struct (void) {
|
|||
fprintf(hf, "};\n");
|
||||
fprintf(hf, "int toku_rollback_%s (", lt->name);
|
||||
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
|
||||
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v);\n");
|
||||
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v, LSN oplsn);\n");
|
||||
fprintf(hf, "int toku_commit_%s (", lt->name);
|
||||
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
|
||||
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v);\n");
|
||||
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v, LSN oplsn);\n");
|
||||
});
|
||||
fprintf(hf, "struct log_entry {\n");
|
||||
fprintf(hf, " enum lt_cmd cmd;\n");
|
||||
|
|
|
@ -143,7 +143,7 @@ static void recover_yield(voidfp UU(f), void *UU(extra)) {
|
|||
// nothing
|
||||
}
|
||||
|
||||
static void toku_recover_commit (LSN UU(lsn), TXNID xid, RECOVER_ENV env) {
|
||||
static void toku_recover_commit (LSN lsn, TXNID xid, RECOVER_ENV env) {
|
||||
int r;
|
||||
|
||||
// find the transaction by transaction id
|
||||
|
@ -152,7 +152,7 @@ static void toku_recover_commit (LSN UU(lsn), TXNID xid, RECOVER_ENV env) {
|
|||
assert(r == 0);
|
||||
|
||||
// commit the transaction
|
||||
r = toku_txn_commit_txn(txn, TRUE, recover_yield, NULL);
|
||||
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, lsn);
|
||||
assert(r == 0);
|
||||
|
||||
// close the transaction
|
||||
|
@ -164,7 +164,7 @@ static int toku_recover_backward_commit (struct logtype_commit *UU(l), RECOVER_E
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void toku_recover_xabort (LSN UU(lsn), TXNID xid, RECOVER_ENV env) {
|
||||
static void toku_recover_xabort (LSN lsn, TXNID xid, RECOVER_ENV env) {
|
||||
int r;
|
||||
|
||||
// find the transaction by transaction id
|
||||
|
@ -173,7 +173,7 @@ static void toku_recover_xabort (LSN UU(lsn), TXNID xid, RECOVER_ENV env) {
|
|||
assert(r == 0);
|
||||
|
||||
// abort the transaction
|
||||
r = toku_txn_abort_txn(txn, recover_yield, NULL);
|
||||
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, lsn);
|
||||
assert(r == 0);
|
||||
|
||||
// close the transaction
|
||||
|
@ -285,21 +285,21 @@ static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
|
||||
static void toku_recover_enq_insert (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
|
||||
struct cf_pair *pair = NULL;
|
||||
int r = find_cachefile(&env->fmap, filenum, &pair);
|
||||
if (r!=0) {
|
||||
// if we didn't find a cachefile, then we don't have to do anything.
|
||||
return;
|
||||
}
|
||||
// TODO compare file LSN with this XID
|
||||
|
||||
TOKUTXN txn;
|
||||
r = toku_txnid2txn(env->logger, xid, &txn);
|
||||
assert(r == 0);
|
||||
DBT keydbt, valdbt;
|
||||
toku_fill_dbt(&keydbt, key.data, key.len);
|
||||
toku_fill_dbt(&valdbt, val.data, val.len);
|
||||
r = toku_brt_insert(pair->brt, &keydbt, &valdbt, txn);
|
||||
r = toku_brt_maybe_insert(pair->brt, &keydbt, &valdbt, txn, lsn);
|
||||
assert(r == 0);
|
||||
}
|
||||
|
||||
|
@ -308,21 +308,21 @@ static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), R
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
|
||||
static void toku_recover_enq_delete_both (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
|
||||
struct cf_pair *pair = NULL;
|
||||
int r = find_cachefile(&env->fmap, filenum, &pair);
|
||||
if (r!=0) {
|
||||
// if we didn't find a cachefile, then we don't have to do anything.
|
||||
return;
|
||||
}
|
||||
// TODO compare file LSN with this XID
|
||||
|
||||
TOKUTXN txn;
|
||||
r = toku_txnid2txn(env->logger, xid, &txn);
|
||||
assert(r == 0);
|
||||
DBT keydbt, valdbt;
|
||||
toku_fill_dbt(&keydbt, key.data, key.len);
|
||||
toku_fill_dbt(&valdbt, val.data, val.len);
|
||||
r = toku_brt_delete_both(pair->brt, &keydbt, &valdbt, txn);
|
||||
r = toku_brt_maybe_delete_both(pair->brt, &keydbt, &valdbt, txn, lsn);
|
||||
assert(r == 0);
|
||||
}
|
||||
|
||||
|
@ -331,20 +331,20 @@ static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, RECOVER_ENV env) {
|
||||
static void toku_recover_enq_delete_any (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, RECOVER_ENV env) {
|
||||
struct cf_pair *pair = NULL;
|
||||
int r = find_cachefile(&env->fmap, filenum, &pair);
|
||||
if (r!=0) {
|
||||
// if we didn't find a cachefile, then we don't have to do anything.
|
||||
return;
|
||||
}
|
||||
// TODO compare file LSN with this XID
|
||||
|
||||
TOKUTXN txn;
|
||||
r = toku_txnid2txn(env->logger, xid, &txn);
|
||||
assert(r == 0);
|
||||
DBT keydbt;
|
||||
toku_fill_dbt(&keydbt, key.data, key.len);
|
||||
r = toku_brt_delete(pair->brt, &keydbt, txn);
|
||||
r = toku_brt_maybe_delete(pair->brt, &keydbt, txn, lsn);
|
||||
assert(r == 0);
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,8 @@ toku_commit_fcreate (TXNID UU(xid),
|
|||
BYTESTRING UU(bs_fname),
|
||||
TOKUTXN UU(txn),
|
||||
YIELDF UU(yield),
|
||||
void *UU(yield_v))
|
||||
void *UU(yield_v),
|
||||
LSN UU(oplsn))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
@ -27,7 +28,8 @@ toku_rollback_fcreate (TXNID UU(xid),
|
|||
BYTESTRING bs_fname,
|
||||
TOKUTXN txn,
|
||||
YIELDF yield,
|
||||
void* yield_v)
|
||||
void* yield_v,
|
||||
LSN UU(oplsn))
|
||||
{
|
||||
yield(toku_checkpoint_safe_client_lock, yield_v);
|
||||
char *fname = fixup_fname(&bs_fname);
|
||||
|
@ -59,12 +61,21 @@ static int find_brt_from_filenum (OMTVALUE v, void *filenumvp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int do_insertion (enum brt_msg_type type, FILENUM filenum, BYTESTRING key, BYTESTRING *data,TOKUTXN txn) {
|
||||
static int do_insertion (enum brt_msg_type type, FILENUM filenum, BYTESTRING key, BYTESTRING *data, TOKUTXN txn, LSN oplsn) {
|
||||
CACHEFILE cf;
|
||||
//printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data);
|
||||
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
|
||||
assert(r==0);
|
||||
|
||||
OMTVALUE brtv=NULL;
|
||||
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL);
|
||||
assert(r==0);
|
||||
BRT brt = brtv;
|
||||
|
||||
LSN treelsn = toku_brt_checkpoint_lsn(brt);
|
||||
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn)
|
||||
return 0;
|
||||
|
||||
DBT key_dbt,data_dbt;
|
||||
XIDS xids = toku_txn_get_xids(txn);
|
||||
BRT_MSG_S brtcmd = { type, xids,
|
||||
|
@ -72,11 +83,7 @@ static int do_insertion (enum brt_msg_type type, FILENUM filenum, BYTESTRING key
|
|||
data
|
||||
? toku_fill_dbt(&data_dbt, data->data, data->len)
|
||||
: toku_init_dbt(&data_dbt) }};
|
||||
OMTVALUE brtv=NULL;
|
||||
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL);
|
||||
|
||||
assert(r==0);
|
||||
BRT brt = brtv;
|
||||
r = toku_brt_root_put_cmd(brt, &brtcmd);
|
||||
return r;
|
||||
}
|
||||
|
@ -90,11 +97,11 @@ static int do_nothing_with_filenum(TOKUTXN txn, FILENUM filenum) {
|
|||
}
|
||||
|
||||
|
||||
int toku_commit_cmdinsert (FILENUM filenum, BYTESTRING key, TOKUTXN txn, YIELDF UU(yield), void *UU(yieldv)) {
|
||||
int toku_commit_cmdinsert (FILENUM filenum, BYTESTRING key, TOKUTXN txn, YIELDF UU(yield), void *UU(yieldv), LSN oplsn) {
|
||||
#if TOKU_DO_COMMIT_CMD_INSERT
|
||||
return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn);
|
||||
return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn, oplsn);
|
||||
#else
|
||||
key = key;
|
||||
key = key; oplsn = oplsn;
|
||||
return do_nothing_with_filenum(txn, filenum);
|
||||
#endif
|
||||
}
|
||||
|
@ -105,12 +112,13 @@ toku_commit_cmdinsertboth (FILENUM filenum,
|
|||
BYTESTRING data,
|
||||
TOKUTXN txn,
|
||||
YIELDF UU(yield),
|
||||
void * UU(yieldv))
|
||||
void * UU(yieldv),
|
||||
LSN oplsn)
|
||||
{
|
||||
#if TOKU_DO_COMMIT_CMD_INSERT
|
||||
return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn);
|
||||
return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn, oplsn);
|
||||
#else
|
||||
key = key; data = data;
|
||||
key = key; data = data; oplsn = oplsn;
|
||||
return do_nothing_with_filenum(txn, filenum);
|
||||
#endif
|
||||
}
|
||||
|
@ -120,9 +128,10 @@ toku_rollback_cmdinsert (FILENUM filenum,
|
|||
BYTESTRING key,
|
||||
TOKUTXN txn,
|
||||
YIELDF UU(yield),
|
||||
void * UU(yieldv))
|
||||
void * UU(yieldv),
|
||||
LSN oplsn)
|
||||
{
|
||||
return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn);
|
||||
return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn, oplsn);
|
||||
}
|
||||
|
||||
int
|
||||
|
@ -131,9 +140,10 @@ toku_rollback_cmdinsertboth (FILENUM filenum,
|
|||
BYTESTRING data,
|
||||
TOKUTXN txn,
|
||||
YIELDF UU(yield),
|
||||
void * UU(yieldv))
|
||||
void * UU(yieldv),
|
||||
LSN oplsn)
|
||||
{
|
||||
return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn);
|
||||
return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn, oplsn);
|
||||
}
|
||||
|
||||
int
|
||||
|
@ -142,10 +152,11 @@ toku_commit_cmddeleteboth (FILENUM filenum,
|
|||
BYTESTRING data,
|
||||
TOKUTXN txn,
|
||||
YIELDF UU(yield),
|
||||
void * UU(yieldv))
|
||||
void * UU(yieldv),
|
||||
LSN oplsn)
|
||||
{
|
||||
#if TOKU_DO_COMMIT_CMD_DELETE_BOTH
|
||||
return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn);
|
||||
return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn, oplsn);
|
||||
#else
|
||||
xid = xid; key = key; data = data;
|
||||
return do_nothing_with_filenum(txn, filenum);
|
||||
|
@ -158,9 +169,10 @@ toku_rollback_cmddeleteboth (FILENUM filenum,
|
|||
BYTESTRING data,
|
||||
TOKUTXN txn,
|
||||
YIELDF UU(yield),
|
||||
void * UU(yieldv))
|
||||
void * UU(yieldv),
|
||||
LSN oplsn)
|
||||
{
|
||||
return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn);
|
||||
return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn, oplsn);
|
||||
}
|
||||
|
||||
int
|
||||
|
@ -168,10 +180,11 @@ toku_commit_cmddelete (FILENUM filenum,
|
|||
BYTESTRING key,
|
||||
TOKUTXN txn,
|
||||
YIELDF UU(yield),
|
||||
void * UU(yieldv))
|
||||
void * UU(yieldv),
|
||||
LSN oplsn)
|
||||
{
|
||||
#if TOKU_DO_COMMIT_CMD_DELETE
|
||||
return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn);
|
||||
return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn, oplsn);
|
||||
#else
|
||||
xid = xid; key = key;
|
||||
return do_nothing_with_filenum(txn, filenum);
|
||||
|
@ -183,16 +196,18 @@ toku_rollback_cmddelete (FILENUM filenum,
|
|||
BYTESTRING key,
|
||||
TOKUTXN txn,
|
||||
YIELDF UU(yield),
|
||||
void * UU(yieldv))
|
||||
void * UU(yieldv),
|
||||
LSN oplsn)
|
||||
{
|
||||
return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn);
|
||||
return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn, oplsn);
|
||||
}
|
||||
|
||||
int
|
||||
toku_commit_fileentries (int fd,
|
||||
TOKUTXN txn,
|
||||
YIELDF yield,
|
||||
void * yieldv)
|
||||
void * yieldv,
|
||||
LSN oplsn)
|
||||
{
|
||||
BREAD f = create_bread_from_fd_initialize_at(fd);
|
||||
int r=0;
|
||||
|
@ -202,7 +217,7 @@ toku_commit_fileentries (int fd,
|
|||
struct roll_entry *item;
|
||||
r = toku_read_rollback_backwards(f, &item, ma);
|
||||
if (r!=0) goto finish;
|
||||
r = toku_commit_rollback_item(txn, item, yield, yieldv);
|
||||
r = toku_commit_rollback_item(txn, item, yield, yieldv, oplsn);
|
||||
if (r!=0) goto finish;
|
||||
memarena_clear(ma);
|
||||
count++;
|
||||
|
@ -218,7 +233,8 @@ int
|
|||
toku_rollback_fileentries (int fd,
|
||||
TOKUTXN txn,
|
||||
YIELDF yield,
|
||||
void * yieldv)
|
||||
void * yieldv,
|
||||
LSN oplsn)
|
||||
{
|
||||
BREAD f = create_bread_from_fd_initialize_at(fd);
|
||||
assert(f);
|
||||
|
@ -229,7 +245,7 @@ toku_rollback_fileentries (int fd,
|
|||
struct roll_entry *item;
|
||||
r = toku_read_rollback_backwards(f, &item, ma);
|
||||
if (r!=0) goto finish;
|
||||
r = toku_abort_rollback_item(txn, item, yield, yieldv);
|
||||
r = toku_abort_rollback_item(txn, item, yield, yieldv, oplsn);
|
||||
if (r!=0) goto finish;
|
||||
memarena_clear(ma);
|
||||
count++;
|
||||
|
@ -245,12 +261,13 @@ int
|
|||
toku_commit_rollinclude (BYTESTRING bs,
|
||||
TOKUTXN txn,
|
||||
YIELDF yield,
|
||||
void * yieldv) {
|
||||
void * yieldv,
|
||||
LSN oplsn) {
|
||||
int r;
|
||||
char *fname = fixup_fname(&bs);
|
||||
int fd = open(fname, O_RDONLY+O_BINARY);
|
||||
assert(fd>=0);
|
||||
r = toku_commit_fileentries(fd, txn, yield, yieldv);
|
||||
r = toku_commit_fileentries(fd, txn, yield, yieldv, oplsn);
|
||||
assert(r==0);
|
||||
r = close(fd);
|
||||
assert(r==0);
|
||||
|
@ -263,13 +280,14 @@ int
|
|||
toku_rollback_rollinclude (BYTESTRING bs,
|
||||
TOKUTXN txn,
|
||||
YIELDF yield,
|
||||
void * yieldv)
|
||||
void * yieldv,
|
||||
LSN oplsn)
|
||||
{
|
||||
int r;
|
||||
char *fname = fixup_fname(&bs);
|
||||
int fd = open(fname, O_RDONLY+O_BINARY);
|
||||
assert(fd>=0);
|
||||
r = toku_rollback_fileentries(fd, txn, yield, yieldv);
|
||||
r = toku_rollback_fileentries(fd, txn, yield, yieldv, oplsn);
|
||||
assert(r==0);
|
||||
r = close(fd);
|
||||
assert(r==0);
|
||||
|
@ -282,7 +300,8 @@ int
|
|||
toku_rollback_tablelock_on_empty_table (FILENUM filenum,
|
||||
TOKUTXN txn,
|
||||
YIELDF yield,
|
||||
void* yield_v)
|
||||
void* yield_v,
|
||||
LSN UU(oplsn))
|
||||
{
|
||||
yield(toku_checkpoint_safe_client_lock, yield_v);
|
||||
// on rollback we have to make the file be empty, since we locked an empty table, and then may have done things to it.
|
||||
|
@ -307,7 +326,7 @@ toku_rollback_tablelock_on_empty_table (FILENUM filenum,
|
|||
}
|
||||
|
||||
int
|
||||
toku_commit_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU(yield), void* UU(yield_v))
|
||||
toku_commit_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU(yield), void* UU(yield_v), LSN UU(oplsn))
|
||||
{
|
||||
return do_nothing_with_filenum(txn, filenum);
|
||||
}
|
||||
|
|
|
@ -7,15 +7,15 @@
|
|||
|
||||
static void note_txn_closing (TOKUTXN txn);
|
||||
|
||||
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv) {
|
||||
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) {
|
||||
int r=0;
|
||||
rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv);
|
||||
rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv, lsn);
|
||||
return r;
|
||||
}
|
||||
|
||||
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv) {
|
||||
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) {
|
||||
int r=0;
|
||||
rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv);
|
||||
rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv, lsn);
|
||||
if (r!=0) return r;
|
||||
return 0;
|
||||
}
|
||||
|
@ -104,7 +104,7 @@ static int note_brt_used_in_txns_parent(OMTVALUE brtv, u_int32_t UU(index), void
|
|||
return r;
|
||||
}
|
||||
|
||||
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv) {
|
||||
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
|
||||
int r=0;
|
||||
if (txn->parent!=0) {
|
||||
// First we must put a rollinclude entry into the parent if we have a rollentry file.
|
||||
|
@ -169,7 +169,7 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv) {
|
|||
int count=0;
|
||||
while ((item=txn->newest_logentry)) {
|
||||
txn->newest_logentry = item->prev;
|
||||
r = toku_commit_rollback_item(txn, item, yield, yieldv);
|
||||
r = toku_commit_rollback_item(txn, item, yield, yieldv, lsn);
|
||||
if (r!=0) return r;
|
||||
count++;
|
||||
if (count%2 == 0) yield(NULL, yieldv);
|
||||
|
@ -178,19 +178,19 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv) {
|
|||
|
||||
// Read stuff out of the file and execute it.
|
||||
if (txn->rollentry_filename) {
|
||||
r = toku_commit_fileentries(txn->rollentry_fd, txn, yield, yieldv);
|
||||
r = toku_commit_fileentries(txn->rollentry_fd, txn, yield, yieldv, lsn);
|
||||
}
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv) {
|
||||
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
|
||||
struct roll_entry *item;
|
||||
int count=0;
|
||||
int r=0;
|
||||
while ((item=txn->newest_logentry)) {
|
||||
txn->newest_logentry = item->prev;
|
||||
r = toku_abort_rollback_item(txn, item, yield, yieldv);
|
||||
r = toku_abort_rollback_item(txn, item, yield, yieldv, lsn);
|
||||
if (r!=0)
|
||||
return r;
|
||||
count++;
|
||||
|
@ -198,7 +198,7 @@ int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv) {
|
|||
}
|
||||
// Read stuff out of the file and roll it back.
|
||||
if (txn->rollentry_filename) {
|
||||
r = toku_rollback_fileentries(txn->rollentry_fd, txn, yield, yieldv);
|
||||
r = toku_rollback_fileentries(txn->rollentry_fd, txn, yield, yieldv, lsn);
|
||||
assert(r==0);
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -9,12 +9,12 @@
|
|||
|
||||
// these routines in rollback.c
|
||||
|
||||
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv);
|
||||
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv);
|
||||
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
|
||||
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
|
||||
void toku_rollback_txn_close (TOKUTXN txn);
|
||||
|
||||
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv);
|
||||
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv);
|
||||
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
|
||||
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
|
||||
|
||||
void *toku_malloc_in_rollback(TOKUTXN txn, size_t size);
|
||||
void *toku_memdup_in_rollback(TOKUTXN txn, const void *v, size_t len);
|
||||
|
@ -29,8 +29,8 @@ int toku_logger_txn_rolltmp_raw_count(TOKUTXN txn, u_int64_t *raw_count);
|
|||
int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr);
|
||||
|
||||
// these routines in roll.c
|
||||
int toku_rollback_fileentries (int fd, TOKUTXN txn, YIELDF yield, void *yieldv);
|
||||
int toku_commit_fileentries (int fd, TOKUTXN txn, YIELDF yield,void *yieldv);
|
||||
int toku_rollback_fileentries (int fd, TOKUTXN txn, YIELDF yield, void *yieldv, LSN lsn);
|
||||
int toku_commit_fileentries (int fd, TOKUTXN txn, YIELDF yield,void *yieldv, LSN lsn);
|
||||
|
||||
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
|
||||
int find_xid (OMTVALUE v, void *txnv);
|
||||
|
|
78
newbrt/txn.c
78
newbrt/txn.c
|
@ -7,58 +7,7 @@
|
|||
#include "txn.h"
|
||||
|
||||
int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger) {
|
||||
if (logger->is_panicked) return EINVAL;
|
||||
TAGMALLOC(TOKUTXN, result);
|
||||
if (result==0)
|
||||
return errno;
|
||||
int r;
|
||||
r = toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
|
||||
if (r!=0) goto died;
|
||||
r = toku_omt_create(&result->open_brts);
|
||||
if (r!=0) goto died;
|
||||
result->txnid64 = result->first_lsn.lsn;
|
||||
XIDS parent_xids;
|
||||
if (parent_tokutxn==NULL)
|
||||
parent_xids = xids_get_root_xids();
|
||||
else
|
||||
parent_xids = parent_tokutxn->xids;
|
||||
if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64)))
|
||||
goto died;
|
||||
result->logger = logger;
|
||||
result->parent = parent_tokutxn;
|
||||
result->oldest_logentry = result->newest_logentry = 0;
|
||||
|
||||
result->rollentry_arena = memarena_create();
|
||||
|
||||
if (toku_omt_size(logger->live_txns) == 0) {
|
||||
assert(logger->oldest_living_xid == TXNID_NONE_LIVING);
|
||||
logger->oldest_living_xid = result->txnid64;
|
||||
}
|
||||
assert(logger->oldest_living_xid <= result->txnid64);
|
||||
|
||||
{
|
||||
//Add txn to list (omt) of live transactions
|
||||
u_int32_t idx;
|
||||
r = toku_omt_insert(logger->live_txns, result, find_xid, result, &idx);
|
||||
if (r!=0) goto died;
|
||||
|
||||
if (logger->oldest_living_xid == result->txnid64)
|
||||
assert(idx == 0);
|
||||
else
|
||||
assert(idx > 0);
|
||||
}
|
||||
|
||||
result->rollentry_resident_bytecount=0;
|
||||
result->rollentry_raw_count = 0;
|
||||
result->rollentry_filename = 0;
|
||||
result->rollentry_fd = -1;
|
||||
result->rollentry_filesize = 0;
|
||||
*tokutxn = result;
|
||||
return 0;
|
||||
died:
|
||||
// TODO memory leak
|
||||
toku_logger_panic(logger, r);
|
||||
return r;
|
||||
return toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, 0);
|
||||
}
|
||||
|
||||
int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid) {
|
||||
|
@ -67,6 +16,10 @@ int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGE
|
|||
if (result==0)
|
||||
return errno;
|
||||
int r;
|
||||
if (xid == 0) {
|
||||
r = toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
|
||||
if (r!=0) goto died;
|
||||
} else
|
||||
result->first_lsn.lsn = xid;
|
||||
r = toku_omt_create(&result->open_brts);
|
||||
if (r!=0) goto died;
|
||||
|
@ -109,6 +62,7 @@ int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGE
|
|||
result->rollentry_filesize = 0;
|
||||
*tokutxn = result;
|
||||
return 0;
|
||||
|
||||
died:
|
||||
// TODO memory leak
|
||||
toku_logger_panic(logger, r);
|
||||
|
@ -117,18 +71,26 @@ died:
|
|||
|
||||
|
||||
// Doesn't close the txn, just performs the commit operations.
|
||||
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv) {
|
||||
int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv) {
|
||||
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN);
|
||||
}
|
||||
|
||||
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn) {
|
||||
int r;
|
||||
// panic handled in log_commit
|
||||
r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
|
||||
if (r!=0)
|
||||
return r;
|
||||
r = toku_rollback_commit(txn, yield, yieldv);
|
||||
r = toku_rollback_commit(txn, yield, yieldv, oplsn);
|
||||
return r;
|
||||
}
|
||||
|
||||
// Doesn't close the txn, just performs the abort operations.
|
||||
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void*yieldv) {
|
||||
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv) {
|
||||
return toku_txn_abort_with_lsn(txn, yield, yieldv, ZERO_LSN);
|
||||
}
|
||||
|
||||
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn) {
|
||||
//printf("%s:%d aborting\n", __FILE__, __LINE__);
|
||||
// Must undo everything. Must undo it all in reverse order.
|
||||
// Build the reverse list
|
||||
|
@ -137,7 +99,7 @@ int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void*yieldv) {
|
|||
r = toku_log_xabort(txn->logger, (LSN*)0, 0, txn->txnid64);
|
||||
if (r!=0)
|
||||
return r;
|
||||
r = toku_rollback_abort(txn, yield, yieldv);
|
||||
r = toku_rollback_abort(txn, yield, yieldv, oplsn);
|
||||
return r;
|
||||
}
|
||||
|
||||
|
@ -155,6 +117,10 @@ BOOL toku_txnid_older(TXNID a, TXNID b) {
|
|||
return (BOOL)(a < b); // TODO need modulo 64 arithmetic
|
||||
}
|
||||
|
||||
BOOL toku_txnid_newer(TXNID a, TXNID b) {
|
||||
return (BOOL)(a > b); // TODO need modulo 64 arithmetic
|
||||
}
|
||||
|
||||
BOOL toku_txnid_eq(TXNID a, TXNID b) {
|
||||
return (BOOL)(a == b);
|
||||
}
|
||||
|
|
|
@ -7,8 +7,13 @@
|
|||
|
||||
int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger);
|
||||
int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid);
|
||||
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv);
|
||||
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void*yieldv);
|
||||
|
||||
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv);
|
||||
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn);
|
||||
|
||||
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv);
|
||||
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn);
|
||||
|
||||
void toku_txn_close_txn(TOKUTXN txn);
|
||||
XIDS toku_txn_get_xids (TOKUTXN);
|
||||
|
||||
|
|
|
@ -8,18 +8,12 @@ const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG
|
|||
|
||||
char *namea="a.db";
|
||||
|
||||
static void checkpoint_callback_2_do_abort(void *UU(extra)) {
|
||||
abort();
|
||||
}
|
||||
|
||||
static void run_test (void) {
|
||||
int r;
|
||||
|
||||
system("rm -rf " ENVDIR);
|
||||
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
|
||||
|
||||
db_env_set_checkpoint_callback2(checkpoint_callback_2_do_abort, NULL);
|
||||
|
||||
DB_ENV *env;
|
||||
r = db_env_create(&env, 0); CKERR(r);
|
||||
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
|
||||
|
@ -84,6 +78,8 @@ static void run_recover (void) {
|
|||
|
||||
r = cursor->c_close(cursor); CKERR(r);
|
||||
|
||||
r = txn->commit(txn, 0); CKERR(r);
|
||||
r = db->close(db, 0); CKERR(r);
|
||||
r = env->close(env, 0); CKERR(r);
|
||||
exit(0);
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue