mirror of
https://github.com/MariaDB/server.git
synced 2025-01-29 02:05:57 +01:00
MDEV-5363: Make parallel replication waits killable
Add another test case. This one for killing a worker while its transaction is waiting to start until the previous transaction has committed. Fix setting reading_or_writing to 0 in worker threads so SHOW SLAVE STATUS can show something more useful than "Reading from net".
This commit is contained in:
parent
245ab473a7
commit
86a2c03b51
3 changed files with 320 additions and 0 deletions
|
@ -295,6 +295,7 @@ a b
|
|||
SET sql_log_bin=0;
|
||||
CALL mtr.add_suppression("Query execution was interrupted");
|
||||
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
|
||||
CALL mtr.add_suppression("Slave: Connection was killed");
|
||||
SET sql_log_bin=1;
|
||||
SET debug_sync='now WAIT_FOR t2_query';
|
||||
SET debug_sync='now SIGNAL t2_cont';
|
||||
|
@ -513,6 +514,120 @@ include/start_slave.inc
|
|||
include/stop_slave.inc
|
||||
SET GLOBAL binlog_format=@old_format;
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=3;
|
||||
include/start_slave.inc
|
||||
*** 4. Test killing thread that is waiting to start transaction until previous transaction commits ***
|
||||
SET binlog_format=statement;
|
||||
SET gtid_domain_id=2;
|
||||
INSERT INTO t3 VALUES (60, foo(60,
|
||||
'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
|
||||
SET gtid_domain_id=0;
|
||||
SET debug_sync='now WAIT_FOR d2_query';
|
||||
SET gtid_domain_id=1;
|
||||
BEGIN;
|
||||
INSERT INTO t3 VALUES (61, foo(61,
|
||||
'rpl_parallel_start_waiting_for_prior SIGNAL t3_waiting',
|
||||
'rpl_parallel_start_waiting_for_prior_killed SIGNAL t3_killed'));
|
||||
INSERT INTO t3 VALUES (62, foo(62,
|
||||
'ha_write_row_end SIGNAL d1_query WAIT_FOR d1_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d1_done WAIT_FOR d1_cont'));
|
||||
COMMIT;
|
||||
SET gtid_domain_id=0;
|
||||
SET debug_sync='now WAIT_FOR d1_query';
|
||||
SET gtid_domain_id=0;
|
||||
INSERT INTO t3 VALUES (63, foo(63,
|
||||
'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont'));
|
||||
SET debug_sync='now WAIT_FOR d0_query';
|
||||
SET debug_sync='now SIGNAL d2_cont2';
|
||||
SET debug_sync='now WAIT_FOR d2_done';
|
||||
SET debug_sync='now SIGNAL d1_cont2';
|
||||
SET debug_sync='now WAIT_FOR d1_done';
|
||||
SET debug_sync='now SIGNAL d0_cont2';
|
||||
SET debug_sync='now WAIT_FOR d0_done';
|
||||
SET binlog_format=statement;
|
||||
INSERT INTO t3 VALUES (64, foo(64,
|
||||
'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
|
||||
INSERT INTO t3 VALUES (65, foo(65, '', ''));
|
||||
SET debug_sync='now WAIT_FOR master_queued2';
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
|
||||
INSERT INTO t3 VALUES (66, foo(66, '', ''));
|
||||
SET debug_sync='now WAIT_FOR master_queued3';
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued4';
|
||||
INSERT INTO t3 VALUES (67, foo(67, '', ''));
|
||||
SET debug_sync='now WAIT_FOR master_queued4';
|
||||
SET debug_sync='now SIGNAL master_cont2';
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
a b
|
||||
60 60
|
||||
61 61
|
||||
62 62
|
||||
63 63
|
||||
64 64
|
||||
65 65
|
||||
66 66
|
||||
67 67
|
||||
SET debug_sync='now SIGNAL d0_cont';
|
||||
SET debug_sync='now WAIT_FOR t1_waiting';
|
||||
SET debug_sync='now SIGNAL d1_cont';
|
||||
SET debug_sync='now WAIT_FOR t3_waiting';
|
||||
SET debug_sync='now SIGNAL d2_cont';
|
||||
KILL THD_ID;
|
||||
SET debug_sync='now WAIT_FOR t3_killed';
|
||||
SET debug_sync='now SIGNAL t1_cont';
|
||||
include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
|
||||
STOP SLAVE IO_THREAD;
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
a b
|
||||
60 60
|
||||
61 61
|
||||
62 62
|
||||
63 63
|
||||
64 64
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
DROP FUNCTION foo;
|
||||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
||||
RETURNS INT DETERMINISTIC
|
||||
BEGIN
|
||||
RETURN x;
|
||||
END
|
||||
||
|
||||
SET sql_log_bin=1;
|
||||
INSERT INTO t3 VALUES (69,0);
|
||||
include/start_slave.inc
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
a b
|
||||
60 60
|
||||
61 61
|
||||
62 62
|
||||
63 63
|
||||
64 64
|
||||
65 65
|
||||
66 66
|
||||
67 67
|
||||
69 0
|
||||
SET sql_log_bin=0;
|
||||
DROP FUNCTION foo;
|
||||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
||||
RETURNS INT DETERMINISTIC
|
||||
BEGIN
|
||||
IF d1 != '' THEN
|
||||
SET debug_sync = d1;
|
||||
END IF;
|
||||
IF d2 != '' THEN
|
||||
SET debug_sync = d2;
|
||||
END IF;
|
||||
RETURN x;
|
||||
END
|
||||
||
|
||||
SET sql_log_bin=1;
|
||||
include/stop_slave.inc
|
||||
SET GLOBAL binlog_format=@old_format;
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
include/start_slave.inc
|
||||
include/stop_slave.inc
|
||||
|
|
|
@ -402,6 +402,7 @@ SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
|
|||
SET sql_log_bin=0;
|
||||
CALL mtr.add_suppression("Query execution was interrupted");
|
||||
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
|
||||
CALL mtr.add_suppression("Slave: Connection was killed");
|
||||
SET sql_log_bin=1;
|
||||
# Wait until T2 is inside executing its insert of 32, then find it in SHOW
|
||||
# PROCESSLIST to know its thread id for KILL later.
|
||||
|
@ -745,6 +746,201 @@ SET sql_log_bin=1;
|
|||
--source include/stop_slave.inc
|
||||
CHANGE MASTER TO master_use_gtid=slave_pos;
|
||||
--source include/start_slave.inc
|
||||
|
||||
--connection server_2
|
||||
# Respawn all worker threads to clear any left-over debug_sync or other stuff.
|
||||
--source include/stop_slave.inc
|
||||
SET GLOBAL binlog_format=@old_format;
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=3;
|
||||
--source include/start_slave.inc
|
||||
|
||||
|
||||
--echo *** 4. Test killing thread that is waiting to start transaction until previous transaction commits ***
|
||||
|
||||
# We set up four transactions T1, T2, T3, and T4 on the master. T2, T3, and T4
|
||||
# can run in parallel with each other (same group commit and commit id),
|
||||
# but not in parallel with T1.
|
||||
#
|
||||
# We use three worker threads. T1 and T2 will be queued on the first, T3 on
|
||||
# the second, and T4 on the third. We will delay T1 commit, T3 will wait for
|
||||
# T1 to commit before it can start. We will kill T3 during this wait, and
|
||||
# check that everything works correctly.
|
||||
#
|
||||
# It is rather tricky to get the correct thread id of the worker to kill.
|
||||
# We start by injecting three dummy transactions in a debug_sync-controlled
|
||||
# manner to be able to get known thread ids for the workers in a pool with
|
||||
# just 3 worker threads. Then we let in each of the real test transactions
|
||||
# T1-T4 one at a time in a way which allows us to know which transaction
|
||||
# ends up with which thread id.
|
||||
|
||||
--connection server_1
|
||||
SET binlog_format=statement;
|
||||
SET gtid_domain_id=2;
|
||||
INSERT INTO t3 VALUES (60, foo(60,
|
||||
'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
|
||||
SET gtid_domain_id=0;
|
||||
|
||||
--connection server_2
|
||||
SET debug_sync='now WAIT_FOR d2_query';
|
||||
--let $d2_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(60%' AND INFO NOT LIKE '%LIKE%'`
|
||||
|
||||
--connection server_1
|
||||
SET gtid_domain_id=1;
|
||||
BEGIN;
|
||||
# These debug_sync's will linger on and be used to control T3 later.
|
||||
INSERT INTO t3 VALUES (61, foo(61,
|
||||
'rpl_parallel_start_waiting_for_prior SIGNAL t3_waiting',
|
||||
'rpl_parallel_start_waiting_for_prior_killed SIGNAL t3_killed'));
|
||||
INSERT INTO t3 VALUES (62, foo(62,
|
||||
'ha_write_row_end SIGNAL d1_query WAIT_FOR d1_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d1_done WAIT_FOR d1_cont'));
|
||||
COMMIT;
|
||||
SET gtid_domain_id=0;
|
||||
|
||||
--connection server_2
|
||||
SET debug_sync='now WAIT_FOR d1_query';
|
||||
--let $d1_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(62%' AND INFO NOT LIKE '%LIKE%'`
|
||||
|
||||
--connection server_1
|
||||
SET gtid_domain_id=0;
|
||||
INSERT INTO t3 VALUES (63, foo(63,
|
||||
'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont'));
|
||||
|
||||
--connection server_2
|
||||
SET debug_sync='now WAIT_FOR d0_query';
|
||||
--let $d0_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(63%' AND INFO NOT LIKE '%LIKE%'`
|
||||
|
||||
SET debug_sync='now SIGNAL d2_cont2';
|
||||
SET debug_sync='now WAIT_FOR d2_done';
|
||||
SET debug_sync='now SIGNAL d1_cont2';
|
||||
SET debug_sync='now WAIT_FOR d1_done';
|
||||
SET debug_sync='now SIGNAL d0_cont2';
|
||||
SET debug_sync='now WAIT_FOR d0_done';
|
||||
|
||||
# Now prepare the real transactions T1, T2, T3, T4 on the master.
|
||||
|
||||
--connection con_temp3
|
||||
# Create transaction T1.
|
||||
SET binlog_format=statement;
|
||||
INSERT INTO t3 VALUES (64, foo(64,
|
||||
'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
|
||||
|
||||
# Create transaction T2, as a group commit leader on the master.
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
|
||||
send INSERT INTO t3 VALUES (65, foo(65, '', ''));
|
||||
|
||||
--connection server_1
|
||||
SET debug_sync='now WAIT_FOR master_queued2';
|
||||
|
||||
--connection con_temp4
|
||||
# Create transaction T3, participating in T2's group commit.
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
|
||||
send INSERT INTO t3 VALUES (66, foo(66, '', ''));
|
||||
|
||||
--connection server_1
|
||||
SET debug_sync='now WAIT_FOR master_queued3';
|
||||
|
||||
--connection con_temp5
|
||||
# Create transaction T4, participating in group commit with T2 and T3.
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued4';
|
||||
send INSERT INTO t3 VALUES (67, foo(67, '', ''));
|
||||
|
||||
--connection server_1
|
||||
SET debug_sync='now WAIT_FOR master_queued4';
|
||||
SET debug_sync='now SIGNAL master_cont2';
|
||||
|
||||
--connection con_temp3
|
||||
REAP;
|
||||
--connection con_temp4
|
||||
REAP;
|
||||
--connection con_temp5
|
||||
REAP;
|
||||
|
||||
--connection server_1
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
|
||||
--connection server_2
|
||||
# Now we have the four transactions pending for replication on the slave.
|
||||
# Let them be queued for our three worker threads in a controlled fashion.
|
||||
# We put them at a stage where T1 is delayed and T3 is waiting for T1 to
|
||||
# commit before T3 can start. Then we kill T3.
|
||||
|
||||
# Make the worker D0 free, and wait for T1 to be queued in it.
|
||||
SET debug_sync='now SIGNAL d0_cont';
|
||||
SET debug_sync='now WAIT_FOR t1_waiting';
|
||||
|
||||
# T2 will be queued on the same worker D0 as T1.
|
||||
# Now release worker D1, and wait for T3 to be queued in it.
|
||||
# T3 will wait for T1 to commit before it can start.
|
||||
SET debug_sync='now SIGNAL d1_cont';
|
||||
SET debug_sync='now WAIT_FOR t3_waiting';
|
||||
|
||||
# Release worker D2. T4 may or may not have time to be queued on it, but
|
||||
# it will not be able to complete due to T3 being killed.
|
||||
SET debug_sync='now SIGNAL d2_cont';
|
||||
|
||||
# Now we kill the waiting transaction T3 in worker D1.
|
||||
--replace_result $d1_thd_id THD_ID
|
||||
eval KILL $d1_thd_id;
|
||||
|
||||
# Wait until T3 has reacted on the kill.
|
||||
SET debug_sync='now WAIT_FOR t3_killed';
|
||||
|
||||
# Now we can allow T1 to proceed.
|
||||
SET debug_sync='now SIGNAL t1_cont';
|
||||
|
||||
--let $slave_sql_errno= 1317,1927,1963
|
||||
--source include/wait_for_slave_sql_error.inc
|
||||
STOP SLAVE IO_THREAD;
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
|
||||
# Now we have to disable the debug_sync statements, so they do not trigger
|
||||
# when the events are retried.
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
DROP FUNCTION foo;
|
||||
--delimiter ||
|
||||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
||||
RETURNS INT DETERMINISTIC
|
||||
BEGIN
|
||||
RETURN x;
|
||||
END
|
||||
||
|
||||
--delimiter ;
|
||||
SET sql_log_bin=1;
|
||||
|
||||
--connection server_1
|
||||
INSERT INTO t3 VALUES (69,0);
|
||||
--save_master_pos
|
||||
|
||||
--connection server_2
|
||||
--source include/start_slave.inc
|
||||
--sync_with_master
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
# Restore the foo() function.
|
||||
SET sql_log_bin=0;
|
||||
DROP FUNCTION foo;
|
||||
--delimiter ||
|
||||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
||||
RETURNS INT DETERMINISTIC
|
||||
BEGIN
|
||||
IF d1 != '' THEN
|
||||
SET debug_sync = d1;
|
||||
END IF;
|
||||
IF d2 != '' THEN
|
||||
SET debug_sync = d2;
|
||||
END IF;
|
||||
RETURN x;
|
||||
END
|
||||
||
|
||||
--delimiter ;
|
||||
SET sql_log_bin=1;
|
||||
|
||||
|
||||
--connection server_2
|
||||
--source include/stop_slave.inc
|
||||
SET GLOBAL binlog_format=@old_format;
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
#include "rpl_parallel.h"
|
||||
#include "slave.h"
|
||||
#include "rpl_mi.h"
|
||||
#include "debug_sync.h"
|
||||
|
||||
|
||||
/*
|
||||
|
@ -219,6 +220,7 @@ handle_rpl_parallel_thread(void *arg)
|
|||
thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
|
||||
set_slave_thread_options(thd);
|
||||
thd->client_capabilities = CLIENT_LOCAL_FILES;
|
||||
thd->net.reading_or_writing= 0;
|
||||
thd_proc_info(thd, "Waiting for work from main SQL threads");
|
||||
thd->set_time();
|
||||
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
|
||||
|
@ -308,6 +310,7 @@ handle_rpl_parallel_thread(void *arg)
|
|||
"Waiting for prior transaction to commit "
|
||||
"before starting next transaction");
|
||||
did_enter_cond= true;
|
||||
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
|
||||
while (wait_start_sub_id > entry->last_committed_sub_id &&
|
||||
!thd->check_killed())
|
||||
mysql_cond_wait(&entry->COND_parallel_entry,
|
||||
|
@ -315,6 +318,7 @@ handle_rpl_parallel_thread(void *arg)
|
|||
if (wait_start_sub_id > entry->last_committed_sub_id)
|
||||
{
|
||||
/* The thread got a kill signal. */
|
||||
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
|
||||
thd->send_kill_message();
|
||||
slave_output_error_info(rgi->rli, thd);
|
||||
signal_error_to_sql_driver_thread(thd, rgi);
|
||||
|
@ -383,6 +387,7 @@ handle_rpl_parallel_thread(void *arg)
|
|||
&rgi->commit_orderer);
|
||||
delete rgi;
|
||||
group_rgi= rgi= NULL;
|
||||
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
|
||||
}
|
||||
|
||||
events= next;
|
||||
|
@ -843,6 +848,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||
However, the commit of this event must wait for the commit of the prior
|
||||
event, to preserve binlog commit order and visibility across all
|
||||
servers in the replication hierarchy.
|
||||
|
||||
In addition, we must not start executing this event until we have
|
||||
finished the previous collection of event groups that group-committed
|
||||
together; we use rgi->wait_start_sub_id to control this.
|
||||
*/
|
||||
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
|
||||
rgi->wait_commit_sub_id= e->current_sub_id;
|
||||
|
|
Loading…
Add table
Reference in a new issue