LOCK_thread_count and COND_thread_count removed from wsrep modules (#1197)

Refactored wsrep patch to not use LOCK_thread_count and COND_thread_count anymore.
This has partially been replaced by using old LOCK_wsrep_slave_threads mutex.
For slave thread count change waiting, new COND_wsrep_slave_threads signal has been added

Added LOCK_wsrep_cluster_config mutex to control that cluster address change cannot happen in parallel

Protected wsrep_slave_threads variable changes with LOCK_cluster_config mutex
This is for avoiding concurrent slave thread count and cluster joining operations to happen

Fixes according to Teemu's review
This commit is contained in:
seppo 2019-02-26 20:39:05 +02:00 committed by Sergey Vojtovich
parent bb970dda77
commit 785092ee23
7 changed files with 60 additions and 104 deletions

View file

@ -1536,11 +1536,6 @@ static my_bool kill_all_threads(THD *thd, void *)
if (DBUG_EVALUATE_IF("only_kill_system_threads", !thd->system_thread, 0))
return 0;
#ifdef WITH_WSREP
/* skip wsrep system threads as well */
if (WSREP(thd) && (wsrep_thd_is_applying(thd) || thd->wsrep_applier))
return 0;
#endif
thd->set_killed(KILL_SERVER_HARD);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data);
@ -1593,34 +1588,6 @@ static my_bool kill_all_threads_once_again(THD *thd, void *)
}
#endif
#ifdef WITH_WSREP
/*
* WSREP_TODO:
* this code block may turn out redundant. wsrep->disconnect()
* should terminate slave threads gracefully, and we don't need
* to signal them here.
* The code here makes sure mysqld will not hang during shutdown
* even if wsrep provider has problems in shutting down.
*/
if (WSREP(thd) && wsrep_thd_is_applying(thd))
{
sql_print_information("closing wsrep system thread");
thd->set_killed(KILL_CONNECTION);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
if (thd->mysys_var)
{
thd->mysys_var->abort=1;
mysql_mutex_lock(&thd->mysys_var->mutex);
if (thd->mysys_var->current_cond)
{
mysql_mutex_lock(thd->mysys_var->current_mutex);
mysql_cond_broadcast(thd->mysys_var->current_cond);
mysql_mutex_unlock(thd->mysys_var->current_mutex);
}
mysql_mutex_unlock(&thd->mysys_var->mutex);
}
}
#endif
return 0;
}

View file

@ -5347,13 +5347,13 @@ static Sys_var_charptr Sys_wsrep_cluster_name(
ON_CHECK(wsrep_cluster_name_check),
ON_UPDATE(wsrep_cluster_name_update));
static PolyLock_mutex PLock_wsrep_slave_threads(&LOCK_wsrep_slave_threads);
static PolyLock_mutex PLock_wsrep_cluster_config(&LOCK_wsrep_cluster_config);
static Sys_var_charptr Sys_wsrep_cluster_address (
"wsrep_cluster_address", "Address to initially connect to cluster",
PREALLOCATED GLOBAL_VAR(wsrep_cluster_address),
CMD_LINE(REQUIRED_ARG),
IN_SYSTEM_CHARSET, DEFAULT(""),
&PLock_wsrep_slave_threads, NOT_IN_BINLOG,
&PLock_wsrep_cluster_config, NOT_IN_BINLOG,
ON_CHECK(wsrep_cluster_address_check),
ON_UPDATE(wsrep_cluster_address_update));
@ -5384,7 +5384,7 @@ static Sys_var_ulong Sys_wsrep_slave_threads(
"wsrep_slave_threads", "Number of slave appliers to launch",
GLOBAL_VAR(wsrep_slave_threads), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1, 512), DEFAULT(1), BLOCK_SIZE(1),
&PLock_wsrep_slave_threads, NOT_IN_BINLOG,
&PLock_wsrep_cluster_config, NOT_IN_BINLOG,
ON_CHECK(0),
ON_UPDATE(wsrep_slave_threads_update));

View file

