diff --git a/storage/tokudb/ha_tokudb.cc b/storage/tokudb/ha_tokudb.cc index 90ea0478972..9350b586375 100644 --- a/storage/tokudb/ha_tokudb.cc +++ b/storage/tokudb/ha_tokudb.cc @@ -1005,9 +1005,6 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t using_ignore = 0; last_cursor_error = 0; range_lock_grabbed = false; - num_added_rows_in_stmt = 0; - num_deleted_rows_in_stmt = 0; - num_updated_rows_in_stmt = 0; blob_buff = NULL; num_blob_bytes = 0; delay_updating_ai_metadata = false; @@ -2635,7 +2632,7 @@ int ha_tokudb::write_row(uchar * record) { TOKUDB_DBUG_ENTER("ha_tokudb::write_row"); DBT row, prim_key, key; int error; - THD *thd = NULL; + THD *thd = ha_thd(); u_int32_t put_flags; bool has_null; DB_TXN* sub_trans = NULL; @@ -2643,10 +2640,9 @@ int ha_tokudb::write_row(uchar * record) { bool is_replace_into; tokudb_trx_data *trx = NULL; uint curr_num_DBs = table->s->keys + test(hidden_primary_key); + declare_lockretry; - - thd = ha_thd(); is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) || (thd_sql_command(thd) == SQLCOM_REPLACE_SELECT); @@ -2807,11 +2803,8 @@ int ha_tokudb::write_row(uchar * record) { if (!error) { added_rows++; - num_added_rows_in_stmt++; - if ((num_added_rows_in_stmt % 1000) == 0) { - sprintf(write_status_msg, "Inserted about %llu rows", num_added_rows_in_stmt); - thd_proc_info(thd, write_status_msg); - } + trx->stmt_progress.inserted++; + track_progress(thd); } cleanup: if (error == DB_KEYEXIST) { @@ -2915,6 +2908,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { THD* thd = ha_thd(); DB_TXN* sub_trans = NULL; DB_TXN* txn = NULL; + tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; // // this can only fail if we have not opened the environment @@ -3050,11 +3044,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { } if (!error) { - num_updated_rows_in_stmt++; - if ((num_updated_rows_in_stmt % 1000) == 0) { - sprintf(write_status_msg, "Updated about %llu rows", num_updated_rows_in_stmt); - thd_proc_info(thd, write_status_msg); - } + trx->stmt_progress.updated++; + track_progress(thd); } @@ -3167,6 +3158,7 @@ int ha_tokudb::delete_row(const uchar * record) { key_map keys = table_share->keys_in_use; bool has_null; THD* thd = ha_thd(); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; // // this can only fail if we have not opened the environment @@ -3190,11 +3182,8 @@ int ha_tokudb::delete_row(const uchar * record) { } else { deleted_rows++; - num_deleted_rows_in_stmt++; - if ((num_deleted_rows_in_stmt % 1000) == 0) { - sprintf(write_status_msg, "Deleted about %llu rows", num_deleted_rows_in_stmt); - thd_proc_info(thd, write_status_msg); - } + trx->stmt_progress.deleted++; + track_progress(thd); } { int r = db_env->checkpointing_end_atomic_operation(db_env); @@ -3585,6 +3574,8 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { bool has_null; int cmp; u_int32_t flags; + THD* thd = ha_thd(); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; HANDLE_INVALID_CURSOR(); statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status); @@ -3613,7 +3604,8 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { if (cmp) { error = HA_ERR_END_OF_FILE; } - + trx->stmt_progress.queried++; + track_progress(thd); cleanup: TOKUDB_DBUG_RETURN(error); } @@ -3643,6 +3635,8 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ DBT lookup_key; int error; u_int32_t flags = 0; + THD* thd = ha_thd(); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; struct smart_dbt_info info; struct index_read_info ir_info; @@ -3718,6 +3712,9 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR)) { TOKUDB_TRACE("error:%d:%d\n", error, find_flag); } + trx->stmt_progress.queried++; + track_progress(thd); + cleanup: TOKUDB_DBUG_RETURN(error); } @@ -3738,6 +3735,8 @@ int ha_tokudb::index_next(uchar * buf) { int error; struct smart_dbt_info info; u_int32_t flags = SET_READ_FLAG(0); + THD* thd = ha_thd(); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; HANDLE_INVALID_CURSOR(); statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status); @@ -3753,6 +3752,8 @@ int ha_tokudb::index_next(uchar * buf) { if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) { error = read_full_row(buf); } + trx->stmt_progress.queried++; + track_progress(thd); cleanup: TOKUDB_DBUG_RETURN(error); } @@ -3777,6 +3778,8 @@ int ha_tokudb::index_prev(uchar * buf) { int error; struct smart_dbt_info info; u_int32_t flags = SET_READ_FLAG(0); + THD* thd = ha_thd(); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; HANDLE_INVALID_CURSOR(); statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status); @@ -3792,7 +3795,9 @@ int ha_tokudb::index_prev(uchar * buf) { if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) { error = read_full_row(buf); } - + trx->stmt_progress.queried++; + track_progress(thd); + cleanup: TOKUDB_DBUG_RETURN(error); } @@ -3811,6 +3816,8 @@ int ha_tokudb::index_first(uchar * buf) { int error; struct smart_dbt_info info; u_int32_t flags = SET_READ_FLAG(0); + THD* thd = ha_thd(); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; HANDLE_INVALID_CURSOR(); statistic_increment(table->in_use->status_var.ha_read_first_count, &LOCK_status); @@ -3827,7 +3834,9 @@ int ha_tokudb::index_first(uchar * buf) { if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) { error = read_full_row(buf); } - + trx->stmt_progress.queried++; + track_progress(thd); + cleanup: TOKUDB_DBUG_RETURN(error); } @@ -3846,6 +3855,8 @@ int ha_tokudb::index_last(uchar * buf) { int error; struct smart_dbt_info info; u_int32_t flags = SET_READ_FLAG(0); + THD* thd = ha_thd(); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; HANDLE_INVALID_CURSOR(); statistic_increment(table->in_use->status_var.ha_read_last_count, &LOCK_status); @@ -3862,6 +3873,11 @@ int ha_tokudb::index_last(uchar * buf) { if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) { error = read_full_row(buf); } + + if (trx) { + trx->stmt_progress.queried++; + } + track_progress(thd); cleanup: TOKUDB_DBUG_RETURN(error); } @@ -3921,6 +3937,9 @@ int ha_tokudb::rnd_next(uchar * buf) { TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next"); int error; u_int32_t flags = SET_READ_FLAG(0); + THD* thd = ha_thd(); + tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; + struct smart_dbt_info info; HANDLE_INVALID_CURSOR(); @@ -3935,11 +3954,36 @@ int ha_tokudb::rnd_next(uchar * buf) { info.keynr = primary_key; error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,primary_key); + + trx->stmt_progress.queried++; + track_progress(thd); cleanup: TOKUDB_DBUG_RETURN(error); } +void ha_tokudb::track_progress(THD* thd) { + tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); + if (trx) { + bool update_status = (trx->stmt_progress.queried % 1000) == 1 || + (trx->stmt_progress.inserted% 1000) == 1 || + (trx->stmt_progress.updated% 1000) == 1 || + (trx->stmt_progress.deleted% 1000) == 1; + if (update_status) { + sprintf( + write_status_msg, + "Queried about %llu rows, inserted about %llu rows, updated about %llu rows, deleted about %llu rows", + trx->stmt_progress.queried, + trx->stmt_progress.inserted, + trx->stmt_progress.updated, + trx->stmt_progress.deleted + ); + thd_proc_info(thd, write_status_msg); + } + } +} + + DBT *ha_tokudb::get_pos(DBT * to, uchar * pos) { TOKUDB_DBUG_ENTER("ha_tokudb::get_pos"); /* We don't need to set app_data here */ @@ -4359,6 +4403,7 @@ int ha_tokudb::create_txn(THD* thd, tokudb_trx_data* trx) { if (tokudb_debug & TOKUDB_DEBUG_TXN) { TOKUDB_TRACE("stmt:%p:%p\n", trx->sp_level, trx->stmt); } + reset_stmt_progress(&trx->stmt_progress); trans_register_ha(thd, FALSE, tokudb_hton); cleanup: return error; @@ -4392,12 +4437,6 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { declare_lockretry; #endif - // - // reset per-stmt variables - // - num_added_rows_in_stmt = 0; - num_deleted_rows_in_stmt = 0; - num_updated_rows_in_stmt = 0; trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); if (!trx) { @@ -4447,6 +4486,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { */ DBUG_PRINT("trans", ("commiting non-updating transaction")); error = trx->stmt->commit(trx->stmt, 0); + reset_stmt_progress(&trx->stmt_progress); if (tokudb_debug & TOKUDB_DEBUG_TXN) TOKUDB_TRACE("commit:%p:%d\n", trx->stmt, error); trx->stmt = NULL; @@ -4471,12 +4511,6 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) { TOKUDB_DBUG_ENTER("ha_tokudb::start_stmt"); int error = 0; - // - // reset per-stmt variables - // - num_added_rows_in_stmt = 0; - num_deleted_rows_in_stmt = 0; - num_updated_rows_in_stmt = 0; tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); DBUG_ASSERT(trx); diff --git a/storage/tokudb/ha_tokudb.h b/storage/tokudb/ha_tokudb.h index 442a4b7bdc9..c15a5c707dd 100644 --- a/storage/tokudb/ha_tokudb.h +++ b/storage/tokudb/ha_tokudb.h @@ -196,19 +196,6 @@ private: ulonglong deleted_rows; - // - // count on number of rows inserted by statement - // this is to help give user progress on what is happening - // the reason that the variables added_rows and deleted_rows - // are not used is that those variables are also used to help - // estimate the number of rows in the DB. There are tricky things that - // can happen with "lock tables", so I do not want to couple these - // two features together. There is a little duplicate work, but I think it is fine - // - ulonglong num_added_rows_in_stmt; - ulonglong num_deleted_rows_in_stmt; - ulonglong num_updated_rows_in_stmt; - uint last_dup_key; // // if set to 0, then the primary key is not hidden @@ -445,6 +432,8 @@ public: return tokudb_prefix_cmp_dbt_key(share->key_file[keynr], first_key, second_key); } + void track_progress(THD* thd); + int heavi_ret_val; // diff --git a/storage/tokudb/hatoku_defines.h b/storage/tokudb/hatoku_defines.h index 373df5930ab..71670929422 100644 --- a/storage/tokudb/hatoku_defines.h +++ b/storage/tokudb/hatoku_defines.h @@ -83,17 +83,33 @@ typedef enum { +typedef struct st_tokudb_stmt_progress { + ulonglong inserted; + ulonglong updated; + ulonglong deleted; + ulonglong queried; +} tokudb_stmt_progress; + + typedef struct st_tokudb_trx_data { DB_TXN *all; DB_TXN *stmt; DB_TXN *sp_level; uint tokudb_lock_count; HA_TOKU_ISO_LEVEL iso_level; + tokudb_stmt_progress stmt_progress; } tokudb_trx_data; extern char *tokudb_data_dir; extern const char *ha_tokudb_ext; +static void reset_stmt_progress (tokudb_stmt_progress* val) { + val->deleted = 0; + val->inserted = 0; + val->updated = 0; + val->queried = 0; +} + static int get_name_length(const char *name) { int n = 0; const char *newname = name; diff --git a/storage/tokudb/hatoku_hton.cc b/storage/tokudb/hatoku_hton.cc index 810acbb2265..2727ac5bb96 100644 --- a/storage/tokudb/hatoku_hton.cc +++ b/storage/tokudb/hatoku_hton.cc @@ -442,6 +442,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) { if (all) { trx->iso_level = hatoku_iso_not_set; } + reset_stmt_progress(&trx->stmt_progress); TOKUDB_DBUG_RETURN(error); } @@ -467,6 +468,7 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) { if (all) { trx->iso_level = hatoku_iso_not_set; } + reset_stmt_progress(&trx->stmt_progress); TOKUDB_DBUG_RETURN(error); }