From 86a2c03b51b2995a0f35ae448e3bf953b90060ad Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 18 Dec 2013 16:26:22 +0100 Subject: [PATCH] MDEV-5363: Make parallel replication waits killable Add another test case. This one for killing a worker while its transaction is waiting to start until the previous transaction has committed. Fix setting reading_or_writing to 0 in worker threads so SHOW SLAVE STATUS can show something more useful than "Reading from net". --- mysql-test/suite/rpl/r/rpl_parallel.result | 115 ++++++++++++ mysql-test/suite/rpl/t/rpl_parallel.test | 196 +++++++++++++++++++++ sql/rpl_parallel.cc | 9 + 3 files changed, 320 insertions(+) diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result index 9dc03789714..ef88ecfef44 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel.result +++ b/mysql-test/suite/rpl/r/rpl_parallel.result @@ -295,6 +295,7 @@ a b SET sql_log_bin=0; CALL mtr.add_suppression("Query execution was interrupted"); CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends"); +CALL mtr.add_suppression("Slave: Connection was killed"); SET sql_log_bin=1; SET debug_sync='now WAIT_FOR t2_query'; SET debug_sync='now SIGNAL t2_cont'; @@ -513,6 +514,120 @@ include/start_slave.inc include/stop_slave.inc SET GLOBAL binlog_format=@old_format; SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=3; +include/start_slave.inc +*** 4. Test killing thread that is waiting to start transaction until previous transaction commits *** +SET binlog_format=statement; +SET gtid_domain_id=2; +INSERT INTO t3 VALUES (60, foo(60, +'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2', +'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont')); +SET gtid_domain_id=0; +SET debug_sync='now WAIT_FOR d2_query'; +SET gtid_domain_id=1; +BEGIN; +INSERT INTO t3 VALUES (61, foo(61, +'rpl_parallel_start_waiting_for_prior SIGNAL t3_waiting', +'rpl_parallel_start_waiting_for_prior_killed SIGNAL t3_killed')); +INSERT INTO t3 VALUES (62, foo(62, +'ha_write_row_end SIGNAL d1_query WAIT_FOR d1_cont2', +'rpl_parallel_end_of_group SIGNAL d1_done WAIT_FOR d1_cont')); +COMMIT; +SET gtid_domain_id=0; +SET debug_sync='now WAIT_FOR d1_query'; +SET gtid_domain_id=0; +INSERT INTO t3 VALUES (63, foo(63, +'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2', +'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont')); +SET debug_sync='now WAIT_FOR d0_query'; +SET debug_sync='now SIGNAL d2_cont2'; +SET debug_sync='now WAIT_FOR d2_done'; +SET debug_sync='now SIGNAL d1_cont2'; +SET debug_sync='now WAIT_FOR d1_done'; +SET debug_sync='now SIGNAL d0_cont2'; +SET debug_sync='now WAIT_FOR d0_done'; +SET binlog_format=statement; +INSERT INTO t3 VALUES (64, foo(64, +'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', '')); +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2'; +INSERT INTO t3 VALUES (65, foo(65, '', '')); +SET debug_sync='now WAIT_FOR master_queued2'; +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3'; +INSERT INTO t3 VALUES (66, foo(66, '', '')); +SET debug_sync='now WAIT_FOR master_queued3'; +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued4'; +INSERT INTO t3 VALUES (67, foo(67, '', '')); +SET debug_sync='now WAIT_FOR master_queued4'; +SET debug_sync='now SIGNAL master_cont2'; +SELECT * FROM t3 WHERE a >= 60 ORDER BY a; +a b +60 60 +61 61 +62 62 +63 63 +64 64 +65 65 +66 66 +67 67 +SET debug_sync='now SIGNAL d0_cont'; +SET debug_sync='now WAIT_FOR t1_waiting'; +SET debug_sync='now SIGNAL d1_cont'; +SET debug_sync='now WAIT_FOR t3_waiting'; +SET debug_sync='now SIGNAL d2_cont'; +KILL THD_ID; +SET debug_sync='now WAIT_FOR t3_killed'; +SET debug_sync='now SIGNAL t1_cont'; +include/wait_for_slave_sql_error.inc [errno=1317,1927,1963] +STOP SLAVE IO_THREAD; +SELECT * FROM t3 WHERE a >= 60 ORDER BY a; +a b +60 60 +61 61 +62 62 +63 63 +64 64 +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; +SET sql_log_bin=0; +DROP FUNCTION foo; +CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) +RETURNS INT DETERMINISTIC +BEGIN +RETURN x; +END +|| +SET sql_log_bin=1; +INSERT INTO t3 VALUES (69,0); +include/start_slave.inc +SELECT * FROM t3 WHERE a >= 60 ORDER BY a; +a b +60 60 +61 61 +62 62 +63 63 +64 64 +65 65 +66 66 +67 67 +69 0 +SET sql_log_bin=0; +DROP FUNCTION foo; +CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) +RETURNS INT DETERMINISTIC +BEGIN +IF d1 != '' THEN +SET debug_sync = d1; +END IF; +IF d2 != '' THEN +SET debug_sync = d2; +END IF; +RETURN x; +END +|| +SET sql_log_bin=1; +include/stop_slave.inc +SET GLOBAL binlog_format=@old_format; +SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; include/start_slave.inc include/stop_slave.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index a1eb47f34f8..94d3b53974a 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -402,6 +402,7 @@ SELECT * FROM t3 WHERE a >= 30 ORDER BY a; SET sql_log_bin=0; CALL mtr.add_suppression("Query execution was interrupted"); CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends"); +CALL mtr.add_suppression("Slave: Connection was killed"); SET sql_log_bin=1; # Wait until T2 is inside executing its insert of 32, then find it in SHOW # PROCESSLIST to know its thread id for KILL later. @@ -745,6 +746,201 @@ SET sql_log_bin=1; --source include/stop_slave.inc CHANGE MASTER TO master_use_gtid=slave_pos; --source include/start_slave.inc + +--connection server_2 +# Respawn all worker threads to clear any left-over debug_sync or other stuff. +--source include/stop_slave.inc +SET GLOBAL binlog_format=@old_format; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=3; +--source include/start_slave.inc + + +--echo *** 4. Test killing thread that is waiting to start transaction until previous transaction commits *** + +# We set up four transactions T1, T2, T3, and T4 on the master. T2, T3, and T4 +# can run in parallel with each other (same group commit and commit id), +# but not in parallel with T1. +# +# We use three worker threads. T1 and T2 will be queued on the first, T3 on +# the second, and T4 on the third. We will delay T1 commit, T3 will wait for +# T1 to commit before it can start. We will kill T3 during this wait, and +# check that everything works correctly. +# +# It is rather tricky to get the correct thread id of the worker to kill. +# We start by injecting three dummy transactions in a debug_sync-controlled +# manner to be able to get known thread ids for the workers in a pool with +# just 3 worker threads. Then we let in each of the real test transactions +# T1-T4 one at a time in a way which allows us to know which transaction +# ends up with which thread id. + +--connection server_1 +SET binlog_format=statement; +SET gtid_domain_id=2; +INSERT INTO t3 VALUES (60, foo(60, + 'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2', + 'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont')); +SET gtid_domain_id=0; + +--connection server_2 +SET debug_sync='now WAIT_FOR d2_query'; +--let $d2_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(60%' AND INFO NOT LIKE '%LIKE%'` + +--connection server_1 +SET gtid_domain_id=1; +BEGIN; +# These debug_sync's will linger on and be used to control T3 later. +INSERT INTO t3 VALUES (61, foo(61, + 'rpl_parallel_start_waiting_for_prior SIGNAL t3_waiting', + 'rpl_parallel_start_waiting_for_prior_killed SIGNAL t3_killed')); +INSERT INTO t3 VALUES (62, foo(62, + 'ha_write_row_end SIGNAL d1_query WAIT_FOR d1_cont2', + 'rpl_parallel_end_of_group SIGNAL d1_done WAIT_FOR d1_cont')); +COMMIT; +SET gtid_domain_id=0; + +--connection server_2 +SET debug_sync='now WAIT_FOR d1_query'; +--let $d1_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(62%' AND INFO NOT LIKE '%LIKE%'` + +--connection server_1 +SET gtid_domain_id=0; +INSERT INTO t3 VALUES (63, foo(63, + 'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2', + 'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont')); + +--connection server_2 +SET debug_sync='now WAIT_FOR d0_query'; +--let $d0_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(63%' AND INFO NOT LIKE '%LIKE%'` + +SET debug_sync='now SIGNAL d2_cont2'; +SET debug_sync='now WAIT_FOR d2_done'; +SET debug_sync='now SIGNAL d1_cont2'; +SET debug_sync='now WAIT_FOR d1_done'; +SET debug_sync='now SIGNAL d0_cont2'; +SET debug_sync='now WAIT_FOR d0_done'; + +# Now prepare the real transactions T1, T2, T3, T4 on the master. + +--connection con_temp3 +# Create transaction T1. +SET binlog_format=statement; +INSERT INTO t3 VALUES (64, foo(64, + 'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', '')); + +# Create transaction T2, as a group commit leader on the master. +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2'; +send INSERT INTO t3 VALUES (65, foo(65, '', '')); + +--connection server_1 +SET debug_sync='now WAIT_FOR master_queued2'; + +--connection con_temp4 +# Create transaction T3, participating in T2's group commit. +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3'; +send INSERT INTO t3 VALUES (66, foo(66, '', '')); + +--connection server_1 +SET debug_sync='now WAIT_FOR master_queued3'; + +--connection con_temp5 +# Create transaction T4, participating in group commit with T2 and T3. +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued4'; +send INSERT INTO t3 VALUES (67, foo(67, '', '')); + +--connection server_1 +SET debug_sync='now WAIT_FOR master_queued4'; +SET debug_sync='now SIGNAL master_cont2'; + +--connection con_temp3 +REAP; +--connection con_temp4 +REAP; +--connection con_temp5 +REAP; + +--connection server_1 +SELECT * FROM t3 WHERE a >= 60 ORDER BY a; + +--connection server_2 +# Now we have the four transactions pending for replication on the slave. +# Let them be queued for our three worker threads in a controlled fashion. +# We put them at a stage where T1 is delayed and T3 is waiting for T1 to +# commit before T3 can start. Then we kill T3. + +# Make the worker D0 free, and wait for T1 to be queued in it. +SET debug_sync='now SIGNAL d0_cont'; +SET debug_sync='now WAIT_FOR t1_waiting'; + +# T2 will be queued on the same worker D0 as T1. +# Now release worker D1, and wait for T3 to be queued in it. +# T3 will wait for T1 to commit before it can start. +SET debug_sync='now SIGNAL d1_cont'; +SET debug_sync='now WAIT_FOR t3_waiting'; + +# Release worker D2. T4 may or may not have time to be queued on it, but +# it will not be able to complete due to T3 being killed. +SET debug_sync='now SIGNAL d2_cont'; + +# Now we kill the waiting transaction T3 in worker D1. +--replace_result $d1_thd_id THD_ID +eval KILL $d1_thd_id; + +# Wait until T3 has reacted on the kill. +SET debug_sync='now WAIT_FOR t3_killed'; + +# Now we can allow T1 to proceed. +SET debug_sync='now SIGNAL t1_cont'; + +--let $slave_sql_errno= 1317,1927,1963 +--source include/wait_for_slave_sql_error.inc +STOP SLAVE IO_THREAD; +SELECT * FROM t3 WHERE a >= 60 ORDER BY a; + +# Now we have to disable the debug_sync statements, so they do not trigger +# when the events are retried. +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; +SET sql_log_bin=0; +DROP FUNCTION foo; +--delimiter || +CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) + RETURNS INT DETERMINISTIC + BEGIN + RETURN x; + END +|| +--delimiter ; +SET sql_log_bin=1; + +--connection server_1 +INSERT INTO t3 VALUES (69,0); +--save_master_pos + +--connection server_2 +--source include/start_slave.inc +--sync_with_master +SELECT * FROM t3 WHERE a >= 60 ORDER BY a; +# Restore the foo() function. +SET sql_log_bin=0; +DROP FUNCTION foo; +--delimiter || +CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) + RETURNS INT DETERMINISTIC + BEGIN + IF d1 != '' THEN + SET debug_sync = d1; + END IF; + IF d2 != '' THEN + SET debug_sync = d2; + END IF; + RETURN x; + END +|| +--delimiter ; +SET sql_log_bin=1; + + --connection server_2 --source include/stop_slave.inc SET GLOBAL binlog_format=@old_format; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 91aa36abc52..3cdd1f5ec8d 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -2,6 +2,7 @@ #include "rpl_parallel.h" #include "slave.h" #include "rpl_mi.h" +#include "debug_sync.h" /* @@ -219,6 +220,7 @@ handle_rpl_parallel_thread(void *arg) thd->variables.log_slow_filter= global_system_variables.log_slow_filter; set_slave_thread_options(thd); thd->client_capabilities = CLIENT_LOCAL_FILES; + thd->net.reading_or_writing= 0; thd_proc_info(thd, "Waiting for work from main SQL threads"); thd->set_time(); thd->variables.lock_wait_timeout= LONG_TIMEOUT; @@ -308,6 +310,7 @@ handle_rpl_parallel_thread(void *arg) "Waiting for prior transaction to commit " "before starting next transaction"); did_enter_cond= true; + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); while (wait_start_sub_id > entry->last_committed_sub_id && !thd->check_killed()) mysql_cond_wait(&entry->COND_parallel_entry, @@ -315,6 +318,7 @@ handle_rpl_parallel_thread(void *arg) if (wait_start_sub_id > entry->last_committed_sub_id) { /* The thread got a kill signal. */ + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); thd->send_kill_message(); slave_output_error_info(rgi->rli, thd); signal_error_to_sql_driver_thread(thd, rgi); @@ -383,6 +387,7 @@ handle_rpl_parallel_thread(void *arg) &rgi->commit_orderer); delete rgi; group_rgi= rgi= NULL; + DEBUG_SYNC(thd, "rpl_parallel_end_of_group"); } events= next; @@ -843,6 +848,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, However, the commit of this event must wait for the commit of the prior event, to preserve binlog commit order and visibility across all servers in the replication hierarchy. + + In addition, we must not start executing this event until we have + finished the previous collection of event groups that group-committed + together; we use rgi->wait_start_sub_id to control this. */ rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e); rgi->wait_commit_sub_id= e->current_sub_id;