@ -144,6 +144,8 @@ mysql_cond_t COND_wsrep_sst_init;
mysql_mutex_t LOCK_wsrep_replaying;
mysql_cond_t COND_wsrep_replaying;
mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_cond_t COND_wsrep_slave_threads;
mysql_mutex_t LOCK_wsrep_cluster_config;
mysql_mutex_t LOCK_wsrep_desync;
mysql_mutex_t LOCK_wsrep_config_state;
mysql_mutex_t LOCK_wsrep_SR_pool;
@ -158,7 +160,7 @@ PSI_mutex_key
key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
key_LOCK_wsrep_config_state,
key_LOCK_wsrep_config_state, key_LOCK_wsrep_cluster_config,
key_LOCK_wsrep_SR_pool,
key_LOCK_wsrep_SR_store,
key_LOCK_wsrep_thd_queue;
@ -166,7 +168,7 @@ PSI_mutex_key
PSI_cond_key key_COND_wsrep_thd,
key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
key_COND_wsrep_thd_queue;
key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads;
PSI_file_key key_file_wsrep_gra_log;
@ -180,6 +182,7 @@ static PSI_mutex_info wsrep_mutexes[]=
{ &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_cluster_config, "LOCK_wsrep_cluster_config", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_SR_pool, "LOCK_wsrep_SR_pool", PSI_FLAG_GLOBAL},
@ -193,7 +196,8 @@ static PSI_cond_info wsrep_conds[]=
{ &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
{ &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0},
{ &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
{ &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_slave_threads, "COND_wsrep_wsrep_slave_threads", PSI_FLAG_GLOBAL}
};
static PSI_file_info wsrep_files[]=
@ -758,6 +762,8 @@ void wsrep_thr_init()
mysql_mutex_init(key_LOCK_wsrep_replaying, &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_slave_threads, &COND_wsrep_slave_threads, NULL);
mysql_mutex_init(key_LOCK_wsrep_cluster_config, &LOCK_wsrep_cluster_config, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_SR_pool,
@ -860,6 +866,8 @@ void wsrep_thr_deinit()
mysql_mutex_destroy(&LOCK_wsrep_replaying);
mysql_cond_destroy(&COND_wsrep_replaying);
mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
mysql_cond_destroy(&COND_wsrep_slave_threads);
mysql_mutex_destroy(&LOCK_wsrep_cluster_config);
mysql_mutex_destroy(&LOCK_wsrep_desync);
mysql_mutex_destroy(&LOCK_wsrep_config_state);
mysql_mutex_destroy(&LOCK_wsrep_SR_pool);
@ -897,7 +905,7 @@ void wsrep_recover()
void wsrep_stop_replication(THD *thd)
{
WSREP_INFO("Stop replication");
WSREP_INFO("Stop replication by %llu", (thd) ? thd->thread_id : 0);
if (Wsrep_server_state::instance().state() !=
Wsrep_server_state::s_disconnected)
{
@ -956,6 +964,7 @@ bool wsrep_start_replication()
if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0)
{
// if provider is non-trivial, but no address is specified, wait for address
WSREP_DEBUG("wsrep_start_replication exit due to empty address");
return true;
}
@ -2280,23 +2289,15 @@ static my_bool kill_remaining_threads(THD *thd, THD *caller_thd)
void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)
{
/* Clear thread cache */
kill_cached_threads++;
flush_thread_cache();
/*
First signal all threads that it's time to die
*/
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
bool kill_cached_threads_saved= kill_cached_threads;
kill_cached_threads= true; // prevent future threads caching
mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die
server_threads.iterate(kill_all_threads, except_caller_thd);
mysql_mutex_unlock(&LOCK_thread_count);
if (thread_count)
sleep(2); // Give threads time to die
mysql_mutex_lock(&LOCK_thread_count);
/*
Force remaining threads to die by closing the connection to the client
*/
@ -2309,14 +2310,10 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)
while (wait_to_end && server_threads.iterate(have_client_connections))
{
mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
sleep(1);
DBUG_PRINT("quit",("One thread died (count=%u)", uint32_t(thread_count)));
}
kill_cached_threads= kill_cached_threads_saved;
mysql_mutex_unlock(&LOCK_thread_count);
/* All client connection threads have now been aborted */
}
@ -2348,7 +2345,7 @@ void wsrep_close_threads(THD *thd)
void wsrep_wait_appliers_close(THD *thd)
{
/* Wait for wsrep appliers to gracefully exit */
mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
while (wsrep_running_threads > 2)
/*
2 is for rollbacker thread which needs to be killed explicitly.
@ -2356,34 +2353,22 @@ void wsrep_wait_appliers_close(THD *thd)
number of non-applier wsrep threads.
*/
{
if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
{
mysql_mutex_unlock(&LOCK_thread_count);
my_sleep(100);
mysql_mutex_lock(&LOCK_thread_count);
}
else
mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
DBUG_PRINT("quit",("One applier died (count=%u)", uint32_t(thread_count)));
mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
}
mysql_mutex_unlock(&LOCK_thread_count);
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
DBUG_PRINT("quit",("applier threads have died (count=%u)",
uint32_t(wsrep_running_threads)));
/* Now kill remaining wsrep threads: rollbacker */
wsrep_close_threads (thd);
/* and wait for them to die */
mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
while (wsrep_running_threads > 0)
{
if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
{
mysql_mutex_unlock(&LOCK_thread_count);
my_sleep(100);
mysql_mutex_lock(&LOCK_thread_count);
}
else
mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
DBUG_PRINT("quit",("One thread died (count=%u)", uint32_t(thread_count)));
mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
}
mysql_mutex_unlock(&LOCK_thread_count);
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
DBUG_PRINT("quit",("all wsrep system threads have died"));
/* All wsrep applier threads have now been aborted. However, if this thread
is also applier, we are still running...
@ -2698,10 +2683,10 @@ void* start_wsrep_THD(void *arg)
thd->proc_info= 0;
thd->set_command(COM_SLEEP);
thd->init_for_queries();
mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
wsrep_running_threads++;
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_wsrep_slave_threads);
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
WSREP_DEBUG("wsrep system thread %llu, %p starting",
thd->thread_id, thd);
@ -2718,11 +2703,11 @@ void* start_wsrep_THD(void *arg)
delete thd_args;
mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
wsrep_running_threads--;
WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads);
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_wsrep_slave_threads);
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
/*
Note: We can't call THD destructor without crashing
if plugins have not been initialized. However, in most of the

View file

@ -301,6 +301,8 @@ extern int wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_replaying;
extern mysql_cond_t COND_wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_slave_threads;
extern mysql_cond_t COND_wsrep_slave_threads;
extern mysql_mutex_t LOCK_wsrep_cluster_config;
extern mysql_mutex_t LOCK_wsrep_desync;
extern mysql_mutex_t LOCK_wsrep_SR_pool;
extern mysql_mutex_t LOCK_wsrep_SR_store;
@ -327,6 +329,8 @@ extern PSI_cond_key key_COND_wsrep_sst_thread;
extern PSI_mutex_key key_LOCK_wsrep_replaying;
extern PSI_cond_key key_COND_wsrep_replaying;
extern PSI_mutex_key key_LOCK_wsrep_slave_threads;
extern PSI_cond_key key_COND_wsrep_slave_threads;
extern PSI_mutex_key key_LOCK_wsrep_cluster_config;
extern PSI_mutex_key key_LOCK_wsrep_desync;
extern PSI_mutex_key key_LOCK_wsrep_SR_pool;
extern PSI_mutex_key key_LOCK_wsrep_SR_store;

View file

@ -580,12 +580,9 @@ static void wsrep_init_thd_for_schema(THD *thd)
thd->security_ctx->skip_grants();
thd->system_thread= SYSTEM_THREAD_GENERIC;
mysql_mutex_lock(&LOCK_thread_count);
thd->real_id=pthread_self(); // Keep purify happy
thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime;
(void) mysql_mutex_unlock(&LOCK_thread_count);
/* */
thd->variables.wsrep_on = 0;
@ -1337,9 +1334,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
THD* thd= new THD(next_thread_id(), true);
thd->thread_stack= (char*)&storage_thd;
mysql_mutex_lock(&LOCK_thread_count);
thd->real_id= pthread_self();
mysql_mutex_unlock(&LOCK_thread_count);
applier= new Wsrep_applier_service(thd);
server_state.start_streaming_applier(server_id, transaction_id,

View file

@ -60,11 +60,11 @@ static void wsrep_replication_process(THD *thd,
enum wsrep::provider::status
ret= Wsrep_server_state::get_provider().run_applier(&applier_service);
WSREP_INFO("Applier thread exiting %d", ret);
mysql_mutex_lock(&LOCK_thread_count);
WSREP_INFO("Applier thread exiting ret: %d thd: %llu", ret, thd->thread_id);
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
wsrep_close_applier(thd);
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_wsrep_slave_threads);
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
delete thd->system_thread_info.rpl_sql_info;
delete thd->wsrep_rgi->rli->mi;
@ -87,7 +87,6 @@ static bool create_wsrep_THD(Wsrep_thd_args* args)
{
ulong old_wsrep_running_threads= wsrep_running_threads;
pthread_t unused;
mysql_mutex_lock(&LOCK_thread_count);
bool res= pthread_create(&unused, &connection_attrib, start_wsrep_THD,
args);
@ -96,10 +95,11 @@ static bool create_wsrep_THD(Wsrep_thd_args* args)
is fully initialized (otherwise a THD initialization code might
try to access a partially initialized server data structure - MDEV-8208).
*/
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
if (!mysqld_server_initialized)
while (old_wsrep_running_threads == wsrep_running_threads)
mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
return res;
}
@ -115,6 +115,7 @@ void wsrep_create_appliers(long threads)
if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0)
{
WSREP_DEBUG("wsrep_create_appliers exit due to empty address");
return;
}
@ -138,6 +139,7 @@ static void wsrep_rollback_process(THD *rollbacker,
THD* thd= NULL;
DBUG_ASSERT(!wsrep_rollback_queue);
wsrep_rollback_queue= new Wsrep_thd_queue(rollbacker);
WSREP_INFO("Starting rollbacker thread %llu", rollbacker->thread_id);
thd_proc_info(rollbacker, "wsrep aborter idle");
while ((thd= wsrep_rollback_queue->pop_front()) != NULL)
@ -266,7 +268,7 @@ static void wsrep_rollback_process(THD *rollbacker,
delete wsrep_rollback_queue;
wsrep_rollback_queue= NULL;
sql_print_information("WSREP: rollbacker thread exiting");
WSREP_INFO("rollbacker thread exiting %llu", rollbacker->thread_id);
DBUG_ASSERT(rollbacker->killed != NOT_KILLED);
DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
@ -279,6 +281,7 @@ static void wsrep_post_rollback_process(THD *post_rollbacker,
DBUG_ENTER("wsrep_post_rollback_process");
THD* thd= NULL;
WSREP_INFO("Starting post rollbacker thread %llu", post_rollbacker->thread_id);
DBUG_ASSERT(!wsrep_post_rollback_queue);
wsrep_post_rollback_queue= new Wsrep_thd_queue(post_rollbacker);
@ -301,12 +304,13 @@ static void wsrep_post_rollback_process(THD *post_rollbacker,
DBUG_ASSERT(post_rollbacker->killed != NOT_KILLED);
DBUG_PRINT("wsrep",("wsrep post rollbacker thread exiting"));
WSREP_INFO("post rollbacker thread exiting %llu", post_rollbacker->thread_id);
DBUG_VOID_RETURN;
}
void wsrep_create_rollbacker()
{
if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
if (wsrep_cluster_address && wsrep_cluster_address[0] != 0)
{
Wsrep_thd_args* args= new Wsrep_thd_args(wsrep_rollback_process, 0);

View file

@ -478,23 +478,24 @@ bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type)
Note: releasing LOCK_global_system_variables may cause race condition, if
there can be several concurrent clients changing wsrep_provider
*/
WSREP_DEBUG("wsrep_cluster_address_update: %s", wsrep_cluster_address);
mysql_mutex_unlock(&LOCK_global_system_variables);
wsrep_stop_replication(thd);
if (wsrep_start_replication())
{
wsrep_create_rollbacker();
wsrep_create_appliers(wsrep_slave_threads);
}
/* locking order to be enforced is:
1. LOCK_global_system_variables
2. LOCK_wsrep_slave_threads
2. LOCK_wsrep_cluster_config
=> have to juggle mutexes to comply with this
*/
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
mysql_mutex_unlock(&LOCK_wsrep_cluster_config);
mysql_mutex_lock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
mysql_mutex_lock(&LOCK_wsrep_cluster_config);
return false;
}