mirror of
https://github.com/MariaDB/server.git
synced 2025-01-22 06:44:16 +01:00
[t:4877], [t:4966], [t:4952], [t:4881], [t:4918], merge to main
git-svn-id: file:///svn/toku/tokudb@44130 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
parent
3609e305c1
commit
900d290cda
25 changed files with 812 additions and 655 deletions
|
@ -1,2 +0,0 @@
|
|||
default:\
|
||||
:skip=/windows/,/dbg/,/opt/,/build/,/gccdbg/,/gccopt/,/iccdbg/,/iccopt/,/clangdbg/,/NightlyRelease/,/NightlyCoverage/,/NightlyDebug/,/Debug/,/Release/,/Coverage/,/cov/:
|
|
@ -8,6 +8,8 @@ file(GLOB_RECURSE all_srcs
|
|||
src/*.c
|
||||
utils/*.c
|
||||
db-benchmark-test/*.c
|
||||
)
|
||||
list(APPEND all_srcs
|
||||
${CMAKE_CURRENT_BINARY_DIR}/ft/log_code.c
|
||||
${CMAKE_CURRENT_BINARY_DIR}/ft/log_print.c
|
||||
)
|
||||
|
@ -20,6 +22,8 @@ file(GLOB_RECURSE all_hdrs
|
|||
src/*.h
|
||||
utils/*.h
|
||||
db-benchmark-test/*.h
|
||||
)
|
||||
list(APPEND all_hdrs
|
||||
${CMAKE_CURRENT_BINARY_DIR}/toku_include/config.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/buildheader/db.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/ft/log_header.h
|
||||
|
@ -76,14 +80,21 @@ endif ()
|
|||
option(USE_GTAGS "Build the gtags database." ON)
|
||||
if (USE_GTAGS)
|
||||
find_program(GTAGS "gtags")
|
||||
find_program(MKID "mkid")
|
||||
if (NOT GTAGS MATCHES NOTFOUND)
|
||||
## todo: use global -u instead of gtags each time
|
||||
file(WRITE "${CMAKE_CURRENT_BINARY_DIR}/gtags.files" "")
|
||||
foreach(file ${all_srcs} ${all_hdrs})
|
||||
file(APPEND "${CMAKE_CURRENT_BINARY_DIR}/gtags.files" "${file}\n")
|
||||
endforeach(file)
|
||||
if (NOT MKID MATCHES NOTFOUND)
|
||||
set(idutils_option "-I")
|
||||
endif ()
|
||||
add_custom_command(
|
||||
OUTPUT "${CMAKE_CURRENT_SOURCE_DIR}/GTAGS"
|
||||
OUTPUT "${CMAKE_CURRENT_SOURCE_DIR}/GRTAGS"
|
||||
OUTPUT "${CMAKE_CURRENT_SOURCE_DIR}/GPATH"
|
||||
OUTPUT "${CMAKE_CURRENT_SOURCE_DIR}/GSYMS"
|
||||
COMMAND ${GTAGS} --gtagsconf "${CMAKE_CURRENT_SOURCE_DIR}/.globalrc"
|
||||
COMMAND ${GTAGS} -i ${idutils_option} -f "${CMAKE_CURRENT_BINARY_DIR}/gtags.files"
|
||||
DEPENDS ${all_srcs} ${all_hdrs} install_tdb_h generate_config_h generate_log_code
|
||||
WORKING_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}")
|
||||
add_custom_target(build_GTAGS ALL DEPENDS
|
||||
|
|
|
@ -59,6 +59,8 @@ set(FT_SOURCES
|
|||
quicklz.c
|
||||
recover.c
|
||||
rollback.c
|
||||
rollback-apply.c
|
||||
rollback-ct-callbacks.c
|
||||
roll.c
|
||||
sort.c
|
||||
sub_block.c
|
||||
|
|
|
@ -3246,14 +3246,14 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
|
|||
int r = toku_log_xstillopen(logger, NULL, 0,
|
||||
toku_txn_get_txnid(txn),
|
||||
toku_txn_get_txnid(toku_logger_txn_parent(txn)),
|
||||
txn->rollentry_raw_count,
|
||||
txn->roll_info.rollentry_raw_count,
|
||||
open_filenums,
|
||||
txn->force_fsync_on_commit,
|
||||
txn->num_rollback_nodes,
|
||||
txn->num_rollentries,
|
||||
txn->spilled_rollback_head,
|
||||
txn->spilled_rollback_tail,
|
||||
txn->current_rollback);
|
||||
txn->roll_info.num_rollback_nodes,
|
||||
txn->roll_info.num_rollentries,
|
||||
txn->roll_info.spilled_rollback_head,
|
||||
txn->roll_info.spilled_rollback_tail,
|
||||
txn->roll_info.current_rollback);
|
||||
assert(r==0);
|
||||
return 0;
|
||||
}
|
||||
|
@ -3263,14 +3263,14 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
|
|||
int r = toku_log_xstillopenprepared(logger, NULL, 0,
|
||||
toku_txn_get_txnid(txn),
|
||||
&xa_xid,
|
||||
txn->rollentry_raw_count,
|
||||
txn->roll_info.rollentry_raw_count,
|
||||
open_filenums,
|
||||
txn->force_fsync_on_commit,
|
||||
txn->num_rollback_nodes,
|
||||
txn->num_rollentries,
|
||||
txn->spilled_rollback_head,
|
||||
txn->spilled_rollback_tail,
|
||||
txn->current_rollback);
|
||||
txn->roll_info.num_rollback_nodes,
|
||||
txn->roll_info.num_rollentries,
|
||||
txn->roll_info.spilled_rollback_head,
|
||||
txn->roll_info.spilled_rollback_tail,
|
||||
txn->roll_info.current_rollback);
|
||||
assert(r==0);
|
||||
return 0;
|
||||
}
|
||||
|
|
123
ft/ft-ops.c
123
ft/ft-ops.c
|
@ -2687,45 +2687,44 @@ toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int nu
|
|||
}
|
||||
|
||||
int
|
||||
toku_ft_maybe_insert (FT_HANDLE brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, BOOL do_logging, enum ft_msg_type type) {
|
||||
toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, BOOL do_logging, enum ft_msg_type type) {
|
||||
assert(type==FT_INSERT || type==FT_INSERT_NO_OVERWRITE);
|
||||
int r = 0;
|
||||
XIDS message_xids = xids_get_root_xids(); //By default use committed messages
|
||||
TXNID xid = toku_txn_get_txnid(txn);
|
||||
if (txn) {
|
||||
if (brt->ft->txnid_that_created_or_locked_when_empty != xid) {
|
||||
if (ft_h->ft->txnid_that_created_or_locked_when_empty != xid) {
|
||||
BYTESTRING keybs = {key->size, key->data};
|
||||
r = toku_logger_save_rollback_cmdinsert(txn, toku_cachefile_filenum(brt->ft->cf), &keybs);
|
||||
if (r!=0) return r;
|
||||
r = toku_txn_note_ft(txn, brt->ft);
|
||||
r = toku_logger_save_rollback_cmdinsert(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
|
||||
if (r!=0) return r;
|
||||
toku_txn_maybe_note_ft(txn, ft_h->ft);
|
||||
//We have transactions, and this is not 2440. We must send the full root-to-leaf-path
|
||||
message_xids = toku_txn_get_xids(txn);
|
||||
}
|
||||
else if (txn->ancestor_txnid64 != brt->ft->h->root_xid_that_created) {
|
||||
else if (txn->ancestor_txnid64 != ft_h->ft->h->root_xid_that_created) {
|
||||
//We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary. We must send the full root-to-leaf-path
|
||||
message_xids = toku_txn_get_xids(txn);
|
||||
}
|
||||
}
|
||||
TOKULOGGER logger = toku_txn_logger(txn);
|
||||
if (do_logging && logger &&
|
||||
brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
|
||||
ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
|
||||
BYTESTRING keybs = {.len=key->size, .data=key->data};
|
||||
BYTESTRING valbs = {.len=val->size, .data=val->data};
|
||||
if (type == FT_INSERT) {
|
||||
r = toku_log_enq_insert(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->ft->cf), xid, keybs, valbs);
|
||||
r = toku_log_enq_insert(logger, (LSN*)0, 0, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs, valbs);
|
||||
}
|
||||
else {
|
||||
r = toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->ft->cf), xid, keybs, valbs);
|
||||
r = toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs, valbs);
|
||||
}
|
||||
if (r!=0) return r;
|
||||
}
|
||||
|
||||
LSN treelsn;
|
||||
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(brt->ft)).lsn) {
|
||||
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
|
||||
r = 0;
|
||||
} else {
|
||||
r = toku_ft_send_insert(brt, key, val, message_xids, type);
|
||||
r = toku_ft_send_insert(ft_h, key, val, message_xids, type);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
@ -2740,7 +2739,7 @@ ft_send_update_msg(FT_HANDLE brt, FT_MSG_S *msg, TOKUTXN txn) {
|
|||
}
|
||||
|
||||
int
|
||||
toku_ft_maybe_update(FT_HANDLE brt, const DBT *key, const DBT *update_function_extra,
|
||||
toku_ft_maybe_update(FT_HANDLE ft_h, const DBT *key, const DBT *update_function_extra,
|
||||
TOKUTXN txn, BOOL oplsn_valid, LSN oplsn,
|
||||
BOOL do_logging) {
|
||||
int r = 0;
|
||||
|
@ -2749,32 +2748,31 @@ toku_ft_maybe_update(FT_HANDLE brt, const DBT *key, const DBT *update_function_e
|
|||
if (txn) {
|
||||
BYTESTRING keybs = { key->size, key->data };
|
||||
r = toku_logger_save_rollback_cmdupdate(
|
||||
txn, toku_cachefile_filenum(brt->ft->cf), &keybs);
|
||||
if (r != 0) { goto cleanup; }
|
||||
r = toku_txn_note_ft(txn, brt->ft);
|
||||
txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
|
||||
if (r != 0) { goto cleanup; }
|
||||
toku_txn_maybe_note_ft(txn, ft_h->ft);
|
||||
}
|
||||
|
||||
TOKULOGGER logger = toku_txn_logger(txn);
|
||||
if (do_logging && logger &&
|
||||
brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
|
||||
ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
|
||||
BYTESTRING keybs = {.len=key->size, .data=key->data};
|
||||
BYTESTRING extrabs = {.len=update_function_extra->size,
|
||||
.data=update_function_extra->data};
|
||||
r = toku_log_enq_update(logger, NULL, 0,
|
||||
toku_cachefile_filenum(brt->ft->cf),
|
||||
toku_cachefile_filenum(ft_h->ft->cf),
|
||||
xid, keybs, extrabs);
|
||||
if (r != 0) { goto cleanup; }
|
||||
}
|
||||
|
||||
LSN treelsn;
|
||||
if (oplsn_valid &&
|
||||
oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(brt->ft)).lsn) {
|
||||
oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
|
||||
r = 0;
|
||||
} else {
|
||||
FT_MSG_S msg = { FT_UPDATE, ZERO_MSN, NULL,
|
||||
.u.id = { key, update_function_extra }};
|
||||
r = ft_send_update_msg(brt, &msg, txn);
|
||||
r = ft_send_update_msg(ft_h, &msg, txn);
|
||||
}
|
||||
|
||||
cleanup:
|
||||
|
@ -2782,7 +2780,7 @@ cleanup:
|
|||
}
|
||||
|
||||
int
|
||||
toku_ft_maybe_update_broadcast(FT_HANDLE brt, const DBT *update_function_extra,
|
||||
toku_ft_maybe_update_broadcast(FT_HANDLE ft_h, const DBT *update_function_extra,
|
||||
TOKUTXN txn, BOOL oplsn_valid, LSN oplsn,
|
||||
BOOL do_logging, BOOL is_resetting_op) {
|
||||
int r = 0;
|
||||
|
@ -2790,19 +2788,18 @@ toku_ft_maybe_update_broadcast(FT_HANDLE brt, const DBT *update_function_extra,
|
|||
TXNID xid = toku_txn_get_txnid(txn);
|
||||
u_int8_t resetting = is_resetting_op ? 1 : 0;
|
||||
if (txn) {
|
||||
r = toku_logger_save_rollback_cmdupdatebroadcast(txn, toku_cachefile_filenum(brt->ft->cf), resetting);
|
||||
if (r != 0) { goto cleanup; }
|
||||
r = toku_txn_note_ft(txn, brt->ft);
|
||||
r = toku_logger_save_rollback_cmdupdatebroadcast(txn, toku_cachefile_filenum(ft_h->ft->cf), resetting);
|
||||
if (r != 0) { goto cleanup; }
|
||||
toku_txn_maybe_note_ft(txn, ft_h->ft);
|
||||
}
|
||||
|
||||
TOKULOGGER logger = toku_txn_logger(txn);
|
||||
if (do_logging && logger &&
|
||||
brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
|
||||
ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
|
||||
BYTESTRING extrabs = {.len=update_function_extra->size,
|
||||
.data=update_function_extra->data};
|
||||
r = toku_log_enq_updatebroadcast(logger, NULL, 0,
|
||||
toku_cachefile_filenum(brt->ft->cf),
|
||||
toku_cachefile_filenum(ft_h->ft->cf),
|
||||
xid, extrabs, resetting);
|
||||
if (r != 0) { goto cleanup; }
|
||||
}
|
||||
|
@ -2810,14 +2807,14 @@ toku_ft_maybe_update_broadcast(FT_HANDLE brt, const DBT *update_function_extra,
|
|||
//TODO(yoni): remove treelsn here and similar calls (no longer being used)
|
||||
LSN treelsn;
|
||||
if (oplsn_valid &&
|
||||
oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(brt->ft)).lsn) {
|
||||
oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
|
||||
r = 0;
|
||||
} else {
|
||||
DBT nullkey;
|
||||
const DBT *nullkeyp = toku_init_dbt(&nullkey);
|
||||
FT_MSG_S msg = { FT_UPDATE_BROADCAST_ALL, ZERO_MSN, NULL,
|
||||
.u.id = { nullkeyp, update_function_extra }};
|
||||
r = ft_send_update_msg(brt, &msg, txn);
|
||||
r = ft_send_update_msg(ft_h, &msg, txn);
|
||||
}
|
||||
|
||||
cleanup:
|
||||
|
@ -2884,38 +2881,37 @@ toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int nu
|
|||
}
|
||||
|
||||
int
|
||||
toku_ft_maybe_delete(FT_HANDLE brt, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, BOOL do_logging) {
|
||||
toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, BOOL do_logging) {
|
||||
int r;
|
||||
XIDS message_xids = xids_get_root_xids(); //By default use committed messages
|
||||
TXNID xid = toku_txn_get_txnid(txn);
|
||||
if (txn) {
|
||||
if (brt->ft->txnid_that_created_or_locked_when_empty != xid) {
|
||||
if (ft_h->ft->txnid_that_created_or_locked_when_empty != xid) {
|
||||
BYTESTRING keybs = {key->size, key->data};
|
||||
r = toku_logger_save_rollback_cmddelete(txn, toku_cachefile_filenum(brt->ft->cf), &keybs);
|
||||
if (r!=0) return r;
|
||||
r = toku_txn_note_ft(txn, brt->ft);
|
||||
r = toku_logger_save_rollback_cmddelete(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
|
||||
if (r!=0) return r;
|
||||
toku_txn_maybe_note_ft(txn, ft_h->ft);
|
||||
//We have transactions, and this is not 2440. We must send the full root-to-leaf-path
|
||||
message_xids = toku_txn_get_xids(txn);
|
||||
}
|
||||
else if (txn->ancestor_txnid64 != brt->ft->h->root_xid_that_created) {
|
||||
else if (txn->ancestor_txnid64 != ft_h->ft->h->root_xid_that_created) {
|
||||
//We have transactions, and this is 2440, however the txn doing 2440 did not create the dictionary. We must send the full root-to-leaf-path
|
||||
message_xids = toku_txn_get_xids(txn);
|
||||
}
|
||||
}
|
||||
TOKULOGGER logger = toku_txn_logger(txn);
|
||||
if (do_logging && logger &&
|
||||
brt->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
|
||||
ft_h->ft->txnid_that_suppressed_recovery_logs == TXNID_NONE) {
|
||||
BYTESTRING keybs = {.len=key->size, .data=key->data};
|
||||
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->ft->cf), xid, keybs);
|
||||
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs);
|
||||
if (r!=0) return r;
|
||||
}
|
||||
|
||||
LSN treelsn;
|
||||
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(brt->ft)).lsn) {
|
||||
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
|
||||
r = 0;
|
||||
} else {
|
||||
r = toku_ft_send_delete(brt, key, message_xids);
|
||||
r = toku_ft_send_delete(ft_h, key, message_xids);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
@ -3072,7 +3068,7 @@ verify_builtin_comparisons_consistent(FT_HANDLE t, u_int32_t flags) {
|
|||
|
||||
int
|
||||
toku_ft_change_descriptor(
|
||||
FT_HANDLE t,
|
||||
FT_HANDLE ft_h,
|
||||
const DBT* old_descriptor,
|
||||
const DBT* new_descriptor,
|
||||
BOOL do_log,
|
||||
|
@ -3092,19 +3088,18 @@ toku_ft_change_descriptor(
|
|||
// put information into rollback file
|
||||
r = toku_logger_save_rollback_change_fdescriptor(
|
||||
txn,
|
||||
toku_cachefile_filenum(t->ft->cf),
|
||||
toku_cachefile_filenum(ft_h->ft->cf),
|
||||
&old_desc_bs
|
||||
);
|
||||
if (r != 0) { goto cleanup; }
|
||||
r = toku_txn_note_ft(txn, t->ft);
|
||||
if (r != 0) { goto cleanup; }
|
||||
toku_txn_maybe_note_ft(txn, ft_h->ft);
|
||||
|
||||
if (do_log) {
|
||||
TOKULOGGER logger = toku_txn_logger(txn);
|
||||
TXNID xid = toku_txn_get_txnid(txn);
|
||||
r = toku_log_change_fdescriptor(
|
||||
logger, NULL, 0,
|
||||
toku_cachefile_filenum(t->ft->cf),
|
||||
toku_cachefile_filenum(ft_h->ft->cf),
|
||||
xid,
|
||||
old_desc_bs,
|
||||
new_desc_bs,
|
||||
|
@ -3115,8 +3110,8 @@ toku_ft_change_descriptor(
|
|||
|
||||
// write new_descriptor to header
|
||||
new_d.dbt = *new_descriptor;
|
||||
fd = toku_cachefile_get_fd (t->ft->cf);
|
||||
r = toku_update_descriptor(t->ft, &new_d, fd);
|
||||
fd = toku_cachefile_get_fd (ft_h->ft->cf);
|
||||
r = toku_update_descriptor(ft_h->ft, &new_d, fd);
|
||||
// very infrequent operation, worth precise threadsafe count
|
||||
if (r == 0) {
|
||||
STATUS_VALUE(FT_DESCRIPTOR_SET)++;
|
||||
|
@ -3124,7 +3119,7 @@ toku_ft_change_descriptor(
|
|||
if (r!=0) goto cleanup;
|
||||
|
||||
if (update_cmp_descriptor) {
|
||||
toku_ft_update_cmp_descriptor(t->ft);
|
||||
toku_ft_update_cmp_descriptor(ft_h->ft);
|
||||
}
|
||||
cleanup:
|
||||
return r;
|
||||
|
@ -3148,7 +3143,7 @@ toku_ft_handle_inherit_options(FT_HANDLE t, FT ft) {
|
|||
// fname_in_env is the iname, relative to the env_dir (data_dir is already in iname as prefix).
|
||||
// The checkpointed version (checkpoint_lsn) of the dictionary must be no later than max_acceptable_lsn .
|
||||
static int
|
||||
ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, DICTIONARY_ID use_dictionary_id, LSN max_acceptable_lsn) {
|
||||
ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, DICTIONARY_ID use_dictionary_id, LSN max_acceptable_lsn) {
|
||||
int r;
|
||||
BOOL txn_created = FALSE;
|
||||
char *fname_in_cwd = NULL;
|
||||
|
@ -3157,8 +3152,8 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
|
|||
BOOL did_create = FALSE;
|
||||
toku_ft_open_close_lock();
|
||||
|
||||
if (t->did_set_flags) {
|
||||
r = verify_builtin_comparisons_consistent(t, t->options.flags);
|
||||
if (ft_h->did_set_flags) {
|
||||
r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags);
|
||||
if (r!=0) { goto exit; }
|
||||
}
|
||||
if (txn && txn->logger->is_panicked) {
|
||||
|
@ -3184,21 +3179,21 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
|
|||
assert_zero(r);
|
||||
}
|
||||
txn_created = (BOOL)(txn!=NULL);
|
||||
r = toku_logger_log_fcreate(txn, fname_in_env, reserved_filenum, mode, t->options.flags, t->options.nodesize, t->options.basementnodesize, t->options.compression_method);
|
||||
r = toku_logger_log_fcreate(txn, fname_in_env, reserved_filenum, mode, ft_h->options.flags, ft_h->options.nodesize, ft_h->options.basementnodesize, ft_h->options.compression_method);
|
||||
assert_zero(r); // only possible failure is panic, which we check above
|
||||
r = ft_create_file(t, fname_in_cwd, &fd);
|
||||
r = ft_create_file(ft_h, fname_in_cwd, &fd);
|
||||
assert_zero(r);
|
||||
}
|
||||
if (r) { goto exit; }
|
||||
r=toku_cachetable_openfd_with_filenum(&cf, cachetable, fd, fname_in_env, reserved_filenum);
|
||||
if (r) { goto exit; }
|
||||
}
|
||||
assert(t->options.nodesize>0);
|
||||
assert(ft_h->options.nodesize>0);
|
||||
BOOL was_already_open;
|
||||
if (is_create) {
|
||||
r = toku_read_ft_and_store_in_cachefile(t, cf, max_acceptable_lsn, &ft, &was_already_open);
|
||||
r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft, &was_already_open);
|
||||
if (r==TOKUDB_DICTIONARY_NO_HEADER) {
|
||||
r = toku_create_new_ft(&ft, &t->options, cf, txn);
|
||||
r = toku_create_new_ft(&ft, &ft_h->options, cf, txn);
|
||||
if (r) { goto exit; }
|
||||
}
|
||||
else if (r!=0) {
|
||||
|
@ -3213,21 +3208,21 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
|
|||
// so it is ok for toku_read_ft_and_store_in_cachefile to have read
|
||||
// the header via toku_read_ft_and_store_in_cachefile
|
||||
} else {
|
||||
r = toku_read_ft_and_store_in_cachefile(t, cf, max_acceptable_lsn, &ft, &was_already_open);
|
||||
r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft, &was_already_open);
|
||||
if (r) { goto exit; }
|
||||
}
|
||||
if (!t->did_set_flags) {
|
||||
r = verify_builtin_comparisons_consistent(t, t->options.flags);
|
||||
if (!ft_h->did_set_flags) {
|
||||
r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags);
|
||||
if (r) { goto exit; }
|
||||
} else if (t->options.flags != ft->h->flags) { /* if flags have been set then flags must match */
|
||||
} else if (ft_h->options.flags != ft->h->flags) { /* if flags have been set then flags must match */
|
||||
r = EINVAL;
|
||||
goto exit;
|
||||
}
|
||||
toku_ft_handle_inherit_options(t, ft);
|
||||
toku_ft_handle_inherit_options(ft_h, ft);
|
||||
|
||||
if (!was_already_open) {
|
||||
if (!did_create) { //Only log the fopen that OPENs the file. If it was already open, don't log.
|
||||
r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(cf), t->options.flags);
|
||||
r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(cf), ft_h->options.flags);
|
||||
assert_zero(r);
|
||||
}
|
||||
}
|
||||
|
@ -3257,12 +3252,11 @@ ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_cr
|
|||
// with the brt, the function is not allowed to fail
|
||||
// Code that handles failure (located below "exit"),
|
||||
// depends on this
|
||||
toku_ft_note_ft_handle_open(ft, t);
|
||||
toku_ft_note_ft_handle_open(ft, ft_h);
|
||||
if (txn_created) {
|
||||
assert(txn);
|
||||
toku_ft_suppress_rollbacks(ft, txn);
|
||||
r = toku_txn_note_ft(txn, ft);
|
||||
assert_zero(r);
|
||||
toku_txn_maybe_note_ft(txn, ft);
|
||||
}
|
||||
|
||||
//Opening a brt may restore to previous checkpoint. Truncate if necessary.
|
||||
|
@ -5506,10 +5500,7 @@ toku_ft_remove_on_commit(FT_HANDLE handle, TOKUTXN txn) {
|
|||
cf = handle->ft->cf;
|
||||
FT ft = toku_cachefile_get_userdata(cf);
|
||||
|
||||
// TODO: toku_txn_note_ft should return void
|
||||
// Assert success here because note_ft also asserts success internally.
|
||||
r = toku_txn_note_ft(txn, ft);
|
||||
assert(r == 0);
|
||||
toku_txn_maybe_note_ft(txn, ft);
|
||||
|
||||
// If the txn commits, the commit MUST be in the log before the file is actually unlinked
|
||||
toku_txn_force_fsync_on_commit(txn);
|
||||
|
|
54
ft/ft.c
54
ft/ft.c
|
@ -776,11 +776,11 @@ toku_dictionary_redirect_abort(FT old_h, FT new_h, TOKUTXN txn) {
|
|||
*****/
|
||||
|
||||
int
|
||||
toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTXN txn) {
|
||||
toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKUTXN txn) {
|
||||
// Input args:
|
||||
// new file name for dictionary (relative to env)
|
||||
// old_ft is a live brt of open handle ({DB, BRT} pair) that currently refers to old dictionary file.
|
||||
// (old_ft may be one of many handles to the dictionary.)
|
||||
// old_ft_h is a live brt of open handle ({DB, BRT} pair) that currently refers to old dictionary file.
|
||||
// (old_ft_h may be one of many handles to the dictionary.)
|
||||
// txn that created the loader
|
||||
// Requires:
|
||||
// ydb_lock is held.
|
||||
|
@ -797,11 +797,11 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTX
|
|||
// If the txn aborts, then this operation will be undone
|
||||
int r;
|
||||
|
||||
FT old_h = old_ft->ft;
|
||||
FT old_ft = old_ft_h->ft;
|
||||
|
||||
// dst file should not be open. (implies that dst and src are different because src must be open.)
|
||||
{
|
||||
CACHETABLE ct = toku_cachefile_get_cachetable(old_h->cf);
|
||||
CACHETABLE ct = toku_cachefile_get_cachetable(old_ft->cf);
|
||||
CACHEFILE cf;
|
||||
r = toku_cachefile_of_iname_in_env(ct, dst_fname_in_env, &cf);
|
||||
if (r==0) {
|
||||
|
@ -813,25 +813,24 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTX
|
|||
}
|
||||
|
||||
if (txn) {
|
||||
r = toku_txn_note_ft(txn, old_h); // mark old brt as touched by this txn
|
||||
assert_zero(r);
|
||||
toku_txn_maybe_note_ft(txn, old_ft); // mark old ft as touched by this txn
|
||||
}
|
||||
|
||||
FT new_h;
|
||||
r = dictionary_redirect_internal(dst_fname_in_env, old_h, txn, &new_h);
|
||||
FT new_ft;
|
||||
r = dictionary_redirect_internal(dst_fname_in_env, old_ft, txn, &new_ft);
|
||||
assert_zero(r);
|
||||
|
||||
// make rollback log entry
|
||||
if (txn) {
|
||||
r = toku_txn_note_ft(txn, new_h); // mark new brt as touched by this txn
|
||||
toku_txn_maybe_note_ft(txn, new_ft); // mark new ft as touched by this txn
|
||||
|
||||
FILENUM old_filenum = toku_cachefile_filenum(old_h->cf);
|
||||
FILENUM new_filenum = toku_cachefile_filenum(new_h->cf);
|
||||
FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
|
||||
FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
|
||||
r = toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum);
|
||||
assert_zero(r);
|
||||
|
||||
TXNID xid = toku_txn_get_txnid(txn);
|
||||
toku_ft_suppress_rollbacks(new_h, txn);
|
||||
toku_ft_suppress_rollbacks(new_ft, txn);
|
||||
r = toku_log_suppress_rollback(txn->logger, NULL, 0, new_filenum, xid);
|
||||
assert_zero(r);
|
||||
}
|
||||
|
@ -849,29 +848,14 @@ static int find_xid (OMTVALUE v, void *txnv) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// returns if ref was added
|
||||
BOOL
|
||||
toku_ft_maybe_add_txn_ref(FT h, TOKUTXN txn) {
|
||||
BOOL ref_added = FALSE;
|
||||
OMTVALUE txnv;
|
||||
u_int32_t index;
|
||||
toku_ft_grab_reflock(h);
|
||||
// Does brt already know about transaction txn?
|
||||
int r = toku_omt_find_zero(h->txns, find_xid, txn, &txnv, &index);
|
||||
if (r==0) {
|
||||
// It's already there.
|
||||
assert((TOKUTXN)txnv==txn);
|
||||
ref_added = FALSE;
|
||||
goto exit;
|
||||
}
|
||||
// Otherwise it's not there.
|
||||
// Insert reference to transaction into brt
|
||||
r = toku_omt_insert_at(h->txns, txn, index);
|
||||
// Insert reference to transaction into ft
|
||||
void
|
||||
toku_ft_add_txn_ref(FT ft, TOKUTXN txn) {
|
||||
toku_ft_grab_reflock(ft);
|
||||
uint32_t idx;
|
||||
int r = toku_omt_insert(ft->txns, txn, find_xid, txn, &idx);
|
||||
assert(r==0);
|
||||
ref_added = TRUE;
|
||||
exit:
|
||||
toku_ft_release_reflock(h);
|
||||
return ref_added;
|
||||
toku_ft_release_reflock(ft);
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
5
ft/ft.h
5
ft/ft.h
|
@ -12,6 +12,7 @@
|
|||
#include "cachetable.h"
|
||||
#include "log.h"
|
||||
#include "ft-search.h"
|
||||
#include "ft-ops.h"
|
||||
#include "compress.h"
|
||||
|
||||
// remove a ft, transactionless.
|
||||
|
@ -68,8 +69,8 @@ void toku_reset_root_xid_that_created(FT h, TXNID new_root_xid_that_created);
|
|||
// This redefines which xid created the dictionary.
|
||||
|
||||
|
||||
BOOL
|
||||
toku_ft_maybe_add_txn_ref(FT h, TOKUTXN txn);
|
||||
void
|
||||
toku_ft_add_txn_ref(FT h, TOKUTXN txn);
|
||||
void
|
||||
toku_ft_remove_txn_ref(FT h, TOKUTXN txn);
|
||||
|
||||
|
|
|
@ -104,24 +104,7 @@ struct tokulogger {
|
|||
int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
|
||||
int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_logfiles);
|
||||
|
||||
struct tokutxn {
|
||||
u_int64_t txnid64; /* this happens to be the first lsn */
|
||||
u_int64_t ancestor_txnid64; /* this is the lsn of root transaction */
|
||||
u_int64_t snapshot_txnid64; /* this is the lsn of the snapshot */
|
||||
TOKULOGGER logger;
|
||||
TOKUTXN parent;
|
||||
DB_TXN* container_db_txn; // reference to DB_TXN that contains this tokutxn
|
||||
time_t starttime; // timestamp in seconds of transaction start
|
||||
|
||||
u_int64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children.
|
||||
OMT open_fts; // a collection of the brts that we touched. Indexed by filenum.
|
||||
TXN_SNAPSHOT_TYPE snapshot_type;
|
||||
OMT live_root_txn_list; // the root txns live when the root ancestor (self if a root) started
|
||||
XIDS xids; //Represents the xid list
|
||||
BOOL force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn)
|
||||
TXN_PROGRESS_POLL_FUNCTION progress_poll_fun;
|
||||
void * progress_poll_fun_extra;
|
||||
|
||||
struct txn_roll_info {
|
||||
// these are number of rollback nodes and rollback entries for this txn.
|
||||
//
|
||||
// the current rollback node below has sequence number num_rollback_nodes - 1
|
||||
|
@ -131,35 +114,76 @@ struct tokutxn {
|
|||
// rollback nodes for this transaction is non-zero, then we will use
|
||||
// the number of rollback nodes to know which sequence number to assign
|
||||
// to a new one we create
|
||||
uint64_t num_rollback_nodes;
|
||||
uint64_t num_rollentries;
|
||||
uint64_t num_rollentries_processed;
|
||||
uint64_t num_rollback_nodes;
|
||||
uint64_t num_rollentries;
|
||||
uint64_t num_rollentries_processed;
|
||||
uint64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children.
|
||||
|
||||
// spilled rollback nodes are rollback nodes that were gorged by this
|
||||
// transaction, retired, and saved in a list.
|
||||
|
||||
// the spilled rollback head is the block number of the first rollback node
|
||||
// that makes up the rollback log chain
|
||||
BLOCKNUM spilled_rollback_head;
|
||||
uint32_t spilled_rollback_head_hash;
|
||||
BLOCKNUM spilled_rollback_head;
|
||||
uint32_t spilled_rollback_head_hash;
|
||||
// the spilled rollback is the block number of the last rollback node that
|
||||
// makes up the rollback log chain.
|
||||
BLOCKNUM spilled_rollback_tail;
|
||||
uint32_t spilled_rollback_tail_hash;
|
||||
BLOCKNUM spilled_rollback_tail;
|
||||
uint32_t spilled_rollback_tail_hash;
|
||||
// the current rollback node block number we may use. if this is ROLLBACK_NONE,
|
||||
// then we need to create one and set it here before using it.
|
||||
BLOCKNUM current_rollback;
|
||||
uint32_t current_rollback_hash;
|
||||
BLOCKNUM current_rollback;
|
||||
uint32_t current_rollback_hash;
|
||||
};
|
||||
|
||||
BOOL recovered_from_checkpoint;
|
||||
struct tokutxn {
|
||||
// These don't change after create:
|
||||
const time_t starttime; // timestamp in seconds of transaction start
|
||||
const u_int64_t txnid64; // this happens to be the first lsn
|
||||
const u_int64_t ancestor_txnid64; // this is the lsn of root transaction
|
||||
const u_int64_t snapshot_txnid64; // this is the lsn of the snapshot
|
||||
const TXN_SNAPSHOT_TYPE snapshot_type;
|
||||
const BOOL recovered_from_checkpoint;
|
||||
const TOKULOGGER logger;
|
||||
const TOKUTXN parent;
|
||||
// These don't either but they're created in a way that's hard to make
|
||||
// strictly const.
|
||||
DB_TXN *container_db_txn; // reference to DB_TXN that contains this tokutxn
|
||||
OMT live_root_txn_list; // the root txns live when the root ancestor (self if a root) started.
|
||||
XIDS xids; // Represents the xid list
|
||||
|
||||
// These are not read until a commit, prepare, or abort starts, and
|
||||
// they're "monotonic" (only go false->true) during operation:
|
||||
BOOL checkpoint_needed_before_commit;
|
||||
BOOL do_fsync;
|
||||
BOOL force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn)
|
||||
|
||||
// Not used until commit, prepare, or abort starts:
|
||||
LSN do_fsync_lsn;
|
||||
TOKU_XA_XID xa_xid; // for prepared transactions
|
||||
TXN_PROGRESS_POLL_FUNCTION progress_poll_fun;
|
||||
void *progress_poll_fun_extra;
|
||||
|
||||
toku_mutex_t txn_lock;
|
||||
// Protected by the txn lock:
|
||||
OMT open_fts; // a collection of the brts that we touched. Indexed by filenum.
|
||||
struct txn_roll_info roll_info; // Info used to manage rollback entries
|
||||
|
||||
// Protected by the txn manager lock:
|
||||
TOKUTXN_STATE state;
|
||||
LSN do_fsync_lsn;
|
||||
BOOL do_fsync;
|
||||
TOKU_XA_XID xa_xid; // for prepared transactions
|
||||
struct toku_list prepared_txns_link; // list of prepared transactions
|
||||
};
|
||||
|
||||
static inline int
|
||||
txn_has_current_rollback_log(TOKUTXN txn) {
|
||||
return txn->roll_info.current_rollback.b != ROLLBACK_NONE.b;
|
||||
}
|
||||
|
||||
static inline int
|
||||
txn_has_spilled_rollback_logs(TOKUTXN txn) {
|
||||
return txn->roll_info.spilled_rollback_tail.b != ROLLBACK_NONE.b;
|
||||
}
|
||||
|
||||
struct txninfo {
|
||||
uint64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children.
|
||||
uint32_t num_fts;
|
||||
|
|
|
@ -573,6 +573,7 @@ generate_rollbacks (void) {
|
|||
|
||||
fprintf(hf, ");\n");
|
||||
fprintf(cf, ") {\n");
|
||||
fprintf(cf, " toku_txn_lock(txn);\n");
|
||||
fprintf(cf, " ROLLBACK_LOG_NODE log;\n");
|
||||
fprintf(cf, " toku_get_and_pin_rollback_log_for_new_entry(txn, &log);\n");
|
||||
// 'memdup' all BYTESTRINGS here
|
||||
|
@ -601,19 +602,20 @@ generate_rollbacks (void) {
|
|||
fprintf(cf, " struct roll_entry *v;\n");
|
||||
fprintf(cf, " size_t mem_needed = sizeof(v->u.%s) + __builtin_offsetof(struct roll_entry, u.%s);\n", lt->name, lt->name);
|
||||
fprintf(cf, " v = toku_malloc_in_rollback(log, mem_needed);\n");
|
||||
fprintf(cf, " if (v==0) return errno;\n");
|
||||
fprintf(cf, " assert(v);\n");
|
||||
fprintf(cf, " v->cmd = (enum rt_cmd)%u;\n", lt->command_and_flags&0xff);
|
||||
DO_FIELDS(field_type, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, field_type->name, field_type->name));
|
||||
fprintf(cf, " v->prev = log->newest_logentry;\n");
|
||||
fprintf(cf, " if (log->oldest_logentry==NULL) log->oldest_logentry=v;\n");
|
||||
fprintf(cf, " log->newest_logentry = v;\n");
|
||||
fprintf(cf, " log->rollentry_resident_bytecount += rollback_fsize;\n");
|
||||
fprintf(cf, " txn->rollentry_raw_count += rollback_fsize;\n");
|
||||
fprintf(cf, " txn->num_rollentries++;\n");
|
||||
fprintf(cf, " txn->roll_info.rollentry_raw_count += rollback_fsize;\n");
|
||||
fprintf(cf, " txn->roll_info.num_rollentries++;\n");
|
||||
fprintf(cf, " log->dirty = TRUE;\n");
|
||||
fprintf(cf, " // spill and unpin assert success internally\n");
|
||||
fprintf(cf, " toku_maybe_spill_rollbacks(txn, log);\n");
|
||||
fprintf(cf, " toku_rollback_log_unpin(txn, log);\n");
|
||||
fprintf(cf, " toku_txn_unlock(txn);\n");
|
||||
fprintf(cf, " return 0;\n}\n");
|
||||
});
|
||||
|
||||
|
|
12
ft/recover.c
12
ft/recover.c
|
@ -480,7 +480,7 @@ recover_transaction(TOKUTXN *txnp, TXNID xid, TXNID parentxid, TOKULOGGER logger
|
|||
assert(r == 0);
|
||||
assert(txn==NULL);
|
||||
}
|
||||
r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE, NULL);
|
||||
r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE, NULL, true);
|
||||
assert(r == 0);
|
||||
if (txnp) *txnp = txn;
|
||||
return 0;
|
||||
|
@ -623,10 +623,9 @@ static int toku_recover_suppress_rollback (struct logtype_suppress_rollback *UU(
|
|||
r = toku_txnid2txn(renv->logger, l->xid, &txn);
|
||||
assert(r == 0);
|
||||
assert(txn!=NULL);
|
||||
FT h = tuple->ft_handle->ft;
|
||||
toku_ft_suppress_rollbacks(h, txn);
|
||||
r = toku_txn_note_ft(txn, tuple->ft_handle->ft);
|
||||
assert(r==0);
|
||||
FT ft = tuple->ft_handle->ft;
|
||||
toku_ft_suppress_rollbacks(ft, txn);
|
||||
toku_txn_maybe_note_ft(txn, ft);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -884,8 +883,7 @@ static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV re
|
|||
toku_fill_dbt(&valdbt, l->value.data, l->value.len);
|
||||
r = toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, TRUE, l->lsn, FALSE, FT_INSERT);
|
||||
assert(r == 0);
|
||||
r = toku_txn_note_ft(txn, tuple->ft_handle->ft);
|
||||
assert(r == 0);
|
||||
toku_txn_maybe_note_ft(txn, tuple->ft_handle->ft);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
/* rollback and rollforward routines. */
|
||||
|
||||
#include "includes.h"
|
||||
#include "rollback-apply.h"
|
||||
#include "xids.h"
|
||||
|
||||
// functionality provided by roll.c is exposed by an autogenerated
|
||||
|
|
222
ft/rollback-apply.c
Normal file
222
ft/rollback-apply.c
Normal file
|
@ -0,0 +1,222 @@
|
|||
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
|
||||
// vim: expandtab:ts=8:sw=4:softtabstop=4:
|
||||
#ident "$Id$"
|
||||
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
|
||||
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
|
||||
|
||||
#include "fttypes.h"
|
||||
#include "log-internal.h"
|
||||
#include "rollback-apply.h"
|
||||
|
||||
static void
|
||||
poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) {
|
||||
if (txn->progress_poll_fun) {
|
||||
TOKU_TXN_PROGRESS_S progress = {
|
||||
.entries_total = txn->roll_info.num_rollentries,
|
||||
.entries_processed = txn->roll_info.num_rollentries_processed,
|
||||
.is_commit = is_commit,
|
||||
.stalled_on_checkpoint = stall_for_checkpoint};
|
||||
txn->progress_poll_fun(&progress, txn->progress_poll_fun_extra);
|
||||
}
|
||||
}
|
||||
|
||||
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
|
||||
int r=0;
|
||||
rolltype_dispatch_assign(item, toku_commit_, r, txn, lsn);
|
||||
txn->roll_info.num_rollentries_processed++;
|
||||
if (txn->roll_info.num_rollentries_processed % 1024 == 0) {
|
||||
poll_txn_progress_function(txn, TRUE, FALSE);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
|
||||
int r=0;
|
||||
rolltype_dispatch_assign(item, toku_rollback_, r, txn, lsn);
|
||||
txn->roll_info.num_rollentries_processed++;
|
||||
if (txn->roll_info.num_rollentries_processed % 1024 == 0) {
|
||||
poll_txn_progress_function(txn, FALSE, FALSE);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
static int
|
||||
note_ft_used_in_txns_parent(OMTVALUE ftv, u_int32_t UU(index), void *txnv) {
|
||||
TOKUTXN child = txnv;
|
||||
TOKUTXN parent = child->parent;
|
||||
FT ft = ftv;
|
||||
toku_txn_maybe_note_ft(parent, ft);
|
||||
if (ft->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(child)) {
|
||||
//Pass magic "no rollback needed" flag to parent.
|
||||
ft->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(parent);
|
||||
}
|
||||
if (ft->txnid_that_suppressed_recovery_logs == toku_txn_get_txnid(child)) {
|
||||
//Pass magic "no recovery needed" flag to parent.
|
||||
ft->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(parent);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
|
||||
int r = 0;
|
||||
// do the commit/abort calls and free everything
|
||||
// we do the commit/abort calls in reverse order too.
|
||||
struct roll_entry *item;
|
||||
//printf("%s:%d abort\n", __FILE__, __LINE__);
|
||||
|
||||
BLOCKNUM next_log = ROLLBACK_NONE;
|
||||
uint32_t next_log_hash = 0;
|
||||
|
||||
BOOL is_current = FALSE;
|
||||
if (txn_has_current_rollback_log(txn)) {
|
||||
next_log = txn->roll_info.current_rollback;
|
||||
next_log_hash = txn->roll_info.current_rollback_hash;
|
||||
is_current = TRUE;
|
||||
}
|
||||
else if (txn_has_spilled_rollback_logs(txn)) {
|
||||
next_log = txn->roll_info.spilled_rollback_tail;
|
||||
next_log_hash = txn->roll_info.spilled_rollback_tail_hash;
|
||||
}
|
||||
|
||||
uint64_t last_sequence = txn->roll_info.num_rollback_nodes;
|
||||
BOOL found_head = FALSE;
|
||||
while (next_log.b != ROLLBACK_NONE.b) {
|
||||
ROLLBACK_LOG_NODE log;
|
||||
//pin log
|
||||
toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log);
|
||||
toku_rollback_verify_contents(log, txn->txnid64, last_sequence - 1);
|
||||
|
||||
toku_maybe_prefetch_previous_rollback_log(txn, log);
|
||||
|
||||
last_sequence = log->sequence;
|
||||
if (func) {
|
||||
while ((item=log->newest_logentry)) {
|
||||
log->newest_logentry = item->prev;
|
||||
r = func(txn, item, lsn);
|
||||
if (r!=0) return r;
|
||||
}
|
||||
}
|
||||
if (next_log.b == txn->roll_info.spilled_rollback_head.b) {
|
||||
assert(!found_head);
|
||||
found_head = TRUE;
|
||||
assert(log->sequence == 0);
|
||||
}
|
||||
next_log = log->previous;
|
||||
next_log_hash = log->previous_hash;
|
||||
{
|
||||
//Clean up transaction structure to prevent
|
||||
//toku_txn_close from double-freeing
|
||||
if (is_current) {
|
||||
txn->roll_info.current_rollback = ROLLBACK_NONE;
|
||||
txn->roll_info.current_rollback_hash = 0;
|
||||
is_current = FALSE;
|
||||
}
|
||||
else {
|
||||
txn->roll_info.spilled_rollback_tail = next_log;
|
||||
txn->roll_info.spilled_rollback_tail_hash = next_log_hash;
|
||||
}
|
||||
if (found_head) {
|
||||
assert(next_log.b == ROLLBACK_NONE.b);
|
||||
txn->roll_info.spilled_rollback_head = next_log;
|
||||
txn->roll_info.spilled_rollback_head_hash = next_log_hash;
|
||||
}
|
||||
}
|
||||
toku_rollback_log_unpin_and_remove(txn, log);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
//Commit each entry in the rollback log.
|
||||
//If the transaction has a parent, it just promotes its information to its parent.
|
||||
int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
|
||||
int r=0;
|
||||
if (txn->parent!=0) {
|
||||
// First we must put a rollinclude entry into the parent if we spilled
|
||||
|
||||
if (txn_has_spilled_rollback_logs(txn)) {
|
||||
uint64_t num_nodes = txn->roll_info.num_rollback_nodes;
|
||||
if (txn_has_current_rollback_log(txn)) {
|
||||
num_nodes--; //Don't count the in-progress rollback log.
|
||||
}
|
||||
r = toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid64, num_nodes,
|
||||
txn->roll_info.spilled_rollback_head, txn->roll_info.spilled_rollback_head_hash,
|
||||
txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash);
|
||||
if (r!=0) return r;
|
||||
//Remove ownership from child.
|
||||
txn->roll_info.spilled_rollback_head = ROLLBACK_NONE;
|
||||
txn->roll_info.spilled_rollback_head_hash = 0;
|
||||
txn->roll_info.spilled_rollback_tail = ROLLBACK_NONE;
|
||||
txn->roll_info.spilled_rollback_tail_hash = 0;
|
||||
}
|
||||
// if we're commiting a child rollback, put its entries into the parent
|
||||
// by pinning both child and parent and then linking the child log entry
|
||||
// list to the end of the parent log entry list.
|
||||
if (txn_has_current_rollback_log(txn)) {
|
||||
//Pin parent log
|
||||
ROLLBACK_LOG_NODE parent_log;
|
||||
toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log);
|
||||
|
||||
//Pin child log
|
||||
ROLLBACK_LOG_NODE child_log;
|
||||
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback,
|
||||
txn->roll_info.current_rollback_hash, &child_log);
|
||||
toku_rollback_verify_contents(child_log, txn->txnid64, txn->roll_info.num_rollback_nodes - 1);
|
||||
|
||||
// Append the list to the front of the parent.
|
||||
if (child_log->oldest_logentry) {
|
||||
// There are some entries, so link them in.
|
||||
child_log->oldest_logentry->prev = parent_log->newest_logentry;
|
||||
if (!parent_log->oldest_logentry) {
|
||||
parent_log->oldest_logentry = child_log->oldest_logentry;
|
||||
}
|
||||
parent_log->newest_logentry = child_log->newest_logentry;
|
||||
parent_log->rollentry_resident_bytecount += child_log->rollentry_resident_bytecount;
|
||||
txn->parent->roll_info.rollentry_raw_count += txn->roll_info.rollentry_raw_count;
|
||||
child_log->rollentry_resident_bytecount = 0;
|
||||
}
|
||||
if (parent_log->oldest_logentry==NULL) {
|
||||
parent_log->oldest_logentry = child_log->oldest_logentry;
|
||||
}
|
||||
child_log->newest_logentry = child_log->oldest_logentry = 0;
|
||||
// Put all the memarena data into the parent.
|
||||
if (memarena_total_size_in_use(child_log->rollentry_arena) > 0) {
|
||||
// If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed.
|
||||
memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena);
|
||||
}
|
||||
toku_rollback_log_unpin_and_remove(txn, child_log);
|
||||
txn->roll_info.current_rollback = ROLLBACK_NONE;
|
||||
txn->roll_info.current_rollback_hash = 0;
|
||||
|
||||
toku_maybe_spill_rollbacks(txn->parent, parent_log);
|
||||
toku_rollback_log_unpin(txn->parent, parent_log);
|
||||
assert(r == 0);
|
||||
}
|
||||
|
||||
// Note the open brts, the omts must be merged
|
||||
r = toku_omt_iterate(txn->open_fts, note_ft_used_in_txns_parent, txn);
|
||||
assert(r==0);
|
||||
|
||||
// Merge the list of headers that must be checkpointed before commit
|
||||
if (txn->checkpoint_needed_before_commit) {
|
||||
txn->parent->checkpoint_needed_before_commit = TRUE;
|
||||
}
|
||||
|
||||
//If this transaction needs an fsync (if it commits)
|
||||
//save that in the parent. Since the commit really happens in the root txn.
|
||||
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
|
||||
txn->parent->roll_info.num_rollentries += txn->roll_info.num_rollentries;
|
||||
} else {
|
||||
r = apply_txn(txn, lsn, toku_commit_rollback_item);
|
||||
assert(r==0);
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int toku_rollback_abort(TOKUTXN txn, LSN lsn) {
|
||||
int r;
|
||||
r = apply_txn(txn, lsn, toku_abort_rollback_item);
|
||||
assert(r==0);
|
||||
return r;
|
||||
}
|
25
ft/rollback-apply.h
Normal file
25
ft/rollback-apply.h
Normal file
|
@ -0,0 +1,25 @@
|
|||
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
|
||||
// vim: expandtab:ts=8:sw=4:softtabstop=4:
|
||||
#ifndef ROLLBACK_APPLY_H
|
||||
#define ROLLBACK_APPLY_H
|
||||
|
||||
#ident "$Id$"
|
||||
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
|
||||
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
|
||||
|
||||
#if defined(__cplusplus) || defined(__cilkplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, LSN lsn);
|
||||
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
|
||||
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
|
||||
|
||||
int toku_rollback_commit(TOKUTXN txn, LSN lsn);
|
||||
int toku_rollback_abort(TOKUTXN txn, LSN lsn);
|
||||
|
||||
#if defined(__cplusplus) || defined(__cilkplusplus)
|
||||
};
|
||||
#endif
|
||||
|
||||
#endif // ROLLBACK_APPLY_H
|
123
ft/rollback-ct-callbacks.c
Normal file
123
ft/rollback-ct-callbacks.c
Normal file
|
@ -0,0 +1,123 @@
|
|||
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
|
||||
// vim: expandtab:ts=8:sw=4:softtabstop=4:
|
||||
#ident "$Id$"
|
||||
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
|
||||
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
|
||||
|
||||
#include "rollback-ct-callbacks.h"
|
||||
|
||||
#include <toku_portability.h>
|
||||
#include <memory.h>
|
||||
#include "ft-internal.h"
|
||||
#include "fttypes.h"
|
||||
#include "memarena.h"
|
||||
#include "rollback.h"
|
||||
|
||||
|
||||
// Cleanup the rollback memory
|
||||
static void
|
||||
rollback_log_destroy(ROLLBACK_LOG_NODE log) {
|
||||
memarena_close(&log->rollentry_arena);
|
||||
toku_free(log);
|
||||
}
|
||||
|
||||
// Write something out. Keep trying even if partial writes occur.
|
||||
// On error: Return negative with errno set.
|
||||
// On success return nbytes.
|
||||
void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
|
||||
void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size,
|
||||
BOOL write_me, BOOL keep_me, BOOL for_checkpoint, BOOL UU(is_clone)) {
|
||||
int r;
|
||||
ROLLBACK_LOG_NODE log = rollback_v;
|
||||
FT h = extraargs;
|
||||
|
||||
assert(h->cf == cachefile);
|
||||
assert(log->blocknum.b==logname.b);
|
||||
|
||||
if (write_me && !h->panic) {
|
||||
int n_workitems, n_threads;
|
||||
toku_cachefile_get_workqueue_load(cachefile, &n_workitems, &n_threads);
|
||||
|
||||
r = toku_serialize_rollback_log_to(fd, log->blocknum, log, h, n_workitems, n_threads, for_checkpoint);
|
||||
if (r) {
|
||||
if (h->panic==0) {
|
||||
char *e = strerror(r);
|
||||
int l = 200 + strlen(e);
|
||||
char s[l];
|
||||
h->panic=r;
|
||||
snprintf(s, l-1, "While writing data to disk, error %d (%s)", r, e);
|
||||
h->panic_string = toku_strdup(s);
|
||||
}
|
||||
}
|
||||
}
|
||||
*new_size = size;
|
||||
if (!keep_me) {
|
||||
rollback_log_destroy(log);
|
||||
}
|
||||
}
|
||||
|
||||
int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash,
|
||||
void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
|
||||
int r;
|
||||
FT h = extraargs;
|
||||
assert(h->cf == cachefile);
|
||||
|
||||
ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv;
|
||||
r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, h);
|
||||
if (r==0) {
|
||||
*sizep = rollback_memory_size(*result);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
void toku_rollback_pe_est_callback(
|
||||
void* rollback_v,
|
||||
void* UU(disk_data),
|
||||
long* bytes_freed_estimate,
|
||||
enum partial_eviction_cost *cost,
|
||||
void* UU(write_extraargs)
|
||||
)
|
||||
{
|
||||
assert(rollback_v != NULL);
|
||||
*bytes_freed_estimate = 0;
|
||||
*cost = PE_CHEAP;
|
||||
}
|
||||
|
||||
// callback for partially evicting a cachetable entry
|
||||
int toku_rollback_pe_callback (
|
||||
void *rollback_v,
|
||||
PAIR_ATTR UU(old_attr),
|
||||
PAIR_ATTR* new_attr,
|
||||
void* UU(extraargs)
|
||||
)
|
||||
{
|
||||
assert(rollback_v != NULL);
|
||||
*new_attr = old_attr;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// partial fetch is never required for a rollback log node
|
||||
BOOL toku_rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) {
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
// a rollback node should never be partial fetched,
|
||||
// because we always say it is not required.
|
||||
// (pf req callback always returns false)
|
||||
int toku_rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)) {
|
||||
assert(FALSE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// the cleaner thread should never choose a rollback node for cleaning
|
||||
int toku_rollback_cleaner_callback (
|
||||
void* UU(ftnode_pv),
|
||||
BLOCKNUM UU(blocknum),
|
||||
u_int32_t UU(fullhash),
|
||||
void* UU(extraargs)
|
||||
)
|
||||
{
|
||||
assert(FALSE);
|
||||
return 0;
|
||||
}
|
||||
|
56
ft/rollback-ct-callbacks.h
Normal file
56
ft/rollback-ct-callbacks.h
Normal file
|
@ -0,0 +1,56 @@
|
|||
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
|
||||
// vim: expandtab:ts=8:sw=4:softtabstop=4:
|
||||
#ifndef ROLLBACK_CT_CALLBACKS_H
|
||||
#define ROLLBACK_CT_CALLBACKS_H
|
||||
|
||||
#ident "$Id$"
|
||||
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
|
||||
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
|
||||
|
||||
#if defined(__cplusplus) || defined(__cilkplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "cachetable.h"
|
||||
#include "fttypes.h"
|
||||
|
||||
void toku_rollback_flush_callback(CACHEFILE cachefile, int fd, BLOCKNUM logname, void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint, BOOL UU(is_clone));
|
||||
int toku_rollback_fetch_callback(CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash, void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs);
|
||||
void toku_rollback_pe_est_callback(
|
||||
void* rollback_v,
|
||||
void* UU(disk_data),
|
||||
long* bytes_freed_estimate,
|
||||
enum partial_eviction_cost *cost,
|
||||
void* UU(write_extraargs)
|
||||
);
|
||||
int toku_rollback_pe_callback (
|
||||
void *rollback_v,
|
||||
PAIR_ATTR UU(old_attr),
|
||||
PAIR_ATTR* new_attr,
|
||||
void* UU(extraargs)
|
||||
) ;
|
||||
BOOL toku_rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) ;
|
||||
int toku_rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep));
|
||||
int toku_rollback_cleaner_callback (
|
||||
void* UU(ftnode_pv),
|
||||
BLOCKNUM UU(blocknum),
|
||||
u_int32_t UU(fullhash),
|
||||
void* UU(extraargs)
|
||||
);
|
||||
|
||||
static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(FT h) {
|
||||
CACHETABLE_WRITE_CALLBACK wc;
|
||||
wc.flush_callback = toku_rollback_flush_callback;
|
||||
wc.pe_est_callback = toku_rollback_pe_est_callback;
|
||||
wc.pe_callback = toku_rollback_pe_callback;
|
||||
wc.cleaner_callback = toku_rollback_cleaner_callback;
|
||||
wc.clone_callback = NULL;
|
||||
wc.write_extraargs = h;
|
||||
return wc;
|
||||
}
|
||||
|
||||
#if defined(__cplusplus) || defined(__cilkplusplus)
|
||||
};
|
||||
#endif
|
||||
|
||||
#endif // ROLLBACK_CT_CALLBACKS_H
|
414
ft/rollback.c
414
ft/rollback.c
|
@ -5,48 +5,7 @@
|
|||
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
|
||||
|
||||
#include "includes.h"
|
||||
|
||||
static void
|
||||
poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) {
|
||||
if (txn->progress_poll_fun) {
|
||||
TOKU_TXN_PROGRESS_S progress = {
|
||||
.entries_total = txn->num_rollentries,
|
||||
.entries_processed = txn->num_rollentries_processed,
|
||||
.is_commit = is_commit,
|
||||
.stalled_on_checkpoint = stall_for_checkpoint};
|
||||
txn->progress_poll_fun(&progress, txn->progress_poll_fun_extra);
|
||||
}
|
||||
}
|
||||
|
||||
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
|
||||
int r=0;
|
||||
rolltype_dispatch_assign(item, toku_commit_, r, txn, lsn);
|
||||
txn->num_rollentries_processed++;
|
||||
if (txn->num_rollentries_processed % 1024 == 0) {
|
||||
poll_txn_progress_function(txn, TRUE, FALSE);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
|
||||
int r=0;
|
||||
rolltype_dispatch_assign(item, toku_rollback_, r, txn, lsn);
|
||||
txn->num_rollentries_processed++;
|
||||
if (txn->num_rollentries_processed % 1024 == 0) {
|
||||
poll_txn_progress_function(txn, FALSE, FALSE);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
static inline int
|
||||
txn_has_current_rollback_log(TOKUTXN txn) {
|
||||
return txn->current_rollback.b != ROLLBACK_NONE.b;
|
||||
}
|
||||
|
||||
static inline int
|
||||
txn_has_spilled_rollback_logs(TOKUTXN txn) {
|
||||
return txn->spilled_rollback_tail.b != ROLLBACK_NONE.b;
|
||||
}
|
||||
#include "rollback-ct-callbacks.h"
|
||||
|
||||
static void rollback_unpin_remove_callback(CACHEKEY* cachekey, BOOL for_checkpoint, void* extra) {
|
||||
FT h = extra;
|
||||
|
@ -67,77 +26,6 @@ void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
|
|||
assert(r == 0);
|
||||
}
|
||||
|
||||
static int
|
||||
apply_txn (TOKUTXN txn, LSN lsn,
|
||||
apply_rollback_item func) {
|
||||
int r = 0;
|
||||
// do the commit/abort calls and free everything
|
||||
// we do the commit/abort calls in reverse order too.
|
||||
struct roll_entry *item;
|
||||
//printf("%s:%d abort\n", __FILE__, __LINE__);
|
||||
|
||||
BLOCKNUM next_log = ROLLBACK_NONE;
|
||||
uint32_t next_log_hash = 0;
|
||||
|
||||
BOOL is_current = FALSE;
|
||||
if (txn_has_current_rollback_log(txn)) {
|
||||
next_log = txn->current_rollback;
|
||||
next_log_hash = txn->current_rollback_hash;
|
||||
is_current = TRUE;
|
||||
}
|
||||
else if (txn_has_spilled_rollback_logs(txn)) {
|
||||
next_log = txn->spilled_rollback_tail;
|
||||
next_log_hash = txn->spilled_rollback_tail_hash;
|
||||
}
|
||||
|
||||
uint64_t last_sequence = txn->num_rollback_nodes;
|
||||
BOOL found_head = FALSE;
|
||||
while (next_log.b != ROLLBACK_NONE.b) {
|
||||
ROLLBACK_LOG_NODE log;
|
||||
//pin log
|
||||
toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log);
|
||||
toku_rollback_verify_contents(log, txn->txnid64, last_sequence - 1);
|
||||
|
||||
toku_maybe_prefetch_previous_rollback_log(txn, log);
|
||||
|
||||
last_sequence = log->sequence;
|
||||
if (func) {
|
||||
while ((item=log->newest_logentry)) {
|
||||
log->newest_logentry = item->prev;
|
||||
r = func(txn, item, lsn);
|
||||
if (r!=0) return r;
|
||||
}
|
||||
}
|
||||
if (next_log.b == txn->spilled_rollback_head.b) {
|
||||
assert(!found_head);
|
||||
found_head = TRUE;
|
||||
assert(log->sequence == 0);
|
||||
}
|
||||
next_log = log->previous;
|
||||
next_log_hash = log->previous_hash;
|
||||
{
|
||||
//Clean up transaction structure to prevent
|
||||
//toku_txn_close from double-freeing
|
||||
if (is_current) {
|
||||
txn->current_rollback = ROLLBACK_NONE;
|
||||
txn->current_rollback_hash = 0;
|
||||
is_current = FALSE;
|
||||
}
|
||||
else {
|
||||
txn->spilled_rollback_tail = next_log;
|
||||
txn->spilled_rollback_tail_hash = next_log_hash;
|
||||
}
|
||||
if (found_head) {
|
||||
assert(next_log.b == ROLLBACK_NONE.b);
|
||||
txn->spilled_rollback_head = next_log;
|
||||
txn->spilled_rollback_head_hash = next_log_hash;
|
||||
}
|
||||
}
|
||||
toku_rollback_log_unpin_and_remove(txn, log);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
int
|
||||
toku_find_xid_by_xid (OMTVALUE v, void *xidv) {
|
||||
TXNID xid = (TXNID) v;
|
||||
|
@ -166,118 +54,6 @@ void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len)
|
|||
return r;
|
||||
}
|
||||
|
||||
static int note_ft_used_in_txns_parent(OMTVALUE hv, u_int32_t UU(index), void*txnv) {
|
||||
TOKUTXN child = txnv;
|
||||
TOKUTXN parent = child->parent;
|
||||
FT h = hv;
|
||||
int r = toku_txn_note_ft(parent, h);
|
||||
if (r==0 &&
|
||||
h->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(child)) {
|
||||
//Pass magic "no rollback needed" flag to parent.
|
||||
h->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(parent);
|
||||
}
|
||||
if (r==0 &&
|
||||
h->txnid_that_suppressed_recovery_logs == toku_txn_get_txnid(child)) {
|
||||
//Pass magic "no recovery needed" flag to parent.
|
||||
h->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(parent);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
//Commit each entry in the rollback log.
|
||||
//If the transaction has a parent, it just promotes its information to its parent.
|
||||
int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
|
||||
int r=0;
|
||||
if (txn->parent!=0) {
|
||||
// First we must put a rollinclude entry into the parent if we spilled
|
||||
|
||||
if (txn_has_spilled_rollback_logs(txn)) {
|
||||
uint64_t num_nodes = txn->num_rollback_nodes;
|
||||
if (txn_has_current_rollback_log(txn)) {
|
||||
num_nodes--; //Don't count the in-progress rollback log.
|
||||
}
|
||||
r = toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid64, num_nodes,
|
||||
txn->spilled_rollback_head, txn->spilled_rollback_head_hash,
|
||||
txn->spilled_rollback_tail, txn->spilled_rollback_tail_hash);
|
||||
if (r!=0) return r;
|
||||
//Remove ownership from child.
|
||||
txn->spilled_rollback_head = ROLLBACK_NONE;
|
||||
txn->spilled_rollback_head_hash = 0;
|
||||
txn->spilled_rollback_tail = ROLLBACK_NONE;
|
||||
txn->spilled_rollback_tail_hash = 0;
|
||||
}
|
||||
// if we're commiting a child rollback, put its entries into the parent
|
||||
// by pinning both child and parent and then linking the child log entry
|
||||
// list to the end of the parent log entry list.
|
||||
if (txn_has_current_rollback_log(txn)) {
|
||||
//Pin parent log
|
||||
ROLLBACK_LOG_NODE parent_log;
|
||||
toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log);
|
||||
|
||||
//Pin child log
|
||||
ROLLBACK_LOG_NODE child_log;
|
||||
toku_get_and_pin_rollback_log(txn, txn->current_rollback,
|
||||
txn->current_rollback_hash, &child_log);
|
||||
toku_rollback_verify_contents(child_log, txn->txnid64, txn->num_rollback_nodes - 1);
|
||||
|
||||
// Append the list to the front of the parent.
|
||||
if (child_log->oldest_logentry) {
|
||||
// There are some entries, so link them in.
|
||||
child_log->oldest_logentry->prev = parent_log->newest_logentry;
|
||||
if (!parent_log->oldest_logentry) {
|
||||
parent_log->oldest_logentry = child_log->oldest_logentry;
|
||||
}
|
||||
parent_log->newest_logentry = child_log->newest_logentry;
|
||||
parent_log->rollentry_resident_bytecount += child_log->rollentry_resident_bytecount;
|
||||
txn->parent->rollentry_raw_count += txn->rollentry_raw_count;
|
||||
child_log->rollentry_resident_bytecount = 0;
|
||||
}
|
||||
if (parent_log->oldest_logentry==NULL) {
|
||||
parent_log->oldest_logentry = child_log->oldest_logentry;
|
||||
}
|
||||
child_log->newest_logentry = child_log->oldest_logentry = 0;
|
||||
// Put all the memarena data into the parent.
|
||||
if (memarena_total_size_in_use(child_log->rollentry_arena) > 0) {
|
||||
// If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed.
|
||||
memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena);
|
||||
}
|
||||
toku_rollback_log_unpin_and_remove(txn, child_log);
|
||||
txn->current_rollback = ROLLBACK_NONE;
|
||||
txn->current_rollback_hash = 0;
|
||||
|
||||
toku_maybe_spill_rollbacks(txn->parent, parent_log);
|
||||
toku_rollback_log_unpin(txn->parent, parent_log);
|
||||
assert(r == 0);
|
||||
}
|
||||
|
||||
// Note the open brts, the omts must be merged
|
||||
r = toku_omt_iterate(txn->open_fts, note_ft_used_in_txns_parent, txn);
|
||||
assert(r==0);
|
||||
|
||||
// Merge the list of headers that must be checkpointed before commit
|
||||
if (txn->checkpoint_needed_before_commit) {
|
||||
txn->parent->checkpoint_needed_before_commit = TRUE;
|
||||
}
|
||||
|
||||
//If this transaction needs an fsync (if it commits)
|
||||
//save that in the parent. Since the commit really happens in the root txn.
|
||||
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
|
||||
txn->parent->num_rollentries += txn->num_rollentries;
|
||||
} else {
|
||||
r = apply_txn(txn, lsn, toku_commit_rollback_item);
|
||||
assert(r==0);
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
int toku_rollback_abort(TOKUTXN txn, LSN lsn) {
|
||||
int r;
|
||||
r = apply_txn(txn, lsn, toku_abort_rollback_item);
|
||||
assert(r==0);
|
||||
return r;
|
||||
}
|
||||
|
||||
static inline PAIR_ATTR make_rollback_pair_attr(long size) {
|
||||
PAIR_ATTR result={
|
||||
.size = size,
|
||||
|
@ -290,133 +66,13 @@ static inline PAIR_ATTR make_rollback_pair_attr(long size) {
|
|||
return result;
|
||||
}
|
||||
|
||||
|
||||
// Write something out. Keep trying even if partial writes occur.
|
||||
// On error: Return negative with errno set.
|
||||
// On success return nbytes.
|
||||
|
||||
static PAIR_ATTR
|
||||
PAIR_ATTR
|
||||
rollback_memory_size(ROLLBACK_LOG_NODE log) {
|
||||
size_t size = sizeof(*log);
|
||||
size += memarena_total_memory_size(log->rollentry_arena);
|
||||
return make_rollback_pair_attr(size);
|
||||
}
|
||||
|
||||
// Cleanup the rollback memory
|
||||
static void
|
||||
rollback_log_destroy(ROLLBACK_LOG_NODE log) {
|
||||
memarena_close(&log->rollentry_arena);
|
||||
toku_free(log);
|
||||
}
|
||||
|
||||
static void rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
|
||||
void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size,
|
||||
BOOL write_me, BOOL keep_me, BOOL for_checkpoint, BOOL UU(is_clone)) {
|
||||
int r;
|
||||
ROLLBACK_LOG_NODE log = rollback_v;
|
||||
FT h = extraargs;
|
||||
|
||||
assert(h->cf == cachefile);
|
||||
assert(log->blocknum.b==logname.b);
|
||||
|
||||
if (write_me && !h->panic) {
|
||||
int n_workitems, n_threads;
|
||||
toku_cachefile_get_workqueue_load(cachefile, &n_workitems, &n_threads);
|
||||
|
||||
r = toku_serialize_rollback_log_to(fd, log->blocknum, log, h, n_workitems, n_threads, for_checkpoint);
|
||||
if (r) {
|
||||
if (h->panic==0) {
|
||||
char *e = strerror(r);
|
||||
int l = 200 + strlen(e);
|
||||
char s[l];
|
||||
h->panic=r;
|
||||
snprintf(s, l-1, "While writing data to disk, error %d (%s)", r, e);
|
||||
h->panic_string = toku_strdup(s);
|
||||
}
|
||||
}
|
||||
}
|
||||
*new_size = size;
|
||||
if (!keep_me) {
|
||||
rollback_log_destroy(log);
|
||||
}
|
||||
}
|
||||
|
||||
static int rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash,
|
||||
void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
|
||||
int r;
|
||||
FT h = extraargs;
|
||||
assert(h->cf == cachefile);
|
||||
|
||||
ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv;
|
||||
r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, h);
|
||||
if (r==0) {
|
||||
*sizep = rollback_memory_size(*result);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
static void rollback_pe_est_callback(
|
||||
void* rollback_v,
|
||||
void* UU(disk_data),
|
||||
long* bytes_freed_estimate,
|
||||
enum partial_eviction_cost *cost,
|
||||
void* UU(write_extraargs)
|
||||
)
|
||||
{
|
||||
assert(rollback_v != NULL);
|
||||
*bytes_freed_estimate = 0;
|
||||
*cost = PE_CHEAP;
|
||||
}
|
||||
|
||||
// callback for partially evicting a cachetable entry
|
||||
static int rollback_pe_callback (
|
||||
void *rollback_v,
|
||||
PAIR_ATTR UU(old_attr),
|
||||
PAIR_ATTR* new_attr,
|
||||
void* UU(extraargs)
|
||||
)
|
||||
{
|
||||
assert(rollback_v != NULL);
|
||||
*new_attr = old_attr;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// partial fetch is never required for a rollback log node
|
||||
static BOOL rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) {
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
// a rollback node should never be partial fetched,
|
||||
// because we always say it is not required.
|
||||
// (pf req callback always returns false)
|
||||
static int rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)) {
|
||||
assert(FALSE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// the cleaner thread should never choose a rollback node for cleaning
|
||||
static int rollback_cleaner_callback (
|
||||
void* UU(ftnode_pv),
|
||||
BLOCKNUM UU(blocknum),
|
||||
u_int32_t UU(fullhash),
|
||||
void* UU(extraargs)
|
||||
)
|
||||
{
|
||||
assert(FALSE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(FT h) {
|
||||
CACHETABLE_WRITE_CALLBACK wc;
|
||||
wc.flush_callback = rollback_flush_callback;
|
||||
wc.pe_est_callback = rollback_pe_est_callback;
|
||||
wc.pe_callback = rollback_pe_callback;
|
||||
wc.cleaner_callback = rollback_cleaner_callback;
|
||||
wc.clone_callback = NULL;
|
||||
wc.write_extraargs = h;
|
||||
return wc;
|
||||
}
|
||||
|
||||
// create and pin a new rollback log node. chain it to the other rollback nodes
|
||||
// by providing a previous blocknum/ hash and assigning the new rollback log
|
||||
// node the next sequence number
|
||||
|
@ -433,7 +89,7 @@ static void rollback_log_create (TOKUTXN txn, BLOCKNUM previous, uint32_t previo
|
|||
log->layout_version_read_from_disk = FT_LAYOUT_VERSION;
|
||||
log->dirty = TRUE;
|
||||
log->txnid = txn->txnid64;
|
||||
log->sequence = txn->num_rollback_nodes++;
|
||||
log->sequence = txn->roll_info.num_rollback_nodes++;
|
||||
toku_allocate_blocknum(h->blocktable, &log->blocknum, h);
|
||||
log->hash = toku_cachetable_hash(cf, log->blocknum);
|
||||
log->previous = previous;
|
||||
|
@ -448,8 +104,8 @@ static void rollback_log_create (TOKUTXN txn, BLOCKNUM previous, uint32_t previo
|
|||
log, rollback_memory_size(log),
|
||||
get_write_callbacks_for_rollback_log(h));
|
||||
assert(r == 0);
|
||||
txn->current_rollback = log->blocknum;
|
||||
txn->current_rollback_hash = log->hash;
|
||||
txn->roll_info.current_rollback = log->blocknum;
|
||||
txn->roll_info.current_rollback_hash = log->hash;
|
||||
}
|
||||
|
||||
void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
|
||||
|
@ -466,19 +122,19 @@ void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
|
|||
// Maybe there is no current after (if it spilled)
|
||||
void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
|
||||
if (log->rollentry_resident_bytecount > txn->logger->write_block_size) {
|
||||
assert(log->blocknum.b == txn->current_rollback.b);
|
||||
assert(log->blocknum.b == txn->roll_info.current_rollback.b);
|
||||
//spill
|
||||
if (!txn_has_spilled_rollback_logs(txn)) {
|
||||
//First spilled. Copy to head.
|
||||
txn->spilled_rollback_head = txn->current_rollback;
|
||||
txn->spilled_rollback_head_hash = txn->current_rollback_hash;
|
||||
txn->roll_info.spilled_rollback_head = txn->roll_info.current_rollback;
|
||||
txn->roll_info.spilled_rollback_head_hash = txn->roll_info.current_rollback_hash;
|
||||
}
|
||||
//Unconditionally copy to tail. Old tail does not need to be cached anymore.
|
||||
txn->spilled_rollback_tail = txn->current_rollback;
|
||||
txn->spilled_rollback_tail_hash = txn->current_rollback_hash;
|
||||
txn->roll_info.spilled_rollback_tail = txn->roll_info.current_rollback;
|
||||
txn->roll_info.spilled_rollback_tail_hash = txn->roll_info.current_rollback_hash;
|
||||
|
||||
txn->current_rollback = ROLLBACK_NONE;
|
||||
txn->current_rollback_hash = 0;
|
||||
txn->roll_info.current_rollback = ROLLBACK_NONE;
|
||||
txn->roll_info.current_rollback_hash = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -493,20 +149,31 @@ static int find_filenum (OMTVALUE v, void *hv) {
|
|||
}
|
||||
|
||||
//Notify a transaction that it has touched a brt.
|
||||
int toku_txn_note_ft (TOKUTXN txn, FT h) {
|
||||
BOOL ref_added = toku_ft_maybe_add_txn_ref(h, txn);
|
||||
// Insert reference to brt into transaction
|
||||
if (ref_added) {
|
||||
int r = toku_omt_insert(txn->open_fts, h, find_filenum, h, 0);
|
||||
assert(r==0);
|
||||
void toku_txn_maybe_note_ft (TOKUTXN txn, FT ft) {
|
||||
toku_txn_lock(txn);
|
||||
OMTVALUE ftv;
|
||||
uint32_t idx;
|
||||
int r = toku_omt_find_zero(txn->open_fts, find_filenum, ft, &ftv, &idx);
|
||||
if (r == 0) {
|
||||
// already there
|
||||
assert((FT) ftv == ft);
|
||||
goto exit;
|
||||
}
|
||||
return 0;
|
||||
r = toku_omt_insert_at(txn->open_fts, ft, idx);
|
||||
assert_zero(r);
|
||||
// TODO(leif): if there's anything that locks the reflock and then
|
||||
// the txn lock, this may deadlock, because it grabs the reflock.
|
||||
toku_ft_add_txn_ref(ft, txn);
|
||||
exit:
|
||||
toku_txn_unlock(txn);
|
||||
}
|
||||
|
||||
// Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
|
||||
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count)
|
||||
{
|
||||
*raw_count = txn->rollentry_raw_count;
|
||||
toku_txn_lock(txn);
|
||||
*raw_count = txn->roll_info.rollentry_raw_count;
|
||||
toku_txn_unlock(txn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -522,9 +189,9 @@ void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE lo
|
|||
BOOL doing_prefetch = FALSE;
|
||||
r = toku_cachefile_prefetch(cf, name, hash,
|
||||
get_write_callbacks_for_rollback_log(h),
|
||||
rollback_fetch_callback,
|
||||
rollback_pf_req_callback,
|
||||
rollback_pf_callback,
|
||||
toku_rollback_fetch_callback,
|
||||
toku_rollback_pf_req_callback,
|
||||
toku_rollback_pf_callback,
|
||||
h,
|
||||
&doing_prefetch);
|
||||
assert(r == 0);
|
||||
|
@ -545,9 +212,9 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash
|
|||
int r = toku_cachetable_get_and_pin(cf, blocknum, hash,
|
||||
&value, NULL,
|
||||
get_write_callbacks_for_rollback_log(h),
|
||||
rollback_fetch_callback,
|
||||
rollback_pf_req_callback,
|
||||
rollback_pf_callback,
|
||||
toku_rollback_fetch_callback,
|
||||
toku_rollback_pf_req_callback,
|
||||
toku_rollback_pf_callback,
|
||||
TRUE, // may_modify_value
|
||||
h
|
||||
);
|
||||
|
@ -561,15 +228,14 @@ void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE
|
|||
ROLLBACK_LOG_NODE pinned_log;
|
||||
invariant(txn->state == TOKUTXN_LIVE); // #3258
|
||||
if (txn_has_current_rollback_log(txn)) {
|
||||
toku_get_and_pin_rollback_log(txn, txn->current_rollback, txn->current_rollback_hash, &pinned_log);
|
||||
toku_rollback_verify_contents(pinned_log, txn->txnid64, txn->num_rollback_nodes - 1);
|
||||
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, txn->roll_info.current_rollback_hash, &pinned_log);
|
||||
toku_rollback_verify_contents(pinned_log, txn->txnid64, txn->roll_info.num_rollback_nodes - 1);
|
||||
} else {
|
||||
// create a new log for this transaction to use.
|
||||
// this call asserts success internally
|
||||
rollback_log_create(txn, txn->spilled_rollback_tail, txn->spilled_rollback_tail_hash, &pinned_log);
|
||||
rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash, &pinned_log);
|
||||
}
|
||||
assert(pinned_log->txnid == txn->txnid64);
|
||||
assert(pinned_log->blocknum.b != ROLLBACK_NONE.b);
|
||||
*log = pinned_log;
|
||||
}
|
||||
|
||||
|
|
|
@ -14,8 +14,6 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint);
|
||||
int toku_rollback_commit(TOKUTXN txn, LSN lsn);
|
||||
int toku_rollback_abort(TOKUTXN txn, LSN lsn);
|
||||
|
||||
// these functions assert internally that they succeed
|
||||
|
||||
|
@ -38,10 +36,6 @@ void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE lo
|
|||
// unpin and rmove a rollback log from the cachetable
|
||||
void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log);
|
||||
|
||||
typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, LSN lsn);
|
||||
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
|
||||
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn);
|
||||
|
||||
void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size);
|
||||
void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len);
|
||||
|
||||
|
@ -56,12 +50,14 @@ void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len);
|
|||
// if necessary.
|
||||
void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log);
|
||||
|
||||
int toku_txn_note_ft (TOKUTXN txn, FT h);
|
||||
void toku_txn_maybe_note_ft (TOKUTXN txn, FT h);
|
||||
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count);
|
||||
|
||||
int toku_find_pair_by_xid (OMTVALUE v, void *txnv);
|
||||
int toku_find_xid_by_xid (OMTVALUE v, void *xidv);
|
||||
|
||||
PAIR_ATTR rollback_memory_size(ROLLBACK_LOG_NODE log);
|
||||
|
||||
// A high-level rollback log is made up of a chain of rollback log nodes.
|
||||
// Each rollback log node is represented (separately) in the cachetable by
|
||||
// this structure. Each portion of the rollback log chain has a block num
|
||||
|
|
161
ft/txn.c
161
ft/txn.c
|
@ -10,6 +10,7 @@
|
|||
#include "checkpoint.h"
|
||||
#include "ule.h"
|
||||
#include <valgrind/helgrind.h>
|
||||
#include "rollback-apply.h"
|
||||
#include "txn_manager.h"
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -50,6 +51,18 @@ toku_txn_get_status(TXN_STATUS s) {
|
|||
*s = txn_status;
|
||||
}
|
||||
|
||||
void
|
||||
toku_txn_lock(TOKUTXN txn)
|
||||
{
|
||||
toku_mutex_lock(&txn->txn_lock);
|
||||
}
|
||||
|
||||
void
|
||||
toku_txn_unlock(TOKUTXN txn)
|
||||
{
|
||||
toku_mutex_unlock(&txn->txn_lock);
|
||||
}
|
||||
|
||||
int
|
||||
toku_txn_begin_txn (
|
||||
DB_TXN *container_db_txn,
|
||||
|
@ -59,7 +72,7 @@ toku_txn_begin_txn (
|
|||
TXN_SNAPSHOT_TYPE snapshot_type
|
||||
)
|
||||
{
|
||||
int r = toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, TXNID_NONE, snapshot_type, container_db_txn);
|
||||
int r = toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, TXNID_NONE, snapshot_type, container_db_txn, false);
|
||||
return r;
|
||||
}
|
||||
|
||||
|
@ -70,14 +83,11 @@ toku_txn_begin_with_xid (
|
|||
TOKULOGGER logger,
|
||||
TXNID xid,
|
||||
TXN_SNAPSHOT_TYPE snapshot_type,
|
||||
DB_TXN *container_db_txn
|
||||
DB_TXN *container_db_txn,
|
||||
bool for_recovery
|
||||
)
|
||||
{
|
||||
int r = toku_txn_create_txn(tokutxn, parent_tokutxn, logger, xid, snapshot_type, container_db_txn);
|
||||
if (r == 0) {
|
||||
toku_txn_manager_start_txn((*tokutxn)->logger->txn_manager, *tokutxn);
|
||||
}
|
||||
return r;
|
||||
return toku_txn_manager_start_txn(tokutxn, logger->txn_manager, parent_tokutxn, logger, xid, snapshot_type, container_db_txn, for_recovery);
|
||||
}
|
||||
|
||||
DB_TXN *
|
||||
|
@ -102,45 +112,71 @@ toku_txn_create_txn (
|
|||
TOKULOGGER logger,
|
||||
TXNID xid,
|
||||
TXN_SNAPSHOT_TYPE snapshot_type,
|
||||
DB_TXN *container_db_txn
|
||||
XIDS xids,
|
||||
DB_TXN *container_db_txn,
|
||||
bool for_checkpoint
|
||||
)
|
||||
{
|
||||
if (logger->is_panicked) return EINVAL;
|
||||
if (logger->is_panicked) {
|
||||
return EINVAL;
|
||||
}
|
||||
assert(logger->rollback_cachefile);
|
||||
TOKUTXN XMALLOC(result);
|
||||
result->starttime = time(NULL); // getting timestamp in seconds is a cheap call
|
||||
int r;
|
||||
r = toku_omt_create(&result->open_fts);
|
||||
assert_zero(r);
|
||||
|
||||
result->logger = logger;
|
||||
result->parent = parent_tokutxn;
|
||||
result->num_rollentries = 0;
|
||||
result->num_rollentries_processed = 0;
|
||||
result->progress_poll_fun = NULL;
|
||||
result->progress_poll_fun_extra = NULL;
|
||||
result->spilled_rollback_head = ROLLBACK_NONE;
|
||||
result->spilled_rollback_tail = ROLLBACK_NONE;
|
||||
result->spilled_rollback_head_hash = 0;
|
||||
result->spilled_rollback_tail_hash = 0;
|
||||
result->current_rollback = ROLLBACK_NONE;
|
||||
result->current_rollback_hash = 0;
|
||||
result->num_rollback_nodes = 0;
|
||||
result->snapshot_type = snapshot_type;
|
||||
result->snapshot_txnid64 = TXNID_NONE;
|
||||
result->container_db_txn = container_db_txn;
|
||||
TXNID snapshot_txnid64;
|
||||
if (snapshot_type == TXN_SNAPSHOT_NONE) {
|
||||
snapshot_txnid64 = TXNID_NONE;
|
||||
} else if (parent_tokutxn == NULL || snapshot_type == TXN_SNAPSHOT_CHILD) {
|
||||
snapshot_txnid64 = xid;
|
||||
} else if (snapshot_type == TXN_SNAPSHOT_ROOT) {
|
||||
snapshot_txnid64 = parent_tokutxn->snapshot_txnid64;
|
||||
} else {
|
||||
assert(false);
|
||||
}
|
||||
|
||||
result->rollentry_raw_count = 0;
|
||||
result->force_fsync_on_commit = FALSE;
|
||||
result->recovered_from_checkpoint = FALSE;
|
||||
result->checkpoint_needed_before_commit = FALSE;
|
||||
result->state = TOKUTXN_LIVE;
|
||||
OMT open_fts;
|
||||
{
|
||||
int r = toku_omt_create(&open_fts);
|
||||
assert_zero(r);
|
||||
}
|
||||
|
||||
struct txn_roll_info roll_info = {
|
||||
.num_rollback_nodes = 0,
|
||||
.num_rollentries = 0,
|
||||
.num_rollentries_processed = 0,
|
||||
.rollentry_raw_count = 0,
|
||||
.spilled_rollback_head = ROLLBACK_NONE,
|
||||
.spilled_rollback_tail = ROLLBACK_NONE,
|
||||
.spilled_rollback_head_hash = 0,
|
||||
.spilled_rollback_tail_hash = 0,
|
||||
.current_rollback = ROLLBACK_NONE,
|
||||
.current_rollback_hash = 0,
|
||||
};
|
||||
|
||||
struct tokutxn new_txn = {
|
||||
.starttime = time(NULL),
|
||||
.open_fts = open_fts,
|
||||
.logger = logger,
|
||||
.parent = parent_tokutxn,
|
||||
.progress_poll_fun = NULL,
|
||||
.progress_poll_fun_extra = NULL,
|
||||
.snapshot_type = snapshot_type,
|
||||
.snapshot_txnid64 = snapshot_txnid64,
|
||||
.container_db_txn = container_db_txn,
|
||||
.force_fsync_on_commit = FALSE,
|
||||
.recovered_from_checkpoint = for_checkpoint,
|
||||
.checkpoint_needed_before_commit = FALSE,
|
||||
.state = TOKUTXN_LIVE,
|
||||
.do_fsync = FALSE,
|
||||
.txnid64 = xid,
|
||||
.ancestor_txnid64 = (parent_tokutxn ? parent_tokutxn->ancestor_txnid64 : xid),
|
||||
.xids = xids,
|
||||
.roll_info = roll_info
|
||||
};
|
||||
|
||||
|
||||
TOKUTXN result = toku_xmemdup(&new_txn, sizeof new_txn);
|
||||
toku_mutex_init(&result->txn_lock, NULL);
|
||||
invalidate_xa_xid(&result->xa_xid);
|
||||
result->do_fsync = FALSE;
|
||||
|
||||
result->txnid64 = xid;
|
||||
result->xids = NULL;
|
||||
|
||||
*tokutxn = result;
|
||||
|
||||
STATUS_VALUE(TXN_BEGIN)++;
|
||||
|
@ -154,31 +190,27 @@ toku_txn_create_txn (
|
|||
//Used on recovery to recover a transaction.
|
||||
int
|
||||
toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
|
||||
#define COPY_FROM_INFO(field) txn->field = info->field
|
||||
COPY_FROM_INFO(rollentry_raw_count);
|
||||
txn->roll_info.rollentry_raw_count = info->rollentry_raw_count;
|
||||
uint32_t i;
|
||||
for (i = 0; i < info->num_fts; i++) {
|
||||
FT h = info->open_fts[i];
|
||||
int r = toku_txn_note_ft(txn, h);
|
||||
assert_zero(r);
|
||||
FT ft = info->open_fts[i];
|
||||
toku_txn_maybe_note_ft(txn, ft);
|
||||
}
|
||||
COPY_FROM_INFO(force_fsync_on_commit );
|
||||
COPY_FROM_INFO(num_rollback_nodes);
|
||||
COPY_FROM_INFO(num_rollentries);
|
||||
txn->force_fsync_on_commit = info->force_fsync_on_commit;
|
||||
txn->roll_info.num_rollback_nodes = info->num_rollback_nodes;
|
||||
txn->roll_info.num_rollentries = info->num_rollentries;
|
||||
|
||||
CACHEFILE rollback_cachefile = txn->logger->rollback_cachefile;
|
||||
|
||||
COPY_FROM_INFO(spilled_rollback_head);
|
||||
txn->spilled_rollback_head_hash = toku_cachetable_hash(rollback_cachefile,
|
||||
txn->spilled_rollback_head);
|
||||
COPY_FROM_INFO(spilled_rollback_tail);
|
||||
txn->spilled_rollback_tail_hash = toku_cachetable_hash(rollback_cachefile,
|
||||
txn->spilled_rollback_tail);
|
||||
COPY_FROM_INFO(current_rollback);
|
||||
txn->current_rollback_hash = toku_cachetable_hash(rollback_cachefile,
|
||||
txn->current_rollback);
|
||||
#undef COPY_FROM_INFO
|
||||
txn->recovered_from_checkpoint = TRUE;
|
||||
txn->roll_info.spilled_rollback_head = info->spilled_rollback_head;
|
||||
txn->roll_info.spilled_rollback_head_hash = toku_cachetable_hash(rollback_cachefile,
|
||||
txn->roll_info.spilled_rollback_head);
|
||||
txn->roll_info.spilled_rollback_tail = info->spilled_rollback_tail;
|
||||
txn->roll_info.spilled_rollback_tail_hash = toku_cachetable_hash(rollback_cachefile,
|
||||
txn->roll_info.spilled_rollback_tail);
|
||||
txn->roll_info.current_rollback = info->current_rollback;
|
||||
txn->roll_info.current_rollback_hash = toku_cachetable_hash(rollback_cachefile,
|
||||
txn->roll_info.current_rollback);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -223,7 +255,7 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
|
|||
// recovery to properly recommit this transaction if the commit
|
||||
// does not make it to disk. In the case of MySQL, that would be the
|
||||
// binary log.
|
||||
txn->do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->num_rollentries>0));
|
||||
txn->do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->roll_info.num_rollentries>0));
|
||||
|
||||
txn->progress_poll_fun = poll;
|
||||
txn->progress_poll_fun_extra = poll_extra;
|
||||
|
@ -274,7 +306,7 @@ int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid) {
|
|||
if (txn->parent) return 0; // nothing to do if there's a parent.
|
||||
toku_txn_manager_add_prepared_txn(txn->logger->txn_manager, txn);
|
||||
// Do we need to do an fsync?
|
||||
txn->do_fsync = (txn->force_fsync_on_commit || txn->num_rollentries>0);
|
||||
txn->do_fsync = (txn->force_fsync_on_commit || txn->roll_info.num_rollentries>0);
|
||||
copy_xid(&txn->xa_xid, xa_xid);
|
||||
// This list will go away with #4683, so we wn't need the ydb lock for this anymore.
|
||||
return toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64, xa_xid);
|
||||
|
@ -350,9 +382,9 @@ static void note_txn_closing (TOKUTXN txn) {
|
|||
}
|
||||
|
||||
void toku_txn_complete_txn(TOKUTXN txn) {
|
||||
assert(txn->spilled_rollback_head.b == ROLLBACK_NONE.b);
|
||||
assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b);
|
||||
assert(txn->current_rollback.b == ROLLBACK_NONE.b);
|
||||
assert(txn->roll_info.spilled_rollback_head.b == ROLLBACK_NONE.b);
|
||||
assert(txn->roll_info.spilled_rollback_tail.b == ROLLBACK_NONE.b);
|
||||
assert(txn->roll_info.current_rollback.b == ROLLBACK_NONE.b);
|
||||
toku_txn_manager_finish_txn(txn->logger->txn_manager, txn);
|
||||
// note that here is another place we depend on
|
||||
// this function being called with the multi operation lock
|
||||
|
@ -364,6 +396,7 @@ void toku_txn_destroy_txn(TOKUTXN txn) {
|
|||
toku_omt_destroy(&txn->open_fts);
|
||||
}
|
||||
xids_destroy(&txn->xids);
|
||||
toku_mutex_destroy(&txn->txn_lock);
|
||||
toku_free(txn);
|
||||
|
||||
STATUS_VALUE(TXN_CLOSE)++;
|
||||
|
|
8
ft/txn.h
8
ft/txn.h
|
@ -11,6 +11,9 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
void toku_txn_lock(TOKUTXN txn);
|
||||
void toku_txn_unlock(TOKUTXN txn);
|
||||
|
||||
int toku_txn_begin_txn (
|
||||
DB_TXN *container_db_txn,
|
||||
TOKUTXN parent_tokutxn,
|
||||
|
@ -29,11 +32,12 @@ int toku_txn_begin_with_xid (
|
|||
TOKULOGGER logger,
|
||||
TXNID xid,
|
||||
TXN_SNAPSHOT_TYPE snapshot_type,
|
||||
DB_TXN *container_db_txn
|
||||
DB_TXN *container_db_txn,
|
||||
bool for_recovery
|
||||
);
|
||||
|
||||
// Allocate and initialize a txn
|
||||
int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXNID xid, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn);
|
||||
int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXNID xid, TXN_SNAPSHOT_TYPE snapshot_type, XIDS xids, DB_TXN *container_db_txn, bool for_checkpoint);
|
||||
|
||||
int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
|
||||
|
||||
|
|
|
@ -305,8 +305,16 @@ live_list_reverse_note_txn_start(TOKUTXN txn) {
|
|||
return r;
|
||||
}
|
||||
|
||||
void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
|
||||
TOKUTXN parent = txn->parent;
|
||||
int toku_txn_manager_start_txn(
|
||||
TOKUTXN *txnp,
|
||||
TXN_MANAGER txn_manager,
|
||||
TOKUTXN parent,
|
||||
TOKULOGGER logger,
|
||||
TXNID xid,
|
||||
TXN_SNAPSHOT_TYPE snapshot_type,
|
||||
DB_TXN *container_db_txn,
|
||||
bool for_recovery)
|
||||
{
|
||||
int r;
|
||||
// we take the txn_manager_lock before writing to the log
|
||||
// we may be able to move this lock acquisition
|
||||
|
@ -316,20 +324,28 @@ void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
|
|||
if (garbage_collection_debug) {
|
||||
verify_snapshot_system(txn_manager);
|
||||
}
|
||||
if (txn->txnid64 == TXNID_NONE) {
|
||||
if (xid == TXNID_NONE) {
|
||||
LSN first_lsn;
|
||||
r = toku_log_xbegin(txn->logger, &first_lsn, 0, parent ? parent->txnid64 : 0);
|
||||
r = toku_log_xbegin(logger, &first_lsn, 0, parent ? parent->txnid64 : 0);
|
||||
assert_zero(r);
|
||||
txn->txnid64 = first_lsn.lsn;
|
||||
xid = first_lsn.lsn;
|
||||
}
|
||||
XIDS parent_xids;
|
||||
if (parent == NULL)
|
||||
parent_xids = xids_get_root_xids();
|
||||
else
|
||||
parent_xids = parent->xids;
|
||||
r = xids_create_child(parent_xids, &txn->xids, txn->txnid64);
|
||||
XIDS xids;
|
||||
r = xids_create_child(parent_xids, &xids, xid);
|
||||
assert_zero(r);
|
||||
|
||||
TOKUTXN txn;
|
||||
r = toku_txn_create_txn(&txn, parent, logger, xid, snapshot_type, xids, container_db_txn, for_recovery);
|
||||
if (r != 0) {
|
||||
// logger is panicked
|
||||
return r;
|
||||
}
|
||||
|
||||
if (toku_omt_size(txn_manager->live_txns) == 0) {
|
||||
assert(txn_manager->oldest_living_xid == TXNID_NONE_LIVING);
|
||||
txn_manager->oldest_living_xid = txn->txnid64;
|
||||
|
@ -367,10 +383,6 @@ void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
|
|||
toku_omt_size(txn_manager->live_root_txns)
|
||||
); //We know it is the newest one.
|
||||
assert_zero(r);
|
||||
txn->ancestor_txnid64 = txn->txnid64;
|
||||
}
|
||||
else {
|
||||
txn->ancestor_txnid64 = parent->ancestor_txnid64;
|
||||
}
|
||||
|
||||
// setup information for snapshot reads
|
||||
|
@ -380,7 +392,6 @@ void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
|
|||
if (parent == NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD) {
|
||||
r = setup_live_root_txn_list(txn_manager, txn);
|
||||
assert_zero(r);
|
||||
txn->snapshot_txnid64 = txn->txnid64;
|
||||
r = snapshot_txnids_note_txn(txn_manager, txn);
|
||||
assert_zero(r);
|
||||
r = live_list_reverse_note_txn_start(txn);
|
||||
|
@ -390,7 +401,6 @@ void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
|
|||
// of the root transaction
|
||||
else if (txn->snapshot_type == TXN_SNAPSHOT_ROOT) {
|
||||
txn->live_root_txn_list = parent->live_root_txn_list;
|
||||
txn->snapshot_txnid64 = parent->snapshot_txnid64;
|
||||
}
|
||||
else {
|
||||
assert(FALSE);
|
||||
|
@ -401,6 +411,9 @@ void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
|
|||
verify_snapshot_system(txn_manager);
|
||||
}
|
||||
toku_mutex_unlock(&txn_manager->txn_manager_lock);
|
||||
|
||||
*txnp = txn;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// For each xid on the closing txn's live list, find the corresponding entry in the reverse live list.
|
||||
|
|
|
@ -34,7 +34,16 @@ void toku_txn_manager_destroy(TXN_MANAGER txn_manager);
|
|||
TXNID toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager, time_t * oldest_living_starttime);
|
||||
|
||||
// Assign a txnid. Log the txn begin in the recovery log. Initialize the txn live lists.
|
||||
void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn);
|
||||
// Also, create the txn.
|
||||
int toku_txn_manager_start_txn(
|
||||
TOKUTXN *txnp,
|
||||
TXN_MANAGER txn_manager,
|
||||
TOKUTXN parent,
|
||||
TOKULOGGER logger,
|
||||
TXNID xid,
|
||||
TXN_SNAPSHOT_TYPE snapshot_type,
|
||||
DB_TXN *container_db_txn,
|
||||
bool for_recovery);
|
||||
void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn);
|
||||
|
||||
void toku_txn_manager_clone_state_for_gc(
|
||||
|
|
14
src/ydb.c
14
src/ydb.c
|
@ -520,7 +520,7 @@ maybe_upgrade_persistent_environment_dictionary(DB_ENV * env, DB_TXN * txn, LSN
|
|||
const uint32_t curr_env_ver_d = toku_htod32(FT_LAYOUT_VERSION);
|
||||
toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
|
||||
toku_fill_dbt(&val, &curr_env_ver_d, sizeof(curr_env_ver_d));
|
||||
r = toku_db_put(persistent_environment, txn, &key, &val, 0, TRUE);
|
||||
r = toku_db_put(persistent_environment, txn, &key, &val, 0, FALSE);
|
||||
assert_zero(r);
|
||||
|
||||
// although the variable name is last_lsn_of_v13, this key really represents
|
||||
|
@ -529,19 +529,19 @@ maybe_upgrade_persistent_environment_dictionary(DB_ENV * env, DB_TXN * txn, LSN
|
|||
uint64_t last_lsn_of_v13_d = toku_htod64(last_lsn_of_clean_shutdown_read_from_log.lsn);
|
||||
toku_fill_dbt(&key, last_lsn_of_v13_key, strlen(last_lsn_of_v13_key));
|
||||
toku_fill_dbt(&val, &last_lsn_of_v13_d, sizeof(last_lsn_of_v13_d));
|
||||
r = toku_db_put(persistent_environment, txn, &key, &val, 0, TRUE);
|
||||
r = toku_db_put(persistent_environment, txn, &key, &val, 0, FALSE);
|
||||
assert_zero(r);
|
||||
|
||||
time_t upgrade_v19_time_d = toku_htod64(time(NULL));
|
||||
toku_fill_dbt(&key, upgrade_v19_time_key, strlen(upgrade_v19_time_key));
|
||||
toku_fill_dbt(&val, &upgrade_v19_time_d, sizeof(upgrade_v19_time_d));
|
||||
r = toku_db_put(persistent_environment, txn, &key, &val, DB_NOOVERWRITE, TRUE);
|
||||
r = toku_db_put(persistent_environment, txn, &key, &val, DB_NOOVERWRITE, FALSE);
|
||||
assert_zero(r);
|
||||
|
||||
uint64_t upgrade_v19_footprint_d = toku_htod64(toku_log_upgrade_get_footprint());
|
||||
toku_fill_dbt(&key, upgrade_v19_footprint_key, strlen(upgrade_v19_footprint_key));
|
||||
toku_fill_dbt(&val, &upgrade_v19_footprint_d, sizeof(upgrade_v19_footprint_d));
|
||||
r = toku_db_put(persistent_environment, txn, &key, &val, DB_NOOVERWRITE, TRUE);
|
||||
r = toku_db_put(persistent_environment, txn, &key, &val, DB_NOOVERWRITE, FALSE);
|
||||
assert_zero(r);
|
||||
}
|
||||
return r;
|
||||
|
@ -959,18 +959,18 @@ env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
|
|||
|
||||
toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
|
||||
toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
|
||||
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, TRUE);
|
||||
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, FALSE);
|
||||
assert_zero(r);
|
||||
|
||||
toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
|
||||
toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
|
||||
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, TRUE);
|
||||
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, FALSE);
|
||||
assert_zero(r);
|
||||
|
||||
time_t creation_time_d = toku_htod64(time(NULL));
|
||||
toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
|
||||
toku_fill_dbt(&val, &creation_time_d, sizeof(creation_time_d));
|
||||
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, TRUE);
|
||||
r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, FALSE);
|
||||
assert_zero(r);
|
||||
}
|
||||
else {
|
||||
|
|
|
@ -488,21 +488,18 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
r = toku_txn_create_txn(&db_txn_struct_i(result)->tokutxn,
|
||||
stxn ? db_txn_struct_i(stxn)->tokutxn : 0,
|
||||
env->i->logger,
|
||||
TXNID_NONE,
|
||||
snapshot_type,
|
||||
result
|
||||
);
|
||||
r = toku_txn_manager_start_txn(&db_txn_struct_i(result)->tokutxn,
|
||||
toku_logger_get_txn_manager(env->i->logger),
|
||||
stxn ? db_txn_struct_i(stxn)->tokutxn : 0,
|
||||
env->i->logger,
|
||||
TXNID_NONE,
|
||||
snapshot_type,
|
||||
result,
|
||||
false);
|
||||
if (r != 0) {
|
||||
toku_free(result);
|
||||
return r;
|
||||
}
|
||||
toku_txn_manager_start_txn(
|
||||
toku_logger_get_txn_manager(env->i->logger),
|
||||
db_txn_struct_i(result)->tokutxn
|
||||
);
|
||||
|
||||
//Add to the list of children for the parent.
|
||||
if (result->parent) {
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#include "ydb-internal.h"
|
||||
#include "indexer.h"
|
||||
#include <ft/log_header.h>
|
||||
#include <ft/checkpoint.h>
|
||||
#include "ydb_row_lock.h"
|
||||
#include "ydb_write.h"
|
||||
#include "ydb_db.h"
|
||||
|
@ -125,7 +126,7 @@ db_put_check_overwrite_constraint(DB *db, DB_TXN *txn, DBT *key,
|
|||
|
||||
|
||||
int
|
||||
toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_ydb_lock) {
|
||||
toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_mo_lock) {
|
||||
HANDLE_PANICKED_DB(db);
|
||||
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
|
||||
|
||||
|
@ -152,9 +153,9 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_ydb_lock)
|
|||
}
|
||||
if (r == 0) {
|
||||
//Do the actual deleting.
|
||||
if (!holds_ydb_lock) toku_ydb_lock();
|
||||
if (!holds_mo_lock) toku_multi_operation_client_lock();
|
||||
r = toku_ft_delete(db->i->ft_handle, key, txn ? db_txn_struct_i(txn)->tokutxn : 0);
|
||||
if (!holds_ydb_lock) toku_ydb_unlock();
|
||||
if (!holds_mo_lock) toku_multi_operation_client_unlock();
|
||||
}
|
||||
|
||||
if (r == 0) {
|
||||
|
@ -168,7 +169,7 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_ydb_lock)
|
|||
|
||||
|
||||
int
|
||||
toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags, BOOL holds_ydb_lock) {
|
||||
toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags, BOOL holds_mo_lock) {
|
||||
HANDLE_PANICKED_DB(db);
|
||||
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
|
||||
int r = 0;
|
||||
|
@ -193,9 +194,9 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags, BOOL holds
|
|||
if (flags==DB_NOOVERWRITE_NO_ERROR) {
|
||||
type = FT_INSERT_NO_OVERWRITE;
|
||||
}
|
||||
if (!holds_ydb_lock) toku_ydb_lock();
|
||||
if (!holds_mo_lock) toku_multi_operation_client_lock();
|
||||
r = toku_ft_maybe_insert(db->i->ft_handle, key, val, ttxn, FALSE, ZERO_LSN, TRUE, type);
|
||||
if (!holds_ydb_lock) toku_ydb_unlock();
|
||||
if (!holds_mo_lock) toku_multi_operation_client_unlock();
|
||||
}
|
||||
|
||||
if (r == 0) {
|
||||
|
@ -232,10 +233,10 @@ toku_db_update(DB *db, DB_TXN *txn,
|
|||
}
|
||||
|
||||
TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
|
||||
toku_ydb_lock();
|
||||
toku_multi_operation_client_lock();
|
||||
r = toku_ft_maybe_update(db->i->ft_handle, key, update_function_extra, ttxn,
|
||||
FALSE, ZERO_LSN, TRUE);
|
||||
toku_ydb_unlock();
|
||||
toku_multi_operation_client_unlock();
|
||||
|
||||
cleanup:
|
||||
if (r == 0)
|
||||
|
@ -287,10 +288,10 @@ toku_db_update_broadcast(DB *db, DB_TXN *txn,
|
|||
}
|
||||
|
||||
TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
|
||||
toku_ydb_lock();
|
||||
toku_multi_operation_client_lock();
|
||||
r = toku_ft_maybe_update_broadcast(db->i->ft_handle, update_function_extra, ttxn,
|
||||
FALSE, ZERO_LSN, TRUE, is_resetting_op);
|
||||
toku_ydb_unlock();
|
||||
toku_multi_operation_client_unlock();
|
||||
|
||||
cleanup:
|
||||
if (r == 0)
|
||||
|
|
|
@ -35,8 +35,8 @@ typedef struct {
|
|||
void ydb_write_layer_get_status(YDB_WRITE_LAYER_STATUS statp);
|
||||
|
||||
|
||||
int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_ydb_lock);
|
||||
int toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags, BOOL holds_ydb_lock);
|
||||
int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags, BOOL holds_mo_lock);
|
||||
int toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags, BOOL holds_mo_lock);
|
||||
int autotxn_db_del(DB* db, DB_TXN* txn, DBT* key, u_int32_t flags);
|
||||
int autotxn_db_put(DB* db, DB_TXN* txn, DBT* key, DBT* data, u_int32_t flags);
|
||||
int autotxn_db_update(DB *db, DB_TXN *txn, const DBT *key, const DBT *update_function_extra, u_int32_t flags);
|
||||
|
|
Loading…
Add table
Reference in a new issue