mirror of
https://github.com/MariaDB/server.git
synced 2025-01-18 21:12:26 +01:00
0c1f97b3ab
The immediate bug was caused by a failure to recognize a correct position to stop the slave applier run in optimistic parallel mode. There were the following set of issues that the analysis unveil. 1 incorrect estimate for the event binlog position passed to is_until_satisfied 2 wait for workers to complete by the driver thread did not account non-group events that could be left unprocessed and thus to mix up the last executed binlog group's file and position: the file remained old and the position related to the new rotated file 3 incorrect 'slave reached file:pos' by the parallel slave report in the error log 4 relay log UNTIL missed out the parallel slave branch in is_until_satisfied. The patch addresses all of them to simplify logics of log change notification in either the master and relay-log until case. P.1 is addressed with passing the event into is_until_satisfied() for proper analisis by the function. P.2 is fixed by changes in handle_queued_pos_update(). P.4 required removing relay-log change notification by workers. Instead the driver thread updates the notion of the current relay-log fully itself with aid of introduced bool Relay_log_info::until_relay_log_names_defer. An extra print out of the requested until file:pos is arranged with --log-warning=3.
2908 lines
90 KiB
C++
2908 lines
90 KiB
C++
#include "my_global.h"
|
|
#include "rpl_parallel.h"
|
|
#include "slave.h"
|
|
#include "rpl_mi.h"
|
|
#include "sql_parse.h"
|
|
#include "debug_sync.h"
|
|
|
|
/*
|
|
Code for optional parallel execution of replicated events on the slave.
|
|
*/
|
|
|
|
|
|
/*
|
|
Maximum number of queued events to accumulate in a local free list, before
|
|
moving them to the global free list. There is additional a limit of how much
|
|
to accumulate based on opt_slave_parallel_max_queued.
|
|
*/
|
|
#define QEV_BATCH_FREE 200
|
|
|
|
|
|
struct rpl_parallel_thread_pool global_rpl_thread_pool;
|
|
|
|
static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
|
|
int err);
|
|
|
|
static int
|
|
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
|
|
struct rpl_parallel_thread *rpt)
|
|
{
|
|
int err;
|
|
rpl_group_info *rgi= qev->rgi;
|
|
Relay_log_info *rli= rgi->rli;
|
|
THD *thd= rgi->thd;
|
|
Log_event *ev;
|
|
|
|
DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT);
|
|
ev= qev->ev;
|
|
|
|
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
|
|
ev->thd= thd;
|
|
|
|
strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name);
|
|
rgi->event_relay_log_name= rgi->event_relay_log_name_buf;
|
|
rgi->event_relay_log_pos= qev->event_relay_log_pos;
|
|
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
|
|
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
|
|
if (!(ev->is_artificial_event() || ev->is_relay_log_event() ||
|
|
(ev->when == 0)))
|
|
rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time;
|
|
err= apply_event_and_update_pos_for_parallel(ev, thd, rgi);
|
|
|
|
thread_safe_increment64(&rli->executed_entries);
|
|
/* ToDo: error handling. */
|
|
return err;
|
|
}
|
|
|
|
|
|
static void
|
|
handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
|
|
{
|
|
int cmp;
|
|
Relay_log_info *rli;
|
|
rpl_parallel_entry *e;
|
|
|
|
/*
|
|
Events that are not part of an event group, such as Format Description,
|
|
Stop, GTID List and such, are executed directly in the driver SQL thread,
|
|
to keep the relay log state up-to-date. But the associated position update
|
|
is done here, in sync with other normal events as they are queued to
|
|
worker threads.
|
|
*/
|
|
if ((thd->variables.option_bits & OPTION_BEGIN) &&
|
|
opt_using_transactions)
|
|
return;
|
|
|
|
/* Do not update position if an earlier event group caused an error abort. */
|
|
DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE);
|
|
rli= qev->rgi->rli;
|
|
e= qev->entry_for_queued;
|
|
if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX ||
|
|
(e->force_abort && !rli->stop_for_until))
|
|
return;
|
|
|
|
mysql_mutex_lock(&rli->data_lock);
|
|
cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name);
|
|
if (cmp < 0)
|
|
{
|
|
rli->group_relay_log_pos= qev->future_event_relay_log_pos;
|
|
strmake_buf(rli->group_relay_log_name, qev->event_relay_log_name);
|
|
} else if (cmp == 0 &&
|
|
rli->group_relay_log_pos < qev->future_event_relay_log_pos)
|
|
rli->group_relay_log_pos= qev->future_event_relay_log_pos;
|
|
|
|
cmp= strcmp(rli->group_master_log_name, qev->future_event_master_log_name);
|
|
if (cmp < 0)
|
|
{
|
|
strcpy(rli->group_master_log_name, qev->future_event_master_log_name);
|
|
rli->group_master_log_pos= qev->future_event_master_log_pos;
|
|
}
|
|
else if (cmp == 0
|
|
&& rli->group_master_log_pos < qev->future_event_master_log_pos)
|
|
rli->group_master_log_pos= qev->future_event_master_log_pos;
|
|
mysql_mutex_unlock(&rli->data_lock);
|
|
mysql_cond_broadcast(&rli->data_cond);
|
|
}
|
|
|
|
|
|
/*
|
|
Wait for any pending deadlock kills. Since deadlock kills happen
|
|
asynchronously, we need to be sure they will be completed before starting a
|
|
new transaction. Otherwise the new transaction might suffer a spurious kill.
|
|
*/
|
|
static void
|
|
wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi)
|
|
{
|
|
PSI_stage_info old_stage;
|
|
|
|
mysql_mutex_lock(&thd->LOCK_wakeup_ready);
|
|
thd->ENTER_COND(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready,
|
|
&stage_waiting_for_deadlock_kill, &old_stage);
|
|
while (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
|
|
mysql_cond_wait(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready);
|
|
thd->EXIT_COND(&old_stage);
|
|
}
|
|
|
|
|
|
static void
|
|
finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
|
|
rpl_parallel_entry *entry, rpl_group_info *rgi)
|
|
{
|
|
THD *thd= rpt->thd;
|
|
wait_for_commit *wfc= &rgi->commit_orderer;
|
|
int err;
|
|
|
|
thd->get_stmt_da()->set_overwrite_status(true);
|
|
/*
|
|
Remove any left-over registration to wait for a prior commit to
|
|
complete. Normally, such wait would already have been removed at
|
|
this point by wait_for_prior_commit() called from within COMMIT
|
|
processing. However, in case of MyISAM and no binlog, we might not
|
|
have any commit processing, and so we need to do the wait here,
|
|
before waking up any subsequent commits, to preserve correct
|
|
order of event execution. Also, in the error case we might have
|
|
skipped waiting and thus need to remove it explicitly.
|
|
|
|
It is important in the non-error case to do a wait, not just an
|
|
unregister. Because we might be last in a group-commit that is
|
|
replicated in parallel, and the following event will then wait
|
|
for us to complete and rely on this also ensuring that any other
|
|
event in the group has completed.
|
|
|
|
And in the error case, correct GCO lifetime relies on the fact that once
|
|
the last event group in the GCO has executed wait_for_prior_commit(),
|
|
all earlier event groups have also committed; this way no more
|
|
mark_start_commit() calls can be made and it is safe to de-allocate
|
|
the GCO.
|
|
*/
|
|
err= wfc->wait_for_prior_commit(thd);
|
|
if (unlikely(err) && !rgi->worker_error)
|
|
signal_error_to_sql_driver_thread(thd, rgi, err);
|
|
thd->wait_for_commit_ptr= NULL;
|
|
|
|
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
|
/*
|
|
We need to mark that this event group started its commit phase, in case we
|
|
missed it before (otherwise we would deadlock the next event group that is
|
|
waiting for this). In most cases (normal DML), it will be a no-op.
|
|
*/
|
|
rgi->mark_start_commit_no_lock();
|
|
|
|
if (entry->last_committed_sub_id < sub_id)
|
|
{
|
|
/*
|
|
Record that this event group has finished (eg. transaction is
|
|
committed, if transactional), so other event groups will no longer
|
|
attempt to wait for us to commit. Once we have increased
|
|
entry->last_committed_sub_id, no other threads will execute
|
|
register_wait_for_prior_commit() against us. Thus, by doing one
|
|
extra (usually redundant) wakeup_subsequent_commits() we can ensure
|
|
that no register_wait_for_prior_commit() can ever happen without a
|
|
subsequent wakeup_subsequent_commits() to wake it up.
|
|
|
|
We can race here with the next transactions, but that is fine, as
|
|
long as we check that we do not decrease last_committed_sub_id. If
|
|
this commit is done, then any prior commits will also have been
|
|
done and also no longer need waiting for.
|
|
*/
|
|
entry->last_committed_sub_id= sub_id;
|
|
if (entry->need_sub_id_signal)
|
|
mysql_cond_broadcast(&entry->COND_parallel_entry);
|
|
|
|
/* Now free any GCOs in which all transactions have committed. */
|
|
group_commit_orderer *tmp_gco= rgi->gco;
|
|
while (tmp_gco &&
|
|
(!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id ||
|
|
tmp_gco->next_gco->wait_count > entry->count_committing_event_groups))
|
|
{
|
|
/*
|
|
We must not free a GCO before the wait_count of the following GCO has
|
|
been reached and wakeup has been sent. Otherwise we will lose the
|
|
wakeup and hang (there were several such bugs in the past).
|
|
|
|
The intention is that this is ensured already since we only free when
|
|
the last event group in the GCO has committed
|
|
(tmp_gco->last_sub_id <= sub_id). However, if we have a bug, we have
|
|
extra check on next_gco->wait_count to hopefully avoid hanging; we
|
|
have here an assertion in debug builds that this check does not in
|
|
fact trigger.
|
|
*/
|
|
DBUG_ASSERT(!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id);
|
|
tmp_gco= tmp_gco->prev_gco;
|
|
}
|
|
while (tmp_gco)
|
|
{
|
|
group_commit_orderer *prev_gco= tmp_gco->prev_gco;
|
|
tmp_gco->next_gco->prev_gco= NULL;
|
|
rpt->loc_free_gco(tmp_gco);
|
|
tmp_gco= prev_gco;
|
|
}
|
|
}
|
|
|
|
/*
|
|
If this event group got error, then any following event groups that have
|
|
not yet started should just skip their group, preparing for stop of the
|
|
SQL driver thread.
|
|
*/
|
|
if (unlikely(rgi->worker_error) &&
|
|
entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
|
|
entry->stop_on_error_sub_id= sub_id;
|
|
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
|
DBUG_EXECUTE_IF("hold_worker_on_schedule", {
|
|
if (entry->stop_on_error_sub_id < (uint64)ULONGLONG_MAX)
|
|
{
|
|
debug_sync_set_action(thd, STRING_WITH_LEN("now SIGNAL continue_worker"));
|
|
}
|
|
});
|
|
|
|
DBUG_EXECUTE_IF("rpl_parallel_simulate_wait_at_retry", {
|
|
if (rgi->current_gtid.seq_no == 1000) {
|
|
DBUG_ASSERT(entry->stop_on_error_sub_id == sub_id);
|
|
debug_sync_set_action(thd,
|
|
STRING_WITH_LEN("now WAIT_FOR proceed_by_1000"));
|
|
}
|
|
});
|
|
|
|
if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
|
|
wait_for_pending_deadlock_kill(thd, rgi);
|
|
thd->clear_error();
|
|
thd->reset_killed();
|
|
/*
|
|
Would do thd->get_stmt_da()->set_overwrite_status(false) here, but
|
|
reset_diagnostics_area() already does that.
|
|
*/
|
|
thd->get_stmt_da()->reset_diagnostics_area();
|
|
wfc->wakeup_subsequent_commits(rgi->worker_error);
|
|
}
|
|
|
|
|
|
static void
|
|
signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
|
|
{
|
|
rgi->worker_error= err;
|
|
/*
|
|
In case we get an error during commit, inform following transactions that
|
|
we aborted our commit.
|
|
*/
|
|
rgi->unmark_start_commit();
|
|
rgi->cleanup_context(thd, true);
|
|
rgi->rli->abort_slave= true;
|
|
rgi->rli->stop_for_until= false;
|
|
mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
|
|
mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
|
|
rgi->rli->relay_log.signal_update();
|
|
}
|
|
|
|
|
|
static void
|
|
unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
|
|
PSI_stage_info *old_stage)
|
|
{
|
|
if (*did_enter_cond)
|
|
{
|
|
thd->EXIT_COND(old_stage);
|
|
*did_enter_cond= false;
|
|
}
|
|
else
|
|
mysql_mutex_unlock(lock);
|
|
}
|
|
|
|
|
|
static void
|
|
register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
|
|
rpl_parallel_entry *entry)
|
|
{
|
|
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
|
|
if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
|
|
{
|
|
/*
|
|
Register that the commit of this event group must wait for the
|
|
commit of the previous event group to complete before it may
|
|
complete itself, so that we preserve commit order.
|
|
*/
|
|
wait_for_commit *waitee=
|
|
&rgi->wait_commit_group_info->commit_orderer;
|
|
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
Do not start parallel execution of this event group until all prior groups
|
|
have reached the commit phase that are not safe to run in parallel with.
|
|
*/
|
|
static bool
|
|
do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
|
|
bool *did_enter_cond, PSI_stage_info *old_stage)
|
|
{
|
|
THD *thd= rgi->thd;
|
|
rpl_parallel_entry *entry= rgi->parallel_entry;
|
|
uint64 wait_count;
|
|
|
|
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
|
|
|
|
if (!gco->installed)
|
|
{
|
|
group_commit_orderer *prev_gco= gco->prev_gco;
|
|
if (prev_gco)
|
|
{
|
|
prev_gco->last_sub_id= gco->prior_sub_id;
|
|
prev_gco->next_gco= gco;
|
|
}
|
|
gco->installed= true;
|
|
}
|
|
wait_count= gco->wait_count;
|
|
if (wait_count > entry->count_committing_event_groups)
|
|
{
|
|
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
|
|
thd->ENTER_COND(&gco->COND_group_commit_orderer,
|
|
&entry->LOCK_parallel_entry,
|
|
&stage_waiting_for_prior_transaction_to_start_commit,
|
|
old_stage);
|
|
*did_enter_cond= true;
|
|
thd->set_time_for_next_stage();
|
|
do
|
|
{
|
|
if (thd->check_killed() && !rgi->worker_error)
|
|
{
|
|
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
|
|
thd->clear_error();
|
|
thd->get_stmt_da()->reset_diagnostics_area();
|
|
thd->send_kill_message();
|
|
slave_output_error_info(rgi, thd);
|
|
signal_error_to_sql_driver_thread(thd, rgi, 1);
|
|
/*
|
|
Even though we were killed, we need to continue waiting for the
|
|
prior event groups to signal that we can continue. Otherwise we
|
|
mess up the accounting for ordering. However, now that we have
|
|
marked the error, events will just be skipped rather than
|
|
executed, and things will progress quickly towards stop.
|
|
*/
|
|
}
|
|
mysql_cond_wait(&gco->COND_group_commit_orderer,
|
|
&entry->LOCK_parallel_entry);
|
|
} while (wait_count > entry->count_committing_event_groups);
|
|
}
|
|
|
|
if (entry->force_abort && wait_count > entry->stop_count)
|
|
{
|
|
/*
|
|
We are stopping (STOP SLAVE), and this event group is beyond the point
|
|
where we can safely stop. So return a flag that will cause us to skip,
|
|
rather than execute, the following events.
|
|
*/
|
|
return true;
|
|
}
|
|
else
|
|
return false;
|
|
}
|
|
|
|
|
|
static void
|
|
do_ftwrl_wait(rpl_group_info *rgi,
|
|
bool *did_enter_cond, PSI_stage_info *old_stage)
|
|
{
|
|
THD *thd= rgi->thd;
|
|
rpl_parallel_entry *entry= rgi->parallel_entry;
|
|
uint64 sub_id= rgi->gtid_sub_id;
|
|
DBUG_ENTER("do_ftwrl_wait");
|
|
|
|
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
|
|
|
|
/*
|
|
If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this
|
|
transaction is later than transactions that have priority to complete
|
|
before FTWRL. If so, wait here so that FTWRL can proceed and complete
|
|
first.
|
|
|
|
(entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes
|
|
this test false as required).
|
|
*/
|
|
if (unlikely(sub_id > entry->pause_sub_id))
|
|
{
|
|
thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
|
|
&stage_waiting_for_ftwrl, old_stage);
|
|
*did_enter_cond= true;
|
|
thd->set_time_for_next_stage();
|
|
do
|
|
{
|
|
if (entry->force_abort || rgi->worker_error)
|
|
break;
|
|
if (thd->check_killed())
|
|
{
|
|
thd->send_kill_message();
|
|
slave_output_error_info(rgi, thd);
|
|
signal_error_to_sql_driver_thread(thd, rgi, 1);
|
|
break;
|
|
}
|
|
mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry);
|
|
} while (sub_id > entry->pause_sub_id);
|
|
|
|
/*
|
|
We do not call EXIT_COND() here, as this will be done later by our
|
|
caller (since we set *did_enter_cond to true).
|
|
*/
|
|
}
|
|
|
|
if (sub_id > entry->largest_started_sub_id)
|
|
entry->largest_started_sub_id= sub_id;
|
|
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
static int
|
|
pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd)
|
|
{
|
|
PSI_stage_info old_stage;
|
|
int res= 0;
|
|
|
|
/*
|
|
Wait here while the queue is busy. This is done to make FLUSH TABLES WITH
|
|
READ LOCK work correctly, without incuring extra locking penalties in
|
|
normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the
|
|
thread pool, and for this we need to make sure the pool will not go away
|
|
during the operation. The LOCK_rpl_thread_pool is not suitable for
|
|
this. It is taken by release_thread() while holding LOCK_rpl_thread; so it
|
|
must be released before locking any LOCK_rpl_thread lock, or a deadlock
|
|
can occur.
|
|
|
|
So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and
|
|
pool size changes with this condition wait.
|
|
*/
|
|
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
|
|
if (thd)
|
|
{
|
|
thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
|
|
&stage_waiting_for_rpl_thread_pool, &old_stage);
|
|
thd->set_time_for_next_stage();
|
|
}
|
|
while (pool->busy)
|
|
{
|
|
if (thd && thd->check_killed())
|
|
{
|
|
thd->send_kill_message();
|
|
res= 1;
|
|
break;
|
|
}
|
|
mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
|
|
}
|
|
if (!res)
|
|
pool->busy= true;
|
|
if (thd)
|
|
thd->EXIT_COND(&old_stage);
|
|
else
|
|
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
|
|
|
|
return res;
|
|
}
|
|
|
|
|
|
static void
|
|
pool_mark_not_busy(rpl_parallel_thread_pool *pool)
|
|
{
|
|
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
|
|
DBUG_ASSERT(pool->busy);
|
|
pool->busy= false;
|
|
mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
|
|
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
|
|
}
|
|
|
|
|
|
void
|
|
rpl_unpause_after_ftwrl(THD *thd)
|
|
{
|
|
uint32 i;
|
|
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
|
|
DBUG_ENTER("rpl_unpause_after_ftwrl");
|
|
|
|
DBUG_ASSERT(pool->busy);
|
|
|
|
for (i= 0; i < pool->count; ++i)
|
|
{
|
|
rpl_parallel_entry *e;
|
|
rpl_parallel_thread *rpt= pool->threads[i];
|
|
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
if (!rpt->current_owner)
|
|
{
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
continue;
|
|
}
|
|
e= rpt->current_entry;
|
|
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
|
rpt->pause_for_ftwrl = false;
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
e->pause_sub_id= (uint64)ULONGLONG_MAX;
|
|
mysql_cond_broadcast(&e->COND_parallel_entry);
|
|
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
|
}
|
|
|
|
pool_mark_not_busy(pool);
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
/*
|
|
.
|
|
|
|
Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called.
|
|
*/
|
|
int
|
|
rpl_pause_for_ftwrl(THD *thd)
|
|
{
|
|
uint32 i;
|
|
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
|
|
int err;
|
|
DBUG_ENTER("rpl_pause_for_ftwrl");
|
|
|
|
/*
|
|
While the count_pending_pause_for_ftwrl counter is non-zero, the pool
|
|
cannot be shutdown/resized, so threads are guaranteed to not disappear.
|
|
|
|
This is required to safely be able to access the individual threads below.
|
|
(We cannot lock an individual thread while holding LOCK_rpl_thread_pool,
|
|
as this can deadlock against release_thread()).
|
|
*/
|
|
if ((err= pool_mark_busy(pool, thd)))
|
|
DBUG_RETURN(err);
|
|
|
|
for (i= 0; i < pool->count; ++i)
|
|
{
|
|
PSI_stage_info old_stage;
|
|
rpl_parallel_entry *e;
|
|
rpl_parallel_thread *rpt= pool->threads[i];
|
|
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
if (!rpt->current_owner)
|
|
{
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
continue;
|
|
}
|
|
e= rpt->current_entry;
|
|
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
|
/*
|
|
Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not
|
|
de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl().
|
|
*/
|
|
rpt->pause_for_ftwrl = true;
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
++e->need_sub_id_signal;
|
|
if (e->pause_sub_id == (uint64)ULONGLONG_MAX)
|
|
e->pause_sub_id= e->largest_started_sub_id;
|
|
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
|
|
&stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
|
|
thd->set_time_for_next_stage();
|
|
while (e->pause_sub_id < (uint64)ULONGLONG_MAX &&
|
|
e->last_committed_sub_id < e->pause_sub_id &&
|
|
!err)
|
|
{
|
|
if (thd->check_killed())
|
|
{
|
|
thd->send_kill_message();
|
|
err= 1;
|
|
break;
|
|
}
|
|
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
|
|
};
|
|
--e->need_sub_id_signal;
|
|
thd->EXIT_COND(&old_stage);
|
|
if (err)
|
|
break;
|
|
}
|
|
|
|
if (err)
|
|
rpl_unpause_after_ftwrl(thd);
|
|
DBUG_RETURN(err);
|
|
}
|
|
|
|
|
|
#ifndef DBUG_OFF
|
|
static int
|
|
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
|
|
{
|
|
if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 &&
|
|
rgi->retry_event_count == 4)
|
|
{
|
|
thd->clear_error();
|
|
thd->get_stmt_da()->reset_diagnostics_area();
|
|
my_error(ER_LOCK_DEADLOCK, MYF(0));
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
#endif
|
|
|
|
|
|
/*
|
|
If we detect a deadlock due to eg. storage engine locks that conflict with
|
|
the fixed commit order, then the later transaction will be killed
|
|
asynchroneously to allow the former to complete its commit.
|
|
|
|
In this case, we convert the 'killed' error into a deadlock error, and retry
|
|
the later transaction.
|
|
|
|
If we are doing optimistic parallel apply of transactions not known to be
|
|
safe, we convert any error to a deadlock error, but then at retry we will
|
|
wait for prior transactions to commit first, so that the retries can be
|
|
done non-speculative.
|
|
*/
|
|
static void
|
|
convert_kill_to_deadlock_error(rpl_group_info *rgi)
|
|
{
|
|
THD *thd= rgi->thd;
|
|
int err_code;
|
|
|
|
if (!thd->get_stmt_da()->is_error())
|
|
return;
|
|
err_code= thd->get_stmt_da()->sql_errno();
|
|
if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC &&
|
|
err_code != ER_PRIOR_COMMIT_FAILED) ||
|
|
((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
|
|
rgi->killed_for_retry))
|
|
{
|
|
thd->clear_error();
|
|
my_error(ER_LOCK_DEADLOCK, MYF(0));
|
|
thd->reset_killed();
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
Check if an event marks the end of an event group. Returns non-zero if so,
|
|
zero otherwise.
|
|
|
|
In addition, returns 1 if the group is committing, 2 if it is rolling back.
|
|
*/
|
|
static int
|
|
is_group_ending(Log_event *ev, Log_event_type event_type)
|
|
{
|
|
if (event_type == XID_EVENT)
|
|
return 1;
|
|
if (event_type == QUERY_EVENT) // COMMIT/ROLLBACK are never compressed
|
|
{
|
|
Query_log_event *qev = (Query_log_event *)ev;
|
|
if (qev->is_commit())
|
|
return 1;
|
|
if (qev->is_rollback())
|
|
return 2;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
static int
|
|
retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
|
|
rpl_parallel_thread::queued_event *orig_qev)
|
|
{
|
|
IO_CACHE rlog;
|
|
LOG_INFO linfo;
|
|
File fd= (File)-1;
|
|
const char *errmsg;
|
|
inuse_relaylog *ir= rgi->relay_log;
|
|
uint64 event_count;
|
|
uint64 events_to_execute= rgi->retry_event_count;
|
|
Relay_log_info *rli= rgi->rli;
|
|
int err;
|
|
ulonglong cur_offset, old_offset;
|
|
char log_name[FN_REFLEN];
|
|
THD *thd= rgi->thd;
|
|
rpl_parallel_entry *entry= rgi->parallel_entry;
|
|
ulong retries= 0;
|
|
Format_description_log_event *description_event= NULL;
|
|
|
|
do_retry:
|
|
event_count= 0;
|
|
err= 0;
|
|
errmsg= NULL;
|
|
|
|
/*
|
|
If we already started committing before getting the deadlock (or other
|
|
error) that caused us to need to retry, we have already signalled
|
|
subsequent transactions that we have started committing. This is
|
|
potentially a problem, as now we will rollback, and if subsequent
|
|
transactions would start to execute now, they could see an unexpected
|
|
state of the database and get eg. key not found or duplicate key error.
|
|
|
|
However, to get a deadlock in the first place, there must have been
|
|
another earlier transaction that is waiting for us. Thus that other
|
|
transaction has _not_ yet started to commit, and any subsequent
|
|
transactions will still be waiting at this point.
|
|
|
|
So here, we decrement back the count of transactions that started
|
|
committing (if we already incremented it), undoing the effect of an
|
|
earlier mark_start_commit(). Then later, when the retry succeeds and we
|
|
commit again, we can do a new mark_start_commit() and eventually wake up
|
|
subsequent transactions at the proper time.
|
|
|
|
We need to do the unmark before the rollback, to be sure that the
|
|
transaction we deadlocked with will not signal that it started to commit
|
|
until after the unmark.
|
|
*/
|
|
DBUG_EXECUTE_IF("inject_mdev8302", { my_sleep(20000);});
|
|
rgi->unmark_start_commit();
|
|
DEBUG_SYNC(thd, "rpl_parallel_retry_after_unmark");
|
|
|
|
/*
|
|
We might get the deadlock error that causes the retry during commit, while
|
|
sitting in wait_for_prior_commit(). If this happens, we will have a
|
|
pending error in the wait_for_commit object. So clear this by
|
|
unregistering (and later re-registering) the wait.
|
|
*/
|
|
if(thd->wait_for_commit_ptr)
|
|
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
|
|
DBUG_EXECUTE_IF("inject_mdev8031", {
|
|
/* Simulate that we get deadlock killed at this exact point. */
|
|
rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
|
|
thd->set_killed(KILL_CONNECTION);
|
|
});
|
|
DBUG_EXECUTE_IF("rpl_parallel_simulate_wait_at_retry", {
|
|
if (rgi->current_gtid.seq_no == 1001) {
|
|
debug_sync_set_action(thd,
|
|
STRING_WITH_LEN("rpl_parallel_simulate_wait_at_retry WAIT_FOR proceed_by_1001"));
|
|
}
|
|
DEBUG_SYNC(thd, "rpl_parallel_simulate_wait_at_retry");
|
|
});
|
|
|
|
rgi->cleanup_context(thd, 1);
|
|
wait_for_pending_deadlock_kill(thd, rgi);
|
|
thd->reset_killed();
|
|
thd->clear_error();
|
|
rgi->killed_for_retry = rpl_group_info::RETRY_KILL_NONE;
|
|
|
|
/*
|
|
If we retry due to a deadlock kill that occurred during the commit step, we
|
|
might have already updated (but not committed) an update of table
|
|
mysql.gtid_slave_pos, and cleared the gtid_pending flag. Now we have
|
|
rolled back any such update, so we must set the gtid_pending flag back to
|
|
true so that we will do a new update when/if we succeed with the retry.
|
|
*/
|
|
rgi->gtid_pending= true;
|
|
|
|
mysql_mutex_lock(&rli->data_lock);
|
|
++rli->retried_trans;
|
|
statistic_increment(slave_retried_transactions, LOCK_status);
|
|
mysql_mutex_unlock(&rli->data_lock);
|
|
|
|
for (;;)
|
|
{
|
|
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
|
if (entry->stop_on_error_sub_id == (uint64) ULONGLONG_MAX ||
|
|
#ifndef DBUG_OFF
|
|
(DBUG_EVALUATE_IF("simulate_mdev_12746", 1, 0)) ||
|
|
#endif
|
|
rgi->gtid_sub_id < entry->stop_on_error_sub_id)
|
|
{
|
|
register_wait_for_prior_event_group_commit(rgi, entry);
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
A failure of a preceeding "parent" transaction may not be
|
|
seen by the current one through its own worker_error.
|
|
Such induced error gets set by ourselves now.
|
|
*/
|
|
err= rgi->worker_error= 1;
|
|
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
|
|
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
|
goto err;
|
|
}
|
|
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
|
|
|
|
/*
|
|
Let us wait for all prior transactions to complete before trying again.
|
|
This way, we avoid repeatedly conflicting with and getting deadlock
|
|
killed by the same earlier transaction.
|
|
*/
|
|
if (!(err= thd->wait_for_prior_commit()))
|
|
{
|
|
rgi->speculation = rpl_group_info::SPECULATE_WAIT;
|
|
break;
|
|
}
|
|
|
|
convert_kill_to_deadlock_error(rgi);
|
|
if (!has_temporary_error(thd))
|
|
goto err;
|
|
/*
|
|
If we get a temporary error such as a deadlock kill, we can safely
|
|
ignore it, as we already rolled back.
|
|
|
|
But we still want to retry the wait for the prior transaction to
|
|
complete its commit.
|
|
*/
|
|
thd->clear_error();
|
|
thd->reset_killed();
|
|
if(thd->wait_for_commit_ptr)
|
|
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
|
|
DBUG_EXECUTE_IF("inject_mdev8031", {
|
|
/* Inject a small sleep to give prior transaction a chance to commit. */
|
|
my_sleep(100000);
|
|
});
|
|
}
|
|
|
|
/*
|
|
Let us clear any lingering deadlock kill one more time, here after
|
|
wait_for_prior_commit() has completed. This should rule out any
|
|
possibility of an old deadlock kill lingering on beyond this point.
|
|
*/
|
|
thd->reset_killed();
|
|
|
|
strmake_buf(log_name, ir->name);
|
|
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
|
|
{
|
|
err= 1;
|
|
goto err;
|
|
}
|
|
cur_offset= rgi->retry_start_offset;
|
|
delete description_event;
|
|
description_event=
|
|
read_relay_log_description_event(&rlog, cur_offset, &errmsg);
|
|
if (!description_event)
|
|
{
|
|
err= 1;
|
|
goto err;
|
|
}
|
|
DBUG_EXECUTE_IF("inject_mdev8031", {
|
|
/* Simulate pending KILL caught in read_relay_log_description_event(). */
|
|
if (thd->check_killed()) {
|
|
thd->send_kill_message();
|
|
err= 1;
|
|
goto err;
|
|
}
|
|
});
|
|
my_b_seek(&rlog, cur_offset);
|
|
|
|
do
|
|
{
|
|
Log_event_type event_type;
|
|
Log_event *ev;
|
|
rpl_parallel_thread::queued_event *qev;
|
|
|
|
/* The loop is here so we can try again the next relay log file on EOF. */
|
|
for (;;)
|
|
{
|
|
old_offset= cur_offset;
|
|
ev= Log_event::read_log_event(&rlog, 0, description_event,
|
|
opt_slave_sql_verify_checksum);
|
|
cur_offset= my_b_tell(&rlog);
|
|
|
|
if (ev)
|
|
break;
|
|
if (rlog.error < 0)
|
|
{
|
|
errmsg= "slave SQL thread aborted because of I/O error";
|
|
err= 1;
|
|
goto check_retry;
|
|
}
|
|
if (rlog.error > 0)
|
|
{
|
|
sql_print_error("Slave SQL thread: I/O error reading "
|
|
"event(errno: %d cur_log->error: %d)",
|
|
my_errno, rlog.error);
|
|
errmsg= "Aborting slave SQL thread because of partial event read";
|
|
err= 1;
|
|
goto err;
|
|
}
|
|
/* EOF. Move to the next relay log. */
|
|
end_io_cache(&rlog);
|
|
mysql_file_close(fd, MYF(MY_WME));
|
|
fd= (File)-1;
|
|
|
|
/* Find the next relay log file. */
|
|
if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) ||
|
|
(err= rli->relay_log.find_next_log(&linfo, 1)))
|
|
{
|
|
char buff[22];
|
|
sql_print_error("next log error: %d offset: %s log: %s",
|
|
err,
|
|
llstr(linfo.index_file_offset, buff),
|
|
log_name);
|
|
goto err;
|
|
}
|
|
strmake_buf(log_name ,linfo.log_file_name);
|
|
|
|
DBUG_EXECUTE_IF("inject_retry_event_group_open_binlog_kill", {
|
|
if (retries < 2)
|
|
{
|
|
/* Simulate that we get deadlock killed during open_binlog(). */
|
|
thd->reset_for_next_command();
|
|
rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
|
|
thd->set_killed(KILL_CONNECTION);
|
|
thd->send_kill_message();
|
|
fd= (File)-1;
|
|
err= 1;
|
|
goto check_retry;
|
|
}
|
|
});
|
|
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
|
|
{
|
|
err= 1;
|
|
goto check_retry;
|
|
}
|
|
description_event->reset_crypto();
|
|
/* Loop to try again on the new log file. */
|
|
}
|
|
|
|
event_type= ev->get_type_code();
|
|
if (event_type == FORMAT_DESCRIPTION_EVENT)
|
|
{
|
|
Format_description_log_event *newde= (Format_description_log_event*)ev;
|
|
newde->copy_crypto_data(description_event);
|
|
delete description_event;
|
|
description_event= newde;
|
|
continue;
|
|
}
|
|
else if (event_type == START_ENCRYPTION_EVENT)
|
|
{
|
|
description_event->start_decryption((Start_encryption_log_event*)ev);
|
|
delete ev;
|
|
continue;
|
|
}
|
|
else if (!Log_event::is_group_event(event_type))
|
|
{
|
|
delete ev;
|
|
continue;
|
|
}
|
|
ev->thd= thd;
|
|
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
qev= rpt->retry_get_qev(ev, orig_qev, log_name, old_offset,
|
|
cur_offset - old_offset);
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
if (!qev)
|
|
{
|
|
delete ev;
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
err= 1;
|
|
goto err;
|
|
}
|
|
if (is_group_ending(ev, event_type) == 1)
|
|
rgi->mark_start_commit();
|
|
|
|
err= rpt_handle_event(qev, rpt);
|
|
++event_count;
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
rpt->free_qev(qev);
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
|
|
delete_or_keep_event_post_apply(rgi, event_type, ev);
|
|
DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
|
|
if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
|
|
DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100",
|
|
err= dbug_simulate_tmp_error(rgi, thd););
|
|
if (!err)
|
|
continue;
|
|
|
|
check_retry:
|
|
convert_kill_to_deadlock_error(rgi);
|
|
if (has_temporary_error(thd))
|
|
{
|
|
++retries;
|
|
if (retries < slave_trans_retries)
|
|
{
|
|
if (fd >= 0)
|
|
{
|
|
end_io_cache(&rlog);
|
|
mysql_file_close(fd, MYF(MY_WME));
|
|
fd= (File)-1;
|
|
}
|
|
goto do_retry;
|
|
}
|
|
sql_print_error("Slave worker thread retried transaction %lu time(s) "
|
|
"in vain, giving up. Consider raising the value of "
|
|
"the slave_transaction_retries variable.",
|
|
slave_trans_retries);
|
|
}
|
|
goto err;
|
|
|
|
} while (event_count < events_to_execute);
|
|
|
|
err:
|
|
|
|
if (description_event)
|
|
delete description_event;
|
|
if (fd >= 0)
|
|
{
|
|
end_io_cache(&rlog);
|
|
mysql_file_close(fd, MYF(MY_WME));
|
|
}
|
|
if (errmsg)
|
|
sql_print_error("Error reading relay log event: %s", errmsg);
|
|
return err;
|
|
}
|
|
|
|
|
|
pthread_handler_t
|
|
handle_rpl_parallel_thread(void *arg)
|
|
{
|
|
THD *thd;
|
|
PSI_stage_info old_stage;
|
|
struct rpl_parallel_thread::queued_event *events;
|
|
bool group_standalone= true;
|
|
bool in_event_group= false;
|
|
bool skip_event_group= false;
|
|
rpl_group_info *group_rgi= NULL;
|
|
group_commit_orderer *gco;
|
|
uint64 event_gtid_sub_id= 0;
|
|
rpl_sql_thread_info sql_info(NULL);
|
|
int err;
|
|
|
|
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
|
|
|
|
my_thread_init();
|
|
thd = new THD(next_thread_id());
|
|
thd->thread_stack = (char*)&thd;
|
|
add_to_active_threads(thd);
|
|
set_current_thd(thd);
|
|
pthread_detach_this_thread();
|
|
thd->init_for_queries();
|
|
thd->variables.binlog_annotate_row_events= 0;
|
|
init_thr_lock();
|
|
thd->store_globals();
|
|
thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
|
|
thd->security_ctx->skip_grants();
|
|
thd->variables.max_allowed_packet= slave_max_allowed_packet;
|
|
thd->slave_thread= 1;
|
|
thd->variables.sql_log_slow= opt_log_slow_slave_statements;
|
|
thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
|
|
set_slave_thread_options(thd);
|
|
thd->client_capabilities = CLIENT_LOCAL_FILES;
|
|
thd->net.reading_or_writing= 0;
|
|
thd_proc_info(thd, "Waiting for work from main SQL threads");
|
|
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
|
|
thd->system_thread_info.rpl_sql_info= &sql_info;
|
|
/*
|
|
We need to use (at least) REPEATABLE READ isolation level. Otherwise
|
|
speculative parallel apply can run out-of-order and give wrong results
|
|
for statement-based replication.
|
|
*/
|
|
thd->variables.tx_isolation= ISO_REPEATABLE_READ;
|
|
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
rpt->thd= thd;
|
|
|
|
while (rpt->delay_start)
|
|
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
|
|
|
|
rpt->running= true;
|
|
mysql_cond_signal(&rpt->COND_rpl_thread);
|
|
|
|
thd->set_command(COM_SLAVE_WORKER);
|
|
while (!rpt->stop)
|
|
{
|
|
uint wait_count= 0;
|
|
rpl_parallel_thread::queued_event *qev, *next_qev;
|
|
|
|
thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
|
|
&stage_waiting_for_work_from_sql_thread, &old_stage);
|
|
/*
|
|
There are 4 cases that should cause us to wake up:
|
|
- Events have been queued for us to handle.
|
|
- We have an owner, but no events and not inside event group -> we need
|
|
to release ourself to the thread pool
|
|
- SQL thread is stopping, and we have an owner but no events, and we are
|
|
inside an event group; no more events will be queued to us, so we need
|
|
to abort the group (force_abort==1).
|
|
- Thread pool shutdown (rpt->stop==1).
|
|
*/
|
|
while (!( (events= rpt->event_queue) ||
|
|
(rpt->current_owner && !in_event_group) ||
|
|
(rpt->current_owner && group_rgi->parallel_entry->force_abort) ||
|
|
rpt->stop))
|
|
{
|
|
if (!wait_count++)
|
|
thd->set_time_for_next_stage();
|
|
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
|
|
}
|
|
rpt->dequeue1(events);
|
|
thd->EXIT_COND(&old_stage);
|
|
|
|
more_events:
|
|
for (qev= events; qev; qev= next_qev)
|
|
{
|
|
Log_event_type event_type;
|
|
rpl_group_info *rgi= qev->rgi;
|
|
rpl_parallel_entry *entry= rgi->parallel_entry;
|
|
bool end_of_group;
|
|
int group_ending;
|
|
|
|
next_qev= qev->next;
|
|
if (qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
|
|
{
|
|
handle_queued_pos_update(thd, qev);
|
|
rpt->loc_free_qev(qev);
|
|
continue;
|
|
}
|
|
else if (qev->typ ==
|
|
rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART)
|
|
{
|
|
if (in_event_group)
|
|
{
|
|
/*
|
|
Master restarted (crashed) in the middle of an event group.
|
|
So we need to roll back and discard that event group.
|
|
*/
|
|
group_rgi->cleanup_context(thd, 1);
|
|
in_event_group= false;
|
|
finish_event_group(rpt, group_rgi->gtid_sub_id,
|
|
qev->entry_for_queued, group_rgi);
|
|
|
|
rpt->loc_free_rgi(group_rgi);
|
|
thd->rgi_slave= group_rgi= NULL;
|
|
}
|
|
|
|
rpt->loc_free_qev(qev);
|
|
continue;
|
|
}
|
|
DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
|
|
|
|
thd->rgi_slave= rgi;
|
|
gco= rgi->gco;
|
|
/* Handle a new event group, which will be initiated by a GTID event. */
|
|
if ((event_type= qev->ev->get_type_code()) == GTID_EVENT)
|
|
{
|
|
bool did_enter_cond= false;
|
|
PSI_stage_info old_stage;
|
|
|
|
DBUG_EXECUTE_IF("hold_worker_on_schedule", {
|
|
if (rgi->current_gtid.domain_id == 0 &&
|
|
rgi->current_gtid.seq_no == 100) {
|
|
debug_sync_set_action(thd,
|
|
STRING_WITH_LEN("now SIGNAL reached_pause WAIT_FOR continue_worker"));
|
|
}
|
|
});
|
|
DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", {
|
|
if (rgi->current_gtid.domain_id == 0 &&
|
|
rgi->current_gtid.seq_no == 100) {
|
|
debug_sync_set_action(thd,
|
|
STRING_WITH_LEN("now SIGNAL scheduled_gtid_0_x_100"));
|
|
}
|
|
});
|
|
|
|
if(unlikely(thd->wait_for_commit_ptr) && group_rgi != NULL)
|
|
{
|
|
/*
|
|
This indicates that we get a new GTID event in the middle of
|
|
a not completed event group. This is corrupt binlog (the master
|
|
will never write such binlog), so it does not happen unless
|
|
someone tries to inject wrong crafted binlog, but let us still
|
|
try to handle it somewhat nicely.
|
|
*/
|
|
group_rgi->cleanup_context(thd, true);
|
|
finish_event_group(rpt, group_rgi->gtid_sub_id,
|
|
group_rgi->parallel_entry, group_rgi);
|
|
rpt->loc_free_rgi(group_rgi);
|
|
}
|
|
|
|
thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation;
|
|
in_event_group= true;
|
|
/*
|
|
If the standalone flag is set, then this event group consists of a
|
|
single statement (possibly preceeded by some Intvar_log_event and
|
|
similar), without any terminating COMMIT/ROLLBACK/XID.
|
|
*/
|
|
group_standalone=
|
|
(0 != (static_cast<Gtid_log_event *>(qev->ev)->flags2 &
|
|
Gtid_log_event::FL_STANDALONE));
|
|
|
|
event_gtid_sub_id= rgi->gtid_sub_id;
|
|
rgi->thd= thd;
|
|
|
|
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
|
skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);
|
|
|
|
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
|
|
{
|
|
skip_event_group= true;
|
|
rgi->worker_error= 1;
|
|
}
|
|
if (likely(!skip_event_group))
|
|
do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
|
|
|
|
/*
|
|
Register ourself to wait for the previous commit, if we need to do
|
|
such registration _and_ that previous commit has not already
|
|
occurred.
|
|
*/
|
|
register_wait_for_prior_event_group_commit(rgi, entry);
|
|
|
|
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
|
|
&did_enter_cond, &old_stage);
|
|
|
|
thd->wait_for_commit_ptr= &rgi->commit_orderer;
|
|
|
|
if (opt_gtid_ignore_duplicates &&
|
|
rgi->rli->mi->using_gtid != Master_info::USE_GTID_NO)
|
|
{
|
|
int res=
|
|
rpl_global_gtid_slave_state->check_duplicate_gtid(&rgi->current_gtid,
|
|
rgi);
|
|
if (res < 0)
|
|
{
|
|
/* Error. */
|
|
slave_output_error_info(rgi, thd);
|
|
signal_error_to_sql_driver_thread(thd, rgi, 1);
|
|
}
|
|
else if (!res)
|
|
{
|
|
/* GTID already applied by another master connection, skip. */
|
|
skip_event_group= true;
|
|
}
|
|
else
|
|
{
|
|
/* We have to apply the event. */
|
|
}
|
|
}
|
|
/*
|
|
If we are optimistically running transactions in parallel, but this
|
|
particular event group should not run in parallel with what came
|
|
before, then wait now for the prior transaction to complete its
|
|
commit.
|
|
*/
|
|
if (rgi->speculation == rpl_group_info::SPECULATE_WAIT &&
|
|
(err= thd->wait_for_prior_commit()))
|
|
{
|
|
slave_output_error_info(rgi, thd);
|
|
signal_error_to_sql_driver_thread(thd, rgi, 1);
|
|
}
|
|
}
|
|
|
|
group_rgi= rgi;
|
|
group_ending= is_group_ending(qev->ev, event_type);
|
|
/*
|
|
We do not unmark_start_commit() here in case of an explicit ROLLBACK
|
|
statement. Such events should be very rare, there is no real reason
|
|
to try to group commit them - on the contrary, it seems best to avoid
|
|
running them in parallel with following group commits, as with
|
|
ROLLBACK events we are already deep in dangerous corner cases with
|
|
mix of transactional and non-transactional tables or the like. And
|
|
avoiding the mark_start_commit() here allows us to keep an assertion
|
|
in ha_rollback_trans() that we do not rollback after doing
|
|
mark_start_commit().
|
|
*/
|
|
if (group_ending == 1 && likely(!rgi->worker_error))
|
|
{
|
|
/*
|
|
Do an extra check for (deadlock) kill here. This helps prevent a
|
|
lingering deadlock kill that occurred during normal DML processing to
|
|
propagate past the mark_start_commit(). If we detect a deadlock only
|
|
after mark_start_commit(), we have to unmark, which has at least a
|
|
theoretical possibility of leaving a window where it looks like all
|
|
transactions in a GCO have started committing, while in fact one
|
|
will need to rollback and retry. This is not supposed to be possible
|
|
(since there is a deadlock, at least one transaction should be
|
|
blocked from reaching commit), but this seems a fragile ensurance,
|
|
and there were historically a number of subtle bugs in this area.
|
|
*/
|
|
if (!thd->killed)
|
|
{
|
|
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
|
|
rgi->mark_start_commit();
|
|
DEBUG_SYNC(thd, "rpl_parallel_after_mark_start_commit");
|
|
}
|
|
}
|
|
|
|
/*
|
|
If the SQL thread is stopping, we just skip execution of all the
|
|
following event groups. We still do all the normal waiting and wakeup
|
|
processing between the event groups as a simple way to ensure that
|
|
everything is stopped and cleaned up correctly.
|
|
*/
|
|
if (likely(!rgi->worker_error) && !skip_event_group)
|
|
{
|
|
++rgi->retry_event_count;
|
|
#ifndef DBUG_OFF
|
|
err= 0;
|
|
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_xid",
|
|
if (event_type == XID_EVENT)
|
|
{
|
|
thd->clear_error();
|
|
thd->get_stmt_da()->reset_diagnostics_area();
|
|
my_error(ER_LOCK_DEADLOCK, MYF(0));
|
|
err= 1;
|
|
DEBUG_SYNC(thd, "rpl_parallel_simulate_temp_err_xid");
|
|
});
|
|
if (!err)
|
|
#endif
|
|
{
|
|
if (thd->check_killed())
|
|
{
|
|
thd->clear_error();
|
|
thd->get_stmt_da()->reset_diagnostics_area();
|
|
thd->send_kill_message();
|
|
err= 1;
|
|
}
|
|
else
|
|
err= rpt_handle_event(qev, rpt);
|
|
}
|
|
delete_or_keep_event_post_apply(rgi, event_type, qev->ev);
|
|
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
|
|
err= dbug_simulate_tmp_error(rgi, thd););
|
|
if (err)
|
|
{
|
|
convert_kill_to_deadlock_error(rgi);
|
|
if (has_temporary_error(thd) && slave_trans_retries > 0)
|
|
err= retry_event_group(rgi, rpt, qev);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
delete qev->ev;
|
|
thd->get_stmt_da()->set_overwrite_status(true);
|
|
err= thd->wait_for_prior_commit();
|
|
thd->get_stmt_da()->set_overwrite_status(false);
|
|
}
|
|
|
|
end_of_group=
|
|
in_event_group &&
|
|
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
|
|
group_ending);
|
|
|
|
rpt->loc_free_qev(qev);
|
|
|
|
if (unlikely(err))
|
|
{
|
|
if (!rgi->worker_error)
|
|
{
|
|
slave_output_error_info(rgi, thd);
|
|
signal_error_to_sql_driver_thread(thd, rgi, err);
|
|
}
|
|
thd->reset_killed();
|
|
}
|
|
if (end_of_group)
|
|
{
|
|
in_event_group= false;
|
|
finish_event_group(rpt, event_gtid_sub_id, entry, rgi);
|
|
rpt->loc_free_rgi(rgi);
|
|
thd->rgi_slave= group_rgi= rgi= NULL;
|
|
skip_event_group= false;
|
|
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
|
|
}
|
|
}
|
|
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
/*
|
|
Now that we have the lock, we can move everything from our local free
|
|
lists to the real free lists that are also accessible from the SQL
|
|
driver thread.
|
|
*/
|
|
rpt->batch_free();
|
|
|
|
if ((events= rpt->event_queue) != NULL)
|
|
{
|
|
/*
|
|
Take next group of events from the replication pool.
|
|
This is faster than having to wakeup the pool manager thread to give
|
|
us a new event.
|
|
*/
|
|
rpt->dequeue1(events);
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
goto more_events;
|
|
}
|
|
|
|
rpt->inuse_relaylog_refcount_update();
|
|
|
|
if (in_event_group && group_rgi->parallel_entry->force_abort)
|
|
{
|
|
/*
|
|
We are asked to abort, without getting the remaining events in the
|
|
current event group.
|
|
|
|
We have to rollback the current transaction and update the last
|
|
sub_id value so that SQL thread will know we are done with the
|
|
half-processed event group.
|
|
*/
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
signal_error_to_sql_driver_thread(thd, group_rgi, 1);
|
|
finish_event_group(rpt, group_rgi->gtid_sub_id,
|
|
group_rgi->parallel_entry, group_rgi);
|
|
in_event_group= false;
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
rpt->free_rgi(group_rgi);
|
|
thd->rgi_slave= group_rgi= NULL;
|
|
skip_event_group= false;
|
|
}
|
|
if (!in_event_group)
|
|
{
|
|
/* If we are in a FLUSH TABLES FOR READ LOCK, wait for it */
|
|
while (rpt->current_entry && rpt->pause_for_ftwrl)
|
|
{
|
|
/*
|
|
We are currently in the delicate process of pausing parallel
|
|
replication while FLUSH TABLES WITH READ LOCK is starting. We must
|
|
not de-allocate the thread (setting rpt->current_owner= NULL) until
|
|
rpl_unpause_after_ftwrl() has woken us up.
|
|
*/
|
|
rpl_parallel_entry *e= rpt->current_entry;
|
|
/*
|
|
Wait for rpl_unpause_after_ftwrl() to wake us up.
|
|
Note that rpl_pause_for_ftwrl() may wait for 'e->pause_sub_id'
|
|
to change. This should happen eventually in finish_event_group()
|
|
*/
|
|
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
if (rpt->pause_for_ftwrl)
|
|
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
|
|
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
}
|
|
|
|
rpt->current_owner= NULL;
|
|
/* Tell wait_for_done() that we are done, if it is waiting. */
|
|
if (likely(rpt->current_entry) &&
|
|
unlikely(rpt->current_entry->force_abort))
|
|
mysql_cond_broadcast(&rpt->COND_rpl_thread_stop);
|
|
|
|
rpt->current_entry= NULL;
|
|
if (!rpt->stop)
|
|
rpt->pool->release_thread(rpt);
|
|
}
|
|
}
|
|
|
|
rpt->thd= NULL;
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
|
|
thd->clear_error();
|
|
thd->catalog= 0;
|
|
thd->reset_query();
|
|
thd->reset_db(NULL, 0);
|
|
thd_proc_info(thd, "Slave worker thread exiting");
|
|
thd->temporary_tables= 0;
|
|
|
|
THD_CHECK_SENTRY(thd);
|
|
unlink_not_visible_thd(thd);
|
|
delete thd;
|
|
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
rpt->running= false;
|
|
mysql_cond_signal(&rpt->COND_rpl_thread);
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
|
|
my_thread_end();
|
|
|
|
return NULL;
|
|
}
|
|
|
|
|
|
static void
|
|
dealloc_gco(group_commit_orderer *gco)
|
|
{
|
|
mysql_cond_destroy(&gco->COND_group_commit_orderer);
|
|
my_free(gco);
|
|
}
|
|
|
|
/**
|
|
Change thread count for global parallel worker threads
|
|
|
|
@param pool parallel thread pool
|
|
@param new_count Number of threads to be in pool. 0 in shutdown
|
|
@param force Force thread count to new_count even if slave
|
|
threads are running
|
|
|
|
By default we don't resize pool of there are running threads.
|
|
However during shutdown we will always do it.
|
|
This is needed as any_slave_sql_running() returns 1 during shutdown
|
|
as we don't want to access master_info while
|
|
Master_info_index::free_connections are running.
|
|
*/
|
|
|
|
static int
|
|
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
|
uint32 new_count, bool force)
|
|
{
|
|
uint32 i;
|
|
rpl_parallel_thread **old_list= NULL;
|
|
rpl_parallel_thread **new_list= NULL;
|
|
rpl_parallel_thread *new_free_list= NULL;
|
|
rpl_parallel_thread *rpt_array= NULL;
|
|
int res;
|
|
|
|
if ((res= pool_mark_busy(pool, current_thd)))
|
|
return res;
|
|
|
|
/* Protect against parallel pool resizes */
|
|
if (pool->count == new_count)
|
|
{
|
|
pool_mark_not_busy(pool);
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
If we are about to delete pool, do an extra check that there are no new
|
|
slave threads running since we marked pool busy
|
|
*/
|
|
if (!new_count && !force)
|
|
{
|
|
if (any_slave_sql_running())
|
|
{
|
|
DBUG_PRINT("warning",
|
|
("SQL threads running while trying to reset parallel pool"));
|
|
pool_mark_not_busy(pool);
|
|
return 0; // Ok to not resize pool
|
|
}
|
|
}
|
|
|
|
/*
|
|
Allocate the new list of threads up-front.
|
|
That way, if we fail half-way, we only need to free whatever we managed
|
|
to allocate, and will not be left with a half-functional thread pool.
|
|
*/
|
|
if (new_count &&
|
|
!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
|
|
&new_list, new_count*sizeof(*new_list),
|
|
&rpt_array, new_count*sizeof(*rpt_array),
|
|
NULL))
|
|
{
|
|
my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) +
|
|
new_count*sizeof(*rpt_array))));
|
|
goto err;
|
|
}
|
|
|
|
for (i= 0; i < new_count; ++i)
|
|
{
|
|
pthread_t th;
|
|
|
|
new_list[i]= &rpt_array[i];
|
|
new_list[i]->delay_start= true;
|
|
mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread,
|
|
MY_MUTEX_INIT_SLOW);
|
|
mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
|
|
mysql_cond_init(key_COND_rpl_thread_queue,
|
|
&new_list[i]->COND_rpl_thread_queue, NULL);
|
|
mysql_cond_init(key_COND_rpl_thread_stop,
|
|
&new_list[i]->COND_rpl_thread_stop, NULL);
|
|
new_list[i]->pool= pool;
|
|
if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib,
|
|
handle_rpl_parallel_thread, new_list[i]))
|
|
{
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
goto err;
|
|
}
|
|
new_list[i]->next= new_free_list;
|
|
new_free_list= new_list[i];
|
|
}
|
|
|
|
/*
|
|
Grab each old thread in turn, and signal it to stop.
|
|
|
|
Note that since we require all replication threads to be stopped before
|
|
changing the parallel replication worker thread pool, all the threads will
|
|
be already idle and will terminate immediately.
|
|
*/
|
|
for (i= 0; i < pool->count; ++i)
|
|
{
|
|
rpl_parallel_thread *rpt;
|
|
|
|
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
|
|
while ((rpt= pool->free_list) == NULL)
|
|
mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
|
|
pool->free_list= rpt->next;
|
|
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
rpt->stop= true;
|
|
mysql_cond_signal(&rpt->COND_rpl_thread);
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
}
|
|
|
|
for (i= 0; i < pool->count; ++i)
|
|
{
|
|
rpl_parallel_thread *rpt= pool->threads[i];
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
while (rpt->running)
|
|
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
mysql_mutex_destroy(&rpt->LOCK_rpl_thread);
|
|
mysql_cond_destroy(&rpt->COND_rpl_thread);
|
|
while (rpt->qev_free_list)
|
|
{
|
|
rpl_parallel_thread::queued_event *next= rpt->qev_free_list->next;
|
|
my_free(rpt->qev_free_list);
|
|
rpt->qev_free_list= next;
|
|
}
|
|
while (rpt->rgi_free_list)
|
|
{
|
|
rpl_group_info *next= rpt->rgi_free_list->next;
|
|
delete rpt->rgi_free_list;
|
|
rpt->rgi_free_list= next;
|
|
}
|
|
while (rpt->gco_free_list)
|
|
{
|
|
group_commit_orderer *next= rpt->gco_free_list->next_gco;
|
|
dealloc_gco(rpt->gco_free_list);
|
|
rpt->gco_free_list= next;
|
|
}
|
|
}
|
|
|
|
old_list= pool->threads;
|
|
if (new_count < pool->count)
|
|
pool->count= new_count;
|
|
pool->threads= new_list;
|
|
if (new_count > pool->count)
|
|
pool->count= new_count;
|
|
my_free(old_list);
|
|
pool->free_list= new_free_list;
|
|
for (i= 0; i < pool->count; ++i)
|
|
{
|
|
mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread);
|
|
pool->threads[i]->delay_start= false;
|
|
mysql_cond_signal(&pool->threads[i]->COND_rpl_thread);
|
|
while (!pool->threads[i]->running)
|
|
mysql_cond_wait(&pool->threads[i]->COND_rpl_thread,
|
|
&pool->threads[i]->LOCK_rpl_thread);
|
|
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
|
|
}
|
|
|
|
pool_mark_not_busy(pool);
|
|
|
|
return 0;
|
|
|
|
err:
|
|
if (new_list)
|
|
{
|
|
while (new_free_list)
|
|
{
|
|
mysql_mutex_lock(&new_free_list->LOCK_rpl_thread);
|
|
new_free_list->delay_start= false;
|
|
new_free_list->stop= true;
|
|
mysql_cond_signal(&new_free_list->COND_rpl_thread);
|
|
while (!new_free_list->running)
|
|
mysql_cond_wait(&new_free_list->COND_rpl_thread,
|
|
&new_free_list->LOCK_rpl_thread);
|
|
while (new_free_list->running)
|
|
mysql_cond_wait(&new_free_list->COND_rpl_thread,
|
|
&new_free_list->LOCK_rpl_thread);
|
|
mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread);
|
|
new_free_list= new_free_list->next;
|
|
}
|
|
my_free(new_list);
|
|
}
|
|
pool_mark_not_busy(pool);
|
|
return 1;
|
|
}
|
|
|
|
/*
|
|
Deactivate the parallel replication thread pool, if there are now no more
|
|
SQL threads running.
|
|
*/
|
|
|
|
int rpl_parallel_resize_pool_if_no_slaves(void)
|
|
{
|
|
/* master_info_index is set to NULL on shutdown */
|
|
if (opt_slave_parallel_threads > 0 && !any_slave_sql_running())
|
|
return rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
|
|
return 0;
|
|
}
|
|
|
|
|
|
/**
|
|
Pool activation is preceeded by taking a "lock" of pool_mark_busy
|
|
which guarantees the number of running slaves drops to zero atomicly
|
|
with the number of pool workers.
|
|
This resolves race between the function caller thread and one
|
|
that may be attempting to deactivate the pool.
|
|
*/
|
|
int
|
|
rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
|
|
{
|
|
int rc= 0;
|
|
|
|
if ((rc= pool_mark_busy(pool, current_thd)))
|
|
return rc; // killed
|
|
|
|
if (!pool->count)
|
|
{
|
|
pool_mark_not_busy(pool);
|
|
rc= rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads,
|
|
0);
|
|
}
|
|
else
|
|
{
|
|
pool_mark_not_busy(pool);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
|
|
int
|
|
rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool)
|
|
{
|
|
return rpl_parallel_change_thread_count(pool, 0, 0);
|
|
}
|
|
|
|
|
|
void
|
|
rpl_parallel_thread::batch_free()
|
|
{
|
|
mysql_mutex_assert_owner(&LOCK_rpl_thread);
|
|
if (loc_qev_list)
|
|
{
|
|
*loc_qev_last_ptr_ptr= qev_free_list;
|
|
qev_free_list= loc_qev_list;
|
|
loc_qev_list= NULL;
|
|
dequeue2(loc_qev_size);
|
|
/* Signal that our queue can now accept more events. */
|
|
mysql_cond_signal(&COND_rpl_thread_queue);
|
|
loc_qev_size= 0;
|
|
qev_free_pending= 0;
|
|
}
|
|
if (loc_rgi_list)
|
|
{
|
|
*loc_rgi_last_ptr_ptr= rgi_free_list;
|
|
rgi_free_list= loc_rgi_list;
|
|
loc_rgi_list= NULL;
|
|
}
|
|
if (loc_gco_list)
|
|
{
|
|
*loc_gco_last_ptr_ptr= gco_free_list;
|
|
gco_free_list= loc_gco_list;
|
|
loc_gco_list= NULL;
|
|
}
|
|
}
|
|
|
|
|
|
void
|
|
rpl_parallel_thread::inuse_relaylog_refcount_update()
|
|
{
|
|
inuse_relaylog *ir= accumulated_ir_last;
|
|
if (ir)
|
|
{
|
|
my_atomic_add64(&ir->dequeued_count, accumulated_ir_count);
|
|
accumulated_ir_count= 0;
|
|
accumulated_ir_last= NULL;
|
|
}
|
|
}
|
|
|
|
|
|
rpl_parallel_thread::queued_event *
|
|
rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
|
|
{
|
|
queued_event *qev;
|
|
mysql_mutex_assert_owner(&LOCK_rpl_thread);
|
|
if ((qev= qev_free_list))
|
|
qev_free_list= qev->next;
|
|
else if(!(qev= (queued_event *)my_malloc(sizeof(*qev), MYF(0))))
|
|
{
|
|
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
|
|
return NULL;
|
|
}
|
|
qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT;
|
|
qev->ev= ev;
|
|
qev->event_size= (size_t)event_size;
|
|
qev->next= NULL;
|
|
return qev;
|
|
}
|
|
|
|
|
|
rpl_parallel_thread::queued_event *
|
|
rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
|
|
Relay_log_info *rli)
|
|
{
|
|
queued_event *qev= get_qev_common(ev, event_size);
|
|
if (!qev)
|
|
return NULL;
|
|
strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
|
|
qev->event_relay_log_pos= rli->event_relay_log_pos;
|
|
qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
|
|
strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name);
|
|
return qev;
|
|
}
|
|
|
|
|
|
rpl_parallel_thread::queued_event *
|
|
rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
|
|
const char *relay_log_name,
|
|
ulonglong event_pos, ulonglong event_size)
|
|
{
|
|
queued_event *qev= get_qev_common(ev, event_size);
|
|
if (!qev)
|
|
return NULL;
|
|
qev->rgi= orig_qev->rgi;
|
|
strcpy(qev->event_relay_log_name, relay_log_name);
|
|
qev->event_relay_log_pos= event_pos;
|
|
qev->future_event_relay_log_pos= event_pos+event_size;
|
|
strcpy(qev->future_event_master_log_name,
|
|
orig_qev->future_event_master_log_name);
|
|
return qev;
|
|
}
|
|
|
|
|
|
void
|
|
rpl_parallel_thread::loc_free_qev(rpl_parallel_thread::queued_event *qev)
|
|
{
|
|
inuse_relaylog *ir= qev->ir;
|
|
inuse_relaylog *last_ir= accumulated_ir_last;
|
|
if (ir != last_ir)
|
|
{
|
|
if (last_ir)
|
|
inuse_relaylog_refcount_update();
|
|
accumulated_ir_last= ir;
|
|
}
|
|
++accumulated_ir_count;
|
|
if (!loc_qev_list)
|
|
loc_qev_last_ptr_ptr= &qev->next;
|
|
else
|
|
qev->next= loc_qev_list;
|
|
loc_qev_list= qev;
|
|
loc_qev_size+= qev->event_size;
|
|
/*
|
|
We want to release to the global free list only occasionally, to avoid
|
|
having to take the LOCK_rpl_thread muted too many times.
|
|
|
|
However, we do need to release regularly. If we let the unreleased part
|
|
grow too large, then the SQL driver thread may go to sleep waiting for
|
|
the queue to drop below opt_slave_parallel_max_queued, and this in turn
|
|
can stall all other worker threads for more stuff to do.
|
|
*/
|
|
if (++qev_free_pending >= QEV_BATCH_FREE ||
|
|
loc_qev_size >= opt_slave_parallel_max_queued/3)
|
|
{
|
|
mysql_mutex_lock(&LOCK_rpl_thread);
|
|
batch_free();
|
|
mysql_mutex_unlock(&LOCK_rpl_thread);
|
|
}
|
|
}
|
|
|
|
|
|
void
|
|
rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
|
|
{
|
|
mysql_mutex_assert_owner(&LOCK_rpl_thread);
|
|
qev->next= qev_free_list;
|
|
qev_free_list= qev;
|
|
}
|
|
|
|
|
|
rpl_group_info*
|
|
rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
|
|
rpl_parallel_entry *e, ulonglong event_size)
|
|
{
|
|
rpl_group_info *rgi;
|
|
mysql_mutex_assert_owner(&LOCK_rpl_thread);
|
|
if ((rgi= rgi_free_list))
|
|
{
|
|
rgi_free_list= rgi->next;
|
|
rgi->reinit(rli);
|
|
}
|
|
else
|
|
{
|
|
if(!(rgi= new rpl_group_info(rli)))
|
|
{
|
|
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*rgi));
|
|
return NULL;
|
|
}
|
|
rgi->is_parallel_exec = true;
|
|
}
|
|
if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()) &&
|
|
!rgi->deferred_events)
|
|
rgi->deferred_events= new Deferred_log_events(rli);
|
|
if (event_group_new_gtid(rgi, gtid_ev))
|
|
{
|
|
free_rgi(rgi);
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
|
|
return NULL;
|
|
}
|
|
rgi->parallel_entry= e;
|
|
rgi->relay_log= rli->last_inuse_relaylog;
|
|
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
|
|
rgi->retry_event_count= 0;
|
|
rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE;
|
|
|
|
return rgi;
|
|
}
|
|
|
|
|
|
void
|
|
rpl_parallel_thread::loc_free_rgi(rpl_group_info *rgi)
|
|
{
|
|
DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
|
|
rgi->free_annotate_event();
|
|
if (!loc_rgi_list)
|
|
loc_rgi_last_ptr_ptr= &rgi->next;
|
|
else
|
|
rgi->next= loc_rgi_list;
|
|
loc_rgi_list= rgi;
|
|
}
|
|
|
|
|
|
void
|
|
rpl_parallel_thread::free_rgi(rpl_group_info *rgi)
|
|
{
|
|
mysql_mutex_assert_owner(&LOCK_rpl_thread);
|
|
DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
|
|
rgi->free_annotate_event();
|
|
rgi->next= rgi_free_list;
|
|
rgi_free_list= rgi;
|
|
}
|
|
|
|
|
|
group_commit_orderer *
|
|
rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev,
|
|
uint64 prior_sub_id)
|
|
{
|
|
group_commit_orderer *gco;
|
|
mysql_mutex_assert_owner(&LOCK_rpl_thread);
|
|
if ((gco= gco_free_list))
|
|
gco_free_list= gco->next_gco;
|
|
else if(!(gco= (group_commit_orderer *)my_malloc(sizeof(*gco), MYF(0))))
|
|
{
|
|
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gco));
|
|
return NULL;
|
|
}
|
|
mysql_cond_init(key_COND_group_commit_orderer,
|
|
&gco->COND_group_commit_orderer, NULL);
|
|
gco->wait_count= wait_count;
|
|
gco->prev_gco= prev;
|
|
gco->next_gco= NULL;
|
|
gco->prior_sub_id= prior_sub_id;
|
|
gco->installed= false;
|
|
gco->flags= 0;
|
|
return gco;
|
|
}
|
|
|
|
|
|
void
|
|
rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
|
|
{
|
|
if (!loc_gco_list)
|
|
loc_gco_last_ptr_ptr= &gco->next_gco;
|
|
else
|
|
gco->next_gco= loc_gco_list;
|
|
loc_gco_list= gco;
|
|
}
|
|
|
|
|
|
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
|
|
: threads(0), free_list(0), count(0), inited(false), busy(false)
|
|
{
|
|
}
|
|
|
|
|
|
int
|
|
rpl_parallel_thread_pool::init(uint32 size)
|
|
{
|
|
threads= NULL;
|
|
free_list= NULL;
|
|
count= 0;
|
|
busy= false;
|
|
|
|
mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
|
|
MY_MUTEX_INIT_SLOW);
|
|
mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL);
|
|
inited= true;
|
|
|
|
/*
|
|
The pool is initially empty. Threads will be spawned when a slave SQL
|
|
thread is started.
|
|
*/
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
void
|
|
rpl_parallel_thread_pool::destroy()
|
|
{
|
|
if (!inited)
|
|
return;
|
|
rpl_parallel_change_thread_count(this, 0, 1);
|
|
mysql_mutex_destroy(&LOCK_rpl_thread_pool);
|
|
mysql_cond_destroy(&COND_rpl_thread_pool);
|
|
inited= false;
|
|
}
|
|
|
|
|
|
/*
|
|
Wait for a worker thread to become idle. When one does, grab the thread for
|
|
our use and return it.
|
|
|
|
Note that we return with the worker threads's LOCK_rpl_thread mutex locked.
|
|
*/
|
|
struct rpl_parallel_thread *
|
|
rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
|
|
rpl_parallel_entry *entry)
|
|
{
|
|
rpl_parallel_thread *rpt;
|
|
|
|
DBUG_ASSERT(count > 0);
|
|
mysql_mutex_lock(&LOCK_rpl_thread_pool);
|
|
while (unlikely(busy) || !(rpt= free_list))
|
|
mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
|
|
free_list= rpt->next;
|
|
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
rpt->current_owner= owner;
|
|
rpt->current_entry= entry;
|
|
|
|
return rpt;
|
|
}
|
|
|
|
|
|
/*
|
|
Release a thread to the thread pool.
|
|
The thread should be locked, and should not have any work queued for it.
|
|
*/
|
|
void
|
|
rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
|
|
{
|
|
rpl_parallel_thread *list;
|
|
|
|
mysql_mutex_assert_owner(&rpt->LOCK_rpl_thread);
|
|
DBUG_ASSERT(rpt->current_owner == NULL);
|
|
mysql_mutex_lock(&LOCK_rpl_thread_pool);
|
|
list= free_list;
|
|
rpt->next= list;
|
|
free_list= rpt;
|
|
if (!list)
|
|
mysql_cond_broadcast(&COND_rpl_thread_pool);
|
|
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
|
|
}
|
|
|
|
|
|
/*
|
|
Obtain a worker thread that we can queue an event to.
|
|
|
|
Each invocation allocates a new worker thread, to maximise
|
|
parallelism. However, only up to a maximum of
|
|
--slave-domain-parallel-threads workers can be occupied by a single
|
|
replication domain; after that point, we start re-using worker threads that
|
|
are still executing events that were queued earlier for this thread.
|
|
|
|
We never queue more than --rpl-parallel-wait-queue_max amount of events
|
|
for one worker, to avoid the SQL driver thread using up all memory with
|
|
queued events while worker threads are stalling.
|
|
|
|
Note that this function returns with rpl_parallel_thread::LOCK_rpl_thread
|
|
locked. Exception is if we were killed, in which case NULL is returned.
|
|
|
|
The *did_enter_cond flag is set true if we had to wait for a worker thread
|
|
to become free (with mysql_cond_wait()). If so, old_stage will also be set,
|
|
and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead
|
|
of mysql_mutex_unlock.
|
|
|
|
If the flag `reuse' is set, the last worker thread will be returned again,
|
|
if it is still available. Otherwise a new worker thread is allocated.
|
|
*/
|
|
rpl_parallel_thread *
|
|
rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
|
|
PSI_stage_info *old_stage, bool reuse)
|
|
{
|
|
uint32 idx;
|
|
Relay_log_info *rli= rgi->rli;
|
|
rpl_parallel_thread *thr;
|
|
|
|
idx= rpl_thread_idx;
|
|
if (!reuse)
|
|
{
|
|
++idx;
|
|
if (idx >= rpl_thread_max)
|
|
idx= 0;
|
|
rpl_thread_idx= idx;
|
|
}
|
|
thr= rpl_threads[idx];
|
|
if (thr)
|
|
{
|
|
*did_enter_cond= false;
|
|
mysql_mutex_lock(&thr->LOCK_rpl_thread);
|
|
for (;;)
|
|
{
|
|
if (thr->current_owner != &rpl_threads[idx])
|
|
{
|
|
/*
|
|
The worker thread became idle, and returned to the free list and
|
|
possibly was allocated to a different request. So we should allocate
|
|
a new worker thread.
|
|
*/
|
|
unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
|
|
did_enter_cond, old_stage);
|
|
thr= NULL;
|
|
break;
|
|
}
|
|
else if (thr->queued_size <= opt_slave_parallel_max_queued)
|
|
{
|
|
/* The thread is ready to queue into. */
|
|
break;
|
|
}
|
|
else if (rli->sql_driver_thd->check_killed())
|
|
{
|
|
unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
|
|
did_enter_cond, old_stage);
|
|
my_error(ER_CONNECTION_KILLED, MYF(0));
|
|
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
|
|
{
|
|
debug_sync_set_action(rli->sql_driver_thd,
|
|
STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
|
|
};);
|
|
slave_output_error_info(rgi, rli->sql_driver_thd);
|
|
return NULL;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
We have reached the limit of how much memory we are allowed to use
|
|
for queuing events, so wait for the thread to consume some of its
|
|
queue.
|
|
*/
|
|
if (!*did_enter_cond)
|
|
{
|
|
/*
|
|
We need to do the debug_sync before ENTER_COND().
|
|
Because debug_sync changes the thd->mysys_var->current_mutex,
|
|
and this can cause THD::awake to use the wrong mutex.
|
|
*/
|
|
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
|
|
{
|
|
debug_sync_set_action(rli->sql_driver_thd,
|
|
STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
|
|
};);
|
|
rli->sql_driver_thd->ENTER_COND(&thr->COND_rpl_thread_queue,
|
|
&thr->LOCK_rpl_thread,
|
|
&stage_waiting_for_room_in_worker_thread,
|
|
old_stage);
|
|
*did_enter_cond= true;
|
|
}
|
|
mysql_cond_wait(&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread);
|
|
}
|
|
}
|
|
}
|
|
if (!thr)
|
|
rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx],
|
|
this);
|
|
|
|
return thr;
|
|
}
|
|
|
|
static void
|
|
free_rpl_parallel_entry(void *element)
|
|
{
|
|
rpl_parallel_entry *e= (rpl_parallel_entry *)element;
|
|
while (e->current_gco)
|
|
{
|
|
group_commit_orderer *prev_gco= e->current_gco->prev_gco;
|
|
dealloc_gco(e->current_gco);
|
|
e->current_gco= prev_gco;
|
|
}
|
|
mysql_cond_destroy(&e->COND_parallel_entry);
|
|
mysql_mutex_destroy(&e->LOCK_parallel_entry);
|
|
my_free(e);
|
|
}
|
|
|
|
|
|
rpl_parallel::rpl_parallel() :
|
|
current(NULL), sql_thread_stopping(false)
|
|
{
|
|
my_hash_init(&domain_hash, &my_charset_bin, 32,
|
|
offsetof(rpl_parallel_entry, domain_id), sizeof(uint32),
|
|
NULL, free_rpl_parallel_entry, HASH_UNIQUE);
|
|
}
|
|
|
|
|
|
void
|
|
rpl_parallel::reset()
|
|
{
|
|
my_hash_reset(&domain_hash);
|
|
current= NULL;
|
|
sql_thread_stopping= false;
|
|
}
|
|
|
|
|
|
rpl_parallel::~rpl_parallel()
|
|
{
|
|
my_hash_free(&domain_hash);
|
|
}
|
|
|
|
|
|
rpl_parallel_entry *
|
|
rpl_parallel::find(uint32 domain_id)
|
|
{
|
|
struct rpl_parallel_entry *e;
|
|
|
|
if (!(e= (rpl_parallel_entry *)my_hash_search(&domain_hash,
|
|
(const uchar *)&domain_id, 0)))
|
|
{
|
|
/* Allocate a new, empty one. */
|
|
ulong count= opt_slave_domain_parallel_threads;
|
|
if (count == 0 || count > opt_slave_parallel_threads)
|
|
count= opt_slave_parallel_threads;
|
|
rpl_parallel_thread **p;
|
|
if (!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
|
|
&e, sizeof(*e),
|
|
&p, count*sizeof(*p),
|
|
NULL))
|
|
{
|
|
my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
|
|
return NULL;
|
|
}
|
|
e->rpl_threads= p;
|
|
e->rpl_thread_max= count;
|
|
e->domain_id= domain_id;
|
|
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
|
|
e->pause_sub_id= (uint64)ULONGLONG_MAX;
|
|
if (my_hash_insert(&domain_hash, (uchar *)e))
|
|
{
|
|
my_free(e);
|
|
return NULL;
|
|
}
|
|
mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry,
|
|
MY_MUTEX_INIT_FAST);
|
|
mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
|
|
}
|
|
else
|
|
e->force_abort= false;
|
|
|
|
return e;
|
|
}
|
|
|
|
/**
|
|
Wait until all sql worker threads has stopped processing
|
|
|
|
This is called when sql thread has been killed/stopped
|
|
*/
|
|
|
|
void
|
|
rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
|
|
{
|
|
struct rpl_parallel_entry *e;
|
|
rpl_parallel_thread *rpt;
|
|
uint32 i, j;
|
|
|
|
/*
|
|
First signal all workers that they must force quit; no more events will
|
|
be queued to complete any partial event groups executed.
|
|
*/
|
|
for (i= 0; i < domain_hash.records; ++i)
|
|
{
|
|
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
|
|
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
|
/*
|
|
We want the worker threads to stop as quickly as is safe. If the slave
|
|
SQL threads are behind, we could have significant amount of events
|
|
queued for the workers, and we want to stop without waiting for them
|
|
all to be applied first. But if any event group has already started
|
|
executing in a worker, we want to be sure that all prior event groups
|
|
are also executed, so that we stop at a consistent point in the binlog
|
|
stream (per replication domain).
|
|
|
|
All event groups wait for e->count_committing_event_groups to reach
|
|
the value of group_commit_orderer::wait_count before starting to
|
|
execute. Thus, at this point we know that any event group with a
|
|
strictly larger wait_count are safe to skip, none of them can have
|
|
started executing yet. So we set e->stop_count here and use it to
|
|
decide in the worker threads whether to continue executing an event
|
|
group or whether to skip it, when force_abort is set.
|
|
|
|
If we stop due to reaching the START SLAVE UNTIL condition, then we
|
|
need to continue executing any queued events up to that point.
|
|
*/
|
|
e->force_abort= true;
|
|
e->stop_count= rli->stop_for_until ?
|
|
e->count_queued_event_groups : e->count_committing_event_groups;
|
|
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
|
for (j= 0; j < e->rpl_thread_max; ++j)
|
|
{
|
|
if ((rpt= e->rpl_threads[j]))
|
|
{
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
if (rpt->current_owner == &e->rpl_threads[j])
|
|
mysql_cond_signal(&rpt->COND_rpl_thread);
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
}
|
|
}
|
|
}
|
|
DBUG_EXECUTE_IF("rpl_parallel_wait_for_done_trigger",
|
|
{
|
|
debug_sync_set_action(thd,
|
|
STRING_WITH_LEN("now SIGNAL wait_for_done_waiting"));
|
|
};);
|
|
|
|
for (i= 0; i < domain_hash.records; ++i)
|
|
{
|
|
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
|
|
for (j= 0; j < e->rpl_thread_max; ++j)
|
|
{
|
|
if ((rpt= e->rpl_threads[j]))
|
|
{
|
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
|
while (rpt->current_owner == &e->rpl_threads[j])
|
|
mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread);
|
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
This function handles the case where the SQL driver thread reached the
|
|
START SLAVE UNTIL position; we stop queueing more events but continue
|
|
processing remaining, already queued events; then use executes manual
|
|
STOP SLAVE; then this function signals to worker threads that they
|
|
should stop the processing of any remaining queued events.
|
|
*/
|
|
void
|
|
rpl_parallel::stop_during_until()
|
|
{
|
|
struct rpl_parallel_entry *e;
|
|
uint32 i;
|
|
|
|
for (i= 0; i < domain_hash.records; ++i)
|
|
{
|
|
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
|
|
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
|
if (e->force_abort)
|
|
e->stop_count= e->count_committing_event_groups;
|
|
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
|
}
|
|
}
|
|
|
|
|
|
bool
|
|
rpl_parallel::workers_idle()
|
|
{
|
|
struct rpl_parallel_entry *e;
|
|
uint32 i, max_i;
|
|
|
|
max_i= domain_hash.records;
|
|
for (i= 0; i < max_i; ++i)
|
|
{
|
|
bool active;
|
|
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
|
|
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
|
active= e->current_sub_id > e->last_committed_sub_id;
|
|
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
|
if (active)
|
|
break;
|
|
}
|
|
return (i == max_i);
|
|
}
|
|
|
|
|
|
int
|
|
rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
|
|
Format_description_log_event *fdev)
|
|
{
|
|
uint32 idx;
|
|
rpl_parallel_thread *thr;
|
|
rpl_parallel_thread::queued_event *qev;
|
|
Relay_log_info *rli= rgi->rli;
|
|
|
|
/*
|
|
We only need to queue the server restart if we still have a thread working
|
|
on a (potentially partial) event group.
|
|
|
|
If the last thread we queued for has finished, then it cannot have any
|
|
partial event group that needs aborting.
|
|
|
|
Thus there is no need for the full complexity of choose_thread(). We only
|
|
need to check if we have a current worker thread, and queue for it if so.
|
|
*/
|
|
idx= rpl_thread_idx;
|
|
thr= rpl_threads[idx];
|
|
if (!thr)
|
|
return 0;
|
|
mysql_mutex_lock(&thr->LOCK_rpl_thread);
|
|
if (thr->current_owner != &rpl_threads[idx])
|
|
{
|
|
/* No active worker thread, so no need to queue the master restart. */
|
|
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
|
|
return 0;
|
|
}
|
|
|
|
if (!(qev= thr->get_qev(fdev, 0, rli)))
|
|
{
|
|
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
|
|
return 1;
|
|
}
|
|
|
|
qev->rgi= rgi;
|
|
qev->typ= rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART;
|
|
qev->entry_for_queued= this;
|
|
qev->ir= rli->last_inuse_relaylog;
|
|
++qev->ir->queued_count;
|
|
thr->enqueue(qev);
|
|
mysql_cond_signal(&thr->COND_rpl_thread);
|
|
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
|
|
return 0;
|
|
}
|
|
|
|
|
|
int
|
|
rpl_parallel::wait_for_workers_idle(THD *thd)
|
|
{
|
|
uint32 i, max_i;
|
|
|
|
/*
|
|
The domain_hash is only accessed by the SQL driver thread, so it is safe
|
|
to iterate over without a lock.
|
|
*/
|
|
max_i= domain_hash.records;
|
|
for (i= 0; i < max_i; ++i)
|
|
{
|
|
PSI_stage_info old_stage;
|
|
struct rpl_parallel_entry *e;
|
|
int err= 0;
|
|
|
|
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
|
|
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
|
++e->need_sub_id_signal;
|
|
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
|
|
&stage_waiting_for_workers_idle, &old_stage);
|
|
while (e->current_sub_id > e->last_committed_sub_id)
|
|
{
|
|
if (thd->check_killed())
|
|
{
|
|
thd->send_kill_message();
|
|
err= 1;
|
|
break;
|
|
}
|
|
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
|
|
}
|
|
--e->need_sub_id_signal;
|
|
thd->EXIT_COND(&old_stage);
|
|
if (err)
|
|
return err;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
/*
|
|
Handle seeing a GTID during slave restart in GTID mode. If we stopped with
|
|
different replication domains having reached different positions in the relay
|
|
log, we need to skip event groups in domains that are further progressed.
|
|
|
|
Updates the state with the seen GTID, and returns true if this GTID should
|
|
be skipped, false otherwise.
|
|
*/
|
|
bool
|
|
process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid)
|
|
{
|
|
slave_connection_state::entry *gtid_entry;
|
|
slave_connection_state *state= &rli->restart_gtid_pos;
|
|
|
|
if (likely(state->count() == 0) ||
|
|
!(gtid_entry= state->find_entry(gtid->domain_id)))
|
|
return false;
|
|
if (gtid->server_id == gtid_entry->gtid.server_id)
|
|
{
|
|
uint64 seq_no= gtid_entry->gtid.seq_no;
|
|
if (gtid->seq_no >= seq_no)
|
|
{
|
|
/*
|
|
This domain has reached its start position. So remove it, so that
|
|
further events will be processed normally.
|
|
*/
|
|
state->remove(>id_entry->gtid);
|
|
}
|
|
return gtid->seq_no <= seq_no;
|
|
}
|
|
else
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
This is used when we get an error during processing in do_event();
|
|
We will not queue any event to the thread, but we still need to wake it up
|
|
to be sure that it will be returned to the pool.
|
|
*/
|
|
static void
|
|
abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
|
|
bool *did_enter_cond, PSI_stage_info *old_stage)
|
|
{
|
|
unlock_or_exit_cond(thd, &cur_thread->LOCK_rpl_thread,
|
|
did_enter_cond, old_stage);
|
|
mysql_cond_signal(&cur_thread->COND_rpl_thread);
|
|
}
|
|
|
|
|
|
/*
|
|
do_event() is executed by the sql_driver_thd thread.
|
|
It's main purpose is to find a thread that can execute the query.
|
|
|
|
@retval 0 ok, event was accepted
|
|
@retval 1 error
|
|
@retval -1 event should be executed serially, in the sql driver thread
|
|
*/
|
|
|
|
int
|
|
rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|
ulonglong event_size)
|
|
{
|
|
rpl_parallel_entry *e;
|
|
rpl_parallel_thread *cur_thread;
|
|
rpl_parallel_thread::queued_event *qev;
|
|
rpl_group_info *rgi= NULL;
|
|
Relay_log_info *rli= serial_rgi->rli;
|
|
enum Log_event_type typ;
|
|
bool is_group_event;
|
|
bool did_enter_cond= false;
|
|
PSI_stage_info old_stage;
|
|
|
|
DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE(););
|
|
/* Handle master log name change, seen in Rotate_log_event. */
|
|
typ= ev->get_type_code();
|
|
if (unlikely(typ == ROTATE_EVENT))
|
|
{
|
|
Rotate_log_event *rev= static_cast<Rotate_log_event *>(ev);
|
|
if ((rev->server_id != global_system_variables.server_id ||
|
|
rli->replicate_same_server_id) &&
|
|
!rev->is_relay_log_event() &&
|
|
!rli->is_in_group())
|
|
{
|
|
memcpy(rli->future_event_master_log_name,
|
|
rev->new_log_ident, rev->ident_len+1);
|
|
rli->notify_group_master_log_name_update();
|
|
}
|
|
}
|
|
|
|
/*
|
|
Execute queries non-parallel if slave_skip_counter is set, as it's is
|
|
easier to skip queries in single threaded mode.
|
|
*/
|
|
if (rli->slave_skip_counter)
|
|
return -1;
|
|
|
|
/* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
|
|
is_group_event= Log_event::is_group_event(typ);
|
|
if (unlikely(!current) && typ != GTID_EVENT &&
|
|
!(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event))
|
|
return -1;
|
|
|
|
/* Note: rli->data_lock is released by sql_delay_event(). */
|
|
if (sql_delay_event(ev, rli->sql_driver_thd, serial_rgi))
|
|
{
|
|
/*
|
|
If sql_delay_event() returns non-zero, it means that the wait timed out
|
|
due to slave stop. We should not queue the event in this case, it must
|
|
not be applied yet.
|
|
*/
|
|
delete ev;
|
|
return 1;
|
|
}
|
|
|
|
if (unlikely(typ == FORMAT_DESCRIPTION_EVENT))
|
|
{
|
|
Format_description_log_event *fdev=
|
|
static_cast<Format_description_log_event *>(ev);
|
|
if (fdev->created)
|
|
{
|
|
/*
|
|
This format description event marks a new binlog after a master server
|
|
restart. We are going to close all temporary tables to clean up any
|
|
possible left-overs after a prior master crash.
|
|
|
|
Thus we need to wait for all prior events to execute to completion,
|
|
in case they need access to any of the temporary tables.
|
|
|
|
We also need to notify the worker thread running the prior incomplete
|
|
event group (if any), as such event group signifies an incompletely
|
|
written group cut short by a master crash, and must be rolled back.
|
|
*/
|
|
if (current->queue_master_restart(serial_rgi, fdev) ||
|
|
wait_for_workers_idle(rli->sql_driver_thd))
|
|
{
|
|
delete ev;
|
|
return 1;
|
|
}
|
|
}
|
|
}
|
|
else if (unlikely(typ == GTID_LIST_EVENT))
|
|
{
|
|
Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev);
|
|
rpl_gtid *list= glev->list;
|
|
uint32 count= glev->count;
|
|
rli->update_relay_log_state(list, count);
|
|
while (count)
|
|
{
|
|
process_gtid_for_restart_pos(rli, list);
|
|
++list;
|
|
--count;
|
|
}
|
|
}
|
|
|
|
/*
|
|
Stop queueing additional event groups once the SQL thread is requested to
|
|
stop.
|
|
|
|
We have to queue any remaining events of any event group that has already
|
|
been partially queued, but after that we will just ignore any further
|
|
events the SQL driver thread may try to queue, and eventually it will stop.
|
|
*/
|
|
if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave)
|
|
sql_thread_stopping= true;
|
|
if (sql_thread_stopping)
|
|
{
|
|
delete ev;
|
|
/*
|
|
Return "no error"; normal stop is not an error, and otherwise the error
|
|
has already been recorded.
|
|
*/
|
|
return 0;
|
|
}
|
|
|
|
if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)
|
|
{
|
|
if (typ == GTID_EVENT)
|
|
rli->gtid_skip_flag= GTID_SKIP_NOT;
|
|
else
|
|
{
|
|
if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE)
|
|
{
|
|
if (!Log_event::is_part_of_group(typ))
|
|
rli->gtid_skip_flag= GTID_SKIP_NOT;
|
|
}
|
|
else
|
|
{
|
|
DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION);
|
|
if (typ == XID_EVENT ||
|
|
(typ == QUERY_EVENT && // COMMIT/ROLLBACK are never compressed
|
|
(((Query_log_event *)ev)->is_commit() ||
|
|
((Query_log_event *)ev)->is_rollback())))
|
|
rli->gtid_skip_flag= GTID_SKIP_NOT;
|
|
}
|
|
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
if (typ == GTID_EVENT)
|
|
{
|
|
rpl_gtid gtid;
|
|
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
|
|
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
|
|
rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
|
|
0 : gtid_ev->domain_id);
|
|
if (!(e= find(domain_id)))
|
|
{
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
|
|
delete ev;
|
|
return 1;
|
|
}
|
|
current= e;
|
|
|
|
gtid.domain_id= gtid_ev->domain_id;
|
|
gtid.server_id= gtid_ev->server_id;
|
|
gtid.seq_no= gtid_ev->seq_no;
|
|
rli->update_relay_log_state(>id, 1);
|
|
if (process_gtid_for_restart_pos(rli, >id))
|
|
{
|
|
/*
|
|
This domain has progressed further into the relay log before the last
|
|
SQL thread restart. So we need to skip this event group to not doubly
|
|
apply it.
|
|
*/
|
|
rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ?
|
|
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
|
|
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
|
|
return 0;
|
|
}
|
|
}
|
|
else
|
|
e= current;
|
|
|
|
/*
|
|
Find a worker thread to queue the event for.
|
|
Prefer a new thread, so we maximise parallelism (at least for the group
|
|
commit). But do not exceed a limit of --slave-domain-parallel-threads;
|
|
instead re-use a thread that we queued for previously.
|
|
*/
|
|
cur_thread=
|
|
e->choose_thread(serial_rgi, &did_enter_cond, &old_stage,
|
|
typ != GTID_EVENT);
|
|
if (!cur_thread)
|
|
{
|
|
/* This means we were killed. The error is already signalled. */
|
|
delete ev;
|
|
return 1;
|
|
}
|
|
|
|
if (!(qev= cur_thread->get_qev(ev, event_size, rli)))
|
|
{
|
|
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
|
&did_enter_cond, &old_stage);
|
|
delete ev;
|
|
return 1;
|
|
}
|
|
|
|
if (typ == GTID_EVENT)
|
|
{
|
|
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
|
|
bool new_gco;
|
|
enum_slave_parallel_mode mode= rli->mi->parallel_mode;
|
|
uchar gtid_flags= gtid_ev->flags2;
|
|
group_commit_orderer *gco;
|
|
uint8 force_switch_flag;
|
|
enum rpl_group_info::enum_speculation speculation;
|
|
|
|
if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
|
|
{
|
|
cur_thread->free_qev(qev);
|
|
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
|
&did_enter_cond, &old_stage);
|
|
delete ev;
|
|
return 1;
|
|
}
|
|
|
|
/*
|
|
We queue the event group in a new worker thread, to run in parallel
|
|
with previous groups.
|
|
|
|
To preserve commit order within the replication domain, we set up
|
|
rgi->wait_commit_sub_id to make the new group commit only after the
|
|
previous group has committed.
|
|
|
|
Event groups that group-committed together on the master can be run
|
|
in parallel with each other without restrictions. But one batch of
|
|
group-commits may not start before all groups in the previous batch
|
|
have initiated their commit phase; we set up rgi->gco to ensure that.
|
|
*/
|
|
rgi->wait_commit_sub_id= e->current_sub_id;
|
|
rgi->wait_commit_group_info= e->current_group_info;
|
|
|
|
speculation= rpl_group_info::SPECULATE_NO;
|
|
new_gco= true;
|
|
force_switch_flag= 0;
|
|
gco= e->current_gco;
|
|
if (likely(gco))
|
|
{
|
|
uint8 flags= gco->flags;
|
|
|
|
if (mode <= SLAVE_PARALLEL_MINIMAL ||
|
|
!(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) ||
|
|
e->last_commit_id != gtid_ev->commit_id)
|
|
flags|= group_commit_orderer::MULTI_BATCH;
|
|
/* Make sure we do not attempt to run DDL in parallel speculatively. */
|
|
if (gtid_flags & Gtid_log_event::FL_DDL)
|
|
flags|= (force_switch_flag= group_commit_orderer::FORCE_SWITCH);
|
|
|
|
if (!(flags & group_commit_orderer::MULTI_BATCH))
|
|
{
|
|
/*
|
|
Still the same batch of event groups that group-committed together
|
|
on the master, so we can run in parallel.
|
|
*/
|
|
new_gco= false;
|
|
}
|
|
else if ((mode >= SLAVE_PARALLEL_OPTIMISTIC) &&
|
|
!(flags & group_commit_orderer::FORCE_SWITCH))
|
|
{
|
|
/*
|
|
In transactional parallel mode, we optimistically attempt to run
|
|
non-DDL in parallel. In case of conflicts, we catch the conflict as
|
|
a deadlock or other error, roll back and retry serially.
|
|
|
|
The assumption is that only a few event groups will be
|
|
non-transactional or otherwise unsuitable for parallel apply. Those
|
|
transactions are still scheduled in parallel, but we set a flag that
|
|
will make the worker thread wait for everything before to complete
|
|
before starting.
|
|
*/
|
|
new_gco= false;
|
|
if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) ||
|
|
( (!(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) ||
|
|
(gtid_flags & Gtid_log_event::FL_WAITED)) &&
|
|
(mode < SLAVE_PARALLEL_AGGRESSIVE)))
|
|
{
|
|
/*
|
|
This transaction should not be speculatively run in parallel with
|
|
what came before, either because it cannot safely be rolled back in
|
|
case of a conflict, or because it was marked as likely to conflict
|
|
and require expensive rollback and retry.
|
|
|
|
Here we mark it as such, and then the worker thread will do a
|
|
wait_for_prior_commit() before starting it. We do not introduce a
|
|
new group_commit_orderer, since we still want following transactions
|
|
to run in parallel with transactions prior to this one.
|
|
*/
|
|
speculation= rpl_group_info::SPECULATE_WAIT;
|
|
}
|
|
else
|
|
speculation= rpl_group_info::SPECULATE_OPTIMISTIC;
|
|
}
|
|
gco->flags= flags;
|
|
}
|
|
else
|
|
{
|
|
if (gtid_flags & Gtid_log_event::FL_DDL)
|
|
force_switch_flag= group_commit_orderer::FORCE_SWITCH;
|
|
}
|
|
rgi->speculation= speculation;
|
|
|
|
if (gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID)
|
|
e->last_commit_id= gtid_ev->commit_id;
|
|
else
|
|
e->last_commit_id= 0;
|
|
|
|
if (new_gco)
|
|
{
|
|
/*
|
|
Do not run this event group in parallel with what came before; instead
|
|
wait for everything prior to at least have started its commit phase, to
|
|
avoid any risk of performing any conflicting action too early.
|
|
|
|
Remember the count that marks the end of the previous batch of event
|
|
groups that run in parallel, and allocate a new gco.
|
|
*/
|
|
uint64 count= e->count_queued_event_groups;
|
|
|
|
if (!(gco= cur_thread->get_gco(count, gco, e->current_sub_id)))
|
|
{
|
|
cur_thread->free_rgi(rgi);
|
|
cur_thread->free_qev(qev);
|
|
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
|
&did_enter_cond, &old_stage);
|
|
delete ev;
|
|
return 1;
|
|
}
|
|
gco->flags|= force_switch_flag;
|
|
e->current_gco= gco;
|
|
}
|
|
rgi->gco= gco;
|
|
|
|
qev->rgi= e->current_group_info= rgi;
|
|
e->current_sub_id= rgi->gtid_sub_id;
|
|
++e->count_queued_event_groups;
|
|
}
|
|
else if (!is_group_event)
|
|
{
|
|
int err;
|
|
bool tmp;
|
|
/*
|
|
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
|
|
Same for events not preceeded by GTID (we should not see those normally,
|
|
but they might be from an old master).
|
|
*/
|
|
qev->rgi= serial_rgi;
|
|
|
|
tmp= serial_rgi->is_parallel_exec;
|
|
serial_rgi->is_parallel_exec= true;
|
|
err= rpt_handle_event(qev, NULL);
|
|
serial_rgi->is_parallel_exec= tmp;
|
|
if (ev->is_relay_log_event())
|
|
qev->future_event_master_log_pos= 0;
|
|
else if (typ == ROTATE_EVENT)
|
|
qev->future_event_master_log_pos=
|
|
(static_cast<Rotate_log_event *>(ev))->pos;
|
|
else
|
|
qev->future_event_master_log_pos= ev->log_pos;
|
|
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
|
|
|
|
if (err)
|
|
{
|
|
cur_thread->free_qev(qev);
|
|
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
|
&did_enter_cond, &old_stage);
|
|
return 1;
|
|
}
|
|
/*
|
|
Queue a position update, so that the position will be updated in a
|
|
reasonable way relative to other events:
|
|
|
|
- If the currently executing events are queued serially for a single
|
|
thread, the position will only be updated when everything before has
|
|
completed.
|
|
|
|
- If we are executing multiple independent events in parallel, then at
|
|
least the position will not be updated until one of them has reached
|
|
the current point.
|
|
*/
|
|
qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE;
|
|
qev->entry_for_queued= e;
|
|
}
|
|
else
|
|
{
|
|
qev->rgi= e->current_group_info;
|
|
}
|
|
|
|
/*
|
|
Queue the event for processing.
|
|
*/
|
|
qev->ir= rli->last_inuse_relaylog;
|
|
++qev->ir->queued_count;
|
|
cur_thread->enqueue(qev);
|
|
unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
|
|
&did_enter_cond, &old_stage);
|
|
mysql_cond_signal(&cur_thread->COND_rpl_thread);
|
|
|
|
return 0;
|
|
}
|