From 0c1f97b3ab7befdae522aaa738be00f6700cee8e Mon Sep 17 00:00:00 2001 From: Andrei Elkin Date: Tue, 5 May 2020 20:32:32 +0300 Subject: [PATCH] MDEV-15152 Optimistic parallel slave doesnt cope well with START SLAVE UNTIL The immediate bug was caused by a failure to recognize a correct position to stop the slave applier run in optimistic parallel mode. There were the following set of issues that the analysis unveil. 1 incorrect estimate for the event binlog position passed to is_until_satisfied 2 wait for workers to complete by the driver thread did not account non-group events that could be left unprocessed and thus to mix up the last executed binlog group's file and position: the file remained old and the position related to the new rotated file 3 incorrect 'slave reached file:pos' by the parallel slave report in the error log 4 relay log UNTIL missed out the parallel slave branch in is_until_satisfied. The patch addresses all of them to simplify logics of log change notification in either the master and relay-log until case. P.1 is addressed with passing the event into is_until_satisfied() for proper analisis by the function. P.2 is fixed by changes in handle_queued_pos_update(). P.4 required removing relay-log change notification by workers. Instead the driver thread updates the notion of the current relay-log fully itself with aid of introduced bool Relay_log_info::until_relay_log_names_defer. An extra print out of the requested until file:pos is arranged with --log-warning=3. --- .../r/rpl_parallel_optimistic_until.result | 291 +++++++++++ .../rpl/t/rpl_parallel_optimistic_until.test | 465 ++++++++++++++++++ sql/rpl_parallel.cc | 6 +- sql/rpl_rli.cc | 65 ++- sql/rpl_rli.h | 21 +- sql/slave.cc | 31 +- 6 files changed, 859 insertions(+), 20 deletions(-) create mode 100644 mysql-test/suite/rpl/r/rpl_parallel_optimistic_until.result create mode 100644 mysql-test/suite/rpl/t/rpl_parallel_optimistic_until.test diff --git a/mysql-test/suite/rpl/r/rpl_parallel_optimistic_until.result b/mysql-test/suite/rpl/r/rpl_parallel_optimistic_until.result new file mode 100644 index 00000000000..a83a9b61b9f --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic_until.result @@ -0,0 +1,291 @@ +include/master-slave.inc +[connection master] +connection slave; +include/stop_slave.inc +RESET MASTER; +RESET SLAVE; +connection master; +RESET MASTER; +CREATE TABLE t1 (a int primary key, b text) ENGINE=InnoDB; +INSERT INTO t1 SET a=25, b='trx0'; +connection slave; +include/start_slave.inc +connection master; +connection slave; +include/stop_slave.inc +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=2; +SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode; +SET GLOBAL slave_parallel_mode='optimistic'; +connection slave; +SET @old_max_relay_log_size = @@global.max_relay_log_size; +SET @@global.max_relay_log_size=4096; +connection master; +BEGIN; +INSERT INTO t1 SET a=1, b='trx1'; +INSERT INTO t1 SET a=2, b='trx1'; +INSERT INTO t1 SET a=3, b='trx1'; +INSERT INTO t1 SET a=4, b='trx1'; +INSERT INTO t1 SET a=5, b='trx1'; +INSERT INTO t1 SET a=6, b='trx1'; +INSERT INTO t1 SET a=7, b='trx1'; +INSERT INTO t1 SET a=8, b='trx1'; +INSERT INTO t1 SET a=9, b='trx1'; +INSERT INTO t1 SET a=10, b='trx1'; +INSERT INTO t1 SET a=11, b='trx1'; +INSERT INTO t1 SET a=12, b='trx1'; +INSERT INTO t1 SET a=13, b='trx1'; +INSERT INTO t1 SET a=14, b='trx1'; +INSERT INTO t1 SET a=15, b='trx1'; +INSERT INTO t1 SET a=16, b='trx1'; +INSERT INTO t1 SET a=17, b='trx1'; +INSERT INTO t1 SET a=18, b='trx1'; +INSERT INTO t1 SET a=19, b='trx1'; +INSERT INTO t1 SET a=20, b='trx1'; +INSERT INTO t1 SET a=21, b='trx1'; +INSERT INTO t1 SET a=22, b='trx1'; +INSERT INTO t1 SET a=23, b='trx1'; +INSERT INTO t1 SET a=24, b='trx1'; +COMMIT; +FLUSH LOGS; +BEGIN; +UPDATE t1 SET b='trx2_0' WHERE a = 25; +UPDATE t1 SET b='trx2' WHERE a = 25; +COMMIT; +INSERT INTO t1 SET a=26,b='trx3'; +*** case 1 UNTIL inside trx2 +connection slave1; +BEGIN; +INSERT INTO t1 SET a= 1; +connection slave; +SELECT <= AND < as "pos_until < trx0 and is within trx2"; +pos_until < trx0 and is within trx2 +1 +CHANGE MASTER TO MASTER_USE_GTID=no; +START SLAVE UNTIL MASTER_LOG_FILE = 'file_2', MASTER_LOG_POS = ; +connection slave1; +ROLLBACK; +Proof 1: Correct stop +connection slave; +include/wait_for_slave_sql_to_stop.inc +SELECT count(*) = 1 as 'trx2 is committed' FROM t1 WHERE b = 'trx2'; +trx2 is committed +1 +SELECT count(*) = 0 as 'trx3 is not committed' FROM t1 WHERE b = 'trx3'; +trx3 is not committed +1 +Proof 2: Resume works out +include/start_slave.inc +connection master; +connection slave; +*** case 2 UNTIL inside trx2 +connection slave; +DELETE FROM t1 WHERE a <> 25; +UPDATE t1 SET b='trx0' WHERE a = 25; +connection slave1; +BEGIN; +INSERT INTO t1 SET a= 1; +connection slave; +include/stop_slave.inc +SELECT <= AND < as "pos_until >= trx0 and is within trx2"; +pos_until >= trx0 and is within trx2 +1 +CHANGE MASTER TO MASTER_LOG_FILE = 'file_1', MASTER_LOG_POS = , MASTER_USE_GTID=no; +START SLAVE UNTIL MASTER_LOG_FILE = 'file_2', MASTER_LOG_POS = ; +connection slave1; +ROLLBACK; +Proof 1: Correct stop +connection slave; +include/wait_for_slave_sql_to_stop.inc +SELECT count(*) = 1 as 'trx2 is committed' FROM t1 WHERE b = 'trx2'; +trx2 is committed +1 +SELECT count(*) = 0 as 'trx3 is not committed' FROM t1 WHERE b = 'trx3'; +trx3 is not committed +1 +Proof 2: Resume works out +include/start_slave.inc +connection master; +connection slave; +*** case 3 UNTIL inside trx1 +connection slave; +DELETE FROM t1 WHERE a <> 25; +UPDATE t1 SET b='trx0' WHERE a = 25; +connection slave1; +BEGIN; +INSERT INTO t1 SET a= 1; # block trx1; +connection slave; +include/stop_slave.inc +SELECT < as "pos_until before trx2 start position"; +pos_until before trx2 start position +1 +CHANGE MASTER TO MASTER_LOG_FILE = 'file_1', MASTER_LOG_POS = , MASTER_USE_GTID=no; +START SLAVE UNTIL MASTER_LOG_FILE = 'file_2', MASTER_LOG_POS = ; +connection slave1; +ROLLBACK; +Proof 1: Correct stop +connection slave; +include/wait_for_slave_sql_to_stop.inc +SELECT count(*) = 25-1 as 'trx1 is committed' FROM t1 WHERE b = 'trx1'; +trx1 is committed +1 +SELECT count(*) = 0 as 'trx2 is not committed' FROM t1 WHERE b = 'trx2'; +trx2 is not committed +1 +Proof 2: Resume works out +include/start_slave.inc +connection master; +connection slave; +*** case 4 Relay-log UNTIL inside trx1 +connection slave; +DELETE FROM t1 WHERE a <> 25; +UPDATE t1 SET b='trx0' WHERE a = 25; +connection slave1; +BEGIN; +INSERT INTO t1 SET a= 1; # block trx1; +connection slave; +include/stop_slave.inc +CHANGE MASTER TO MASTER_LOG_FILE = 'file_1', MASTER_LOG_POS = , MASTER_USE_GTID=no; +START SLAVE IO_THREAD; +include/wait_for_slave_io_to_start.inc +START SLAVE UNTIL RELAY_LOG_FILE = 'file_2', RELAY_LOG_POS = ; +connection slave1; +ROLLBACK; +Proof 1: Correct stop +connection slave; +include/wait_for_slave_sql_to_stop.inc +SELECT count(*) = 25-1 as 'trx1 is committed' FROM t1 WHERE b = 'trx1'; +trx1 is committed +1 +SELECT count(*) = 0 as 'trx2 is not committed' FROM t1 WHERE b = 'trx2'; +trx2 is not committed +1 +Proof 2: Resume works out +include/start_slave.inc +connection master; +connection slave; +*** case 5 Relay-log UNTIL inside a "big" trx that spawns few relay logs +connection master; +CREATE TABLE t2 (a TEXT) ENGINE=InnoDB; +FLUSH LOGS; +connection slave; +connection slave; +include/stop_slave.inc +connection master; +BEGIN; +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +INSERT INTO t2 SET a=repeat('a',1024); +COMMIT; +INSERT INTO t2 SET a='a'; +connection slave; +START SLAVE IO_THREAD; +include/wait_for_slave_io_to_start.inc +START SLAVE UNTIL RELAY_LOG_FILE = 'file_2', RELAY_LOG_POS = ; +Proof 1: Correct stop +connection slave; +include/wait_for_slave_sql_to_stop.inc +Proof 2: Resume works out +include/start_slave.inc +connection master; +connection slave; +include/diff_tables.inc [master:t2,slave:t2] +*** case 6 Relay-log UNTIL inside a small trx inside a sequence of relay logs +connection slave; +include/stop_slave.inc +connection master; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +BEGIN; +DELETE FROM t2 LIMIT 1; +COMMIT; +COMMIT; +connection slave; +START SLAVE IO_THREAD; +include/wait_for_slave_io_to_start.inc +connection master; +include/sync_slave_io_with_master.inc +connection slave; +START SLAVE UNTIL RELAY_LOG_FILE = 'file_2', RELAY_LOG_POS = ; +Proof 1: Correct stop +connection slave; +include/wait_for_slave_sql_to_stop.inc +Proof 2: Resume works out +include/start_slave.inc +connection master; +connection slave; +include/diff_tables.inc [master:t2,slave:t2] +connection slave; +include/stop_slave.inc +SET GLOBAL max_relay_log_size=@old_max_relay_log_size; +SET GLOBAL slave_parallel_mode=@old_parallel_mode; +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +include/start_slave.inc +connection master; +DROP TABLE t1, t2; +connection slave; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel_optimistic_until.test b/mysql-test/suite/rpl/t/rpl_parallel_optimistic_until.test new file mode 100644 index 00000000000..508213c9075 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic_until.test @@ -0,0 +1,465 @@ +--source include/have_innodb.inc +--source include/have_debug.inc +--source include/have_debug_sync.inc +--source include/master-slave.inc +# Format is restricted because the test expects a specific result of +# relay-logging that splits a transaction into two different files. +--source include/have_binlog_format_row.inc + +# +# MDEV-15152 Optimistic parallel slave doesn't cope well with START SLAVE UNTIL +# +--connection slave +--source include/stop_slave.inc +RESET MASTER; +RESET SLAVE; + +--connection master +RESET MASTER; +CREATE TABLE t1 (a int primary key, b text) ENGINE=InnoDB; +--let $a0 = 25 +--eval INSERT INTO t1 SET a=$a0, b='trx0' +# Memorize the position for replication restart from it +--let $pos_trx0 = query_get_value(SHOW MASTER STATUS, Position, 1) + +--connection slave +--source include/start_slave.inc + +--connection master +# --connection slave +--sync_slave_with_master +--source include/stop_slave.inc +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=2; +SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode; +SET GLOBAL slave_parallel_mode='optimistic'; + +# Run the slave in the following modes one by one. +# +# 1. the until position is set in the middle of trx2 +# below $pos_trx0 of the last exec position in the first file +# 2. and above $pos_trx0 +# In either case trx2 must commit before slave stops. +# 3. the until postion is inside trx1 +# 4. RELAY log until inside trx1 +# 5. RELAY log until inside a "big" trx +# 6. RELAY log until inside a trx within a sequence of relay logs +# +# Execution flaw for Until_Master_Pos cases follows as: +# create the transaction trx1, trx2 +# logged at the beginning of two subsequent binlog files. +# Set the until position to at the middle of the 2rd transaction. +# Engage the optimistic scheduler while having trx1 execution blocked. +# Lift the block after trx2 has reached waiting its order to commit. +# *Proof 1* +# Observe that the slave applier stops at a correct position. +# In the bug condition it would stop prematurely having the stop position +# in the first file, therefore trx2 not committed. +# Specifically, an internal transaction position until makes the applier to run +# beyond it to commit commit the current transaction. +# *Proof 2* +# Observe the following START SLAVE resumes OK. +# +# Auxiliary third trx3 on master is just for triggering the actual stop +# (whihc is a legacy UNTIL's property). +# trx0 is to produce a specific value of the last executed binlog file:pos +# to emulate the bug condition. +# +# Intermediate checks via SELECT are supposed to succeed +# with putting out value 1. +# +# NOTE: Relay log until tests have to use explicit log names and position +# which may require to adjust with future changes to event formats etc. +# + +--connection slave +SET @old_max_relay_log_size = @@global.max_relay_log_size; +SET @@global.max_relay_log_size=4096; + +--connection master +# trx1 +--let $a=1 +BEGIN; +while (`SELECT $a < $a0`) +{ + --eval INSERT INTO t1 SET a=$a, b='trx1' +--inc $a +} +COMMIT; +--let $fil_1 = query_get_value(SHOW MASTER STATUS, File, 1) +--let $pos_trx1 = query_get_value(SHOW MASTER STATUS, Position, 1) + +FLUSH LOGS; + +# $pos_0 the offset of the first event of trx2 in new file +--let $pos_0=query_get_value(SHOW MASTER STATUS, Position, 1) +# trx2 +--let $a=$a0 +BEGIN; +--eval UPDATE t1 SET b='trx2_0' WHERE a = $a +--eval UPDATE t1 SET b='trx2' WHERE a = $a +COMMIT; +--let $fil_2=query_get_value(SHOW MASTER STATUS, File, 1) +--let $pos_trx2=query_get_value(SHOW MASTER STATUS, Position, 1) + +# trx3 +--let $a=$a0 +--inc $a +--eval INSERT INTO t1 SET a=$a,b='trx3' +--let $pos_trx3=query_get_value(SHOW MASTER STATUS, Position, 1) +--let $a= + + +--echo *** case 1 UNTIL inside trx2 + +--connection slave1 +# Blocker to hold off EXEC_MASTER_LOG_POS advance +BEGIN; + --eval INSERT INTO t1 SET a= 1 +--connection slave +--let $pos_until=`SELECT $pos_trx0 - 1` +--replace_result $pos_0 $pos_until $pos_trx2 +--eval SELECT $pos_0 <= $pos_until AND $pos_until < $pos_trx2 as "pos_until < trx0 and is within trx2" +CHANGE MASTER TO MASTER_USE_GTID=no; +--replace_result $fil_2 file_2 $pos_until +--eval START SLAVE UNTIL MASTER_LOG_FILE = '$fil_2', MASTER_LOG_POS = $pos_until + +--let $wait_condition= SELECT COUNT(*) > 0 FROM information_schema.processlist WHERE state = "Waiting for prior transaction to commit" +--source include/wait_condition.inc + +--connection slave1 +# unblock to see the slave applier stops at $until +ROLLBACK; + +--echo Proof 1: Correct stop +--connection slave +--source include/wait_for_slave_sql_to_stop.inc +--let $file_stop= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1) +--let $pos_stop= query_get_value(SHOW SLAVE STATUS, Exec_Master_Log_Pos, 1) +if (`SELECT "$file_stop" != "$fil_2" OR $pos_stop < $pos_until`) +{ + --echo *** ERROR: Slave stopped at $file_stop:$pos_stop which is not $fil_2:$pos_until. + --die +} +--eval SELECT count(*) = 1 as 'trx2 is committed' FROM t1 WHERE b = 'trx2' +--eval SELECT count(*) = 0 as 'trx3 is not committed' FROM t1 WHERE b = 'trx3' + +--echo Proof 2: Resume works out +--source include/start_slave.inc +--connection master +--sync_slave_with_master + + +--echo *** case 2 UNTIL inside trx2 + +--connection slave +--eval DELETE FROM t1 WHERE a <> $a0 +--eval UPDATE t1 SET b='trx0' WHERE a = $a0 + +--connection slave1 +# Blocker to hold off EXEC_MASTER_LOG_POS advance +BEGIN; + --eval INSERT INTO t1 SET a= 1 + +--connection slave +--source include/stop_slave.inc + +--let $pos_until=`SELECT $pos_trx2 - 1` +--replace_result $pos_trx0 $pos_until $pos_trx2 +--eval SELECT $pos_trx0 <= $pos_until AND $pos_until < $pos_trx2 as "pos_until >= trx0 and is within trx2" +--replace_result $fil_1 file_1 $pos_trx0 +--eval CHANGE MASTER TO MASTER_LOG_FILE = '$fil_1', MASTER_LOG_POS = $pos_trx0, MASTER_USE_GTID=no +--replace_result $fil_2 file_2 $pos_until +--eval START SLAVE UNTIL MASTER_LOG_FILE = '$fil_2', MASTER_LOG_POS = $pos_until + +--let $wait_condition= SELECT COUNT(*) > 0 FROM information_schema.processlist WHERE state = "Waiting for prior transaction to commit" +--source include/wait_condition.inc + +--connection slave1 +# unblock to see the slave applier stops at $until +ROLLBACK; + +--echo Proof 1: Correct stop +--connection slave +--source include/wait_for_slave_sql_to_stop.inc +--let $file_stop= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1) +--let $pos_stop= query_get_value(SHOW SLAVE STATUS, Exec_Master_Log_Pos, 1) +if (`SELECT "$file_stop" != "$fil_2" OR $pos_stop < $pos_until`) +{ + --echo *** ERROR: Slave stopped at $file_stop:$pos_stop which is not $fil_2:$pos_until. + --die +} +--eval SELECT count(*) = 1 as 'trx2 is committed' FROM t1 WHERE b = 'trx2' +--eval SELECT count(*) = 0 as 'trx3 is not committed' FROM t1 WHERE b = 'trx3' + +--echo Proof 2: Resume works out +--source include/start_slave.inc +--connection master +--sync_slave_with_master + + +--echo *** case 3 UNTIL inside trx1 + +--connection slave +--eval DELETE FROM t1 WHERE a <> $a0 +--eval UPDATE t1 SET b='trx0' WHERE a = $a0 + + +--connection slave1 +# Blocker to hold off EXEC_MASTER_LOG_POS advance +BEGIN; + --eval INSERT INTO t1 SET a= 1; # block trx1 + +--connection slave +--source include/stop_slave.inc + +--let $pos_until=`SELECT $pos_0 - 1` +--replace_result $pos_0 $pos_until $pos_trx2 +--eval SELECT $pos_until < $pos_0 as "pos_until before trx2 start position" +--replace_result $fil_1 file_1 $pos_trx0 +--eval CHANGE MASTER TO MASTER_LOG_FILE = '$fil_1', MASTER_LOG_POS = $pos_trx0, MASTER_USE_GTID=no +--replace_result $fil_2 file_2 $pos_until +--eval START SLAVE UNTIL MASTER_LOG_FILE = '$fil_2', MASTER_LOG_POS = $pos_until + +--connection slave1 +# unblock to see the slave applier stops at $until +ROLLBACK; + +--echo Proof 1: Correct stop +--connection slave +--source include/wait_for_slave_sql_to_stop.inc +--let $file_stop= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1) +--let $pos_stop= query_get_value(SHOW SLAVE STATUS, Exec_Master_Log_Pos, 1) +if (`SELECT "$file_stop" != "$fil_2" OR $pos_stop < $pos_until`) +{ + --echo *** ERROR: Slave stopped at $file_stop:$pos_stop which is not $fil_2:$pos_until. + --die +} +--eval SELECT count(*) = $a0-1 as 'trx1 is committed' FROM t1 WHERE b = 'trx1' +--eval SELECT count(*) = 0 as 'trx2 is not committed' FROM t1 WHERE b = 'trx2' + +--echo Proof 2: Resume works out +--source include/start_slave.inc +--connection master +--sync_slave_with_master + + +--echo *** case 4 Relay-log UNTIL inside trx1 + +--connection slave +--eval DELETE FROM t1 WHERE a <> $a0 +--eval UPDATE t1 SET b='trx0' WHERE a = $a0 + +--connection slave1 +# Blocker to hold off EXEC_MASTER_LOG_POS advance +BEGIN; + --eval INSERT INTO t1 SET a= 1; # block trx1 + +--connection slave +--source include/stop_slave.inc +--replace_result $fil_1 file_1 $pos_trx0 +--eval CHANGE MASTER TO MASTER_LOG_FILE = '$fil_1', MASTER_LOG_POS = $pos_trx0, MASTER_USE_GTID=no +START SLAVE IO_THREAD; +--source include/wait_for_slave_io_to_start.inc + +# The following test sets the stop coordinate is set to inside the first event +# of a relay log that holds events of a transaction started in an earlier log. +# Peek the stop position in the middle of trx1, not even on a event boundary. +--let $pos_until=255 +--let $file_rl=slave-relay-bin.000003 +--let $binlog_file=$file_rl + +--let $pos_xid=508 +--let $info= query_get_value(SHOW RELAYLOG EVENTS IN '$file_rl' FROM $pos_xid LIMIT 1, Info, 1) + +if (`SELECT "$info" NOT LIKE "COMMIT /* xid=% */" OR $pos_xid < $pos_until`) +{ + --echo *** Unexpected offset. Refine it to point to the correct XID event! + --die +} + +--replace_result $file_rl file_2 $pos_until +--eval START SLAVE UNTIL RELAY_LOG_FILE = '$file_rl', RELAY_LOG_POS = $pos_until + +--connection slave1 +# unblock to see the slave applier stops at $until +ROLLBACK; + +--echo Proof 1: Correct stop +--connection slave +--source include/wait_for_slave_sql_to_stop.inc +--let $file_stop= query_get_value(SHOW SLAVE STATUS, Relay_Log_File, 1) +--let $pos_stop= query_get_value(SHOW SLAVE STATUS, Relay_Log_Pos, 1) +if (`SELECT strcmp("$file_rl","$file_stop") > -1`) +{ + --echo *** ERROR: Slave stopped at $file_stop:$pos_stop which is not $file_rl:$pos_until. + --die +} + +--eval SELECT count(*) = $a0-1 as 'trx1 is committed' FROM t1 WHERE b = 'trx1' +--eval SELECT count(*) = 0 as 'trx2 is not committed' FROM t1 WHERE b = 'trx2' + +--echo Proof 2: Resume works out +--source include/start_slave.inc +--connection master +--sync_slave_with_master + + + +--echo *** case 5 Relay-log UNTIL inside a "big" trx that spawns few relay logs + +--connection master +CREATE TABLE t2 (a TEXT) ENGINE=InnoDB; +FLUSH LOGS; + +--sync_slave_with_master +--let $file_stop= query_get_value(SHOW SLAVE STATUS, Relay_Log_File, 1) +--let $pos_stop= query_get_value(SHOW SLAVE STATUS, Relay_Log_Pos, 1) +--let $records=`SELECT floor(4*@@global.max_relay_log_size / 1024) + 1` + +--connection slave +--source include/stop_slave.inc + +--connection master +# trx4 +BEGIN; +--let $i=$records +while ($i) +{ + INSERT INTO t2 SET a=repeat('a',1024); + +--dec $i +} +COMMIT; + +# slave will stop there: +--let $file_trx4 = query_get_value(SHOW MASTER STATUS, File, 1) +--let $pos_trx4 = query_get_value(SHOW MASTER STATUS, Position, 1) + +# trx5 +INSERT INTO t2 SET a='a'; +--let $pos_trx5 = query_get_value(SHOW MASTER STATUS, Position, 1) + +--connection slave +START SLAVE IO_THREAD; +--source include/wait_for_slave_io_to_start.inc + +# Set position inside the transaction though the value +# specified is beyond that relay log file. +# The coordianate may point to in a different event in future changes +# but should not move away from inside this big group of events. +# So we don't test which event in the transaction it points to. +--let $pos_until= 4500 +--let $file_rl= slave-relay-bin.000010 + +--replace_result $file_rl file_2 $pos_until +--eval START SLAVE UNTIL RELAY_LOG_FILE = '$file_rl', RELAY_LOG_POS = $pos_until + +--echo Proof 1: Correct stop +--connection slave +--source include/wait_for_slave_sql_to_stop.inc +--let $file_stop= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1) +--let $pos_stop= query_get_value(SHOW SLAVE STATUS, Exec_Master_Log_Pos, 1) +# It's showed the actual stop occurred before trx5 +if (`SELECT strcmp("$file_trx4", "$file_stop") <> 0 OR $pos_stop >= $pos_trx5 OR count(*) <> $records FROM t2`) +{ + --echo *** ERROR: Slave stopped at *binlog* $file_stop:$pos_stop which is not $file_trx4:$pos_trx4. + --die +} + +--echo Proof 2: Resume works out +--source include/start_slave.inc +--connection master +--sync_slave_with_master + +--let $diff_tables=master:t2,slave:t2 +--source include/diff_tables.inc + + + +--echo *** case 6 Relay-log UNTIL inside a small trx inside a sequence of relay logs + +--connection slave +--source include/stop_slave.inc + +--connection master +# trx6 +--let $records=`SELECT count(*) FROM t2` +while ($records) +{ + BEGIN; + DELETE FROM t2 LIMIT 1; + COMMIT; +--dec $records +} +COMMIT; + +--connection slave +START SLAVE IO_THREAD; +--source include/wait_for_slave_io_to_start.inc + +--connection master +--source include/sync_slave_io_with_master.inc + +--connection slave +# The relay-log coordinate is not at an event boundary and +# also may change across the server version. +# The test makes its best to check its coherance. +--let $pos_until= 3130 +--let $file_rl= slave-relay-bin.000018 + +--let $pos_gtid = 2987 +--let $info= query_get_value(SHOW RELAYLOG EVENTS IN '$file_rl' FROM $pos_gtid LIMIT 1, Info, 1) + +if (`SELECT "$info" != "BEGIN GTID 0-1-23"`) +{ + --echo *** Unexpected offset. Refine it to point to the correct GTID! + --die +} +--let $pos_event = 3120 +--let $type= query_get_value(SHOW RELAYLOG EVENTS IN '$file_rl' FROM $pos_event LIMIT 1, Event_type, 1) +if (`SELECT "$type" != "Delete_rows_v1"`) +{ + --echo *** Unexpected offset. Refine it to point to the expected event! + --die +} + +--replace_result $file_rl file_2 $pos_until +--eval START SLAVE UNTIL RELAY_LOG_FILE = '$file_rl', RELAY_LOG_POS = $pos_until + +--echo Proof 1: Correct stop +--connection slave +--source include/wait_for_slave_sql_to_stop.inc +--let $file_stop= query_get_value(SHOW SLAVE STATUS, Relay_Log_File, 1) +--let $pos_stop= query_get_value(SHOW SLAVE STATUS, Relay_Log_Pos, 1) +if (`SELECT strcmp("$file_stop", "$file_rl") = -1 OR + strcmp("$file_stop", "$file_rl") = 0 AND $pos_stop < $pos_until`) +{ + --echo *** ERROR: Slave stopped at *relay* $file_stop:$pos_stop which is not $file_rl:$pos_until. + --die +} + +--echo Proof 2: Resume works out +--source include/start_slave.inc +--connection master +--sync_slave_with_master + +--let $diff_tables=master:t2,slave:t2 +--source include/diff_tables.inc + +# +# Clean up. +# +--connection slave +--source include/stop_slave.inc +SET GLOBAL max_relay_log_size=@old_max_relay_log_size; +SET GLOBAL slave_parallel_mode=@old_parallel_mode; +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +--source include/start_slave.inc + +--connection master +DROP TABLE t1, t2; + +--sync_slave_with_master +--source include/rpl_end.inc diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 027aa5f628c..3871ddcf8ef 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -75,18 +75,18 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) /* Do not update position if an earlier event group caused an error abort. */ DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE); + rli= qev->rgi->rli; e= qev->entry_for_queued; - if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort) + if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || + (e->force_abort && !rli->stop_for_until)) return; - rli= qev->rgi->rli; mysql_mutex_lock(&rli->data_lock); cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); if (cmp < 0) { rli->group_relay_log_pos= qev->future_event_relay_log_pos; strmake_buf(rli->group_relay_log_name, qev->event_relay_log_name); - rli->notify_group_relay_log_name_update(); } else if (cmp == 0 && rli->group_relay_log_pos < qev->future_event_relay_log_pos) rli->group_relay_log_pos= qev->future_event_relay_log_pos; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 3e1bb28f701..7215cdd4b96 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -60,6 +60,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), sql_delay(0), sql_delay_end(0), + until_relay_log_names_defer(false), m_flags(0) { DBUG_ENTER("Relay_log_info::Relay_log_info"); @@ -499,6 +500,8 @@ void Relay_log_info::clear_until_condition() until_condition= Relay_log_info::UNTIL_NONE; until_log_name[0]= 0; until_log_pos= 0; + until_relay_log_names_defer= false; + DBUG_VOID_RETURN; } @@ -989,7 +992,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, { group_relay_log_pos= rgi->future_event_relay_log_pos; strmake_buf(group_relay_log_name, rgi->event_relay_log_name); - notify_group_relay_log_name_update(); } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos) group_relay_log_pos= rgi->future_event_relay_log_pos; @@ -1279,29 +1281,78 @@ err: autoincrement or if we have transactions). Should be called ONLY if until_condition != UNTIL_NONE ! + + In the parallel execution mode and UNTIL_MASTER_POS the file name is + presented by future_event_master_log_name which may be ahead of + group_master_log_name. Log_event::log_pos does relate to it nevertheless + so the pair comprises a correct binlog coordinate. + Internal group events and events that have zero log_pos also + produce the zero for the local log_pos which may not lead to the + function falsely return true. + In UNTIL_RELAY_POS the original caching and notification are simplified + to straightforward files comparison when the current event can't be + a part of an event group. + RETURN VALUE true - condition met or error happened (condition seems to have bad log file name) false - condition not met */ -bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos) +bool Relay_log_info::is_until_satisfied(Log_event *ev) { const char *log_name; ulonglong log_pos; + /* Prevents stopping within transaction; needed solely for Relay UNTIL. */ + bool in_trans= false; + DBUG_ENTER("Relay_log_info::is_until_satisfied"); if (until_condition == UNTIL_MASTER_POS) { log_name= (mi->using_parallel() ? future_event_master_log_name : group_master_log_name); - log_pos= master_beg_pos; + log_pos= (get_flag(Relay_log_info::IN_TRANSACTION) || !ev || !ev->log_pos) ? + (mi->using_parallel() ? 0 : group_master_log_pos) : + ev->log_pos - ev->data_written; } else { DBUG_ASSERT(until_condition == UNTIL_RELAY_POS); - log_name= group_relay_log_name; - log_pos= group_relay_log_pos; + if (!mi->using_parallel()) + { + log_name= group_relay_log_name; + log_pos= group_relay_log_pos; + } + else + { + log_name= event_relay_log_name; + log_pos= event_relay_log_pos; + in_trans= get_flag(Relay_log_info::IN_TRANSACTION); + /* + until_log_names_cmp_result is set to UNKNOWN either + - by a non-group event *and* only when it is in the middle of a group + - or by a group event when the preceding group made the above + non-group event to defer the resetting. + */ + if ((ev && !Log_event::is_group_event(ev->get_type_code()))) + { + if (in_trans) + { + until_relay_log_names_defer= true; + } + else + { + until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN; + until_relay_log_names_defer= false; + } + } + else if (!in_trans && until_relay_log_names_defer) + { + until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN; + until_relay_log_names_defer= false; + } + } } DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%llu", @@ -1355,8 +1406,8 @@ bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos) } DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL && - log_pos >= until_log_pos) || - until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER)); + (log_pos >= until_log_pos && !in_trans)) || + until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER)); } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 4ec4821b67d..2bc0a80268a 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -219,7 +219,7 @@ public: */ char future_event_master_log_name[FN_REFLEN]; - /* + /* Original log name and position of the group we're currently executing (whose coordinates are group_relay_log_name/pos in the relay log) in the master's binlog. These concern the *group*, because in the master's @@ -419,7 +419,7 @@ public: void close_temporary_tables(); /* Check if UNTIL condition is satisfied. See slave.cc for more. */ - bool is_until_satisfied(my_off_t); + bool is_until_satisfied(Log_event *ev); inline ulonglong until_pos() { DBUG_ASSERT(until_condition == UNTIL_MASTER_POS || @@ -427,7 +427,13 @@ public: return ((until_condition == UNTIL_MASTER_POS) ? group_master_log_pos : group_relay_log_pos); } - + inline char *until_name() + { + DBUG_ASSERT(until_condition == UNTIL_MASTER_POS || + until_condition == UNTIL_RELAY_POS); + return ((until_condition == UNTIL_MASTER_POS) ? group_master_log_name : + group_relay_log_name); + } /** Helper function to do after statement completion. @@ -564,6 +570,15 @@ private: relay_log.info had 4 lines. Now it has 5 lines. */ static const int LINES_IN_RELAY_LOG_INFO_WITH_DELAY= 5; + /* + Hint for when to stop event distribution by sql driver thread. + The flag is set ON by a non-group event when this event is in the middle + of a group (e.g a transaction group) so it's too early + to refresh the current-relay-log vs until-log cached comparison result. + And it is checked and to decide whether it's a right time to do so + when the being processed group has been fully scheduled. + */ + bool until_relay_log_names_defer; /* Holds the state of the data in the relay log. diff --git a/sql/slave.cc b/sql/slave.cc index eb95afd140b..1bf83aa9652 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3921,12 +3921,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && (ev->server_id != global_system_variables.server_id || rli->replicate_same_server_id) && - rli->is_until_satisfied((rli->get_flag(Relay_log_info::IN_TRANSACTION) || !ev->log_pos) - ? rli->group_master_log_pos - : ev->log_pos - ev->data_written)) + rli->is_until_satisfied(ev)) { - sql_print_information("Slave SQL thread stopped because it reached its" - " UNTIL position %llu", rli->until_pos()); /* Setting abort_slave flag because we do not want additional message about error in query execution to be printed. @@ -5136,10 +5132,14 @@ pthread_handler_t handle_slave_sql(void *arg) } if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS || rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && - rli->is_until_satisfied(rli->group_master_log_pos)) + rli->is_until_satisfied(NULL)) { sql_print_information("Slave SQL thread stopped because it reached its" - " UNTIL position %llu", rli->until_pos()); + " UNTIL position %llu in %s %s file", + rli->until_pos(), rli->until_name(), + rli->until_condition == + Relay_log_info::UNTIL_MASTER_POS ? + "binlog" : "relaylog"); mysql_mutex_unlock(&rli->data_lock); goto err; } @@ -5205,7 +5205,24 @@ pthread_handler_t handle_slave_sql(void *arg) err: if (mi->using_parallel()) rli->parallel.wait_for_done(thd, rli); + /* Gtid_list_log_event::do_apply_event has already reported the GTID until */ + if (rli->stop_for_until && rli->until_condition != Relay_log_info::UNTIL_GTID) + { + if (global_system_variables.log_warnings > 2) + sql_print_information("Slave SQL thread UNTIL stop was requested at position " + "%llu in %s %s file", + rli->until_log_pos, rli->until_log_name, + rli->until_condition == + Relay_log_info::UNTIL_MASTER_POS ? + "binlog" : "relaylog"); + sql_print_information("Slave SQL thread stopped because it reached its" + " UNTIL position %llu in %s %s file", + rli->until_pos(), rli->until_name(), + rli->until_condition == + Relay_log_info::UNTIL_MASTER_POS ? + "binlog" : "relaylog"); + }; /* Thread stopped. Print the current replication position to the log */ { StringBuffer<100> tmp;