diff --git a/storage/tokudb/ha_tokudb.cc b/storage/tokudb/ha_tokudb.cc index 25064fa4b7f..811e689a6a6 100644 --- a/storage/tokudb/ha_tokudb.cc +++ b/storage/tokudb/ha_tokudb.cc @@ -1205,6 +1205,8 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t mult_del_flags[i] = DB_DELETE_ANY; mult_dbt_flags[i] = DB_DBT_REALLOC; } + num_DBs_locked_in_bulk = false; + lock_count = 0; } // @@ -2958,13 +2960,18 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { delay_updating_ai_metadata = true; ai_metadata_update_required = false; abort_loader = false; + + rw_rdlock(&share->num_DBs_lock); + uint curr_num_DBs = table->s->keys + test(hidden_primary_key); + num_DBs_locked_in_bulk = true; + lock_count = 0; + if (share->try_table_lock) { if (get_prelock_empty(thd) && may_table_be_empty()) { if (using_ignore || get_load_save_space(thd)) { acquire_table_lock(transaction, lock_write); } else { - uint curr_num_DBs = table->s->keys + test(hidden_primary_key); mult_dbt_flags[primary_key] = 0; if (!thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) && !hidden_primary_key) { mult_put_flags[primary_key] = DB_NOOVERWRITE; @@ -3002,6 +3009,13 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { share->try_table_lock = false; // RFP what good is the mutex? pthread_mutex_unlock(&share->mutex); } + for (uint i = 0; i < curr_num_DBs; i++) { + DB* curr_DB = share->key_file[i]; + int error = curr_DB->pre_acquire_fileops_shared_lock(curr_DB, transaction); + if (!error) { + mult_put_flags[i] |= DB_PRELOCKED_FILE_READ; + } + } DBUG_VOID_RETURN; } @@ -3016,6 +3030,7 @@ int ha_tokudb::end_bulk_insert(bool abort) { THD* thd = ha_thd(); tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); bool using_loader = (loader != NULL); + uint curr_num_DBs = table->s->keys + test(hidden_primary_key); if (ai_metadata_update_required) { pthread_mutex_lock(&share->mutex); error = update_max_auto_inc(share->status_block, share->last_auto_increment); @@ -3067,6 +3082,17 @@ int ha_tokudb::end_bulk_insert(bool abort) { } cleanup: + if (num_DBs_locked_in_bulk) { + rw_unlock(&share->num_DBs_lock); + } + num_DBs_locked_in_bulk = false; + lock_count = 0; + + for (uint i = 0; i < curr_num_DBs; i++) { + u_int32_t prelocked_read_flag = DB_PRELOCKED_FILE_READ; + mult_put_flags[i] &= ~(prelocked_read_flag); + } + if (loader) { error = sprintf(write_status_msg, "aborting bulk load"); thd_proc_info(thd, write_status_msg); @@ -3451,7 +3477,13 @@ void ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) { // // set the put flags for the main dictionary // -void ha_tokudb::set_main_dict_put_flags(THD* thd, u_int32_t* put_flags) { +void ha_tokudb::set_main_dict_put_flags( + THD* thd, + u_int32_t* put_flags, + bool no_overwrite_no_error_allowed + ) +{ + u_int32_t old_prelock_flags = (*put_flags)&(DB_PRELOCKED_FILE_READ); // // optimization for "REPLACE INTO..." (and "INSERT IGNORE") command // if the command is "REPLACE INTO" and the only table @@ -3464,43 +3496,43 @@ void ha_tokudb::set_main_dict_put_flags(THD* thd, u_int32_t* put_flags) { // consistency between indexes // if (hidden_primary_key){ - *put_flags = DB_YESOVERWRITE; + *put_flags = DB_YESOVERWRITE|old_prelock_flags; } else if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) && !is_replace_into(thd) && !is_insert_ignore(thd) ) { - *put_flags = DB_YESOVERWRITE; + *put_flags = DB_YESOVERWRITE|old_prelock_flags; } else if (do_ignore_flag_optimization(thd,table,share->replace_into_fast) && is_replace_into(thd) ) { - *put_flags = DB_YESOVERWRITE; + *put_flags = DB_YESOVERWRITE|old_prelock_flags; } else if (do_ignore_flag_optimization(thd,table,share->replace_into_fast) && - is_insert_ignore(thd) + is_insert_ignore(thd) && no_overwrite_no_error_allowed ) { - *put_flags = DB_NOOVERWRITE_NO_ERROR; + *put_flags = DB_NOOVERWRITE_NO_ERROR|old_prelock_flags; } else { - *put_flags = DB_NOOVERWRITE; + *put_flags = DB_NOOVERWRITE|old_prelock_flags; } } int ha_tokudb::insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn) { int error = 0; - u_int32_t put_flags = 0; + u_int32_t put_flags = mult_put_flags[primary_key]; THD *thd = ha_thd(); uint curr_num_DBs = table->s->keys + test(hidden_primary_key); ulonglong wait_lock_time = get_write_lock_wait_time(thd); assert(curr_num_DBs == 1); - set_main_dict_put_flags(thd,&put_flags); + set_main_dict_put_flags(thd, &put_flags, true); lockretryN(wait_lock_time){ error = share->file->put( @@ -3527,14 +3559,7 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN uint curr_num_DBs = table->s->keys + test(hidden_primary_key); ulonglong wait_lock_time = get_write_lock_wait_time(thd); - set_main_dict_put_flags(thd, &mult_put_flags[primary_key]); - if (mult_put_flags[primary_key] == DB_NOOVERWRITE_NO_ERROR) { - // - //hopefully temporary, right now, put_multiple does not - // support use of DB_NOOVERWRITE_NO_ERROR as put_flag - // - mult_put_flags[primary_key] = DB_NOOVERWRITE; - } + set_main_dict_put_flags(thd, &mult_put_flags[primary_key], false); lockretryN(wait_lock_time){ error = db_env->put_multiple( @@ -3622,7 +3647,17 @@ int ha_tokudb::write_row(uchar * record) { // // grab reader lock on numDBs_lock // - rw_rdlock(&share->num_DBs_lock); + if (!num_DBs_locked_in_bulk) { + rw_rdlock(&share->num_DBs_lock); + } + else { + lock_count++; + if (lock_count >= 2000) { + rw_unlock(&share->num_DBs_lock); + rw_rdlock(&share->num_DBs_lock); + lock_count = 0; + } + } curr_num_DBs = share->num_DBs; if (hidden_primary_key) { @@ -3688,7 +3723,9 @@ int ha_tokudb::write_row(uchar * record) { track_progress(thd); } cleanup: - rw_unlock(&share->num_DBs_lock); + if (!num_DBs_locked_in_bulk) { + rw_unlock(&share->num_DBs_lock); + } if (error == DB_KEYEXIST) { error = HA_ERR_FOUND_DUPP_KEY; } @@ -3858,14 +3895,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { error = pack_old_row_for_update(&old_prim_row, old_row, primary_key); if (error) { goto cleanup; } - set_main_dict_put_flags(thd, &mult_put_flags[primary_key]); - if (mult_put_flags[primary_key] == DB_NOOVERWRITE_NO_ERROR) { - // - //hopefully temporary, right now, put_multiple does not - // support use of DB_NOOVERWRITE_NO_ERROR as put_flag - // - mult_put_flags[primary_key] = DB_NOOVERWRITE; - } + set_main_dict_put_flags(thd, &mult_put_flags[primary_key], false); lockretryN(wait_lock_time){ error = db_env->update_multiple( db_env, diff --git a/storage/tokudb/ha_tokudb.h b/storage/tokudb/ha_tokudb.h index a1a19a7dfa1..65dccd027ec 100644 --- a/storage/tokudb/ha_tokudb.h +++ b/storage/tokudb/ha_tokudb.h @@ -281,6 +281,9 @@ private: DB_LOADER* loader; bool abort_loader; int loader_error; + + bool num_DBs_locked_in_bulk; + u_int32_t lock_count; bool fix_rec_buff_for_blob(ulong length); bool fix_rec_update_buff_for_blob(ulong length); @@ -348,7 +351,7 @@ private: int is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info); int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn); int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd); - void set_main_dict_put_flags(THD* thd, u_int32_t* put_flags); + void set_main_dict_put_flags(THD* thd, u_int32_t* put_flags, bool no_overwrite_no_error_allowed); int insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn); int insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN* txn, THD* thd); void test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val);