From b65e6e3d746ef8261ca415928df2f1c1614d40e1 Mon Sep 17 00:00:00 2001 From: Rich Prohaska Date: Thu, 5 Dec 2013 11:59:34 -0500 Subject: [PATCH] #141 redo table open and close locking to avoid table opening pileup --- storage/tokudb/ha_tokudb.cc | 201 ++++++++++++++++++-------------- storage/tokudb/ha_tokudb.h | 17 ++- storage/tokudb/hatoku_defines.h | 40 +++++++ storage/tokudb/hatoku_hton.cc | 4 +- 4 files changed, 167 insertions(+), 95 deletions(-) diff --git a/storage/tokudb/ha_tokudb.cc b/storage/tokudb/ha_tokudb.cc index 8dbdbcba028..37962ce42d0 100644 --- a/storage/tokudb/ha_tokudb.cc +++ b/storage/tokudb/ha_tokudb.cc @@ -205,14 +205,39 @@ exit: return error; } -/** @brief - Simple lock controls. The "share" it creates is a structure we will - pass to each tokudb handler. Do you have to have one of these? Well, you have - pieces that are used for locking, and they are needed to function. - - MUST have tokudb_mutex locked on input +static void free_key_and_col_info (KEY_AND_COL_INFO* kc_info) { + for (uint i = 0; i < MAX_KEY+1; i++) { + bitmap_free(&kc_info->key_filters[i]); + } -*/ + for (uint i = 0; i < MAX_KEY+1; i++) { + tokudb_my_free(kc_info->cp_info[i]); + kc_info->cp_info[i] = NULL; // 3144 + } + + tokudb_my_free(kc_info->field_lengths); + tokudb_my_free(kc_info->length_bytes); + tokudb_my_free(kc_info->blob_fields); +} + +void TOKUDB_SHARE::init(void) { + use_count = 0; + thr_lock_init(&lock); + tokudb_pthread_mutex_init(&mutex, MY_MUTEX_INIT_FAST); + my_rwlock_init(&num_DBs_lock, 0); + tokudb_pthread_cond_init(&m_openclose_cond, NULL); + m_state = CLOSED; +} + +void TOKUDB_SHARE::destroy(void) { + assert(m_state == CLOSED); + thr_lock_delete(&lock); + tokudb_pthread_mutex_destroy(&mutex); + rwlock_destroy(&num_DBs_lock); + tokudb_pthread_cond_destroy(&m_openclose_cond); +} + +// MUST have tokudb_mutex locked on input static TOKUDB_SHARE *get_share(const char *table_name, TABLE_SHARE* table_share) { TOKUDB_SHARE *share = NULL; int error = 0; @@ -234,7 +259,8 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE_SHARE* table_share) ); assert(share); - share->use_count = 0; + share->init(); + share->table_name_length = length; share->table_name = tmp_name; strmov(share->table_name, table_name); @@ -244,54 +270,30 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE_SHARE* table_share) goto exit; } - memset((void *) share->key_file, 0, sizeof(share->key_file)); - error = my_hash_insert(&tokudb_open_tables, (uchar *) share); if (error) { + free_key_and_col_info(&share->kc_info); goto exit; } - thr_lock_init(&share->lock); - pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST); - my_rwlock_init(&share->num_DBs_lock, 0); } exit: if (error) { - pthread_mutex_destroy(&share->mutex); + share->destroy(); tokudb_my_free((uchar *) share); share = NULL; } return share; } - -static void free_key_and_col_info (KEY_AND_COL_INFO* kc_info) { - for (uint i = 0; i < MAX_KEY+1; i++) { - bitmap_free(&kc_info->key_filters[i]); - } - - for (uint i = 0; i < MAX_KEY+1; i++) { - tokudb_my_free(kc_info->cp_info[i]); - kc_info->cp_info[i] = NULL; // 3144 - } - - tokudb_my_free(kc_info->field_lengths); - tokudb_my_free(kc_info->length_bytes); - tokudb_my_free(kc_info->blob_fields); -} - -// -// MUST have tokudb_mutex locked on input -// bool mutex_is_locked specifies if share->mutex is locked -// -static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) { +static int free_share(TOKUDB_SHARE * share) { int error, result = 0; - if (mutex_is_locked) { - pthread_mutex_unlock(&share->mutex); - } + tokudb_pthread_mutex_lock(&share->mutex); + DBUG_PRINT("info", ("share->use_count %u", share->use_count)); if (!--share->use_count) { - DBUG_PRINT("info", ("share->use_count %u", share->use_count)); + share->m_state = TOKUDB_SHARE::CLOSING; + tokudb_pthread_mutex_unlock(&share->mutex); // // number of open DB's may not be equal to number of keys we have because add_index @@ -316,13 +318,25 @@ static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) { error = tokudb::close_status(&share->status_block); assert(error == 0); - - my_hash_delete(&tokudb_open_tables, (uchar *) share); - thr_lock_delete(&share->lock); - pthread_mutex_destroy(&share->mutex); - rwlock_destroy(&share->num_DBs_lock); - tokudb_my_free((uchar *) share); + tokudb_pthread_mutex_lock(&tokudb_mutex); + tokudb_pthread_mutex_lock(&share->mutex); + share->m_state = TOKUDB_SHARE::CLOSED; + if (share->use_count > 0) { + tokudb_pthread_cond_broadcast(&share->m_openclose_cond); + tokudb_pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&tokudb_mutex); + } else { + my_hash_delete(&tokudb_open_tables, (uchar *) share); + + tokudb_pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&tokudb_mutex); + + share->destroy(); + tokudb_my_free((uchar *) share); + } + } else { + tokudb_pthread_mutex_unlock(&share->mutex); } return result; @@ -588,7 +602,7 @@ smart_dbt_callback_ir_rowread(DBT const *key, DBT const *row, void *context) { // Returns: // The value of the auto increment column in record // -ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar *record) +static ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar *record) { const uchar *key; /* Key */ ulonglong unsigned_autoinc = 0; /* Unsigned auto-increment */ @@ -1710,8 +1724,6 @@ exit: return error; } - - // // Creates and opens a handle to a table which already exists in a tokudb // database. @@ -1788,36 +1800,49 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { goto exit; } - /* Init shared structure */ - pthread_mutex_lock(&tokudb_mutex); + // lookup or create share + tokudb_pthread_mutex_lock(&tokudb_mutex); share = get_share(name, table_share); assert(share); thr_lock_data_init(&share->lock, &lock, NULL); - /* Fill in shared structure, if needed */ - pthread_mutex_lock(&share->mutex); - if (!share->use_count++) { - ret_val = initialize_share( - name, - mode - ); - if (ret_val) { - free_share(share, true); - pthread_mutex_unlock(&tokudb_mutex); - goto exit; - } + tokudb_pthread_mutex_lock(&share->mutex); + tokudb_pthread_mutex_unlock(&tokudb_mutex); + share->use_count++; + while (share->m_state == TOKUDB_SHARE::OPENING || share->m_state == TOKUDB_SHARE::CLOSING) { + tokudb_pthread_cond_wait(&share->m_openclose_cond, &share->mutex); + } + if (share->m_state == TOKUDB_SHARE::CLOSED) { + share->m_state = TOKUDB_SHARE::OPENING; + tokudb_pthread_mutex_unlock(&share->mutex); + + ret_val = initialize_share(name, mode); + + tokudb_pthread_mutex_lock(&share->mutex); + if (ret_val == 0) { + share->m_state = TOKUDB_SHARE::OPENED; + } else { + share->m_state = TOKUDB_SHARE::ERROR; + share->m_error = ret_val; + } + tokudb_pthread_cond_broadcast(&share->m_openclose_cond); + } + if (share->m_state == TOKUDB_SHARE::ERROR) { + ret_val = share->m_error; + tokudb_pthread_mutex_unlock(&share->mutex); + free_share(share); + goto exit; + } else { + assert(share->m_state == TOKUDB_SHARE::OPENED); + tokudb_pthread_mutex_unlock(&share->mutex); } - pthread_mutex_unlock(&share->mutex); - pthread_mutex_unlock(&tokudb_mutex); ref_length = share->ref_length; // If second open if (tokudb_debug & TOKUDB_DEBUG_OPEN) { - pthread_mutex_lock(&share->mutex); TOKUDB_TRACE("tokudbopen:%p:share=%p:file=%p:table=%p:table->s=%p:%d\n", this, share, share->file, table, table->s, share->use_count); - pthread_mutex_unlock(&share->mutex); } key_read = false; @@ -2128,9 +2153,7 @@ int ha_tokudb::__close() { rec_update_buff = NULL; alloc_ptr = NULL; ha_tokudb::reset(); - pthread_mutex_lock(&tokudb_mutex); - int retval = free_share(share, false); - pthread_mutex_unlock(&tokudb_mutex); + int retval = free_share(share); TOKUDB_DBUG_RETURN(retval); } @@ -2926,7 +2949,7 @@ DBT *ha_tokudb::pack_ext_key( // void ha_tokudb::init_hidden_prim_key_info() { TOKUDB_DBUG_ENTER("ha_tokudb::init_prim_key_info"); - pthread_mutex_lock(&share->mutex); + tokudb_pthread_mutex_lock(&share->mutex); if (!(share->status & STATUS_PRIMARY_KEY_INIT)) { int error = 0; THD* thd = ha_thd(); @@ -2966,7 +2989,7 @@ void ha_tokudb::init_hidden_prim_key_info() { } share->status |= STATUS_PRIMARY_KEY_INIT; } - pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&share->mutex); DBUG_VOID_RETURN; } @@ -3246,9 +3269,9 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { } } exit_try_table_lock: - pthread_mutex_lock(&share->mutex); + tokudb_pthread_mutex_lock(&share->mutex); share->try_table_lock = false; - pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&share->mutex); } DBUG_VOID_RETURN; } @@ -3265,9 +3288,9 @@ int ha_tokudb::end_bulk_insert(bool abort) { tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); bool using_loader = (loader != NULL); if (ai_metadata_update_required) { - pthread_mutex_lock(&share->mutex); + tokudb_pthread_mutex_lock(&share->mutex); error = update_max_auto_inc(share->status_block, share->last_auto_increment); - pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&share->mutex); if (error) { goto cleanup; } } delay_updating_ai_metadata = false; @@ -3891,7 +3914,7 @@ int ha_tokudb::write_row(uchar * record) { // of the auto inc field. // if (share->has_auto_inc && record == table->record[0]) { - pthread_mutex_lock(&share->mutex); + tokudb_pthread_mutex_lock(&share->mutex); ulonglong curr_auto_inc = retrieve_auto_increment( table->field[share->ai_field_index]->key_type(), field_offset(table->field[share->ai_field_index], table), @@ -3906,7 +3929,7 @@ int ha_tokudb::write_row(uchar * record) { update_max_auto_inc(share->status_block, share->last_auto_increment); } } - pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&share->mutex); } // @@ -4084,7 +4107,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { // of the auto inc field. // if (share->has_auto_inc && new_row == table->record[0]) { - pthread_mutex_lock(&share->mutex); + tokudb_pthread_mutex_lock(&share->mutex); ulonglong curr_auto_inc = retrieve_auto_increment( table->field[share->ai_field_index]->key_type(), field_offset(table->field[share->ai_field_index], table), @@ -4096,7 +4119,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { share->last_auto_increment = curr_auto_inc; } } - pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&share->mutex); } // @@ -5377,6 +5400,8 @@ int ha_tokudb::index_prev(uchar * buf) { TOKUDB_DBUG_RETURN(error); } +volatile int tokudb_index_first_wait = 0; + // // Reads the first row from the active index (cursor) into buf, and advances cursor // Parameters: @@ -5398,6 +5423,8 @@ int ha_tokudb::index_first(uchar * buf) { ha_statistic_increment(&SSV::ha_read_first_count); + while (tokudb_index_first_wait) sleep(1); + info.ha = this; info.buf = buf; info.keynr = tokudb_active_index; @@ -6125,7 +6152,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { transaction = trx->sub_sp_level; } else { - pthread_mutex_lock(&share->mutex); + tokudb_pthread_mutex_lock(&share->mutex); // hate dealing with comparison of signed vs unsigned, so doing this if (deleted_rows > added_rows && share->rows < (deleted_rows - added_rows)) { share->rows = 0; @@ -6133,7 +6160,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { else { share->rows += (added_rows - deleted_rows); } - pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&share->mutex); added_rows = 0; deleted_rows = 0; share->rows_from_locked_table = 0; @@ -7443,7 +7470,7 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl ulonglong nr; bool over; - pthread_mutex_lock(&share->mutex); + tokudb_pthread_mutex_lock(&share->mutex); if (share->auto_inc_create_value > share->last_auto_increment) { nr = share->auto_inc_create_value; @@ -7472,7 +7499,7 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl } *first_value = nr; *nb_reserved_values = nb_desired_values; - pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&share->mutex); DBUG_VOID_RETURN; } @@ -7838,18 +7865,18 @@ int ha_tokudb::tokudb_add_index( // We have an accurate row count, might as well update share->rows // if(!creating_hot_index) { - pthread_mutex_lock(&share->mutex); + tokudb_pthread_mutex_lock(&share->mutex); share->rows = num_processed; - pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&share->mutex); } // // now write stuff to status.tokudb // - pthread_mutex_lock(&share->mutex); + tokudb_pthread_mutex_lock(&share->mutex); for (uint i = 0; i < num_of_keys; i++) { write_key_name_to_status(share->status_block, key_info[i].name, txn); } - pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&share->mutex); error = 0; cleanup: diff --git a/storage/tokudb/ha_tokudb.h b/storage/tokudb/ha_tokudb.h index f28048e279e..11179775868 100644 --- a/storage/tokudb/ha_tokudb.h +++ b/storage/tokudb/ha_tokudb.h @@ -126,6 +126,10 @@ typedef struct hot_optimize_context { // and auto increment information. // class TOKUDB_SHARE { +public: + void init(void); + void destroy(void); + public: char *table_name; uint table_name_length, use_count; @@ -184,6 +188,10 @@ public: bool replace_into_fast; rw_lock_t num_DBs_lock; uint32_t num_DBs; + + pthread_cond_t m_openclose_cond; + enum { CLOSED, OPENING, OPENED, CLOSING, ERROR } m_state; + int m_error; }; typedef struct st_filter_key_part_info { @@ -443,10 +451,7 @@ private: int write_auto_inc_create(DB* db, ulonglong val, DB_TXN* txn); void init_auto_increment(); bool can_replace_into_be_fast(TABLE_SHARE* table_share, KEY_AND_COL_INFO* kc_info, uint pk); - int initialize_share( - const char* name, - int mode - ); + int initialize_share(const char* name, int mode); void set_query_columns(uint keynr); int prelock_range (const key_range *start_key, const key_range *end_key); @@ -599,10 +604,10 @@ public: int get_status(DB_TXN* trans); void init_hidden_prim_key_info(); inline void get_auto_primary_key(uchar * to) { - pthread_mutex_lock(&share->mutex); + tokudb_pthread_mutex_lock(&share->mutex); share->auto_ident++; hpk_num_to_char(to, share->auto_ident); - pthread_mutex_unlock(&share->mutex); + tokudb_pthread_mutex_unlock(&share->mutex); } virtual void get_auto_increment(ulonglong offset, ulonglong increment, ulonglong nb_desired_values, ulonglong * first_value, ulonglong * nb_reserved_values); bool is_optimize_blocking(); diff --git a/storage/tokudb/hatoku_defines.h b/storage/tokudb/hatoku_defines.h index 5d3409d0967..66724457748 100644 --- a/storage/tokudb/hatoku_defines.h +++ b/storage/tokudb/hatoku_defines.h @@ -443,4 +443,44 @@ static inline void* tokudb_my_multi_malloc(myf myFlags, ...) { return start; } +static inline void tokudb_pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) { + int r = pthread_mutex_init(mutex, attr); + assert(r == 0); +} + +static inline void tokudb_pthread_mutex_destroy(pthread_mutex_t *mutex) { + int r = pthread_mutex_destroy(mutex); + assert(r == 0); +} + +static inline void tokudb_pthread_mutex_lock(pthread_mutex_t *mutex) { + int r = pthread_mutex_lock(mutex); + assert(r == 0); +} + +static inline void tokudb_pthread_mutex_unlock(pthread_mutex_t *mutex) { + int r = pthread_mutex_unlock(mutex); + assert(r == 0); +} + +static inline void tokudb_pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr) { + int r = pthread_cond_init(cond, attr); + assert(r == 0); +} + +static inline void tokudb_pthread_cond_destroy(pthread_cond_t *cond) { + int r = pthread_cond_destroy(cond); + assert(r == 0); +} + +static inline void tokudb_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) { + int r = pthread_cond_wait(cond, mutex); + assert(r == 0); +} + +static inline void tokudb_pthread_cond_broadcast(pthread_cond_t *cond) { + int r = pthread_cond_broadcast(cond); + assert(r == 0); +} + #endif diff --git a/storage/tokudb/hatoku_hton.cc b/storage/tokudb/hatoku_hton.cc index 93a2f1312eb..b0493a635ad 100644 --- a/storage/tokudb/hatoku_hton.cc +++ b/storage/tokudb/hatoku_hton.cc @@ -311,7 +311,7 @@ static int tokudb_init_func(void *p) { tokudb_hton = (handlerton *) p; - pthread_mutex_init(&tokudb_mutex, MY_MUTEX_INIT_FAST); + tokudb_pthread_mutex_init(&tokudb_mutex, MY_MUTEX_INIT_FAST); (void) my_hash_init(&tokudb_open_tables, table_alias_charset, 32, 0, 0, (my_hash_get_key) tokudb_get_key, 0, 0); tokudb_hton->state = SHOW_OPTION_YES; @@ -529,7 +529,7 @@ static int tokudb_done_func(void *p) { tokudb_my_free(toku_global_status_rows); toku_global_status_rows = NULL; my_hash_free(&tokudb_open_tables); - pthread_mutex_destroy(&tokudb_mutex); + tokudb_pthread_mutex_destroy(&tokudb_mutex); #if defined(_WIN64) toku_ydb_destroy(); #endif