mirror of
https://github.com/MariaDB/server.git
synced 2025-01-29 02:05:57 +01:00
Merge branch 'mdev7818-4' into bb-10.0-knielsen
This commit is contained in:
commit
6bf88cdd9d
11 changed files with 642 additions and 88 deletions
|
@ -6,6 +6,7 @@ user1 statement/sql/flush flush tables with read lock
|
|||
username event_name nesting_event_type
|
||||
username event_name nesting_event_type
|
||||
user1 stage/sql/init STATEMENT
|
||||
user1 stage/sql/init STATEMENT
|
||||
user1 stage/sql/query end STATEMENT
|
||||
user1 stage/sql/closing tables STATEMENT
|
||||
user1 stage/sql/freeing items STATEMENT
|
||||
|
|
|
@ -29,8 +29,98 @@ include/start_slave.inc
|
|||
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
|
||||
a b
|
||||
10 0
|
||||
*** MDEV-7818: Deadlock occurring with parallel replication and FTWRL ***
|
||||
CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
|
||||
INSERT INTO t2 VALUES (1,0), (2,0), (3,0);
|
||||
include/stop_slave.inc
|
||||
SET @old_dbug= @@SESSION.debug_dbug;
|
||||
SET @commit_id= 4242;
|
||||
SET SESSION debug_dbug="+d,binlog_force_commit_id";
|
||||
BEGIN;
|
||||
UPDATE t2 SET b=b+1 WHERE a=2;
|
||||
COMMIT;
|
||||
BEGIN;
|
||||
INSERT INTO t2 VALUES (4,10);
|
||||
COMMIT;
|
||||
SET SESSION debug_dbug= @old_dbug;
|
||||
INSERT INTO t2 VALUES (5,0);
|
||||
INSERT INTO t2 VALUES (6,0);
|
||||
INSERT INTO t2 VALUES (7,0);
|
||||
INSERT INTO t2 VALUES (8,0);
|
||||
INSERT INTO t2 VALUES (9,0);
|
||||
INSERT INTO t2 VALUES (10,0);
|
||||
INSERT INTO t2 VALUES (11,0);
|
||||
INSERT INTO t2 VALUES (12,0);
|
||||
INSERT INTO t2 VALUES (13,0);
|
||||
INSERT INTO t2 VALUES (14,0);
|
||||
INSERT INTO t2 VALUES (15,0);
|
||||
INSERT INTO t2 VALUES (16,0);
|
||||
INSERT INTO t2 VALUES (17,0);
|
||||
INSERT INTO t2 VALUES (18,0);
|
||||
INSERT INTO t2 VALUES (19,0);
|
||||
BEGIN;
|
||||
SELECT * FROM t2 WHERE a=2 FOR UPDATE;
|
||||
a b
|
||||
2 0
|
||||
include/start_slave.inc
|
||||
FLUSH TABLES WITH READ LOCK;
|
||||
COMMIT;
|
||||
STOP SLAVE;
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
a b
|
||||
1 0
|
||||
2 1
|
||||
3 0
|
||||
4 10
|
||||
5 0
|
||||
6 0
|
||||
7 0
|
||||
8 0
|
||||
9 0
|
||||
10 0
|
||||
11 0
|
||||
12 0
|
||||
13 0
|
||||
14 0
|
||||
15 0
|
||||
16 0
|
||||
17 0
|
||||
18 0
|
||||
19 0
|
||||
UNLOCK TABLES;
|
||||
include/wait_for_slave_to_stop.inc
|
||||
include/start_slave.inc
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
a b
|
||||
1 0
|
||||
2 1
|
||||
3 0
|
||||
4 10
|
||||
5 0
|
||||
6 0
|
||||
7 0
|
||||
8 0
|
||||
9 0
|
||||
10 0
|
||||
11 0
|
||||
12 0
|
||||
13 0
|
||||
14 0
|
||||
15 0
|
||||
16 0
|
||||
17 0
|
||||
18 0
|
||||
19 0
|
||||
*** MDEV-8318: Assertion `!pool->busy' failed in pool_mark_busy(rpl_parallel_thread_pool*) on concurrent FTWRL ***
|
||||
LOCK TABLE t2 WRITE;
|
||||
FLUSH TABLES WITH READ LOCK;
|
||||
FLUSH TABLES WITH READ LOCK;
|
||||
KILL QUERY CID;
|
||||
ERROR 70100: Query execution was interrupted
|
||||
UNLOCK TABLES;
|
||||
UNLOCK TABLES;
|
||||
include/stop_slave.inc
|
||||
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
||||
include/start_slave.inc
|
||||
DROP TABLE t1;
|
||||
DROP TABLE t1, t2;
|
||||
include/rpl_end.inc
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
--source include/have_debug.inc
|
||||
--source include/have_innodb.inc
|
||||
--source include/have_binlog_format_statement.inc
|
||||
--let $rpl_topology=1->2
|
||||
--source include/rpl_init.inc
|
||||
|
@ -78,13 +80,144 @@ SET GLOBAL sql_slave_skip_counter= 1;
|
|||
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
|
||||
|
||||
|
||||
# Clean up
|
||||
--echo *** MDEV-7818: Deadlock occurring with parallel replication and FTWRL ***
|
||||
|
||||
--connection server_1
|
||||
CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
|
||||
INSERT INTO t2 VALUES (1,0), (2,0), (3,0);
|
||||
--save_master_pos
|
||||
|
||||
--connection server_2
|
||||
--sync_with_master
|
||||
--source include/stop_slave.inc
|
||||
|
||||
--connection server_1
|
||||
# Create a group commit with two transactions, will be used to provoke the
|
||||
# problematic thread interaction with FTWRL on the slave.
|
||||
SET @old_dbug= @@SESSION.debug_dbug;
|
||||
SET @commit_id= 4242;
|
||||
SET SESSION debug_dbug="+d,binlog_force_commit_id";
|
||||
|
||||
BEGIN;
|
||||
UPDATE t2 SET b=b+1 WHERE a=2;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO t2 VALUES (4,10);
|
||||
COMMIT;
|
||||
|
||||
SET SESSION debug_dbug= @old_dbug;
|
||||
|
||||
INSERT INTO t2 VALUES (5,0);
|
||||
INSERT INTO t2 VALUES (6,0);
|
||||
INSERT INTO t2 VALUES (7,0);
|
||||
INSERT INTO t2 VALUES (8,0);
|
||||
INSERT INTO t2 VALUES (9,0);
|
||||
INSERT INTO t2 VALUES (10,0);
|
||||
INSERT INTO t2 VALUES (11,0);
|
||||
INSERT INTO t2 VALUES (12,0);
|
||||
INSERT INTO t2 VALUES (13,0);
|
||||
INSERT INTO t2 VALUES (14,0);
|
||||
INSERT INTO t2 VALUES (15,0);
|
||||
INSERT INTO t2 VALUES (16,0);
|
||||
INSERT INTO t2 VALUES (17,0);
|
||||
INSERT INTO t2 VALUES (18,0);
|
||||
INSERT INTO t2 VALUES (19,0);
|
||||
--save_master_pos
|
||||
|
||||
--connection server_2
|
||||
|
||||
--connect (s1, 127.0.0.1, root,, test, $SLAVE_MYPORT,)
|
||||
# Block one transaction on a row lock.
|
||||
BEGIN;
|
||||
SELECT * FROM t2 WHERE a=2 FOR UPDATE;
|
||||
|
||||
--connection server_2
|
||||
|
||||
# Wait for slave thread of the other transaction to have the commit lock.
|
||||
--source include/start_slave.inc
|
||||
--let $wait_condition= SELECT COUNT(*) > 0 FROM information_schema.processlist WHERE state = "Waiting for prior transaction to commit"
|
||||
--source include/wait_condition.inc
|
||||
|
||||
--connect (s2, 127.0.0.1, root,, test, $SLAVE_MYPORT,)
|
||||
send FLUSH TABLES WITH READ LOCK;
|
||||
# The bug was that at this point we were deadlocked.
|
||||
# The FTWRL command would wait forever for T2 to commit.
|
||||
# T2 would wait for T1 to commit first, but T1 is waiting for
|
||||
# the global read lock to be released.
|
||||
|
||||
--connection s1
|
||||
# Release the lock that blocs T1 from replicating.
|
||||
COMMIT;
|
||||
|
||||
--connection s1
|
||||
send STOP SLAVE;
|
||||
|
||||
--connection s2
|
||||
reap;
|
||||
|
||||
--connection server_1
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
|
||||
--connection s2
|
||||
UNLOCK TABLES;
|
||||
|
||||
--connection s1
|
||||
reap;
|
||||
|
||||
--connection server_2
|
||||
--source include/wait_for_slave_to_stop.inc
|
||||
--source include/start_slave.inc
|
||||
--sync_with_master
|
||||
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
|
||||
|
||||
|
||||
--echo *** MDEV-8318: Assertion `!pool->busy' failed in pool_mark_busy(rpl_parallel_thread_pool*) on concurrent FTWRL ***
|
||||
|
||||
--connection server_1
|
||||
LOCK TABLE t2 WRITE;
|
||||
|
||||
|
||||
--connect (m1,localhost,root,,test)
|
||||
--connection m1
|
||||
--let $cid=`SELECT CONNECTION_ID()`
|
||||
send FLUSH TABLES WITH READ LOCK;
|
||||
|
||||
--connect (m2,localhost,root,,test)
|
||||
# We cannot force the race with DEBUG_SYNC, because the race does not
|
||||
# exist after fixing the bug. At best we could force a debug sync to
|
||||
# time out, which is effectively just a sleep.
|
||||
# So just put a small sleep here; it is enough to trigger the bug in
|
||||
# most run before the bug fix, and the code should work correctly
|
||||
# however the thread scheduling happens.
|
||||
--sleep 0.1
|
||||
send FLUSH TABLES WITH READ LOCK;
|
||||
|
||||
--connection server_1
|
||||
--replace_result $cid CID
|
||||
eval KILL QUERY $cid;
|
||||
|
||||
--connection m1
|
||||
--error ER_QUERY_INTERRUPTED
|
||||
reap;
|
||||
|
||||
--connection server_1
|
||||
UNLOCK TABLES;
|
||||
|
||||
--connection m2
|
||||
reap;
|
||||
UNLOCK TABLES;
|
||||
|
||||
|
||||
# Clean up.
|
||||
--connection server_2
|
||||
--source include/stop_slave.inc
|
||||
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
||||
--source include/start_slave.inc
|
||||
|
||||
--connection server_1
|
||||
DROP TABLE t1;
|
||||
DROP TABLE t1, t2;
|
||||
|
||||
--source include/rpl_end.inc
|
||||
|
|
|
@ -9541,6 +9541,9 @@ PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for
|
|||
PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0};
|
||||
PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0};
|
||||
PSI_stage_info stage_waiting_for_workers_idle= { 0, "Waiting for worker threads to be idle", 0};
|
||||
PSI_stage_info stage_waiting_for_ftwrl= { 0, "Waiting due to global read lock", 0};
|
||||
PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause= { 0, "Waiting for worker threads to pause for global read lock", 0};
|
||||
PSI_stage_info stage_waiting_for_rpl_thread_pool= { 0, "Waiting while replication worker thread pool is busy", 0};
|
||||
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
|
||||
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
|
||||
PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
|
||||
|
|
|
@ -455,6 +455,9 @@ extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit;
|
|||
extern PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit;
|
||||
extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
|
||||
extern PSI_stage_info stage_waiting_for_workers_idle;
|
||||
extern PSI_stage_info stage_waiting_for_ftwrl;
|
||||
extern PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause;
|
||||
extern PSI_stage_info stage_waiting_for_rpl_thread_pool;
|
||||
extern PSI_stage_info stage_master_gtid_wait_primary;
|
||||
extern PSI_stage_info stage_master_gtid_wait;
|
||||
extern PSI_stage_info stage_gtid_wait_other_connection;
|
||||
|
|
|
@ -44,6 +44,9 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
|
|||
rgi->event_relay_log_pos= qev->event_relay_log_pos;
|
||||
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
|
||||
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
|
||||
if (!(ev->is_artificial_event() || ev->is_relay_log_event() ||
|
||||
(ev->when == 0)))
|
||||
rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time;
|
||||
mysql_mutex_lock(&rli->data_lock);
|
||||
/* Mutex will be released in apply_event_and_update_pos(). */
|
||||
err= apply_event_and_update_pos(ev, thd, rgi, rpt);
|
||||
|
@ -272,6 +275,284 @@ register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
Do not start parallel execution of this event group until all prior groups
|
||||
have reached the commit phase that are not safe to run in parallel with.
|
||||
*/
|
||||
static bool
|
||||
do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
|
||||
bool *did_enter_cond, PSI_stage_info *old_stage)
|
||||
{
|
||||
THD *thd= rgi->thd;
|
||||
rpl_parallel_entry *entry= rgi->parallel_entry;
|
||||
uint64 wait_count;
|
||||
|
||||
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
|
||||
|
||||
if (!gco->installed)
|
||||
{
|
||||
group_commit_orderer *prev_gco= gco->prev_gco;
|
||||
if (prev_gco)
|
||||
{
|
||||
prev_gco->last_sub_id= gco->prior_sub_id;
|
||||
prev_gco->next_gco= gco;
|
||||
}
|
||||
gco->installed= true;
|
||||
}
|
||||
wait_count= gco->wait_count;
|
||||
if (wait_count > entry->count_committing_event_groups)
|
||||
{
|
||||
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
|
||||
thd->ENTER_COND(&gco->COND_group_commit_orderer,
|
||||
&entry->LOCK_parallel_entry,
|
||||
&stage_waiting_for_prior_transaction_to_start_commit,
|
||||
old_stage);
|
||||
*did_enter_cond= true;
|
||||
do
|
||||
{
|
||||
if (thd->check_killed() && !rgi->worker_error)
|
||||
{
|
||||
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
|
||||
thd->clear_error();
|
||||
thd->get_stmt_da()->reset_diagnostics_area();
|
||||
thd->send_kill_message();
|
||||
slave_output_error_info(rgi, thd);
|
||||
signal_error_to_sql_driver_thread(thd, rgi, 1);
|
||||
/*
|
||||
Even though we were killed, we need to continue waiting for the
|
||||
prior event groups to signal that we can continue. Otherwise we
|
||||
mess up the accounting for ordering. However, now that we have
|
||||
marked the error, events will just be skipped rather than
|
||||
executed, and things will progress quickly towards stop.
|
||||
*/
|
||||
}
|
||||
mysql_cond_wait(&gco->COND_group_commit_orderer,
|
||||
&entry->LOCK_parallel_entry);
|
||||
} while (wait_count > entry->count_committing_event_groups);
|
||||
}
|
||||
|
||||
if (entry->force_abort && wait_count > entry->stop_count)
|
||||
{
|
||||
/*
|
||||
We are stopping (STOP SLAVE), and this event group is beyond the point
|
||||
where we can safely stop. So return a flag that will cause us to skip,
|
||||
rather than execute, the following events.
|
||||
*/
|
||||
return true;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
do_ftwrl_wait(rpl_group_info *rgi,
|
||||
bool *did_enter_cond, PSI_stage_info *old_stage)
|
||||
{
|
||||
THD *thd= rgi->thd;
|
||||
rpl_parallel_entry *entry= rgi->parallel_entry;
|
||||
uint64 sub_id= rgi->gtid_sub_id;
|
||||
|
||||
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
|
||||
|
||||
/*
|
||||
If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this
|
||||
transaction is later than transactions that have priority to complete
|
||||
before FTWRL. If so, wait here so that FTWRL can proceed and complete
|
||||
first.
|
||||
|
||||
(entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes
|
||||
this test false as required).
|
||||
*/
|
||||
if (unlikely(sub_id > entry->pause_sub_id))
|
||||
{
|
||||
thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
|
||||
&stage_waiting_for_ftwrl, old_stage);
|
||||
*did_enter_cond= true;
|
||||
do
|
||||
{
|
||||
if (entry->force_abort || rgi->worker_error)
|
||||
break;
|
||||
if (thd->check_killed())
|
||||
{
|
||||
thd->send_kill_message();
|
||||
slave_output_error_info(rgi, thd);
|
||||
signal_error_to_sql_driver_thread(thd, rgi, 1);
|
||||
break;
|
||||
}
|
||||
mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry);
|
||||
} while (sub_id > entry->pause_sub_id);
|
||||
|
||||
/*
|
||||
We do not call EXIT_COND() here, as this will be done later by our
|
||||
caller (since we set *did_enter_cond to true).
|
||||
*/
|
||||
}
|
||||
|
||||
if (sub_id > entry->largest_started_sub_id)
|
||||
entry->largest_started_sub_id= sub_id;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd)
|
||||
{
|
||||
PSI_stage_info old_stage;
|
||||
int res= 0;
|
||||
|
||||
/*
|
||||
Wait here while the queue is busy. This is done to make FLUSH TABLES WITH
|
||||
READ LOCK work correctly, without incuring extra locking penalties in
|
||||
normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the
|
||||
thread pool, and for this we need to make sure the pool will not go away
|
||||
during the operation. The LOCK_rpl_thread_pool is not suitable for
|
||||
this. It is taken by release_thread() while holding LOCK_rpl_thread; so it
|
||||
must be released before locking any LOCK_rpl_thread lock, or a deadlock
|
||||
can occur.
|
||||
|
||||
So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and
|
||||
pool size changes with this condition wait.
|
||||
*/
|
||||
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
|
||||
if (thd)
|
||||
thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
|
||||
&stage_waiting_for_rpl_thread_pool, &old_stage);
|
||||
while (pool->busy)
|
||||
{
|
||||
if (thd && thd->check_killed())
|
||||
{
|
||||
thd->send_kill_message();
|
||||
res= 1;
|
||||
break;
|
||||
}
|
||||
mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
|
||||
}
|
||||
if (!res)
|
||||
pool->busy= true;
|
||||
if (thd)
|
||||
thd->EXIT_COND(&old_stage);
|
||||
else
|
||||
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
pool_mark_not_busy(rpl_parallel_thread_pool *pool)
|
||||
{
|
||||
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
|
||||
DBUG_ASSERT(pool->busy);
|
||||
pool->busy= false;
|
||||
mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
|
||||
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
rpl_unpause_after_ftwrl(THD *thd)
|
||||
{
|
||||
uint32 i;
|
||||
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
|
||||
|
||||
DBUG_ASSERT(pool->busy);
|
||||
|
||||
for (i= 0; i < pool->count; ++i)
|
||||
{
|
||||
rpl_parallel_entry *e;
|
||||
rpl_parallel_thread *rpt= pool->threads[i];
|
||||
|
||||
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
||||
if (!rpt->current_owner)
|
||||
{
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
continue;
|
||||
}
|
||||
e= rpt->current_entry;
|
||||
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
||||
rpt->pause_for_ftwrl = false;
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
e->pause_sub_id= (uint64)ULONGLONG_MAX;
|
||||
mysql_cond_broadcast(&e->COND_parallel_entry);
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
}
|
||||
|
||||
pool_mark_not_busy(pool);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
.
|
||||
|
||||
Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called.
|
||||
*/
|
||||
int
|
||||
rpl_pause_for_ftwrl(THD *thd)
|
||||
{
|
||||
uint32 i;
|
||||
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
|
||||
int err;
|
||||
|
||||
/*
|
||||
While the count_pending_pause_for_ftwrl counter is non-zero, the pool
|
||||
cannot be shutdown/resized, so threads are guaranteed to not disappear.
|
||||
|
||||
This is required to safely be able to access the individual threads below.
|
||||
(We cannot lock an individual thread while holding LOCK_rpl_thread_pool,
|
||||
as this can deadlock against release_thread()).
|
||||
*/
|
||||
if ((err= pool_mark_busy(pool, thd)))
|
||||
return err;
|
||||
|
||||
for (i= 0; i < pool->count; ++i)
|
||||
{
|
||||
PSI_stage_info old_stage;
|
||||
rpl_parallel_entry *e;
|
||||
rpl_parallel_thread *rpt= pool->threads[i];
|
||||
|
||||
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
||||
if (!rpt->current_owner)
|
||||
{
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
continue;
|
||||
}
|
||||
e= rpt->current_entry;
|
||||
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
||||
/*
|
||||
Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not
|
||||
de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl().
|
||||
*/
|
||||
rpt->pause_for_ftwrl = true;
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
++e->need_sub_id_signal;
|
||||
if (e->pause_sub_id == (uint64)ULONGLONG_MAX)
|
||||
e->pause_sub_id= e->largest_started_sub_id;
|
||||
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
|
||||
&stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
|
||||
while (e->pause_sub_id < (uint64)ULONGLONG_MAX &&
|
||||
e->last_committed_sub_id < e->pause_sub_id &&
|
||||
!err)
|
||||
{
|
||||
if (thd->check_killed())
|
||||
{
|
||||
thd->send_kill_message();
|
||||
err= 1;
|
||||
break;
|
||||
}
|
||||
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
|
||||
};
|
||||
--e->need_sub_id_signal;
|
||||
thd->EXIT_COND(&old_stage);
|
||||
if (err)
|
||||
break;
|
||||
}
|
||||
|
||||
if (err)
|
||||
rpl_unpause_after_ftwrl(thd);
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
static int
|
||||
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
|
||||
|
@ -765,7 +1046,6 @@ handle_rpl_parallel_thread(void *arg)
|
|||
{
|
||||
bool did_enter_cond= false;
|
||||
PSI_stage_info old_stage;
|
||||
uint64 wait_count;
|
||||
|
||||
DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", {
|
||||
if (rgi->current_gtid.domain_id == 0 &&
|
||||
|
@ -803,72 +1083,19 @@ handle_rpl_parallel_thread(void *arg)
|
|||
event_gtid_sub_id= rgi->gtid_sub_id;
|
||||
rgi->thd= thd;
|
||||
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);
|
||||
|
||||
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
|
||||
skip_event_group= true;
|
||||
if (likely(!skip_event_group))
|
||||
do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
|
||||
|
||||
/*
|
||||
Register ourself to wait for the previous commit, if we need to do
|
||||
such registration _and_ that previous commit has not already
|
||||
occured.
|
||||
|
||||
Also do not start parallel execution of this event group until all
|
||||
prior groups have reached the commit phase that are not safe to run
|
||||
in parallel with.
|
||||
*/
|
||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||
if (!gco->installed)
|
||||
{
|
||||
group_commit_orderer *prev_gco= gco->prev_gco;
|
||||
if (prev_gco)
|
||||
{
|
||||
prev_gco->last_sub_id= gco->prior_sub_id;
|
||||
prev_gco->next_gco= gco;
|
||||
}
|
||||
gco->installed= true;
|
||||
}
|
||||
wait_count= gco->wait_count;
|
||||
if (wait_count > entry->count_committing_event_groups)
|
||||
{
|
||||
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
|
||||
thd->ENTER_COND(&gco->COND_group_commit_orderer,
|
||||
&entry->LOCK_parallel_entry,
|
||||
&stage_waiting_for_prior_transaction_to_start_commit,
|
||||
&old_stage);
|
||||
did_enter_cond= true;
|
||||
do
|
||||
{
|
||||
if (thd->check_killed() && !rgi->worker_error)
|
||||
{
|
||||
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
|
||||
thd->clear_error();
|
||||
thd->get_stmt_da()->reset_diagnostics_area();
|
||||
thd->send_kill_message();
|
||||
slave_output_error_info(rgi, thd);
|
||||
signal_error_to_sql_driver_thread(thd, rgi, 1);
|
||||
/*
|
||||
Even though we were killed, we need to continue waiting for the
|
||||
prior event groups to signal that we can continue. Otherwise we
|
||||
mess up the accounting for ordering. However, now that we have
|
||||
marked the error, events will just be skipped rather than
|
||||
executed, and things will progress quickly towards stop.
|
||||
*/
|
||||
}
|
||||
mysql_cond_wait(&gco->COND_group_commit_orderer,
|
||||
&entry->LOCK_parallel_entry);
|
||||
} while (wait_count > entry->count_committing_event_groups);
|
||||
}
|
||||
|
||||
if (entry->force_abort && wait_count > entry->stop_count)
|
||||
{
|
||||
/*
|
||||
We are stopping (STOP SLAVE), and this event group is beyond the
|
||||
point where we can safely stop. So set a flag that will cause us
|
||||
to skip, rather than execute, the following events.
|
||||
*/
|
||||
skip_event_group= true;
|
||||
}
|
||||
else
|
||||
skip_event_group= false;
|
||||
|
||||
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
|
||||
skip_event_group= true;
|
||||
register_wait_for_prior_event_group_commit(rgi, entry);
|
||||
|
||||
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
|
||||
|
@ -1020,17 +1247,40 @@ handle_rpl_parallel_thread(void *arg)
|
|||
*/
|
||||
rpt->batch_free();
|
||||
|
||||
if ((events= rpt->event_queue) != NULL)
|
||||
for (;;)
|
||||
{
|
||||
if ((events= rpt->event_queue) != NULL)
|
||||
{
|
||||
/*
|
||||
Take next group of events from the replication pool.
|
||||
This is faster than having to wakeup the pool manager thread to give
|
||||
us a new event.
|
||||
*/
|
||||
rpt->dequeue1(events);
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
goto more_events;
|
||||
}
|
||||
if (!rpt->pause_for_ftwrl ||
|
||||
(in_event_group && !group_rgi->parallel_entry->force_abort))
|
||||
break;
|
||||
/*
|
||||
Take next group of events from the replication pool.
|
||||
This is faster than having to wakeup the pool manager thread to give us
|
||||
a new event.
|
||||
We are currently in the delicate process of pausing parallel
|
||||
replication while FLUSH TABLES WITH READ LOCK is starting. We must
|
||||
not de-allocate the thread (setting rpt->current_owner= NULL) until
|
||||
rpl_unpause_after_ftwrl() has woken us up.
|
||||
*/
|
||||
rpt->dequeue1(events);
|
||||
mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry);
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
goto more_events;
|
||||
mysql_cond_wait(&rpt->current_entry->COND_parallel_entry,
|
||||
&rpt->current_entry->LOCK_parallel_entry);
|
||||
mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry);
|
||||
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
||||
/*
|
||||
Now loop to check again for more events available, since we released
|
||||
and re-aquired the LOCK_rpl_thread mutex.
|
||||
*/
|
||||
}
|
||||
|
||||
rpt->inuse_relaylog_refcount_update();
|
||||
|
||||
if (in_event_group && group_rgi->parallel_entry->force_abort)
|
||||
|
@ -1107,6 +1357,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
|||
rpl_parallel_thread **new_list= NULL;
|
||||
rpl_parallel_thread *new_free_list= NULL;
|
||||
rpl_parallel_thread *rpt_array= NULL;
|
||||
int res;
|
||||
|
||||
if ((res= pool_mark_busy(pool, current_thd)))
|
||||
return res;
|
||||
|
||||
/*
|
||||
Allocate the new list of threads up-front.
|
||||
|
@ -1155,7 +1409,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
|||
*/
|
||||
for (i= 0; i < pool->count; ++i)
|
||||
{
|
||||
rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL);
|
||||
rpl_parallel_thread *rpt;
|
||||
|
||||
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
|
||||
while ((rpt= pool->free_list) == NULL)
|
||||
mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
|
||||
pool->free_list= rpt->next;
|
||||
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
|
||||
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
||||
rpt->stop= true;
|
||||
mysql_cond_signal(&rpt->COND_rpl_thread);
|
||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||
|
@ -1205,9 +1466,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
|||
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
|
||||
}
|
||||
|
||||
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
|
||||
mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
|
||||
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
|
||||
pool_mark_not_busy(pool);
|
||||
|
||||
return 0;
|
||||
|
||||
|
@ -1231,6 +1490,7 @@ err:
|
|||
}
|
||||
my_free(new_list);
|
||||
}
|
||||
pool_mark_not_busy(pool);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -1494,7 +1754,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
|
|||
|
||||
|
||||
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
|
||||
: count(0), threads(0), free_list(0), inited(false)
|
||||
: threads(0), free_list(0), count(0), inited(false), busy(false)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -1502,9 +1762,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool()
|
|||
int
|
||||
rpl_parallel_thread_pool::init(uint32 size)
|
||||
{
|
||||
count= 0;
|
||||
threads= NULL;
|
||||
free_list= NULL;
|
||||
count= 0;
|
||||
busy= false;
|
||||
|
||||
mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
|
||||
MY_MUTEX_INIT_SLOW);
|
||||
|
@ -1545,7 +1806,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
|
|||
rpl_parallel_thread *rpt;
|
||||
|
||||
mysql_mutex_lock(&LOCK_rpl_thread_pool);
|
||||
while ((rpt= free_list) == NULL)
|
||||
while (unlikely(busy) || !(rpt= free_list))
|
||||
mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
|
||||
free_list= rpt->next;
|
||||
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
|
||||
|
@ -1756,6 +2017,7 @@ rpl_parallel::find(uint32 domain_id)
|
|||
e->rpl_thread_max= count;
|
||||
e->domain_id= domain_id;
|
||||
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
|
||||
e->pause_sub_id= (uint64)ULONGLONG_MAX;
|
||||
if (my_hash_insert(&domain_hash, (uchar *)e))
|
||||
{
|
||||
my_free(e);
|
||||
|
@ -1957,7 +2219,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
|
|||
|
||||
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
|
||||
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
||||
e->need_sub_id_signal= true;
|
||||
++e->need_sub_id_signal;
|
||||
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
|
||||
&stage_waiting_for_workers_idle, &old_stage);
|
||||
while (e->current_sub_id > e->last_committed_sub_id)
|
||||
|
@ -1970,7 +2232,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
|
|||
}
|
||||
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
|
||||
}
|
||||
e->need_sub_id_signal= false;
|
||||
--e->need_sub_id_signal;
|
||||
thd->EXIT_COND(&old_stage);
|
||||
if (err)
|
||||
return err;
|
||||
|
|
|
@ -70,6 +70,7 @@ struct rpl_parallel_thread {
|
|||
bool delay_start;
|
||||
bool running;
|
||||
bool stop;
|
||||
bool pause_for_ftwrl;
|
||||
mysql_mutex_t LOCK_rpl_thread;
|
||||
mysql_cond_t COND_rpl_thread;
|
||||
mysql_cond_t COND_rpl_thread_queue;
|
||||
|
@ -199,12 +200,18 @@ struct rpl_parallel_thread {
|
|||
|
||||
|
||||
struct rpl_parallel_thread_pool {
|
||||
uint32 count;
|
||||
struct rpl_parallel_thread **threads;
|
||||
struct rpl_parallel_thread *free_list;
|
||||
mysql_mutex_t LOCK_rpl_thread_pool;
|
||||
mysql_cond_t COND_rpl_thread_pool;
|
||||
uint32 count;
|
||||
bool inited;
|
||||
/*
|
||||
While FTWRL runs, this counter is incremented to make SQL thread or
|
||||
STOP/START slave not try to start new activity while that operation
|
||||
is in progress.
|
||||
*/
|
||||
bool busy;
|
||||
|
||||
rpl_parallel_thread_pool();
|
||||
int init(uint32 size);
|
||||
|
@ -219,6 +226,12 @@ struct rpl_parallel_entry {
|
|||
mysql_mutex_t LOCK_parallel_entry;
|
||||
mysql_cond_t COND_parallel_entry;
|
||||
uint32 domain_id;
|
||||
/*
|
||||
Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show
|
||||
that they are waiting, so that finish_event_group knows to signal them
|
||||
when last_committed_sub_id is increased.
|
||||
*/
|
||||
uint32 need_sub_id_signal;
|
||||
uint64 last_commit_id;
|
||||
bool active;
|
||||
/*
|
||||
|
@ -227,12 +240,6 @@ struct rpl_parallel_entry {
|
|||
waiting for event groups to complete.
|
||||
*/
|
||||
bool force_abort;
|
||||
/*
|
||||
Set in wait_for_workers_idle() to show that it is waiting, so that
|
||||
finish_event_group knows to signal it when last_committed_sub_id is
|
||||
increased.
|
||||
*/
|
||||
bool need_sub_id_signal;
|
||||
/*
|
||||
At STOP SLAVE (force_abort=true), we do not want to process all events in
|
||||
the queue (which could unnecessarily delay stop, if a lot of events happen
|
||||
|
@ -273,6 +280,15 @@ struct rpl_parallel_entry {
|
|||
queued for execution by a worker thread.
|
||||
*/
|
||||
uint64 current_sub_id;
|
||||
/*
|
||||
The largest sub_id that has started its transaction. Protected by
|
||||
LOCK_parallel_entry.
|
||||
|
||||
(Transactions can start out-of-order, so this value signifies that no
|
||||
transactions with larger sub_id have started, but not necessarily that all
|
||||
transactions with smaller sub_id have started).
|
||||
*/
|
||||
uint64 largest_started_sub_id;
|
||||
rpl_group_info *current_group_info;
|
||||
/*
|
||||
If we get an error in some event group, we set the sub_id of that event
|
||||
|
@ -282,6 +298,12 @@ struct rpl_parallel_entry {
|
|||
The value is ULONGLONG_MAX when no error occured.
|
||||
*/
|
||||
uint64 stop_on_error_sub_id;
|
||||
/*
|
||||
During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than
|
||||
this value must not start, but wait until the global read lock is released.
|
||||
The value is set to ULONGLONG_MAX when no FTWRL is pending.
|
||||
*/
|
||||
uint64 pause_sub_id;
|
||||
/* Total count of event groups queued so far. */
|
||||
uint64 count_queued_event_groups;
|
||||
/*
|
||||
|
@ -322,5 +344,7 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
|
|||
extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
|
||||
extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);
|
||||
extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
|
||||
extern int rpl_pause_for_ftwrl(THD *thd);
|
||||
extern void rpl_unpause_after_ftwrl(THD *thd);
|
||||
|
||||
#endif /* RPL_PARALLEL_H */
|
||||
|
|
|
@ -1001,6 +1001,18 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
|
|||
else if (group_master_log_pos < log_pos)
|
||||
group_master_log_pos= log_pos;
|
||||
}
|
||||
|
||||
/*
|
||||
In the parallel case, we only update the Seconds_Behind_Master at the
|
||||
end of a transaction. In the non-parallel case, the value is updated as
|
||||
soon as an event is read from the relay log; however this would be too
|
||||
confusing for the user, seeing the slave reported as up-to-date when
|
||||
potentially thousands of events are still queued up for worker threads
|
||||
waiting for execution.
|
||||
*/
|
||||
if (rgi->last_master_timestamp &&
|
||||
rgi->last_master_timestamp > last_master_timestamp)
|
||||
last_master_timestamp= rgi->last_master_timestamp;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -1630,6 +1642,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
|
|||
row_stmt_start_timestamp= 0;
|
||||
long_find_row_note_printed= false;
|
||||
did_mark_start_commit= false;
|
||||
last_master_timestamp = 0;
|
||||
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
|
||||
commit_orderer.reinit();
|
||||
}
|
||||
|
|
|
@ -668,6 +668,13 @@ struct rpl_group_info
|
|||
/* Needs room for "Gtid D-S-N\x00". */
|
||||
char gtid_info_buf[5+10+1+10+1+20+1];
|
||||
|
||||
/*
|
||||
The timestamp, from the master, of the commit event.
|
||||
Used to do delayed update of rli->last_master_timestamp, for getting
|
||||
reasonable values out of Seconds_Behind_Master in SHOW SLAVE STATUS.
|
||||
*/
|
||||
time_t last_master_timestamp;
|
||||
|
||||
/*
|
||||
Information to be able to re-try an event group in case of a deadlock or
|
||||
other temporary error.
|
||||
|
|
|
@ -3506,8 +3506,13 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
|
|||
If it is an artificial event, or a relay log event (IO thread generated
|
||||
event) or ev->when is set to 0, we don't update the
|
||||
last_master_timestamp.
|
||||
|
||||
In parallel replication, we might queue a large number of events, and
|
||||
the user might be surprised to see a claim that the slave is up to date
|
||||
long before those queued events are actually executed.
|
||||
*/
|
||||
if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0)))
|
||||
if (opt_slave_parallel_threads == 0 &&
|
||||
!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0)))
|
||||
{
|
||||
rli->last_master_timestamp= ev->when + (time_t) ev->exec_time;
|
||||
DBUG_ASSERT(rli->last_master_timestamp >= 0);
|
||||
|
|
|
@ -4283,6 +4283,17 @@ end_with_restore_list:
|
|||
break;
|
||||
}
|
||||
|
||||
if (lex->type & REFRESH_READ_LOCK)
|
||||
{
|
||||
/*
|
||||
We need to pause any parallel replication slave workers during FLUSH
|
||||
TABLES WITH READ LOCK. Otherwise we might cause a deadlock, as
|
||||
worker threads eun run in arbitrary order but need to commit in a
|
||||
specific given order.
|
||||
*/
|
||||
if (rpl_pause_for_ftwrl(thd))
|
||||
goto error;
|
||||
}
|
||||
/*
|
||||
reload_acl_and_cache() will tell us if we are allowed to write to the
|
||||
binlog or not.
|
||||
|
@ -4313,6 +4324,8 @@ end_with_restore_list:
|
|||
if (!res)
|
||||
my_ok(thd);
|
||||
}
|
||||
if (lex->type & REFRESH_READ_LOCK)
|
||||
rpl_unpause_after_ftwrl(thd);
|
||||
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue