MDEV-33751 Assertion `thd' failed in int temp_file_size_cb_func(tmp_file_tracking*, int)

Changes:
- Fixed that MyISAM and Aria parallel repair works with tmp file limit.
  This required to add current_thd to all parallel workers and add
  protection in my_malloc_size_cb_func() and temp_file_size_cb_func() to
  be able to handle shared THD's.  I removed the old code in MyISAM to
  set current_thd() as only worked when using with virtal indexed
  columns and I wanted to keep the Aria and MyISAM code identical.

Other things:
- Improved error messages from Aria parallel repair and
  create_internal_tmp_table_from_heap().
This commit is contained in:
Monty 2024-04-20 14:02:05 +03:00 committed by Sergei Golubchik
parent 865ef0f567
commit d2304554ac
16 changed files with 147 additions and 34 deletions

View file

@ -357,6 +357,7 @@ typedef struct st_mi_sort_param
MEM_ROOT wordroot; MEM_ROOT wordroot;
uchar *record; uchar *record;
MY_TMPDIR *tmpdir; MY_TMPDIR *tmpdir;
HA_CHECK *check_param;
/* /*
The next two are used to collect statistics, see update_key_parts for The next two are used to collect statistics, see update_key_parts for

View file

@ -113,9 +113,9 @@ typedef struct st_handler_check_param
uint progress_counter; /* How often to call _report_progress() */ uint progress_counter; /* How often to call _report_progress() */
ulonglong progress, max_progress; ulonglong progress, max_progress;
void (*init_fix_record)(void *);
int (*fix_record)(struct st_myisam_info *info, uchar *record, int keynum); int (*fix_record)(struct st_myisam_info *info, uchar *record, int keynum);
void (*init_repair_thread)(void *);
void *init_repair_thread_arg;
mysql_mutex_t print_msg_mutex; mysql_mutex_t print_msg_mutex;
my_bool need_print_msg_lock; my_bool need_print_msg_lock;
myf malloc_flags; myf malloc_flags;

View file

@ -10,7 +10,7 @@ create table t2 (a int) engine=aria data directory='MYSQL_TMP_DIR';
insert t2 values (1); insert t2 values (1);
repair table t2; repair table t2;
Table Op Msg_type Msg_text Table Op Msg_type Msg_text
test.t2 repair error 20 for record at pos 256 test.t2 repair error Got error 20 for record at pos 256 when creating index
test.t2 repair Error File 'MYSQL_TMP_DIR/t2.MAD' not found (Errcode: 20 "<errmsg>") test.t2 repair Error File 'MYSQL_TMP_DIR/t2.MAD' not found (Errcode: 20 "<errmsg>")
test.t2 repair status Operation failed test.t2 repair status Operation failed
drop table t2; drop table t2;

View file

@ -157,4 +157,24 @@ DROP TABLE t1;
set max_tmp_session_space_usage = 1024*1024; set max_tmp_session_space_usage = 1024*1024;
select count(distinct concat(seq,repeat('x',1000))) from seq_1_to_1000; select count(distinct concat(seq,repeat('x',1000))) from seq_1_to_1000;
ERROR HY000: Global temporary space limit reached ERROR HY000: Global temporary space limit reached
#
# MDEV-33751 Assertion `thd' failed in int
# temp_file_size_cb_func(tmp_file_tracking*, int)
#
set @@global.max_tmp_total_space_usage=64*1024*1024;
set @@max_tmp_session_space_usage=1179648;
select @@max_tmp_session_space_usage;
@@max_tmp_session_space_usage
1179648
set @save_aria_repair_threads=@@aria_repair_threads;
set @@aria_repair_threads=2;
set @save_max_heap_table_size=@@max_heap_table_size;
set @@max_heap_table_size=16777216;
CREATE TABLE t1 (a CHAR(255),b INT,INDEX (b));
INSERT INTO t1 SELECT SEQ,SEQ FROM seq_1_to_100000;
SELECT * FROM t1 UNION SELECT * FROM t1;
ERROR HY000: Local temporary space limit reached
DROP TABLE t1;
set @@aria_repair_threads=@save_aria_repair_threads;
set @@max_heap_table_size=@save_max_heap_table_size;
# End of 11.5 tests # End of 11.5 tests

View file

