MDEV-30436 Spider: deduplicate some sts/crd code.

We spare spider_table_remove_share_from_crd/sts_thread and
spider_table_add_share_to_crd/sts_thread because the function body has
too many fields with crd/sts in the name and macro affects
debuggability.
This commit is contained in:
Yuchen Pei 2025-06-05 17:38:17 +10:00
commit 3e9aa07cce
No known key found for this signature in database
GPG key ID: 3DD1B35105743563
2 changed files with 112 additions and 275 deletions

View file

@ -6044,12 +6044,12 @@ int spider_db_done(
for (roop_count = spider_param_table_crd_thread_count() - 1;
roop_count >= 0; roop_count--)
{
spider_free_crd_threads(&spider_table_crd_threads[roop_count]);
spider_free_sts_crd_threads(&spider_table_crd_threads[roop_count]);
}
for (roop_count = spider_param_table_sts_thread_count() - 1;
roop_count >= 0; roop_count--)
{
spider_free_sts_threads(&spider_table_sts_threads[roop_count]);
spider_free_sts_crd_threads(&spider_table_sts_threads[roop_count]);
}
spider_free(NULL, spider_table_sts_threads, MYF(0));
#endif
@ -6532,7 +6532,8 @@ int spider_db_init(
roop_count < (int) spider_param_table_sts_thread_count();
roop_count++)
{
if ((error_num = spider_create_sts_threads(&spider_table_sts_threads[roop_count])))
if ((error_num = spider_create_sts_crd_threads(&spider_table_sts_threads[roop_count],
true)))
{
goto error_init_table_sts_threads;
}
@ -6541,7 +6542,8 @@ int spider_db_init(
roop_count < (int) spider_param_table_crd_thread_count();
roop_count++)
{
if ((error_num = spider_create_crd_threads(&spider_table_crd_threads[roop_count])))
if ((error_num = spider_create_sts_crd_threads(&spider_table_crd_threads[roop_count],
false)))
{
goto error_init_table_crd_threads;
}
@ -6581,13 +6583,13 @@ error_init_dbton:
error_init_table_crd_threads:
for (; roop_count >= 0; roop_count--)
{
spider_free_crd_threads(&spider_table_crd_threads[roop_count]);
spider_free_sts_crd_threads(&spider_table_crd_threads[roop_count]);
}
roop_count = spider_param_table_sts_thread_count() - 1;
error_init_table_sts_threads:
for (; roop_count >= 0; roop_count--)
{
spider_free_sts_threads(&spider_table_sts_threads[roop_count]);
spider_free_sts_crd_threads(&spider_table_sts_threads[roop_count]);
}
error_alloc_table_sts_crd_threads:
spider_free(NULL, spider_table_sts_threads, MYF(0));
@ -8621,32 +8623,44 @@ void spider_free_spider_object_for_share(
DBUG_VOID_RETURN;
}
int spider_create_sts_threads(
SPIDER_THREAD *spider_thread
int spider_create_sts_crd_threads(
SPIDER_THREAD *spider_thread,
bool is_sts
) {
int error_num;
DBUG_ENTER("spider_create_sts_threads");
if (mysql_mutex_init(spd_key_mutex_bg_stss,
DBUG_ENTER("spider_create_sts_crd_threads");
PSI_mutex_key mutex_bg= is_sts ? spd_key_mutex_bg_stss :
spd_key_mutex_bg_crds;
PSI_cond_key cond_bg= is_sts ? spd_key_cond_bg_stss :
spd_key_cond_bg_crds;
PSI_cond_key cond_bg_syncs= is_sts ? spd_key_cond_bg_sts_syncs :
spd_key_cond_bg_crd_syncs;
if (mysql_mutex_init(mutex_bg,
&spider_thread->mutex, MY_MUTEX_INIT_FAST))
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_mutex_init;
}
if (mysql_cond_init(spd_key_cond_bg_stss,
if (mysql_cond_init(cond_bg,
&spider_thread->cond, NULL))
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_cond_init;
}
if (mysql_cond_init(spd_key_cond_bg_sts_syncs,
if (mysql_cond_init(cond_bg_syncs,
&spider_thread->sync_cond, NULL))
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_sync_cond_init;
}
if (mysql_thread_create(spd_key_thd_bg_stss, &spider_thread->thread,
&spider_pt_attr, spider_table_bg_sts_action, (void *) spider_thread)
)
error_num = is_sts ?
mysql_thread_create(spd_key_thd_bg_stss, &spider_thread->thread,
&spider_pt_attr, spider_table_bg_sts_action,
(void *) spider_thread) :
mysql_thread_create(spd_key_thd_bg_crds, &spider_thread->thread,
&spider_pt_attr, spider_table_bg_crd_action,
(void *) spider_thread);
if (error_num)
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_thread_create;
@ -8663,11 +8677,11 @@ error_mutex_init:
DBUG_RETURN(error_num);
}
void spider_free_sts_threads(
void spider_free_sts_crd_threads(
SPIDER_THREAD *spider_thread
) {
bool thread_killed;
DBUG_ENTER("spider_free_sts_threads");
DBUG_ENTER("spider_free_sts_crd_threads");
pthread_mutex_lock(&spider_thread->mutex);
thread_killed = spider_thread->killed;
spider_thread->killed = TRUE;
@ -8689,86 +8703,20 @@ void spider_free_sts_threads(
DBUG_VOID_RETURN;
}
int spider_create_crd_threads(
SPIDER_THREAD *spider_thread
) {
int error_num;
DBUG_ENTER("spider_create_crd_threads");
if (mysql_mutex_init(spd_key_mutex_bg_crds,
&spider_thread->mutex, MY_MUTEX_INIT_FAST))
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_mutex_init;
}
if (mysql_cond_init(spd_key_cond_bg_crds,
&spider_thread->cond, NULL))
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_cond_init;
}
if (mysql_cond_init(spd_key_cond_bg_crd_syncs,
&spider_thread->sync_cond, NULL))
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_sync_cond_init;
}
if (mysql_thread_create(spd_key_thd_bg_crds, &spider_thread->thread,
&spider_pt_attr, spider_table_bg_crd_action, (void *) spider_thread)
)
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_thread_create;
}
DBUG_RETURN(0);
error_thread_create:
pthread_cond_destroy(&spider_thread->sync_cond);
error_sync_cond_init:
pthread_cond_destroy(&spider_thread->cond);
error_cond_init:
pthread_mutex_destroy(&spider_thread->mutex);
error_mutex_init:
DBUG_RETURN(error_num);
}
void spider_free_crd_threads(
SPIDER_THREAD *spider_thread
) {
bool thread_killed;
DBUG_ENTER("spider_free_crd_threads");
pthread_mutex_lock(&spider_thread->mutex);
thread_killed = spider_thread->killed;
spider_thread->killed = TRUE;
if (!thread_killed)
{
if (spider_thread->thd_wait)
{
pthread_cond_signal(&spider_thread->cond);
}
pthread_cond_wait(&spider_thread->sync_cond, &spider_thread->mutex);
}
pthread_mutex_unlock(&spider_thread->mutex);
pthread_join(spider_thread->thread, NULL);
pthread_cond_destroy(&spider_thread->sync_cond);
pthread_cond_destroy(&spider_thread->cond);
pthread_mutex_destroy(&spider_thread->mutex);
spider_thread->thd_wait = FALSE;
spider_thread->killed = FALSE;
DBUG_VOID_RETURN;
}
void *spider_table_bg_sts_action(
void *arg
static void *spider_table_bg_sts_crd_action(
void *arg,
bool is_sts
) {
SPIDER_THREAD *thread = (SPIDER_THREAD *) arg;
SPIDER_SHARE *share;
SPIDER_TRX *trx;
int error_num;
ha_spider *spider;
TABLE *table; /* only needed for crd */
SPIDER_CONN **conns;
THD *thd;
my_thread_init();
DBUG_ENTER("spider_table_bg_sts_action");
DBUG_ENTER("spider_table_bg_sts_crd_action");
/* init start */
pthread_mutex_lock(&thread->mutex);
if (!(thd = spider_create_sys_thd(thread)))
@ -8783,7 +8731,8 @@ void *spider_table_bg_sts_action(
#ifdef HAVE_PSI_INTERFACE
mysql_thread_set_psi_id(thd->thread_id);
#endif
thd_proc_info(thd, "Spider table background statistics action handler");
thd_proc_info(thd, "Spider table background statistics/cardinality"
" action handler");
if (!(trx = spider_get_trx(NULL, FALSE, &error_num)))
{
spider_destroy_sys_thd(thd);
@ -8799,10 +8748,6 @@ void *spider_table_bg_sts_action(
trx->thd = thd;
/* init end */
if (thd->killed)
{
thread->killed = TRUE;
}
if (thd->killed)
{
thread->killed = TRUE;
@ -8810,10 +8755,10 @@ void *spider_table_bg_sts_action(
while (TRUE)
{
DBUG_PRINT("info",("spider bg sts loop start"));
DBUG_PRINT("info",("spider bg sts/crd loop start"));
if (thread->killed)
{
DBUG_PRINT("info",("spider bg sts kill start"));
DBUG_PRINT("info",("spider bg sts/crd kill start"));
trx->thd = NULL;
spider_free_trx(trx, TRUE);
spider_destroy_sys_thd(thd);
@ -8827,7 +8772,7 @@ void *spider_table_bg_sts_action(
}
if (!thread->queue_first)
{
DBUG_PRINT("info",("spider bg sts has no job"));
DBUG_PRINT("info",("spider bg sts/crd has no job"));
thread->thd_wait = TRUE;
pthread_cond_wait(&thread->cond, &thread->mutex);
thread->thd_wait = FALSE;
@ -8836,155 +8781,16 @@ void *spider_table_bg_sts_action(
continue;
}
share = (SPIDER_SHARE *) thread->queue_first;
share->sts_working = TRUE;
if (is_sts)
share->sts_working = TRUE;
else
share->crd_working = TRUE;
pthread_mutex_unlock(&thread->mutex);
spider = share->sts_spider;
conns = spider->conns;
if (spider->search_link_idx < 0)
{
spider->wide_handler->trx = trx;
spider_trx_set_link_idx_for_all(spider);
spider->search_link_idx = spider_conn_first_link_idx(thd,
share->link_statuses, share->access_balances, spider->conn_link_idx,
share->link_count, SPIDER_LINK_STATUS_OK);
}
if (spider->search_link_idx >= 0)
{
DBUG_PRINT("info",
("spider difftime=%f",
difftime(share->bg_sts_try_time, share->sts_get_time)));
DBUG_PRINT("info",
("spider bg_sts_interval=%f", share->bg_sts_interval));
if (difftime(share->bg_sts_try_time, share->sts_get_time) >=
share->bg_sts_interval)
{
if (!conns[spider->search_link_idx])
{
spider_get_conn(share, spider->search_link_idx,
share->conn_keys[spider->search_link_idx], trx,
spider, FALSE, FALSE, &error_num);
if (conns[spider->search_link_idx])
{
conns[spider->search_link_idx]->error_mode = 0;
} else {
spider->search_link_idx = -1;
}
}
DBUG_PRINT("info",
("spider search_link_idx=%d", spider->search_link_idx));
if (spider->search_link_idx >= 0 && conns[spider->search_link_idx])
{
if (spider_get_sts(share, spider->search_link_idx,
share->bg_sts_try_time, spider,
share->bg_sts_interval, share->bg_sts_mode,
share->bg_sts_sync,
2, HA_STATUS_CONST | HA_STATUS_VARIABLE))
{
spider->search_link_idx = -1;
}
}
}
}
memset(spider->need_mons, 0, sizeof(int) * share->link_count);
pthread_mutex_lock(&thread->mutex);
if (thread->queue_first == thread->queue_last)
{
thread->queue_first = NULL;
thread->queue_last = NULL;
} else {
thread->queue_first = share->sts_next;
share->sts_next->sts_prev = NULL;
share->sts_next = NULL;
}
share->sts_working = FALSE;
share->sts_wait = FALSE;
if (thread->first_free_wait)
{
pthread_cond_signal(&thread->sync_cond);
pthread_cond_wait(&thread->cond, &thread->mutex);
if (thd->killed)
thread->killed = TRUE;
}
}
}
void *spider_table_bg_crd_action(
void *arg
) {
SPIDER_THREAD *thread = (SPIDER_THREAD *) arg;
SPIDER_SHARE *share;
SPIDER_TRX *trx;
int error_num;
ha_spider *spider;
TABLE *table;
SPIDER_CONN **conns;
THD *thd;
my_thread_init();
DBUG_ENTER("spider_table_bg_crd_action");
/* init start */
pthread_mutex_lock(&thread->mutex);
if (!(thd = spider_create_sys_thd(thread)))
{
thread->thd_wait = FALSE;
thread->killed = FALSE;
pthread_mutex_unlock(&thread->mutex);
my_thread_end();
DBUG_RETURN(NULL);
}
SPIDER_set_next_thread_id(thd);
#ifdef HAVE_PSI_INTERFACE
mysql_thread_set_psi_id(thd->thread_id);
#endif
thd_proc_info(thd, "Spider table background cardinality action handler");
if (!(trx = spider_get_trx(NULL, FALSE, &error_num)))
{
spider_destroy_sys_thd(thd);
thread->thd_wait = FALSE;
thread->killed = FALSE;
pthread_mutex_unlock(&thread->mutex);
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
set_current_thd(nullptr);
#endif
my_thread_end();
DBUG_RETURN(NULL);
}
trx->thd = thd;
/* init end */
while (TRUE)
{
DBUG_PRINT("info",("spider bg crd loop start"));
if (thread->killed)
{
DBUG_PRINT("info",("spider bg crd kill start"));
trx->thd = NULL;
spider_free_trx(trx, TRUE);
spider_destroy_sys_thd(thd);
pthread_cond_signal(&thread->sync_cond);
pthread_mutex_unlock(&thread->mutex);
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
set_current_thd(nullptr);
#endif
my_thread_end();
DBUG_RETURN(NULL);
}
if (!thread->queue_first)
{
DBUG_PRINT("info",("spider bg crd has no job"));
thread->thd_wait = TRUE;
pthread_cond_wait(&thread->cond, &thread->mutex);
thread->thd_wait = FALSE;
if (thd->killed)
thread->killed = TRUE;
continue;
}
share = (SPIDER_SHARE *) thread->queue_first;
share->crd_working = TRUE;
pthread_mutex_unlock(&thread->mutex);
table = &share->table;
spider = share->crd_spider;
if (is_sts)
spider = share->sts_spider;
else
spider = share->crd_spider;
conns = spider->conns;
if (spider->search_link_idx < 0)
{
@ -8996,13 +8802,13 @@ void *spider_table_bg_crd_action(
}
if (spider->search_link_idx >= 0)
{
DBUG_PRINT("info",
("spider difftime=%f",
difftime(share->bg_crd_try_time, share->crd_get_time)));
DBUG_PRINT("info",
("spider bg_crd_interval=%f", share->bg_crd_interval));
if (difftime(share->bg_crd_try_time, share->crd_get_time) >=
share->bg_crd_interval)
double diff_time= is_sts ?
difftime(share->bg_sts_try_time, share->sts_get_time) :
difftime(share->bg_crd_try_time, share->crd_get_time);
double interval= is_sts? share->bg_sts_interval : share->bg_crd_interval;
DBUG_PRINT("info", ("spider difftime=%f", diff_time));
DBUG_PRINT("info", ("spider bg_sts_interval=%f", interval));
if (diff_time >= interval)
{
if (!conns[spider->search_link_idx])
{
@ -9020,18 +8826,27 @@ void *spider_table_bg_crd_action(
("spider search_link_idx=%d", spider->search_link_idx));
if (spider->search_link_idx >= 0 && conns[spider->search_link_idx])
{
int result = is_sts ?
#ifdef WITH_PARTITION_STORAGE_ENGINE
if (spider_get_crd(share, spider->search_link_idx,
share->bg_crd_try_time, spider, table,
share->bg_crd_interval, share->bg_crd_mode,
share->bg_crd_sync,
2))
spider_get_sts(share, spider->search_link_idx,
share->bg_sts_try_time, spider,
share->bg_sts_interval, share->bg_sts_mode,
share->bg_sts_sync,
2, HA_STATUS_CONST | HA_STATUS_VARIABLE) :
spider_get_crd(share, spider->search_link_idx,
share->bg_crd_try_time, spider, table,
share->bg_crd_interval, share->bg_crd_mode,
share->bg_crd_sync, 2);
#else
if (spider_get_crd(share, spider->search_link_idx,
share->bg_crd_try_time, spider, table,
share->bg_crd_interval, share->bg_crd_mode,
2))
spider_get_sts(share, spider->search_link_idx,
share->bg_sts_try_time, spider,
share->bg_sts_interval, share->bg_sts_mode,
2, HA_STATUS_CONST | HA_STATUS_VARIABLE) :
spider_get_crd(share, spider->search_link_idx,
share->bg_crd_try_time, spider, table,
share->bg_crd_interval, share->bg_crd_mode, 2);
#endif
if (result)
{
spider->search_link_idx = -1;
}
@ -9045,12 +8860,29 @@ void *spider_table_bg_crd_action(
thread->queue_first = NULL;
thread->queue_last = NULL;
} else {
thread->queue_first = share->crd_next;
share->crd_next->crd_prev = NULL;
share->crd_next = NULL;
if (is_sts)
{
thread->queue_first = share->sts_next;
share->sts_next->sts_prev = NULL;
share->sts_next = NULL;
}
else
{
thread->queue_first = share->crd_next;
share->crd_next->crd_prev = NULL;
share->crd_next = NULL;
}
}
if (is_sts)
{
share->sts_working= FALSE;
share->sts_wait= FALSE;
}
else
{
share->crd_working= FALSE;
share->crd_wait= FALSE;
}
share->crd_working = FALSE;
share->crd_wait = FALSE;
if (thread->first_free_wait)
{
pthread_cond_signal(&thread->sync_cond);
@ -9061,6 +8893,18 @@ void *spider_table_bg_crd_action(
}
}
void *spider_table_bg_sts_action(void *arg)
{
DBUG_ENTER("spider_table_bg_sts_action");
DBUG_RETURN(spider_table_bg_sts_crd_action(arg, true));
}
void *spider_table_bg_crd_action(void *arg)
{
DBUG_ENTER("spider_table_bg_crd_action");
DBUG_RETURN(spider_table_bg_sts_crd_action(arg, false));
}
void spider_table_add_share_to_sts_thread(
SPIDER_SHARE *share
) {

View file

@ -472,19 +472,12 @@ void spider_free_spider_object_for_share(
ha_spider **spider
);
int spider_create_sts_threads(
SPIDER_THREAD *spider_thread
int spider_create_sts_crd_threads(
SPIDER_THREAD *spider_thread,
bool is_sts
);
void spider_free_sts_threads(
SPIDER_THREAD *spider_thread
);
int spider_create_crd_threads(
SPIDER_THREAD *spider_thread
);
void spider_free_crd_threads(
void spider_free_sts_crd_threads(
SPIDER_THREAD *spider_thread
);