@ -209,4 +209,26 @@ set max_tmp_session_space_usage = 1024*1024;
--error 201 --error 201
select count(distinct concat(seq,repeat('x',1000))) from seq_1_to_1000; select count(distinct concat(seq,repeat('x',1000))) from seq_1_to_1000;
--echo #
--echo # MDEV-33751 Assertion `thd' failed in int
--echo # temp_file_size_cb_func(tmp_file_tracking*, int)
--echo #
set @@global.max_tmp_session_space_usage=64*1024*1024;
set @@max_tmp_session_space_usage=1179648;
select @@max_tmp_session_space_usage;
set @save_aria_repair_threads=@@aria_repair_threads;
set @@aria_repair_threads=2;
set @save_max_heap_table_size=@@max_heap_table_size;
set @@max_heap_table_size=16777216;
CREATE TABLE t1 (a CHAR(255),b INT,INDEX (b));
INSERT INTO t1 SELECT SEQ,SEQ FROM seq_1_to_100000;
--error 200
SELECT * FROM t1 UNION SELECT * FROM t1;
DROP TABLE t1;
set @@aria_repair_threads=@save_aria_repair_threads;
set @@max_heap_table_size=@save_max_heap_table_size;
--echo # End of 11.5 tests --echo # End of 11.5 tests

View file

@ -3760,7 +3760,7 @@ static void my_malloc_size_cb_func(long long size, my_bool is_thread_specific)
DBUG_ASSERT((longlong) thd->status_var.local_memory_used >= 0 || DBUG_ASSERT((longlong) thd->status_var.local_memory_used >= 0 ||
!debug_assert_on_not_freed_memory); !debug_assert_on_not_freed_memory);
} }
else if (likely(thd)) else if (likely(thd) && likely(!thd->shared_thd))
{ {
DBUG_PRINT("info", ("global thd memory_used: %lld size: %lld", DBUG_PRINT("info", ("global thd memory_used: %lld size: %lld",
(longlong) thd->status_var.global_memory_used, size)); (longlong) thd->status_var.global_memory_used, size));
@ -3777,6 +3777,7 @@ static int temp_file_size_cb_func(struct tmp_file_tracking *track,
int no_error) int no_error)
{ {
THD *thd= current_thd; THD *thd= current_thd;
int error= 0;
longlong size_change= (longlong) (track->file_size - longlong size_change= (longlong) (track->file_size -
track->previous_file_size); track->previous_file_size);
DBUG_ENTER("temp_file_size_cb_func"); DBUG_ENTER("temp_file_size_cb_func");
@ -3794,6 +3795,8 @@ static int temp_file_size_cb_func(struct tmp_file_tracking *track,
track->previous_file_size. track->previous_file_size.
*/ */
DBUG_ASSERT(thd->status_var.tmp_space_used >= track->previous_file_size); DBUG_ASSERT(thd->status_var.tmp_space_used >= track->previous_file_size);
if (unlikely(thd->shared_thd))
mysql_mutex_lock(&thd->LOCK_thd_data);
global_tmp_space_used+= size_change; global_tmp_space_used+= size_change;
if (size_change > 0) if (size_change > 0)
@ -3805,14 +3808,16 @@ static int temp_file_size_cb_func(struct tmp_file_tracking *track,
global_max_tmp_space_usage) global_max_tmp_space_usage)
{ {
global_tmp_space_used-= size_change; global_tmp_space_used-= size_change;
DBUG_RETURN(my_errno= EE_GLOBAL_TMP_SPACE_FULL); error= my_errno= EE_GLOBAL_TMP_SPACE_FULL;
goto exit;
} }
if (thd->status_var.tmp_space_used + size_change > if (thd->status_var.tmp_space_used + size_change >
thd->variables.max_tmp_space_usage && !no_error && thd->variables.max_tmp_space_usage && !no_error &&
thd->variables.max_tmp_space_usage) thd->variables.max_tmp_space_usage)
{ {
global_tmp_space_used-= size_change; global_tmp_space_used-= size_change;
DBUG_RETURN(my_errno= EE_LOCAL_TMP_SPACE_FULL); error= my_errno= EE_LOCAL_TMP_SPACE_FULL;
goto exit;
} }
set_if_bigger(global_status_var.max_tmp_space_used, cached_space); set_if_bigger(global_status_var.max_tmp_space_used, cached_space);
} }
@ -3828,8 +3833,12 @@ static int temp_file_size_cb_func(struct tmp_file_tracking *track,
/* Record that we have registered the change */ /* Record that we have registered the change */
track->previous_file_size= track->file_size; track->previous_file_size= track->file_size;
exit:
if (unlikely(thd->shared_thd))
mysql_mutex_unlock(&thd->LOCK_thd_data);
} }
DBUG_RETURN(0); DBUG_RETURN(error);
} }
int json_escape_string(const char *str,const char *str_end, int json_escape_string(const char *str,const char *str_end,

View file

@ -813,6 +813,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
net.reading_or_writing= 0; net.reading_or_writing= 0;
client_capabilities= 0; // minimalistic client client_capabilities= 0; // minimalistic client
system_thread= NON_SYSTEM_THREAD; system_thread= NON_SYSTEM_THREAD;
shared_thd= 0;
cleanup_done= free_connection_done= abort_on_warning= got_warning= 0; cleanup_done= free_connection_done= abort_on_warning= got_warning= 0;
peer_port= 0; // For SHOW PROCESSLIST peer_port= 0; // For SHOW PROCESSLIST
transaction= &default_transaction; transaction= &default_transaction;

View file

@ -3127,6 +3127,8 @@ public:
Trans_binlog_info *semisync_info; Trans_binlog_info *semisync_info;
/* If this is a semisync slave connection. */ /* If this is a semisync slave connection. */
bool semi_sync_slave; bool semi_sync_slave;
/* Several threads may share this thd. Used with parallel repair */
bool shared_thd;
ulonglong client_capabilities; /* What the client supports */ ulonglong client_capabilities; /* What the client supports */
ulong max_client_packet_length; ulong max_client_packet_length;

View file

@ -22797,7 +22797,7 @@ create_internal_tmp_table_from_heap(THD *thd, TABLE *table,
if (unlikely(thd->check_killed())) if (unlikely(thd->check_killed()))
goto err_killed; goto err_killed;
} }
if (!new_table.no_rows && new_table.file->ha_end_bulk_insert()) if (!new_table.no_rows && (write_err= new_table.file->ha_end_bulk_insert()))
goto err; goto err;
/* copy row that filled HEAP table */ /* copy row that filled HEAP table */
if (unlikely((write_err=new_table.file->ha_write_tmp_row(table->record[0])))) if (unlikely((write_err=new_table.file->ha_write_tmp_row(table->record[0]))))

View file

@ -1635,6 +1635,19 @@ int ha_maria::optimize(THD * thd, HA_CHECK_OPT *check_opt)
return error; return error;
} }
/*
Set current_thd() for parallel worker thread
*/
C_MODE_START
void maria_setup_thd_for_repair_thread(void *arg)
{
THD *thd= (THD*) arg;
DBUG_ASSERT(thd->shared_thd);
set_current_thd(thd);
}
C_MODE_END
int ha_maria::repair(THD *thd, HA_CHECK *param, bool do_optimize) int ha_maria::repair(THD *thd, HA_CHECK *param, bool do_optimize)
{ {
@ -1726,8 +1739,17 @@ int ha_maria::repair(THD *thd, HA_CHECK *param, bool do_optimize)
my_snprintf(buf, 40, "Repair with %d threads", my_count_bits(key_map)); my_snprintf(buf, 40, "Repair with %d threads", my_count_bits(key_map));
thd_proc_info(thd, buf); thd_proc_info(thd, buf);
param->testflag|= T_REP_PARALLEL; param->testflag|= T_REP_PARALLEL;
/*
Ensure that all threads are using the same THD. This is needed
to get limit of tmp files to work
*/
param->init_repair_thread= maria_setup_thd_for_repair_thread;
param->init_repair_thread_arg= table->in_use;
/* Mark that multiple threads are using the thd */
table->in_use->shared_thd= 1;
error= maria_repair_parallel(param, file, fixed_name, error= maria_repair_parallel(param, file, fixed_name,
MY_TEST(param->testflag & T_QUICK)); MY_TEST(param->testflag & T_QUICK));
table->in_use->shared_thd= 0;
/* to reset proc_info, as it was pointing to local buffer */ /* to reset proc_info, as it was pointing to local buffer */
thd_proc_info(thd, "Repair done"); thd_proc_info(thd, "Repair done");
} }
@ -2100,12 +2122,13 @@ int ha_maria::enable_indexes(key_map map, bool persist)
This can be set when doing an ALTER TABLE and enabling unique keys This can be set when doing an ALTER TABLE and enabling unique keys
*/ */
if ((error= (repair(thd, param, 0) != HA_ADMIN_OK)) && param->retry_repair && if ((error= (repair(thd, param, 0) != HA_ADMIN_OK)) && param->retry_repair &&
!file->s->internal_table &&
(my_errno != HA_ERR_FOUND_DUPP_KEY || (my_errno != HA_ERR_FOUND_DUPP_KEY ||
!file->create_unique_index_by_sort)) !file->create_unique_index_by_sort))
{ {
sql_print_warning("Warning: Enabling keys got errno %d on %s.%s, " sql_print_warning("Warning: Enabling keys got errno %d on %s, "
"retrying", "retrying",
my_errno, param->db_name, param->table_name); my_errno, file->s->open_file_name);
/* Repairing by sort failed. Now try standard repair method. */ /* Repairing by sort failed. Now try standard repair method. */
param->testflag &= ~T_REP_BY_SORT; param->testflag &= ~T_REP_BY_SORT;
file->state->records= start_rows; file->state->records= start_rows;
@ -2398,7 +2421,7 @@ int ha_maria::end_bulk_insert()
can_enable_indexes= 0; can_enable_indexes= 0;
if (first_error) if (first_error)
my_errno= first_errno; my_errno= first_errno;
DBUG_RETURN(first_error); DBUG_RETURN(first_errno);
} }

View file

@ -2978,8 +2978,9 @@ err:
if (got_error) if (got_error)
{ {
if (! param->error_printed) if (! param->error_printed)
_ma_check_print_error(param,"%d for record at pos %s",my_errno, _ma_check_print_error(param,"Got error %d for record at pos %s when creating index",
llstr(sort_param.start_recpos,llbuff)); my_errno,
llstr(sort_param.start_recpos,llbuff));
(void)_ma_flush_table_files_before_swap(param, info); (void)_ma_flush_table_files_before_swap(param, info);
if (sort_info.new_info && sort_info.new_info != sort_info.info) if (sort_info.new_info && sort_info.new_info != sort_info.info)
{ {
@ -4218,7 +4219,7 @@ err:
if (got_error) if (got_error)
{ {
if (! param->error_printed) if (! param->error_printed)
_ma_check_print_error(param,"%d when fixing table",my_errno); _ma_check_print_error(param,"Got error %d when trying to repair table",my_errno);
(void)_ma_flush_table_files_before_swap(param, info); (void)_ma_flush_table_files_before_swap(param, info);
if (sort_info.new_info && sort_info.new_info != sort_info.info) if (sort_info.new_info && sort_info.new_info != sort_info.info)
{ {
@ -4488,6 +4489,7 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
for (i=key=0, istep=1 ; key < share->base.keys ; for (i=key=0, istep=1 ; key < share->base.keys ;
rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++) rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++)
{ {
sort_param[i].check_param= param;
sort_param[i].key=key; sort_param[i].key=key;
sort_param[i].keyinfo=share->keyinfo+key; sort_param[i].keyinfo=share->keyinfo+key;
sort_param[i].seg=sort_param[i].keyinfo->seg; sort_param[i].seg=sort_param[i].keyinfo->seg;
@ -4793,7 +4795,8 @@ err:
if (got_error) if (got_error)
{ {
if (! param->error_printed) if (! param->error_printed)
_ma_check_print_error(param,"%d when fixing table",my_errno); _ma_check_print_error(param,"Got error %d when repairing table with parallel repair",
my_errno);
(void)_ma_flush_table_files_before_swap(param, info); (void)_ma_flush_table_files_before_swap(param, info);
if (new_file >= 0) if (new_file >= 0)
{ {

View file

@ -543,8 +543,16 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
MARIA_SORT_PARAM *sort_param= (MARIA_SORT_PARAM*) arg; MARIA_SORT_PARAM *sort_param= (MARIA_SORT_PARAM*) arg;
my_bool error= FALSE; my_bool error= FALSE;
/* If my_thread_init fails */ /* If my_thread_init fails */
if (my_thread_init() || _ma_thr_find_all_keys_exec(sort_param)) if (my_thread_init())
error= TRUE; error= TRUE;
else
{
HA_CHECK *check= sort_param->check_param;
if (check->init_repair_thread)
check->init_repair_thread(check->init_repair_thread_arg);
if (_ma_thr_find_all_keys_exec(sort_param))
error= TRUE;
}
/* /*
Thread must clean up after itself. Thread must clean up after itself.

View file

@ -396,6 +396,7 @@ typedef struct st_maria_sort_param
MEM_ROOT wordroot; MEM_ROOT wordroot;
uchar *record; uchar *record;
MY_TMPDIR *tmpdir; MY_TMPDIR *tmpdir;
HA_CHECK *check_param;
/* /*
The next two are used to collect statistics, see maria_update_key_parts for The next two are used to collect statistics, see maria_update_key_parts for

View file

@ -710,16 +710,6 @@ my_bool mi_killed_in_mariadb(MI_INFO *info)
return (((TABLE*) (info->external_ref))->in_use->killed != 0); return (((TABLE*) (info->external_ref))->in_use->killed != 0);
} }
static void init_compute_vcols(void *table)
{
/*
To evaluate vcols we must have current_thd set.
This will set current_thd in all threads to the same THD, but it's
safe, because vcols are always evaluated under info->s->intern_lock.
*/
set_current_thd(static_cast<TABLE *>(table)->in_use);
}
static int compute_vcols(MI_INFO *info, uchar *record, int keynum) static int compute_vcols(MI_INFO *info, uchar *record, int keynum)
{ {
/* This mutex is needed for parallel repair */ /* This mutex is needed for parallel repair */
@ -1032,7 +1022,6 @@ void ha_myisam::setup_vcols_for_repair(HA_CHECK *param)
} }
DBUG_ASSERT(file->s->base.reclength < file->s->vreclength || DBUG_ASSERT(file->s->base.reclength < file->s->vreclength ||
!table->s->stored_fields); !table->s->stored_fields);
param->init_fix_record= init_compute_vcols;
param->fix_record= compute_vcols; param->fix_record= compute_vcols;
table->use_all_columns(); table->use_all_columns();
} }
@ -1286,6 +1275,25 @@ int ha_myisam::optimize(THD* thd, HA_CHECK_OPT *check_opt)
} }
/*
Set current_thd() for parallel worker thread
This is needed to evaluate vcols as we must have current_thd set.
This will set current_thd in all threads to the same THD, but it's
safe, because vcols are always evaluated under info->s->intern_lock.
This is also used temp_file_size_cb_func() to tmp_space_usage by THD.
*/
C_MODE_START
void myisam_setup_thd_for_repair_thread(void *arg)
{
THD *thd= (THD*) arg;
DBUG_ASSERT(thd->shared_thd);
set_current_thd(thd);
}
C_MODE_END
int ha_myisam::repair(THD *thd, HA_CHECK &param, bool do_optimize) int ha_myisam::repair(THD *thd, HA_CHECK &param, bool do_optimize)
{ {
int error=0; int error=0;
@ -1357,8 +1365,18 @@ int ha_myisam::repair(THD *thd, HA_CHECK &param, bool do_optimize)
{ {
/* TODO: respect myisam_repair_threads variable */ /* TODO: respect myisam_repair_threads variable */
thd_proc_info(thd, "Parallel repair"); thd_proc_info(thd, "Parallel repair");
error = mi_repair_parallel(&param, file, fixed_name, param.testflag|= T_REP_PARALLEL;
MY_TEST(param.testflag & T_QUICK)); /*
Ensure that all threads are using the same THD. This is needed
to get limit of tmp files to work
*/
param.init_repair_thread= myisam_setup_thd_for_repair_thread;
param.init_repair_thread_arg= table->in_use;
/* Mark that multiple threads are using the thd */
table->in_use->shared_thd= 1;
error= mi_repair_parallel(&param, file, fixed_name,
MY_TEST(param.testflag & T_QUICK));
table->in_use->shared_thd= 0;
} }
else else
{ {

View file

@ -2813,6 +2813,7 @@ int mi_repair_parallel(HA_CHECK *param, register MI_INFO *info,
for (i=key=0, istep=1 ; key < share->base.keys ; for (i=key=0, istep=1 ; key < share->base.keys ;
rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++) rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++)
{ {
sort_param[i].check_param= param;
sort_param[i].key=key; sort_param[i].key=key;
sort_param[i].keyinfo=share->keyinfo+key; sort_param[i].keyinfo=share->keyinfo+key;
sort_param[i].seg=sort_param[i].keyinfo->seg; sort_param[i].seg=sort_param[i].keyinfo->seg;

View file

@ -530,13 +530,17 @@ pthread_handler_t thr_find_all_keys(void *arg)
MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg; MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
my_bool error= FALSE; my_bool error= FALSE;
MI_SORT_INFO *si= sort_param->sort_info;
if (si->param->init_fix_record)
si->param->init_fix_record(si->info->external_ref);
/* If my_thread_init fails */ /* If my_thread_init fails */
if (my_thread_init() || thr_find_all_keys_exec(sort_param)) if (my_thread_init())
error= TRUE; error= TRUE;
else
{
HA_CHECK *check= sort_param->check_param;
if (check->init_repair_thread)
check->init_repair_thread(check->init_repair_thread_arg);
if (thr_find_all_keys_exec(sort_param))
error= TRUE;
}
/* /*
Thread must clean up after itself. Thread must clean up after itself.