MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel replication causing replication to fail.

Merge the patches into MariaDB 10.0 main.

With this patch, parallel replication will now automatically retry a
transaction that fails due to deadlock or other temporary error, same as
single-threaded replication.

We catch deadlocks with InnoDB transactions due to enforced commit order. If
T1 must commit before T2 in parallel replication and T1 ends up waiting for T2
inside InnoDB, we kill T2 and retry it later to resolve the deadlock
automatically.
This commit is contained in:
Kristian Nielsen 2014-07-11 12:06:47 +02:00
commit 501c56ef1e
33 changed files with 1918 additions and 273 deletions

View file

@ -86,7 +86,10 @@ SET DEBUG_SYNC= 'now SIGNAL killed';
# Reaping: OPTIMIZE TABLE t1
Table Op Msg_type Msg_text
test.t1 optimize note Table does not support optimize, doing recreate + analyze instead
test.t1 optimize error Query execution was interrupted
test.t1 optimize status Operation failed
Warnings:
Error 1317 Query execution was interrupted
# Connection default
DROP TABLE t1;
SET DEBUG_SYNC= 'RESET';

Binary file not shown.

View file

@ -0,0 +1,49 @@
include/master-slave.inc
[connection master]
include/stop_slave.inc
include/rpl_stop_server.inc [server_number=1]
include/rpl_start_server.inc [server_number=1]
SET SQL_LOG_BIN=0;
ALTER TABLE mysql.gtid_slave_pos ENGINE = InnoDB;
SET SQL_LOG_BIN=1;
SET @old_engine= @@GLOBAL.default_storage_engine;
SET GLOBAL default_storage_engine=InnoDB;
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=12;
CHANGE MASTER TO master_host='127.0.0.1', master_port=SERVER_MYPORT_1, master_user='root', master_log_file='master-bin.000001', master_log_pos=4;
include/start_slave.inc
SET SQL_LOG_BIN=0;
ALTER TABLE mysql.gtid_slave_pos ENGINE = InnoDB;
SET SQL_LOG_BIN=1;
SELECT @@gtid_slave_pos;
@@gtid_slave_pos
0-1-1381
CHECKSUM TABLE table0_int_autoinc, table0_key_pk_parts_2_int_autoinc, table100_int_autoinc, table100_key_pk_parts_2_int_autoinc, table10_int_autoinc, table10_key_pk_parts_2_int_autoinc, table1_int_autoinc, table1_key_pk_parts_2_int_autoinc, table2_int_autoinc, table2_key_pk_parts_2_int_autoinc;
Table Checksum
test.table0_int_autoinc 3623174395
test.table0_key_pk_parts_2_int_autoinc 2888328157
test.table100_int_autoinc 3624823809
test.table100_key_pk_parts_2_int_autoinc 3316583308
test.table10_int_autoinc 1615053718
test.table10_key_pk_parts_2_int_autoinc 4147461080
test.table1_int_autoinc 478809705
test.table1_key_pk_parts_2_int_autoinc 3032208641
test.table2_int_autoinc 854763867
test.table2_key_pk_parts_2_int_autoinc 4231615291
include/stop_slave.inc
SET GLOBAL default_storage_engine= @old_engine;
SET GLOBAL slave_parallel_threads=@old_parallel;
SET sql_log_bin=0;
DROP TABLE table0_int_autoinc;
DROP TABLE table0_key_pk_parts_2_int_autoinc;
DROP TABLE table100_int_autoinc;
DROP TABLE table100_key_pk_parts_2_int_autoinc;
DROP TABLE table10_int_autoinc;
DROP TABLE table10_key_pk_parts_2_int_autoinc;
DROP TABLE table1_int_autoinc;
DROP TABLE table1_key_pk_parts_2_int_autoinc;
DROP TABLE table2_int_autoinc;
DROP TABLE table2_key_pk_parts_2_int_autoinc;
SET sql_log_bin=1;
include/start_slave.inc
include/rpl_end.inc

View file

@ -314,7 +314,7 @@ SET debug_sync='now WAIT_FOR t1_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR t2_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1964]
include/wait_for_slave_sql_error.inc [errno=1317,1927,1964]
STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
a b
@ -398,7 +398,7 @@ SET debug_sync='now WAIT_FOR t1_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR t2_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1964]
include/wait_for_slave_sql_error.inc [errno=1317,1927,1964]
SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
@ -481,7 +481,7 @@ SET debug_sync='now WAIT_FOR t1_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR t2_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1964]
include/wait_for_slave_sql_error.inc [errno=1317,1927,1964]
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
a b
51 51
@ -819,11 +819,37 @@ test_check
OK
test_check
OK
*** MDEV_6435: Incorrect error handling when query binlogged partially on master with "killed" error ***
CREATE TABLE t6 (a INT) ENGINE=MyISAM;
CREATE TRIGGER tr AFTER INSERT ON t6 FOR EACH ROW SET @a = 1;
SET @old_format= @@binlog_format;
SET binlog_format= statement;
SET debug_sync='sp_head_execute_before_loop SIGNAL ready WAIT_FOR cont';
INSERT INTO t6 VALUES (1), (2), (3);
SET debug_sync='now WAIT_FOR ready';
KILL QUERY CONID;
SET debug_sync='now SIGNAL cont';
ERROR 70100: Query execution was interrupted
SET binlog_format= @old_format;
SET debug_sync='RESET';
SET debug_sync='RESET';
include/wait_for_slave_sql_error.inc [errno=1317]
STOP SLAVE IO_THREAD;
SET GLOBAL gtid_slave_pos= 'AFTER_ERROR_GTID_POS';
include/start_slave.inc
INSERT INTO t6 VALUES (4);
SELECT * FROM t6 ORDER BY a;
a
1
4
SELECT * FROM t6 ORDER BY a;
a
4
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
SET DEBUG_SYNC= 'RESET';
DROP function foo;
DROP TABLE t1,t2,t3,t4,t5;
DROP TABLE t1,t2,t3,t4,t5,t6;
SET DEBUG_SYNC= 'RESET';
include/rpl_end.inc

View file

@ -0,0 +1,196 @@
include/rpl_init.inc [topology=1->2]
*** Test retry of transactions that fail to replicate due to deadlock or similar temporary error. ***
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1,1);
SET sql_log_bin=0;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
RETURN x;
END
||
SET sql_log_bin=1;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=5;
include/start_slave.inc
SET sql_log_bin=0;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
IF d1 != '' THEN
SET debug_sync = d1;
END IF;
IF d2 != '' THEN
SET debug_sync = d2;
END IF;
RETURN x;
END
||
SET sql_log_bin=1;
include/stop_slave.inc
SET gtid_seq_no = 100;
BEGIN;
INSERT INTO t1 VALUES (2,1);
UPDATE t1 SET b=b+1 WHERE a=1;
INSERT INTO t1 VALUES (3,1);
COMMIT;
SELECT * FROM t1 ORDER BY a;
a b
1 2
2 1
3 1
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
include/start_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
retries
1
SELECT * FROM t1 ORDER BY a;
a b
1 2
2 1
3 1
*** Test that double retry works when the first retry also fails with temp error ***
include/stop_slave.inc
SET gtid_seq_no = 100;
SET @old_server_id = @@server_id;
SET server_id = 10;
BEGIN;
INSERT INTO t1 VALUES (4,1);
UPDATE t1 SET b=b+1 WHERE a=1;
INSERT INTO t1 VALUES (5,1);
INSERT INTO t1 VALUES (6,1);
COMMIT;
SET server_id = @old_server_id;
SELECT * FROM t1 ORDER BY a;
a b
1 3
2 1
3 1
4 1
5 1
6 1
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100,rpl_parallel_simulate_double_temp_err_gtid_0_x_100";
include/start_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
retries
2
SELECT * FROM t1 ORDER BY a;
a b
1 3
2 1
3 1
4 1
5 1
6 1
*** Test too many retries, eventually causing failure. ***
include/stop_slave.inc
SET gtid_seq_no = 100;
SET @old_server_id = @@server_id;
SET server_id = 11;
BEGIN;
INSERT INTO t1 VALUES (7,1);
UPDATE t1 SET b=b+1 WHERE a=1;
INSERT INTO t1 VALUES (8,1);
INSERT INTO t1 VALUES (9,1);
COMMIT;
SET server_id = @old_server_id;
SELECT * FROM t1 ORDER BY a;
a b
1 4
2 1
3 1
4 1
5 1
6 1
7 1
8 1
9 1
SET sql_log_bin=0;
CALL mtr.add_suppression("Slave worker thread retried transaction 10 time\\(s\\) in vain, giving up");
CALL mtr.add_suppression("Slave: Deadlock found when trying to get lock; try restarting transaction");
SET sql_log_bin=1;
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100,rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100";
START SLAVE;
include/wait_for_slave_sql_error.inc [errno=1213]
SET GLOBAL debug_dbug=@old_dbug;
retries
10
SELECT * FROM t1 ORDER BY a;
a b
1 3
2 1
3 1
4 1
5 1
6 1
STOP SLAVE IO_THREAD;
include/start_slave.inc
SELECT * FROM t1 ORDER BY a;
a b
1 4
2 1
3 1
4 1
5 1
6 1
7 1
8 1
9 1
*** Test retry of event group that spans multiple relay log files. ***
CREATE TABLE t2 (a int PRIMARY KEY, b BLOB) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1,"Hulubullu");
include/stop_slave.inc
SET @old_max= @@GLOBAL.max_relay_log_size;
SET GLOBAL max_relay_log_size=4096;
SET gtid_seq_no = 100;
SET @old_server_id = @@server_id;
SET server_id = 12;
BEGIN;
INSERT INTO t1 VALUES (10, 4);
COMMIT;
SET server_id = @old_server_id;
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a b
10 4
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
a LENGTH(b)
1 9
2 5006
3 5012
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
include/start_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
retries
1
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a b
10 4
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
a LENGTH(b)
1 9
2 5006
3 5012
INSERT INTO t1 VALUES (11,11);
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a b
10 4
11 11
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
a LENGTH(b)
1 9
2 5006
3 5012
4 5000
SET GLOBAL max_relay_log_size=@old_max;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
DROP TABLE t1, t2;
DROP function foo;
include/rpl_end.inc

View file

@ -0,0 +1,70 @@
--source include/have_innodb.inc
--source include/have_partition.inc
--source include/have_binlog_format_mixed_or_row.inc
--source include/master-slave.inc
--connection slave
--source include/stop_slave.inc
--connection master
--let $datadir= `SELECT @@datadir`
--let $rpl_server_number= 1
--source include/rpl_stop_server.inc
--remove_file $datadir/master-bin.000001
--remove_file $datadir/master-bin.state
--copy_file $MYSQL_TEST_DIR/std_data/mdev6020-mysql-bin.000001 $datadir/master-bin.000001
--let $rpl_server_number= 1
--source include/rpl_start_server.inc
--source include/wait_until_connected_again.inc
--connection slave
SET SQL_LOG_BIN=0;
ALTER TABLE mysql.gtid_slave_pos ENGINE = InnoDB;
SET SQL_LOG_BIN=1;
SET @old_engine= @@GLOBAL.default_storage_engine;
SET GLOBAL default_storage_engine=InnoDB;
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=12;
--replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1
eval CHANGE MASTER TO master_host='127.0.0.1', master_port=$SERVER_MYPORT_1, master_user='root', master_log_file='master-bin.000001', master_log_pos=4;
--source include/start_slave.inc
--connection master
SET SQL_LOG_BIN=0;
ALTER TABLE mysql.gtid_slave_pos ENGINE = InnoDB;
SET SQL_LOG_BIN=1;
--save_master_pos
--connection slave
--sync_with_master
SELECT @@gtid_slave_pos;
CHECKSUM TABLE table0_int_autoinc, table0_key_pk_parts_2_int_autoinc, table100_int_autoinc, table100_key_pk_parts_2_int_autoinc, table10_int_autoinc, table10_key_pk_parts_2_int_autoinc, table1_int_autoinc, table1_key_pk_parts_2_int_autoinc, table2_int_autoinc, table2_key_pk_parts_2_int_autoinc;
--source include/stop_slave.inc
SET GLOBAL default_storage_engine= @old_engine;
SET GLOBAL slave_parallel_threads=@old_parallel;
SET sql_log_bin=0;
DROP TABLE table0_int_autoinc;
DROP TABLE table0_key_pk_parts_2_int_autoinc;
DROP TABLE table100_int_autoinc;
DROP TABLE table100_key_pk_parts_2_int_autoinc;
DROP TABLE table10_int_autoinc;
DROP TABLE table10_key_pk_parts_2_int_autoinc;
DROP TABLE table1_int_autoinc;
DROP TABLE table1_key_pk_parts_2_int_autoinc;
DROP TABLE table2_int_autoinc;
DROP TABLE table2_key_pk_parts_2_int_autoinc;
SET sql_log_bin=1;
--source include/start_slave.inc
--connection master
--source include/rpl_end.inc

View file

@ -438,7 +438,7 @@ SET debug_sync='now WAIT_FOR t2_killed';
# Now we can allow T1 to proceed.
SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1964
--let $slave_sql_errno= 1317,1927,1964
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
@ -573,7 +573,7 @@ SET debug_sync='now WAIT_FOR t2_killed';
# Now we can allow T1 to proceed.
SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1964
--let $slave_sql_errno= 1317,1927,1964
--source include/wait_for_slave_sql_error.inc
# Now we have to disable the debug_sync statements, so they do not trigger
@ -712,7 +712,7 @@ SET debug_sync='now WAIT_FOR t2_killed';
# Now we can allow T1 to proceed.
SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1964
--let $slave_sql_errno= 1317,1927,1964
--source include/wait_for_slave_sql_error.inc
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
@ -1277,6 +1277,7 @@ eval SELECT IF('$io_pos' = '$sql_pos', "OK", "Not ok, $io_pos <> $sql_pos") AS t
--connection server_1
FLUSH LOGS;
--source include/wait_for_binlog_checkpoint.inc
--save_master_pos
--connection server_2
@ -1291,6 +1292,54 @@ eval SELECT IF('$io_pos' = '$sql_pos', "OK", "Not ok, $io_pos <> $sql_pos") AS t
--enable_query_log
--echo *** MDEV_6435: Incorrect error handling when query binlogged partially on master with "killed" error ***
--connection server_1
CREATE TABLE t6 (a INT) ENGINE=MyISAM;
CREATE TRIGGER tr AFTER INSERT ON t6 FOR EACH ROW SET @a = 1;
--connection con1
SET @old_format= @@binlog_format;
SET binlog_format= statement;
--let $conid = `SELECT CONNECTION_ID()`
SET debug_sync='sp_head_execute_before_loop SIGNAL ready WAIT_FOR cont';
send INSERT INTO t6 VALUES (1), (2), (3);
--connection server_1
SET debug_sync='now WAIT_FOR ready';
--replace_result $conid CONID
eval KILL QUERY $conid;
SET debug_sync='now SIGNAL cont';
--connection con1
--error ER_QUERY_INTERRUPTED
--reap
SET binlog_format= @old_format;
SET debug_sync='RESET';
--let $after_error_gtid_pos= `SELECT @@gtid_binlog_pos`
--connection server_1
SET debug_sync='RESET';
--connection server_2
--let $slave_sql_errno= 1317
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
--replace_result $after_error_gtid_pos AFTER_ERROR_GTID_POS
eval SET GLOBAL gtid_slave_pos= '$after_error_gtid_pos';
--source include/start_slave.inc
--connection server_1
INSERT INTO t6 VALUES (4);
SELECT * FROM t6 ORDER BY a;
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t6 ORDER BY a;
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
@ -1299,7 +1348,7 @@ SET DEBUG_SYNC= 'RESET';
--connection server_1
DROP function foo;
DROP TABLE t1,t2,t3,t4,t5;
DROP TABLE t1,t2,t3,t4,t5,t6;
SET DEBUG_SYNC= 'RESET';
--source include/rpl_end.inc

View file

@ -0,0 +1,220 @@
--source include/have_innodb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
--let $rpl_topology=1->2
--source include/rpl_init.inc
--echo *** Test retry of transactions that fail to replicate due to deadlock or similar temporary error. ***
--connection server_1
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1,1);
--save_master_pos
# Use a stored function to inject a debug_sync into the appropriate THD.
# The function does nothing on the master, and on the slave it injects the
# desired debug_sync action(s).
SET sql_log_bin=0;
--delimiter ||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
RETURN x;
END
||
--delimiter ;
SET sql_log_bin=1;
--connection server_2
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=5;
--source include/start_slave.inc
--sync_with_master
SET sql_log_bin=0;
--delimiter ||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
IF d1 != '' THEN
SET debug_sync = d1;
END IF;
IF d2 != '' THEN
SET debug_sync = d2;
END IF;
RETURN x;
END
||
--delimiter ;
SET sql_log_bin=1;
--source include/stop_slave.inc
--connection server_1
SET gtid_seq_no = 100;
BEGIN;
INSERT INTO t1 VALUES (2,1);
UPDATE t1 SET b=b+1 WHERE a=1;
INSERT INTO t1 VALUES (3,1);
COMMIT;
SELECT * FROM t1 ORDER BY a;
--save_master_pos
--connection server_2
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--source include/start_slave.inc
--sync_with_master
SET GLOBAL debug_dbug=@old_dbug;
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
SELECT * FROM t1 ORDER BY a;
--echo *** Test that double retry works when the first retry also fails with temp error ***
--source include/stop_slave.inc
--connection server_1
SET gtid_seq_no = 100;
SET @old_server_id = @@server_id;
SET server_id = 10;
BEGIN;
INSERT INTO t1 VALUES (4,1);
UPDATE t1 SET b=b+1 WHERE a=1;
INSERT INTO t1 VALUES (5,1);
INSERT INTO t1 VALUES (6,1);
COMMIT;
SET server_id = @old_server_id;
SELECT * FROM t1 ORDER BY a;
--save_master_pos
--connection server_2
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100,rpl_parallel_simulate_double_temp_err_gtid_0_x_100";
let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--source include/start_slave.inc
--sync_with_master
SET GLOBAL debug_dbug=@old_dbug;
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
SELECT * FROM t1 ORDER BY a;
--echo *** Test too many retries, eventually causing failure. ***
--source include/stop_slave.inc
--connection server_1
SET gtid_seq_no = 100;
SET @old_server_id = @@server_id;
SET server_id = 11;
BEGIN;
INSERT INTO t1 VALUES (7,1);
UPDATE t1 SET b=b+1 WHERE a=1;
INSERT INTO t1 VALUES (8,1);
INSERT INTO t1 VALUES (9,1);
COMMIT;
SET server_id = @old_server_id;
SELECT * FROM t1 ORDER BY a;
--save_master_pos
--connection server_2
SET sql_log_bin=0;
CALL mtr.add_suppression("Slave worker thread retried transaction 10 time\\(s\\) in vain, giving up");
CALL mtr.add_suppression("Slave: Deadlock found when trying to get lock; try restarting transaction");
SET sql_log_bin=1;
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100,rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100";
let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
START SLAVE;
--let $slave_sql_errno= 1213
--let $slave_timeout= 10
--source include/wait_for_slave_sql_error.inc
SET GLOBAL debug_dbug=@old_dbug;
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
SELECT * FROM t1 ORDER BY a;
STOP SLAVE IO_THREAD;
--source include/start_slave.inc
--sync_with_master
SELECT * FROM t1 ORDER BY a;
--echo *** Test retry of event group that spans multiple relay log files. ***
--connection server_1
CREATE TABLE t2 (a int PRIMARY KEY, b BLOB) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1,"Hulubullu");
--save_master_pos
--connection server_2
--sync_with_master
--source include/stop_slave.inc
SET @old_max= @@GLOBAL.max_relay_log_size;
SET GLOBAL max_relay_log_size=4096;
--connection server_1
--let $big= `SELECT REPEAT("*", 5000)`
SET gtid_seq_no = 100;
SET @old_server_id = @@server_id;
SET server_id = 12;
BEGIN;
--disable_query_log
eval INSERT INTO t2 VALUES (2, CONCAT("Hello ", "$big"));
eval INSERT INTO t2 VALUES (3, CONCAT("Long data: ", "$big"));
--enable_query_log
INSERT INTO t1 VALUES (10, 4);
COMMIT;
SET server_id = @old_server_id;
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
--save_master_pos
--connection server_2
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--source include/start_slave.inc
--sync_with_master
SET GLOBAL debug_dbug=@old_dbug;
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
--connection server_1
INSERT INTO t1 VALUES (11,11);
--disable_query_log
eval INSERT INTO t2 VALUES (4, "$big");
--enable_query_log
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
SELECT a, LENGTH(b) FROM t2 ORDER BY a;
SET GLOBAL max_relay_log_size=@old_max;
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc
--connection server_1
DROP TABLE t1, t2;
DROP function foo;
--source include/rpl_end.inc

View file

@ -1137,6 +1137,17 @@
}
{
OpenSSL still reachable.
Memcheck:Leak
fun:*alloc
fun:CRYPTO_malloc
obj:*libssl*
fun:SSL_COMP_get_compression_methods
fun:SSL_library_init
}
{
Problem with udf and libresolve
Memcheck:Cond

View file

@ -4101,6 +4101,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
{
int error;
char *to_purge_if_included= NULL;
inuse_relaylog *ir;
DBUG_ENTER("purge_first_log");
DBUG_ASSERT(is_open());
@ -4108,7 +4109,30 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
mysql_mutex_lock(&LOCK_index);
to_purge_if_included= my_strdup(rli->group_relay_log_name, MYF(0));
ir= rli->inuse_relaylog_list;
while (ir)
{
inuse_relaylog *next= ir->next;
if (!ir->completed || ir->dequeued_count < ir->queued_count)
{
included= false;
break;
}
if (!included && !strcmp(ir->name, rli->group_relay_log_name))
break;
if (!next)
{
rli->last_inuse_relaylog= NULL;
included= 1;
to_purge_if_included= my_strdup(ir->name, MYF(0));
}
my_free(ir);
ir= next;
}
rli->inuse_relaylog_list= ir;
if (ir)
to_purge_if_included= my_strdup(ir->name, MYF(0));
/*
Read the next log file name from the index file and pass it back to
@ -6816,7 +6840,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
/* Interrupted by kill. */
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior_killed");
wfc->wakeup_error= orig_entry->thd->killed_errno();
if (wfc->wakeup_error)
if (!wfc->wakeup_error)
wfc->wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0));
DBUG_RETURN(-1);
@ -6827,12 +6851,6 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
else
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
}
if (wfc && wfc->wakeup_error)
{
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
DBUG_RETURN(-1);
}
/*
If the transaction we were waiting for has already put us into the group
commit queue (and possibly already done the entire binlog commit for us),
@ -6841,6 +6859,12 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
if (orig_entry->queued_by_other)
DBUG_RETURN(0);
if (wfc && wfc->wakeup_error)
{
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
DBUG_RETURN(-1);
}
/* Now enqueue ourselves in the group commit queue. */
DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue");
orig_entry->thd->clear_wakeup_ready();
@ -9044,6 +9068,8 @@ binlog_background_thread(void *arg __attribute__((unused)))
thd->thread_id= thread_id++;
mysql_mutex_unlock(&LOCK_thread_count);
thd->store_globals();
thd->security_ctx->skip_grants();
thd->set_command(COM_DAEMON);
/*
Load the slave replication GTID state from the mysql.gtid_slave_pos
@ -9347,7 +9373,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
file= -1;
}
if (0 == strcmp(linfo->log_file_name, last_log_name))
if (!strcmp(linfo->log_file_name, last_log_name))
break; // No more files to do
if ((file= open_binlog(&log, linfo->log_file_name, &errmsg)) < 0)
{

View file

@ -190,6 +190,28 @@ static const char *HA_ERR(int i)
return "No Error!";
}
/*
Return true if an error caught during event execution is a temporary error
that will cause automatic retry of the event group during parallel
replication, false otherwise.
In parallel replication, conflicting transactions can occasionally cause
deadlocks; such errors are handled automatically by rolling back re-trying
the transactions, so should not pollute the error log.
*/
static bool
is_parallel_retry_error(rpl_group_info *rgi, int err)
{
if (!rgi->is_parallel_exec)
return false;
if (rgi->killed_for_retry &&
(err == ER_QUERY_INTERRUPTED || err == ER_CONNECTION_KILLED))
return true;
return has_temporary_error(rgi->thd);
}
/**
Error reporting facility for Rows_log_event::do_apply_event
@ -217,6 +239,16 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
Relay_log_info const *rli= rgi->rli;
const Sql_condition *err;
buff[0]= 0;
int errcode= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0;
/*
In parallel replication, deadlocks or other temporary errors can happen
occasionally in normal operation, they will be handled correctly and
automatically by re-trying the transactions. So do not pollute the error
log with messages about them.
*/
if (is_parallel_retry_error(rgi, errcode))
return;
for (err= it++, slider= buff; err && slider < buff_end - 1;
slider += len, err= it++)
@ -227,8 +259,7 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
}
if (ha_error != 0)
rli->report(level, thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0,
rgi->gtid_info(),
rli->report(level, errcode, rgi->gtid_info(),
"Could not execute %s event on table %s.%s;"
"%s handler error %s; "
"the event's master log %s, end_log_pos %lu",
@ -236,8 +267,7 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
buff, handler_error == NULL ? "<unknown>" : handler_error,
log_name, pos);
else
rli->report(level, thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0,
rgi->gtid_info(),
rli->report(level, errcode, rgi->gtid_info(),
"Could not execute %s event on table %s.%s;"
"%s the event's master log %s, end_log_pos %lu",
type, table->s->db.str, table->s->table_name.str,
@ -4087,7 +4117,8 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
*/
int error;
char llbuff[22];
if ((error= rows_event_stmt_cleanup(rgi, thd)))
if ((error= rows_event_stmt_cleanup(rgi, thd)) &&
!is_parallel_retry_error(rgi, error))
{
rli->report(ERROR_LEVEL, error, rgi->gtid_info(),
"Error in cleaning up after an event preceding the commit; "
@ -4234,22 +4265,24 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
Record any GTID in the same transaction, so slave state is
transactionally consistent.
*/
if (current_stmt_is_commit && (sub_id= rgi->gtid_sub_id))
if (current_stmt_is_commit && rgi->gtid_pending)
{
/* Clear the GTID from the RLI so we don't accidentally reuse it. */
rgi->gtid_sub_id= 0;
sub_id= rgi->gtid_sub_id;
rgi->gtid_pending= false;
gtid= rgi->current_gtid;
thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
if (rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true, false))
{
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
rgi->gtid_info(),
"Error during COMMIT: failed to update GTID state in "
"%s.%s: %d: %s",
"mysql", rpl_gtid_slave_state_table_name.str,
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());
int errcode= thd->get_stmt_da()->sql_errno();
if (!is_parallel_retry_error(rgi, errcode))
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
rgi->gtid_info(),
"Error during COMMIT: failed to update GTID state in "
"%s.%s: %d: %s",
"mysql", rpl_gtid_slave_state_table_name.str,
errcode,
thd->get_stmt_da()->message());
trans_rollback(thd);
sub_id= 0;
thd->is_slave_error= 1;
@ -4396,18 +4429,21 @@ Default database: '%s'. Query: '%s'",
{
DBUG_PRINT("info",("error ignored"));
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
thd->reset_killed();
if (actual_error == ER_QUERY_INTERRUPTED ||
actual_error == ER_CONNECTION_KILLED)
thd->reset_killed();
}
/*
Other cases: mostly we expected no error and get one.
*/
else if (thd->is_slave_error || thd->is_fatal_error)
{
rli->report(ERROR_LEVEL, actual_error, rgi->gtid_info(),
"Error '%s' on query. Default database: '%s'. Query: '%s'",
(actual_error ? thd->get_stmt_da()->message() :
"unexpected success or fatal error"),
print_slave_db_safe(thd->db), query_arg);
if (!is_parallel_retry_error(rgi, actual_error))
rli->report(ERROR_LEVEL, actual_error, rgi->gtid_info(),
"Error '%s' on query. Default database: '%s'. Query: '%s'",
(actual_error ? thd->get_stmt_da()->message() :
"unexpected success or fatal error"),
print_slave_db_safe(thd->db), query_arg);
thd->is_slave_error= 1;
}
@ -6507,12 +6543,10 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
thd->variables.server_id= this->server_id;
thd->variables.gtid_domain_id= this->domain_id;
thd->variables.gtid_seq_no= this->seq_no;
mysql_reset_thd_for_next_command(thd);
if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates)
{
/* Need to reset prior "ok" status to give an error. */
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
if (mysql_bin_log.check_strict_gtid_sequence(this->domain_id,
this->server_id, this->seq_no))
return 1;
@ -7290,28 +7324,41 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
bool res;
int err;
rpl_gtid gtid;
uint64 sub_id;
uint64 sub_id= 0;
Relay_log_info const *rli= rgi->rli;
/*
XID_EVENT works like a COMMIT statement. And it also updates the
mysql.gtid_slave_pos table with the GTID of the current transaction.
Therefore, it acts much like a normal SQL statement, so we need to do
mysql_reset_thd_for_next_command() as if starting a new statement.
*/
mysql_reset_thd_for_next_command(thd);
/*
Record any GTID in the same transaction, so slave state is transactionally
consistent.
*/
if ((sub_id= rgi->gtid_sub_id))
if (rgi->gtid_pending)
{
/* Clear the GTID from the RLI so we don't accidentally reuse it. */
rgi->gtid_sub_id= 0;
sub_id= rgi->gtid_sub_id;
rgi->gtid_pending= false;
gtid= rgi->current_gtid;
err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true, false);
if (err)
{
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
"Error during XID COMMIT: failed to update GTID state in "
"%s.%s: %d: %s",
"mysql", rpl_gtid_slave_state_table_name.str,
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());
int ec= thd->get_stmt_da()->sql_errno();
/*
Do not report an error if this is really a kill due to a deadlock.
In this case, the transaction will be re-tried instead.
*/
if (!is_parallel_retry_error(rgi, ec))
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
"Error during XID COMMIT: failed to update GTID state in "
"%s.%s: %d: %s",
"mysql", rpl_gtid_slave_state_table_name.str, ec,
thd->get_stmt_da()->message());
trans_rollback(thd);
thd->is_slave_error= 1;
return err;
@ -9631,7 +9678,8 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
if (open_and_lock_tables(thd, rgi->tables_to_lock, FALSE, 0))
{
uint actual_error= thd->get_stmt_da()->sql_errno();
if (thd->is_slave_error || thd->is_fatal_error)
if ((thd->is_slave_error || thd->is_fatal_error) &&
!is_parallel_retry_error(rgi, actual_error))
{
/*
Error reporting borrowed from Query_log_event with many excessive

View file

@ -368,6 +368,7 @@ static I_List<THD> thread_cache;
static bool binlog_format_used= false;
LEX_STRING opt_init_connect, opt_init_slave;
static mysql_cond_t COND_thread_cache, COND_flush_thread_cache;
mysql_cond_t COND_slave_init;
static DYNAMIC_ARRAY all_options;
/* Global variables */
@ -706,7 +707,7 @@ mysql_mutex_t
LOCK_crypt,
LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_active_mi,
LOCK_connection_count, LOCK_error_messages;
LOCK_connection_count, LOCK_error_messages, LOCK_slave_init;
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats;
@ -880,7 +881,8 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_wakeup_ready, key_LOCK_wait_commit;
PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
key_LOCK_slave_init;
PSI_mutex_key key_TABLE_SHARE_LOCK_share;
static PSI_mutex_info all_server_mutexes[]=
@ -943,6 +945,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL},
{ &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_init, "LOCK_slave_init", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
@ -997,7 +1000,7 @@ PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered;
key_COND_prepare_ordered, key_COND_slave_init;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
static PSI_cond_info all_server_conds[]=
@ -1046,6 +1049,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_slave_init, "COND_slave_init", 0},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
};
@ -2173,6 +2177,8 @@ static void clean_up_mutexes()
mysql_mutex_destroy(&LOCK_prepare_ordered);
mysql_cond_destroy(&COND_prepare_ordered);
mysql_mutex_destroy(&LOCK_commit_ordered);
mysql_mutex_destroy(&LOCK_slave_init);
mysql_cond_destroy(&COND_slave_init);
DBUG_VOID_RETURN;
}
@ -4387,6 +4393,9 @@ static int init_thread_environment()
mysql_cond_init(key_COND_prepare_ordered, &COND_prepare_ordered, NULL);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_slave_init, &LOCK_slave_init,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_slave_init, &COND_slave_init, NULL);
#ifdef HAVE_OPENSSL
mysql_mutex_init(key_LOCK_des_key_file,

View file

@ -518,7 +518,8 @@ extern mysql_mutex_t
LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables, LOCK_user_conn,
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count;
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count,
LOCK_slave_init;
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
#ifdef HAVE_OPENSSL
extern mysql_mutex_t LOCK_des_key_file;
@ -529,6 +530,7 @@ extern mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
extern mysql_rwlock_t LOCK_system_variables_hash;
extern mysql_cond_t COND_thread_count;
extern mysql_cond_t COND_manager;
extern mysql_cond_t COND_slave_init;
extern int32 thread_running;
extern int32 thread_count;
extern my_atomic_rwlock_t thread_running_lock, thread_count_lock;

View file

@ -65,16 +65,16 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
int
rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
{
uint64 sub_id;
DBUG_ENTER("rpl_slave_state::record_and_update_gtid");
/*
Update the GTID position, if we have it and did not already update
it in a GTID transaction.
*/
if ((sub_id= rgi->gtid_sub_id))
if (rgi->gtid_pending)
{
rgi->gtid_sub_id= 0;
uint64 sub_id= rgi->gtid_sub_id;
rgi->gtid_pending= false;
if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
{
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))

View file

@ -7,15 +7,6 @@
/*
Code for optional parallel execution of replicated events on the slave.
ToDo list:
- Retry of failed transactions is not yet implemented for the parallel case.
- All the waits (eg. in struct wait_for_commit and in
rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill,
everything needs to be correctly rolled back and stopped in all threads,
to ensure a consistent slave replication state.
*/
struct rpl_parallel_thread_pool global_rpl_thread_pool;
@ -32,7 +23,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
thd->rgi_slave= rgi;
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
@ -44,7 +34,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL;
thread_safe_increment64(&rli->executed_entries,
&slave_executed_entries_lock);
@ -165,6 +154,7 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
thd->clear_error();
thd->reset_killed();
thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(rgi->worker_error);
}
@ -197,6 +187,281 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
}
static void
register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
rpl_parallel_entry *entry)
{
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
{
/*
Register that the commit of this event group must wait for the
commit of the previous event group to complete before it may
complete itself, so that we preserve commit order.
*/
wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
}
}
#ifndef DBUG_OFF
static int
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
{
if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 &&
rgi->retry_event_count == 4)
{
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
my_error(ER_LOCK_DEADLOCK, MYF(0));
return 1;
}
return 0;
}
#endif
/*
If we detect a deadlock due to eg. storage engine locks that conflict with
the fixed commit order, then the later transaction will be killed
asynchroneously to allow the former to complete its commit.
In this case, we convert the 'killed' error into a deadlock error, and retry
the later transaction. */
static void
convert_kill_to_deadlock_error(rpl_group_info *rgi)
{
THD *thd= rgi->thd;
int err_code;
if (!thd->get_stmt_da()->is_error())
return;
err_code= thd->get_stmt_da()->sql_errno();
if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
rgi->killed_for_retry)
{
thd->clear_error();
my_error(ER_LOCK_DEADLOCK, MYF(0));
rgi->killed_for_retry= false;
thd->reset_killed();
}
}
static bool
is_group_ending(Log_event *ev, Log_event_type event_type)
{
return event_type == XID_EVENT ||
(event_type == QUERY_EVENT &&
(((Query_log_event *)ev)->is_commit() ||
((Query_log_event *)ev)->is_rollback()));
}
static int
retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
rpl_parallel_thread::queued_event *orig_qev)
{
IO_CACHE rlog;
LOG_INFO linfo;
File fd= (File)-1;
const char *errmsg= NULL;
inuse_relaylog *ir= rgi->relay_log;
uint64 event_count;
uint64 events_to_execute= rgi->retry_event_count;
Relay_log_info *rli= rgi->rli;
int err;
ulonglong cur_offset, old_offset;
char log_name[FN_REFLEN];
THD *thd= rgi->thd;
rpl_parallel_entry *entry= rgi->parallel_entry;
ulong retries= 0;
do_retry:
event_count= 0;
err= 0;
/*
If we already started committing before getting the deadlock (or other
error) that caused us to need to retry, we have already signalled
subsequent transactions that we have started committing. This is
potentially a problem, as now we will rollback, and if subsequent
transactions would start to execute now, they could see an unexpected
state of the database and get eg. key not found or duplicate key error.
However, to get a deadlock in the first place, there must have been
another earlier transaction that is waiting for us. Thus that other
transaction has _not_ yet started to commit, and any subsequent
transactions will still be waiting at this point.
So here, we decrement back the count of transactions that started
committing (if we already incremented it), undoing the effect of an
earlier mark_start_commit(). Then later, when the retry succeeds and we
commit again, we can do a new mark_start_commit() and eventually wake up
subsequent transactions at the proper time.
We need to do the unmark before the rollback, to be sure that the
transaction we deadlocked with will not signal that it started to commit
until after the unmark.
*/
rgi->unmark_start_commit();
/*
We might get the deadlock error that causes the retry during commit, while
sitting in wait_for_prior_commit(). If this happens, we will have a
pending error in the wait_for_commit object. So clear this by
unregistering (and later re-registering) the wait.
*/
if(thd->wait_for_commit_ptr)
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
rgi->cleanup_context(thd, 1);
mysql_mutex_lock(&rli->data_lock);
++rli->retried_trans;
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
mysql_mutex_lock(&entry->LOCK_parallel_entry);
register_wait_for_prior_event_group_commit(rgi, entry);
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
strmake_buf(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
{
err= 1;
goto err;
}
cur_offset= rgi->retry_start_offset;
my_b_seek(&rlog, cur_offset);
do
{
Log_event_type event_type;
Log_event *ev;
rpl_parallel_thread::queued_event *qev;
/* The loop is here so we can try again the next relay log file on EOF. */
for (;;)
{
old_offset= cur_offset;
ev= Log_event::read_log_event(&rlog, 0,
rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
opt_slave_sql_verify_checksum);
cur_offset= my_b_tell(&rlog);
if (ev)
break;
if (rlog.error < 0)
{
errmsg= "slave SQL thread aborted because of I/O error";
err= 1;
goto err;
}
if (rlog.error > 0)
{
sql_print_error("Slave SQL thread: I/O error reading "
"event(errno: %d cur_log->error: %d)",
my_errno, rlog.error);
errmsg= "Aborting slave SQL thread because of partial event read";
err= 1;
goto err;
}
/* EOF. Move to the next relay log. */
end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME));
fd= (File)-1;
/* Find the next relay log file. */
if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) ||
(err= rli->relay_log.find_next_log(&linfo, 1)))
{
char buff[22];
sql_print_error("next log error: %d offset: %s log: %s",
err,
llstr(linfo.index_file_offset, buff),
log_name);
goto err;
}
strmake_buf(log_name ,linfo.log_file_name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
{
err= 1;
goto err;
}
/* Loop to try again on the new log file. */
}
event_type= ev->get_type_code();
if (!Log_event::is_group_event(event_type))
{
delete ev;
continue;
}
ev->thd= thd;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
cur_offset - old_offset);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
if (!qev)
{
delete ev;
my_error(ER_OUT_OF_RESOURCES, MYF(0));
err= 1;
goto err;
}
if (is_group_ending(ev, event_type))
rgi->mark_start_commit();
err= rpt_handle_event(qev, rpt);
++event_count;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_qev(qev);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
delete_or_keep_event_post_apply(rgi, event_type, ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100",
err= dbug_simulate_tmp_error(rgi, thd););
if (err)
{
convert_kill_to_deadlock_error(rgi);
if (has_temporary_error(thd))
{
++retries;
if (retries < slave_trans_retries)
{
end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME));
fd= (File)-1;
goto do_retry;
}
sql_print_error("Slave worker thread retried transaction %lu time(s) "
"in vain, giving up. Consider raising the value of "
"the slave_transaction_retries variable.",
slave_trans_retries);
}
goto err;
}
} while (event_count < events_to_execute);
err:
if (fd >= 0)
{
end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME));
}
if (errmsg)
sql_print_error("Error reading relay log event: %s", errmsg);
return err;
}
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@ -215,6 +480,8 @@ handle_rpl_parallel_thread(void *arg)
rpl_sql_thread_info sql_info(NULL);
size_t total_event_size;
int err;
inuse_relaylog *last_ir;
uint64 accumulated_ir_count;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@ -244,39 +511,6 @@ handle_rpl_parallel_thread(void *arg)
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
thd->system_thread_info.rpl_sql_info= &sql_info;
/*
For now, we need to run the replication parallel worker threads in
READ COMMITTED. This is needed because gap locks are not symmetric.
For example, a gap lock from a DELETE blocks an insert intention lock,
but not vice versa. So an INSERT followed by DELETE can group commit
on the master, but if we are unlucky with thread scheduling we can
then deadlock on the slave because the INSERT ends up waiting for a
gap lock from the DELETE (and the DELETE in turn waits for the INSERT
in wait_for_prior_commit()). See also MDEV-5914.
It should be mostly safe to run in READ COMMITTED in the slave anyway.
The commit order is already fixed from on the master, so we do not
risk logging into the binlog in an incorrect order between worker
threads (one that would cause different results if executed on a
lower-level slave that uses this slave as a master). The only
potential problem is with transactions run in a different master
connection (using multi-source replication), or run directly on the
slave by an application; when using READ COMMITTED we are not
guaranteed serialisability of binlogged statements.
In practice, this is unlikely to be an issue. In GTID mode, such
parallel transactions from multi-source or application must in any
case use a different replication domain, in which case binlog order
by definition must be independent between the different domain. Even
in non-GTID mode, normally one will assume that the external
transactions are not conflicting with those applied by the slave, so
that isolation level should make no difference. It would be rather
strange if the result of applying query events from one master would
depend on the timing and nature of other queries executed from
different multi-source connections or done directly on the slave by
an application. Still, something to be aware of.
*/
thd->variables.tx_isolation= ISO_READ_COMMITTED;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->thd= thd;
@ -332,7 +566,7 @@ handle_rpl_parallel_thread(void *arg)
continue;
}
group_rgi= rgi;
thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
@ -341,7 +575,6 @@ handle_rpl_parallel_thread(void *arg)
PSI_stage_info old_stage;
uint64 wait_count;
thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation;
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@ -352,9 +585,7 @@ handle_rpl_parallel_thread(void *arg)
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
/* Save this, as it gets cleared when the event group commits. */
event_gtid_sub_id= rgi->gtid_sub_id;
rgi->thd= thd;
/*
@ -430,17 +661,9 @@ handle_rpl_parallel_thread(void *arg)
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
skip_event_group= true;
else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
{
/*
Register that the commit of this event group must wait for the
commit of the previous event group to complete before it may
complete itself, so that we preserve commit order.
*/
wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
}
else
register_wait_for_prior_event_group_commit(rgi, entry);
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
&did_enter_cond, &old_stage);
@ -482,10 +705,7 @@ handle_rpl_parallel_thread(void *arg)
}
}
group_ending= event_type == XID_EVENT ||
(event_type == QUERY_EVENT &&
(((Query_log_event *)events->ev)->is_commit() ||
((Query_log_event *)events->ev)->is_rollback()));
group_ending= is_group_ending(events->ev, event_type);
if (group_ending && likely(!rgi->worker_error))
{
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
@ -499,23 +719,41 @@ handle_rpl_parallel_thread(void *arg)
everything is stopped and cleaned up correctly.
*/
if (likely(!rgi->worker_error) && !skip_event_group)
{
++rgi->retry_event_count;
err= rpt_handle_event(events, rpt);
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
err= dbug_simulate_tmp_error(rgi, thd););
if (err)
{
convert_kill_to_deadlock_error(rgi);
if (has_temporary_error(thd) && slave_trans_retries > 0)
err= retry_event_group(rgi, rpt, events);
}
}
else
{
delete events->ev;
err= thd->wait_for_prior_commit();
}
end_of_group=
in_event_group &&
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
group_ending);
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
events->next= qevs_to_free;
qevs_to_free= events;
if (unlikely(err) && !rgi->worker_error)
if (unlikely(err))
{
slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, err);
if (!rgi->worker_error)
{
slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, err);
}
thd->reset_killed();
}
if (end_of_group)
{
@ -523,7 +761,7 @@ handle_rpl_parallel_thread(void *arg)
finish_event_group(thd, event_gtid_sub_id, entry, rgi);
rgi->next= rgis_to_free;
rgis_to_free= rgi;
group_rgi= rgi= NULL;
thd->rgi_slave= group_rgi= rgi= NULL;
skip_event_group= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
@ -548,12 +786,34 @@ handle_rpl_parallel_thread(void *arg)
rpt->free_rgi(rgis_to_free);
rgis_to_free= next;
}
last_ir= NULL;
accumulated_ir_count= 0;
while (qevs_to_free)
{
rpl_parallel_thread::queued_event *next= qevs_to_free->next;
inuse_relaylog *ir= qevs_to_free->ir;
/* Batch up refcount update to reduce use of synchronised operations. */
if (last_ir != ir)
{
if (last_ir)
{
my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
accumulated_ir_count= 0;
}
last_ir= ir;
}
++accumulated_ir_count;
rpt->free_qev(qevs_to_free);
qevs_to_free= next;
}
if (last_ir)
{
my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
}
if ((events= rpt->event_queue) != NULL)
{
@ -584,7 +844,7 @@ handle_rpl_parallel_thread(void *arg)
in_event_group= false;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_rgi(group_rgi);
group_rgi= NULL;
thd->rgi_slave= group_rgi= NULL;
skip_event_group= false;
}
if (!in_event_group)
@ -802,8 +1062,7 @@ err:
rpl_parallel_thread::queued_event *
rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
Relay_log_info *rli)
rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
{
queued_event *qev;
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@ -817,6 +1076,17 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
qev->ev= ev;
qev->event_size= event_size;
qev->next= NULL;
return qev;
}
rpl_parallel_thread::queued_event *
rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
Relay_log_info *rli)
{
queued_event *qev= get_qev_common(ev, event_size);
if (!qev)
return NULL;
strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
qev->event_relay_log_pos= rli->event_relay_log_pos;
qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
@ -825,6 +1095,24 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
}
rpl_parallel_thread::queued_event *
rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
const char *relay_log_name,
ulonglong event_pos, ulonglong event_size)
{
queued_event *qev= get_qev_common(ev, event_size);
if (!qev)
return NULL;
qev->rgi= orig_qev->rgi;
strcpy(qev->event_relay_log_name, relay_log_name);
qev->event_relay_log_pos= event_pos;
qev->future_event_relay_log_pos= event_pos+event_size;
strcpy(qev->future_event_master_log_name,
orig_qev->future_event_master_log_name);
return qev;
}
void
rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
{
@ -836,7 +1124,7 @@ rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
rpl_group_info*
rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rpl_parallel_entry *e)
rpl_parallel_entry *e, ulonglong event_size)
{
rpl_group_info *rgi;
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@ -864,6 +1152,10 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
return NULL;
}
rgi->parallel_entry= e;
rgi->relay_log= rli->last_inuse_relaylog;
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
rgi->killed_for_retry= false;
return rgi;
}
@ -1391,15 +1683,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (typ == GTID_EVENT)
{
uint32 domain_id;
if (likely(typ == GTID_EVENT))
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
0 : gtid_ev->domain_id);
}
else
domain_id= 0;
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
0 : gtid_ev->domain_id);
if (!(e= find(domain_id)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
@ -1439,7 +1725,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e)))
if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
{
cur_thread->free_qev(qev);
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
@ -1551,6 +1837,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Queue the event for processing.
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
qev->ir= rli->last_inuse_relaylog;
++qev->ir->queued_count;
cur_thread->enqueue(qev);
unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
&did_enter_cond, &old_stage);

View file

@ -9,6 +9,7 @@ struct rpl_parallel_entry;
struct rpl_parallel_thread_pool;
class Relay_log_info;
struct inuse_relaylog;
/*
@ -73,6 +74,7 @@ struct rpl_parallel_thread {
queued_event *next;
Log_event *ev;
rpl_group_info *rgi;
inuse_relaylog *ir;
ulonglong future_event_relay_log_pos;
char event_relay_log_name[FN_REFLEN];
char future_event_master_log_name[FN_REFLEN];
@ -106,11 +108,15 @@ struct rpl_parallel_thread {
queued_size-= dequeue_size;
}
queued_event *get_qev_common(Log_event *ev, ulonglong event_size);
queued_event *get_qev(Log_event *ev, ulonglong event_size,
Relay_log_info *rli);
queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev,
const char *relay_log_name,
ulonglong event_pos, ulonglong event_size);
void free_qev(queued_event *qev);
rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rpl_parallel_entry *e);
rpl_parallel_entry *e, ulonglong event_size);
void free_rgi(rpl_group_info *rgi);
group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev);
void free_gco(group_commit_orderer *gco);
@ -176,7 +182,7 @@ struct rpl_parallel_entry {
Event groups commit in order, so the rpl_group_info for an event group
will be alive (at least) as long as
rpl_grou_info::gtid_sub_id > last_committed_sub_id. This can be used to
rpl_group_info::gtid_sub_id > last_committed_sub_id. This can be used to
safely refer back to previous event groups if they are still executing,
and ignore them if they completed, without requiring explicit
synchronisation between the threads.

View file

@ -52,6 +52,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
sync_counter(0), is_relay_log_recovery(is_slave_recovery),
save_temporary_tables(0), mi(0),
inuse_relaylog_list(0), last_inuse_relaylog(0),
cur_log_old_open_count(0), group_relay_log_pos(0),
event_relay_log_pos(0),
#if HAVE_valgrind
@ -91,6 +92,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
my_atomic_rwlock_init(&inuse_relaylog_atomic_lock);
relay_log.init_pthread_objects();
DBUG_VOID_RETURN;
}
@ -98,8 +100,17 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
Relay_log_info::~Relay_log_info()
{
inuse_relaylog *cur;
DBUG_ENTER("Relay_log_info::~Relay_log_info");
cur= inuse_relaylog_list;
while (cur)
{
DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
inuse_relaylog *next= cur->next;
my_free(cur);
cur= next;
}
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
@ -107,6 +118,7 @@ Relay_log_info::~Relay_log_info()
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
mysql_cond_destroy(&log_space_cond);
my_atomic_rwlock_destroy(&inuse_relaylog_atomic_lock);
relay_log.cleanup();
DBUG_VOID_RETURN;
}
@ -1338,6 +1350,32 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
DBUG_VOID_RETURN;
}
int
Relay_log_info::alloc_inuse_relaylog(const char *name)
{
inuse_relaylog *ir;
if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
return 1;
}
strmake_buf(ir->name, name);
if (!inuse_relaylog_list)
inuse_relaylog_list= ir;
else
{
last_inuse_relaylog->completed= true;
last_inuse_relaylog->next= ir;
}
last_inuse_relaylog= ir;
return 0;
}
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
rpl_load_gtid_slave_state(THD *thd)
@ -1524,6 +1562,9 @@ rpl_group_info::reinit(Relay_log_info *rli)
tables_to_lock_count= 0;
trans_retries= 0;
last_event_start_time= 0;
gtid_sub_id= 0;
commit_id= 0;
gtid_pending= false;
worker_error= 0;
row_stmt_start_timestamp= 0;
long_find_row_note_printed= false;
@ -1533,7 +1574,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
}
rpl_group_info::rpl_group_info(Relay_log_info *rli)
: thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
: thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{
@ -1567,6 +1608,8 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
rgi->current_gtid.domain_id= gev->domain_id;
rgi->current_gtid.server_id= gev->server_id;
rgi->current_gtid.seq_no= gev->seq_no;
rgi->commit_id= gev->commit_id;
rgi->gtid_pending= true;
return 0;
}
@ -1622,7 +1665,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
void rpl_group_info::cleanup_context(THD *thd, bool error)
{
DBUG_ENTER("Relay_log_info::cleanup_context");
DBUG_ENTER("rpl_group_info::cleanup_context");
DBUG_PRINT("enter", ("error: %d", (int) error));
DBUG_ASSERT(this->thd == thd);
@ -1688,7 +1731,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
void rpl_group_info::clear_tables_to_lock()
{
DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
#ifndef DBUG_OFF
/**
When replicating in RBR and MyISAM Merge tables are involved
@ -1735,7 +1778,7 @@ void rpl_group_info::clear_tables_to_lock()
void rpl_group_info::slave_close_thread_tables(THD *thd)
{
DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
thd->get_stmt_da()->set_overwrite_status(true);
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
thd->get_stmt_da()->set_overwrite_status(false);
@ -1824,6 +1867,34 @@ rpl_group_info::gtid_info()
}
/*
Undo the effect of a prior mark_start_commit().
This is only used for retrying a transaction in parallel replication, after
we have encountered a deadlock or other temporary error.
When we get such a deadlock, it means that the current group of transactions
did not yet all start committing (else they would not have deadlocked). So
we will not yet have woken up anything in the next group, our rgi->gco is
still live, and we can simply decrement the counter (to be incremented again
later, when the retry succeeds and reaches the commit step).
*/
void
rpl_group_info::unmark_start_commit()
{
rpl_parallel_entry *e;
if (!did_mark_start_commit)
return;
e= this->parallel_entry;
mysql_mutex_lock(&e->LOCK_parallel_entry);
--e->count_committing_event_groups;
mysql_mutex_unlock(&e->LOCK_parallel_entry);
did_mark_start_commit= false;
}
rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
: rpl_filter(filter)
{

View file

@ -61,6 +61,7 @@ enum {
*****************************************************************************/
struct rpl_group_info;
struct inuse_relaylog;
class Relay_log_info : public Slave_reporting_capability
{
@ -163,6 +164,15 @@ public:
/* parent Master_info structure */
Master_info *mi;
/*
List of active relay log files.
(This can be more than one in case of parallel replication).
*/
inuse_relaylog *inuse_relaylog_list;
inuse_relaylog *last_inuse_relaylog;
/* Lock used to protect inuse_relaylog::dequeued_count */
my_atomic_rwlock_t inuse_relaylog_atomic_lock;
/*
Needed to deal properly with cur_log getting closed and re-opened with
a different log under our feet
@ -398,6 +408,7 @@ public:
void stmt_done(my_off_t event_log_pos,
time_t event_creation_time, THD *thd,
rpl_group_info *rgi);
int alloc_inuse_relaylog(const char *name);
/**
Is the replication inside a group?
@ -463,6 +474,39 @@ private:
};
/*
In parallel replication, if we need to re-try a transaction due to a
deadlock or other temporary error, we may need to go back and re-read events
out of an earlier relay log.
This structure keeps track of the relaylogs that are potentially in use.
Each rpl_group_info has a pointer to one of those, corresponding to the
first GTID event.
A pair of reference count keeps track of how long a relay log is potentially
in use. When the `completed' flag is set, all events have been read out of
the relay log, but the log might still be needed for retry in worker
threads. As worker threads complete an event group, they increment
atomically the `dequeued_count' with number of events queued. Thus, when
completed is set and dequeued_count equals queued_count, the relay log file
is finally done with and can be purged.
By separating the queued and dequeued count, only the dequeued_count needs
multi-thread synchronisation; the completed flag and queued_count fields
are only accessed by the SQL driver thread and need no synchronisation.
*/
struct inuse_relaylog {
inuse_relaylog *next;
/* Number of events in this relay log queued for worker threads. */
int64 queued_count;
/* Number of events completed by worker threads. */
volatile int64 dequeued_count;
/* Set when all events have been read from a relaylog. */
bool completed;
char name[FN_REFLEN];
};
/*
This is data for various state needed to be kept for the processing of
one event group (transaction) during replication.
@ -489,6 +533,7 @@ struct rpl_group_info
*/
uint64 gtid_sub_id;
rpl_gtid current_gtid;
uint64 commit_id;
/*
This is used to keep transaction commit order.
We will signal this when we commit, and can register it to wait for the
@ -566,6 +611,8 @@ struct rpl_group_info
*/
char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec;
/* When gtid_pending is true, we have not yet done record_gtid(). */
bool gtid_pending;
int worker_error;
/*
Set true when we signalled that we reach the commit phase. Used to avoid
@ -596,6 +643,15 @@ struct rpl_group_info
/* Needs room for "Gtid D-S-N\x00". */
char gtid_info_buf[5+10+1+10+1+20+1];
/*
Information to be able to re-try an event group in case of a deadlock or
other temporary error.
*/
inuse_relaylog *relay_log;
uint64 retry_start_offset;
uint64 retry_event_count;
bool killed_for_retry;
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
void reinit(Relay_log_info *rli);
@ -684,6 +740,7 @@ struct rpl_group_info
void mark_start_commit_no_lock();
void mark_start_commit();
char *gtid_info();
void unmark_start_commit();
time_t get_row_stmt_start_timestamp()
{

View file

@ -301,7 +301,10 @@ handle_slave_init(void *arg __attribute__((unused)))
mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id= thread_id++;
mysql_mutex_unlock(&LOCK_thread_count);
thd->system_thread = SYSTEM_THREAD_SLAVE_INIT;
thd->store_globals();
thd->security_ctx->skip_grants();
thd->set_command(COM_DAEMON);
thd_proc_info(thd, "Loading slave GTID position from table");
if (rpl_load_gtid_slave_state(thd))
@ -316,15 +319,22 @@ handle_slave_init(void *arg __attribute__((unused)))
mysql_mutex_unlock(&LOCK_thread_count);
my_thread_end();
mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_slave_init);
slave_init_thread_running= false;
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_slave_init);
mysql_mutex_unlock(&LOCK_slave_init);
return 0;
}
/*
Start the slave init thread.
This thread is used to load the GTID state from mysql.gtid_slave_pos at
server start; reading from table requires valid THD, which is otherwise not
available during server init.
*/
static int
run_slave_init_thread()
{
@ -338,10 +348,10 @@ run_slave_init_thread()
return 1;
}
mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_slave_init);
while (slave_init_thread_running)
mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_wait(&COND_slave_init, &LOCK_slave_init);
mysql_mutex_unlock(&LOCK_slave_init);
return 0;
}
@ -3094,7 +3104,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
that the error is temporary by pushing a warning with the error code
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
*/
static int has_temporary_error(THD *thd)
int
has_temporary_error(THD *thd)
{
DBUG_ENTER("has_temporary_error");
@ -3310,7 +3321,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
Make sure we do not errorneously update gtid_slave_pos with a lingering
GTID from this failed event group (MDEV-4906).
*/
rgi->gtid_sub_id= 0;
rgi->gtid_pending= false;
}
DBUG_RETURN(exec_res ? 1 : 0);
@ -4466,6 +4477,7 @@ pthread_handler_t handle_slave_sql(void *arg)
mysql_mutex_unlock(&rli->log_space_lock);
serial_rgi->gtid_sub_id= 0;
serial_rgi->gtid_pending= false;
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,
@ -4476,6 +4488,9 @@ pthread_handler_t handle_slave_sql(void *arg)
"Error initializing relay log position: %s", errmsg);
goto err;
}
if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
goto err;
strcpy(rli->future_event_master_log_name, rli->group_master_log_name);
THD_CHECK_SENTRY(thd);
#ifndef DBUG_OFF
@ -6408,6 +6423,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
DBUG_ASSERT(rli->cur_log_fd >= 0);
mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1;
rli->last_inuse_relaylog->completed= true;
if (relay_log_purge)
{
@ -6536,6 +6552,12 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
mysql_mutex_unlock(log_lock);
goto err;
}
if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name))
{
if (!hot_log)
mysql_mutex_unlock(log_lock);
goto err;
}
if (!hot_log)
mysql_mutex_unlock(log_lock);
continue;
@ -6551,6 +6573,8 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
&errmsg)) <0)
goto err;
if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name))
goto err;
}
else
{

View file

@ -229,6 +229,7 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD *thd, rpl_group_info *rgi);
int rotate_relay_log(Master_info* mi);
int has_temporary_error(THD *thd);
int apply_event_and_update_pos(Log_event* ev, THD* thd,
struct rpl_group_info *rgi,
rpl_parallel_thread *rpt);

View file

@ -43,6 +43,7 @@
#include "sql_base.h" // close_thread_tables
#include "transaction.h" // trans_commit_stmt
#include "sql_audit.h"
#include "debug_sync.h"
/*
Sufficient max length of printed destinations and frame offsets (all uints).
@ -1309,6 +1310,7 @@ sp_head::execute(THD *thd, bool merge_da_on_success)
/* Discard the initial part of executing routines. */
thd->profiling.discard_current_query();
#endif
DEBUG_SYNC(thd, "sp_head_execute_before_loop");
do
{
sp_instr *i;

View file

@ -914,7 +914,7 @@ send_result_message:
protocol->store(operator_name, system_charset_info);
if (result_code) // either mysql_recreate_table or analyze failed
{
DBUG_ASSERT(thd->is_error() || thd->killed);
DBUG_ASSERT(thd->is_error());
if (thd->is_error())
{
const char *err_msg= thd->get_stmt_da()->message();

View file

@ -2084,7 +2084,10 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
DBUG_RETURN(TRUE);
if (!(flags & MYSQL_OPEN_IGNORE_KILLED) && thd->killed)
{
thd->send_kill_message();
DBUG_RETURN(TRUE);
}
/*
Check if we're trying to take a write lock in a read only transaction.

View file

@ -4217,6 +4217,219 @@ extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd)
return thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
}
/*
This function can optionally be called to check if thd_report_wait_for()
needs to be called for waits done by a given transaction.
If this function returns false for a given thd, there is no need to do any
calls to thd_report_wait_for() on that thd.
This call is optional; it is safe to call thd_report_wait_for() in any case.
This call can be used to save some redundant calls to thd_report_wait_for()
if desired. (This is unlikely to matter much unless there are _lots_ of
waits to report, as the overhead of thd_report_wait_for() is small).
*/
extern "C" int
thd_need_wait_for(const MYSQL_THD thd)
{
rpl_group_info *rgi;
if (!thd)
return false;
rgi= thd->rgi_slave;
if (!rgi)
return false;
return rgi->is_parallel_exec;
}
/*
Used by InnoDB/XtraDB to report that one transaction THD is about to go to
wait for a transactional lock held by another transactions OTHER_THD.
This is used for parallel replication, where transactions are required to
commit in the same order on the slave as they did on the master. If the
transactions on the slave encounters lock conflicts on the slave that did
not exist on the master, this can cause deadlocks.
Normally, such conflicts will not occur, because the same conflict would
have prevented the two transactions from committing in parallel on the
master, thus preventing them from running in parallel on the slave in the
first place. However, it is possible in case when the optimizer chooses a
different plan on the slave than on the master (eg. table scan instead of
index scan).
InnoDB/XtraDB reports lock waits using this call. If a lock wait causes a
deadlock with the pre-determined commit order, we kill the later transaction,
and later re-try it, to resolve the deadlock.
This call need only receive reports about waits for locks that will remain
until the holding transaction commits. InnoDB/XtraDB auto-increment locks
are released earlier, and so need not be reported. (Such false positives are
not harmful, but could lead to unnecessary kill and retry, so best avoided).
*/
extern "C" void
thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd)
{
rpl_group_info *rgi;
rpl_group_info *other_rgi;
if (!thd || !other_thd)
return;
rgi= thd->rgi_slave;
other_rgi= other_thd->rgi_slave;
if (!rgi || !other_rgi)
return;
if (!rgi->is_parallel_exec)
return;
if (rgi->rli != other_rgi->rli)
return;
if (!rgi->gtid_sub_id || !other_rgi->gtid_sub_id)
return;
if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
return;
if (rgi->gtid_sub_id > other_rgi->gtid_sub_id)
return;
/*
This transaction is about to wait for another transaction that is required
by replication binlog order to commit after. This would cause a deadlock.
So send a kill to the other transaction, with a temporary error; this will
cause replication to rollback (and later re-try) the other transaction,
releasing the lock for this transaction so replication can proceed.
*/
other_rgi->killed_for_retry= true;
mysql_mutex_lock(&other_thd->LOCK_thd_data);
other_thd->awake(KILL_CONNECTION);
mysql_mutex_unlock(&other_thd->LOCK_thd_data);
}
/*
This function is called from InnoDB/XtraDB to check if the commit order of
two transactions has already been decided by the upper layer. This happens
in parallel replication, where the commit order is forced to be the same on
the slave as it was originally on the master.
If this function returns false, it means that such commit order will be
enforced. This allows the storage engine to optionally omit gap lock waits
or similar measures that would otherwise be needed to ensure that
transactions would be serialised in a way that would cause a commit order
that is correct for binlogging for statement-based replication.
Since transactions are only run in parallel on the slave if they ran without
lock conflicts on the master, normally no lock conflicts on the slave happen
during parallel replication. However, there are a couple of corner cases
where it can happen, like these secondary-index operations:
T1: INSERT INTO t1 VALUES (7, NULL);
T2: DELETE FROM t1 WHERE b <= 3;
T1: UPDATE t1 SET secondary=NULL WHERE primary=1
T2: DELETE t1 WHERE secondary <= 3
The DELETE takes a gap lock that can block the INSERT/UPDATE, but the row
locks set by INSERT/UPDATE do not block the DELETE. Thus, the execution
order of the transactions determine whether a lock conflict occurs or
not. Thus a lock conflict can occur on the slave where it did not on the
master.
If this function returns true, normal locking should be done as required by
the binlogging and transaction isolation level in effect. But if it returns
false, the correct order will be enforced anyway, and InnoDB/XtraDB can
avoid taking the gap lock, preventing the lock conflict.
Calling this function is just an optimisation to avoid unnecessary
deadlocks. If it was not used, a gap lock would be set that could eventually
cause a deadlock; the deadlock would be caught by thd_report_wait_for() and
the transaction T2 killed and rolled back (and later re-tried).
*/
extern "C" int
thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd)
{
rpl_group_info *rgi, *other_rgi;
if (!thd || !other_thd)
return 1;
rgi= thd->rgi_slave;
other_rgi= other_thd->rgi_slave;
if (!rgi || !other_rgi)
return 1;
if (!rgi->is_parallel_exec)
return 1;
if (rgi->rli != other_rgi->rli)
return 1;
if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
return 1;
if (!rgi->commit_id || rgi->commit_id != other_rgi->commit_id)
return 1;
/*
These two threads are doing parallel replication within the same
replication domain. Their commit order is already fixed, so we do not need
gap locks or similar to otherwise enforce ordering (and in fact such locks
could lead to unnecessary deadlocks and transaction retry).
*/
return 0;
}
/*
If the storage engine detects a deadlock, and needs to choose a victim
transaction to roll back, it can call this function to ask the upper
server layer for which of two possible transactions is prefered to be
aborted and rolled back.
In parallel replication, if two transactions are running in parallel and
one is fixed to commit before the other, then the one that commits later
will be prefered as the victim - chosing the early transaction as a victim
will not resolve the deadlock anyway, as the later transaction still needs
to wait for the earlier to commit.
Otherwise, a transaction that uses only transactional tables, and can thus
be safely rolled back, will be prefered as a deadlock victim over a
transaction that also modified non-transactional (eg. MyISAM) tables.
The return value is -1 if the first transaction is prefered as a deadlock
victim, 1 if the second transaction is prefered, or 0 for no preference (in
which case the storage engine can make the choice as it prefers).
*/
extern "C" int
thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2)
{
rpl_group_info *rgi1, *rgi2;
bool nontrans1, nontrans2;
if (!thd1 || !thd2)
return 0;
/*
If the transactions are participating in the same replication domain in
parallel replication, then request to select the one that will commit
later (in the fixed commit order from the master) as the deadlock victim.
*/
rgi1= thd1->rgi_slave;
rgi2= thd2->rgi_slave;
if (rgi1 && rgi2 &&
rgi1->is_parallel_exec &&
rgi1->rli == rgi2->rli &&
rgi1->current_gtid.domain_id == rgi2->current_gtid.domain_id)
return rgi1->gtid_sub_id < rgi2->gtid_sub_id ? 1 : -1;
/*
If one transaction has modified non-transactional tables (so that it
cannot be safely rolled back), and the other has not, then prefer to
select the purely transactional one as the victim.
*/
nontrans1= thd1->transaction.all.modified_non_trans_table;
nontrans2= thd2->transaction.all.modified_non_trans_table;
if (nontrans1 && !nontrans2)
return 1;
else if (!nontrans1 && nontrans2)
return -1;
/* No preferences, let the storage engine decide. */
return 0;
}
extern "C" int thd_non_transactional_update(const MYSQL_THD thd)
{
return(thd->transaction.all.modified_non_trans_table);
@ -6393,6 +6606,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
this->waitee= NULL;
}
}
wakeup_error= 0;
mysql_mutex_unlock(&LOCK_wait_commit);
}

View file

@ -1358,7 +1358,8 @@ enum enum_thread_type
SYSTEM_THREAD_NDBCLUSTER_BINLOG= 8,
SYSTEM_THREAD_EVENT_SCHEDULER= 16,
SYSTEM_THREAD_EVENT_WORKER= 32,
SYSTEM_THREAD_BINLOG_BACKGROUND= 64
SYSTEM_THREAD_BINLOG_BACKGROUND= 64,
SYSTEM_THREAD_SLAVE_INIT= 128,
};
inline char const *
@ -1741,6 +1742,8 @@ struct wait_for_commit
{
if (waitee)
unregister_wait_for_prior_commit2();
else
wakeup_error= 0;
}
/*
Remove a waiter from the list in the waitee. Used to unregister a wait.

View file

@ -4219,16 +4219,23 @@ innobase_kill_query(
trx = thd_to_trx(thd);
if (trx)
{
/* Cancel a pending lock request. */
lock_mutex_enter();
trx_mutex_enter(trx);
if (trx->lock.wait_lock)
lock_cancel_waiting_and_release(trx->lock.wait_lock);
trx_mutex_exit(trx);
lock_mutex_exit();
}
if (trx) {
THD *cur = current_thd;
THD *owner = trx->current_lock_mutex_owner;
/* Cancel a pending lock request. */
if (owner != cur) {
lock_mutex_enter();
}
trx_mutex_enter(trx);
if (trx->lock.wait_lock) {
lock_cancel_waiting_and_release(trx->lock.wait_lock);
}
trx_mutex_exit(trx);
if (owner != cur) {
lock_mutex_exit();
}
}
DBUG_VOID_RETURN;
}
@ -4275,14 +4282,11 @@ handler::Table_flags
ha_innobase::table_flags() const
/*============================*/
{
THD *thd = ha_thd();
/* Need to use tx_isolation here since table flags is (also)
called before prebuilt is inited. */
ulong const tx_isolation = thd_tx_isolation(thd);
ulong const tx_isolation = thd_tx_isolation(ha_thd());
if (tx_isolation <= ISO_READ_COMMITTED &&
!(tx_isolation == ISO_READ_COMMITTED &&
thd_rpl_is_parallel(thd))) {
if (tx_isolation <= ISO_READ_COMMITTED) {
return(int_table_flags);
}

View file

@ -992,6 +992,11 @@ struct trx_t{
count of tables being flushed. */
/*------------------------------*/
THD* current_lock_mutex_owner;
/*!< If this is equal to current_thd,
then in innobase_kill_query() we know we
already hold the lock_sys->mutex. */
/*------------------------------*/
#ifdef UNIV_DEBUG
ulint start_line; /*!< Track where it was started from */
const char* start_file; /*!< Filename where it was started */

View file

@ -49,6 +49,7 @@ Created 5/7/1996 Heikki Tuuri
#include "btr0btr.h"
#include "dict0boot.h"
#include <set>
#include "mysql/plugin.h"
/* Restricts the length of search we will do in the waits-for
graph of transactions */
@ -373,6 +374,11 @@ struct lock_stack_t {
ulint heap_no; /*!< heap number if rec lock */
};
extern "C" void thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd);
extern "C" int thd_need_wait_for(const MYSQL_THD thd);
extern "C"
int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
/** Stack to use during DFS search. Currently only a single stack is required
because there is no parallel deadlock check. This stack is protected by
the lock_sys_t::mutex. */
@ -388,6 +394,14 @@ UNIV_INTERN mysql_pfs_key_t lock_sys_mutex_key;
UNIV_INTERN mysql_pfs_key_t lock_sys_wait_mutex_key;
#endif /* UNIV_PFS_MUTEX */
/* Buffer to collect THDs to report waits for. */
struct thd_wait_reports {
struct thd_wait_reports *next; /*!< List link */
ulint used; /*!< How many elements in waitees[] */
trx_t *waitees[64]; /*!< Trxs for thd_report_wait_for() */
};
#ifdef UNIV_DEBUG
UNIV_INTERN ibool lock_print_waits = FALSE;
@ -1015,6 +1029,32 @@ lock_rec_has_to_wait(
return(FALSE);
}
if ((type_mode & LOCK_GAP || lock_rec_get_gap(lock2)) &&
!thd_need_ordering_with(trx->mysql_thd,
lock2->trx->mysql_thd)) {
/* If the upper server layer has already decided on the
commit order between the transaction requesting the
lock and the transaction owning the lock, we do not
need to wait for gap locks. Such ordeering by the upper
server layer happens in parallel replication, where the
commit order is fixed to match the original order on the
master.
Such gap locks are mainly needed to get serialisability
between transactions so that they will be binlogged in
the correct order so that statement-based replication
will give the correct results. Since the right order
was already determined on the master, we do not need
to enforce it again here.
Skipping the locks is not essential for correctness,
since in case of deadlock we will just kill the later
transaction and retry it. But it can save some
unnecessary rollbacks and retries. */
return (FALSE);
}
return(TRUE);
}
@ -3798,7 +3838,8 @@ static
trx_id_t
lock_deadlock_search(
/*=================*/
lock_deadlock_ctx_t* ctx) /*!< in/out: deadlock context */
lock_deadlock_ctx_t* ctx, /*!< in/out: deadlock context */
struct thd_wait_reports*waitee_ptr) /*!< in/out: list of waitees */
{
const lock_t* lock;
ulint heap_no;
@ -3873,33 +3914,59 @@ lock_deadlock_search(
/* Select the joining transaction as the victim. */
return(ctx->start->id);
} else if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
/* Another trx ahead has requested a lock in an
incompatible mode, and is itself waiting for a lock. */
++ctx->cost;
/* Save current search state. */
if (!lock_deadlock_push(ctx, lock, heap_no)) {
/* Unable to save current search state, stack
size not big enough. */
ctx->too_deep = TRUE;
return(ctx->start->id);
} else {
/* We do not need to report autoinc locks to the upper
layer. These locks are released before commit, so they
can not cause deadlocks with binlog-fixed commit
order. */
if (waitee_ptr &&
(lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)) {
if (waitee_ptr->used ==
sizeof(waitee_ptr->waitees) /
sizeof(waitee_ptr->waitees[0])) {
waitee_ptr->next =
(struct thd_wait_reports *)
mem_alloc(sizeof(*waitee_ptr));
waitee_ptr = waitee_ptr->next;
if (!waitee_ptr) {
ctx->too_deep = TRUE;
return(ctx->start->id);
}
waitee_ptr->next = NULL;
waitee_ptr->used = 0;
}
waitee_ptr->waitees[waitee_ptr->used++] = lock->trx;
}
ctx->wait_lock = lock->trx->lock.wait_lock;
lock = lock_get_first_lock(ctx, &heap_no);
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
if (lock->trx->lock.deadlock_mark > ctx->mark_start) {
/* Another trx ahead has requested a lock in an
incompatible mode, and is itself waiting for a lock. */
++ctx->cost;
/* Save current search state. */
if (!lock_deadlock_push(ctx, lock, heap_no)) {
/* Unable to save current search state, stack
size not big enough. */
ctx->too_deep = TRUE;
return(ctx->start->id);
}
ctx->wait_lock = lock->trx->lock.wait_lock;
lock = lock_get_first_lock(ctx, &heap_no);
if (lock->trx->lock.deadlock_mark > ctx->mark_start) {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
} else {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
} else {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
}
@ -3964,6 +4031,48 @@ lock_deadlock_trx_rollback(
trx_mutex_exit(trx);
}
static
void
lock_report_waiters_to_mysql(
/*=======================*/
struct thd_wait_reports* waitee_buf_ptr, /*!< in: set of trxs */
THD* mysql_thd, /*!< in: THD */
trx_id_t victim_trx_id) /*!< in: Trx selected
as deadlock victim, if
any */
{
struct thd_wait_reports* p;
struct thd_wait_reports* q;
ulint i;
p = waitee_buf_ptr;
while (p) {
i = 0;
while (i < p->used) {
trx_t *w_trx = p->waitees[i];
/* There is no need to report waits to a trx already
selected as a victim. */
if (w_trx->id != victim_trx_id) {
/* If thd_report_wait_for() decides to kill the
transaction, then we will get a call back into
innobase_kill_query. We mark this by setting
current_lock_mutex_owner, so we can avoid trying
to recursively take lock_sys->mutex. */
w_trx->current_lock_mutex_owner = mysql_thd;
thd_report_wait_for(mysql_thd, w_trx->mysql_thd);
w_trx->current_lock_mutex_owner = NULL;
}
++i;
}
q = p->next;
if (p != waitee_buf_ptr) {
mem_free(p);
}
p = q;
}
}
/********************************************************************//**
Checks if a joining lock request results in a deadlock. If a deadlock is
found this function will resolve the dadlock by choosing a victim transaction
@ -3979,13 +4088,23 @@ lock_deadlock_check_and_resolve(
const lock_t* lock, /*!< in: lock the transaction is requesting */
const trx_t* trx) /*!< in: transaction */
{
trx_id_t victim_trx_id;
trx_id_t victim_trx_id;
struct thd_wait_reports waitee_buf;
struct thd_wait_reports*waitee_buf_ptr;
THD* start_mysql_thd;
ut_ad(trx != NULL);
ut_ad(lock != NULL);
ut_ad(lock_mutex_own());
assert_trx_in_list(trx);
start_mysql_thd = trx->mysql_thd;
if (start_mysql_thd && thd_need_wait_for(start_mysql_thd)) {
waitee_buf_ptr = &waitee_buf;
} else {
waitee_buf_ptr = NULL;
}
/* Try and resolve as many deadlocks as possible. */
do {
lock_deadlock_ctx_t ctx;
@ -3998,7 +4117,19 @@ lock_deadlock_check_and_resolve(
ctx.wait_lock = lock;
ctx.mark_start = lock_mark_counter;
victim_trx_id = lock_deadlock_search(&ctx);
if (waitee_buf_ptr) {
waitee_buf_ptr->next = NULL;
waitee_buf_ptr->used = 0;
}
victim_trx_id = lock_deadlock_search(&ctx, waitee_buf_ptr);
/* Report waits to upper layer, as needed. */
if (waitee_buf_ptr) {
lock_report_waiters_to_mysql(waitee_buf_ptr,
start_mysql_thd,
victim_trx_id);
}
/* Search too deep, we rollback the joining transaction. */
if (ctx.too_deep) {

View file

@ -50,6 +50,9 @@ Created 3/26/1996 Heikki Tuuri
#include<set>
extern "C"
int thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2);
/** Set of table_id */
typedef std::set<table_id_t> table_id_set;
@ -1903,9 +1906,8 @@ trx_assert_started(
#endif /* UNIV_DEBUG */
/*******************************************************************//**
Compares the "weight" (or size) of two transactions. Transactions that
have edited non-transactional tables are considered heavier than ones
that have not.
Compares the "weight" (or size) of two transactions. The heavier the weight,
the more reluctant we will be to choose the transaction as a deadlock victim.
@return TRUE if weight(a) >= weight(b) */
UNIV_INTERN
ibool
@ -1914,26 +1916,19 @@ trx_weight_ge(
const trx_t* a, /*!< in: the first transaction to be compared */
const trx_t* b) /*!< in: the second transaction to be compared */
{
ibool a_notrans_edit;
ibool b_notrans_edit;
int pref;
/* If mysql_thd is NULL for a transaction we assume that it has
not edited non-transactional tables. */
a_notrans_edit = a->mysql_thd != NULL
&& thd_has_edited_nontrans_tables(a->mysql_thd);
b_notrans_edit = b->mysql_thd != NULL
&& thd_has_edited_nontrans_tables(b->mysql_thd);
if (a_notrans_edit != b_notrans_edit) {
return(a_notrans_edit);
/* First ask the upper server layer if it has any preference for which
to prefer as a deadlock victim. */
pref= thd_deadlock_victim_preference(a->mysql_thd, b->mysql_thd);
if (pref < 0) {
return FALSE;
} else if (pref > 0) {
return TRUE;
}
/* Either both had edited non-transactional tables or both had
not, we fall back to comparing the number of altered/locked
rows. */
/* Upper server layer had no preference, we fall back to comparing the
number of altered/locked rows. */
#if 0
fprintf(stderr,

View file

@ -4704,12 +4704,15 @@ innobase_kill_connection(
DBUG_ENTER("innobase_kill_connection");
DBUG_ASSERT(hton == innodb_hton_ptr);
lock_mutex_enter();
trx = thd_to_trx(thd);
if (trx)
{
if (trx) {
THD *cur = current_thd;
THD *owner = trx->current_lock_mutex_owner;
if (owner != cur) {
lock_mutex_enter();
}
trx_mutex_enter(trx);
/* Cancel a pending lock request. */
@ -4717,10 +4720,11 @@ innobase_kill_connection(
lock_cancel_waiting_and_release(trx->lock.wait_lock);
trx_mutex_exit(trx);
if (owner != cur) {
lock_mutex_exit();
}
}
lock_mutex_exit();
DBUG_VOID_RETURN;
}
@ -4734,14 +4738,11 @@ handler::Table_flags
ha_innobase::table_flags() const
/*============================*/
{
THD *thd = ha_thd();
/* Need to use tx_isolation here since table flags is (also)
called before prebuilt is inited. */
ulong const tx_isolation = thd_tx_isolation(thd);
ulong const tx_isolation = thd_tx_isolation(ha_thd());
if (tx_isolation <= ISO_READ_COMMITTED &&
!(tx_isolation == ISO_READ_COMMITTED &&
thd_rpl_is_parallel(thd))) {
if (tx_isolation <= ISO_READ_COMMITTED) {
return(int_table_flags);
}

View file

@ -1019,6 +1019,11 @@ struct trx_t{
count of tables being flushed. */
/*------------------------------*/
THD* current_lock_mutex_owner;
/*!< If this is equal to current_thd,
then in innobase_kill_query() we know we
already hold the lock_sys->mutex. */
/*------------------------------*/
#ifdef UNIV_DEBUG
ulint start_line; /*!< Track where it was started from */
const char* start_file; /*!< Filename where it was started */

View file

@ -49,6 +49,7 @@ Created 5/7/1996 Heikki Tuuri
#include "btr0btr.h"
#include "dict0boot.h"
#include <set>
#include "mysql/plugin.h"
/* Restricts the length of search we will do in the waits-for
graph of transactions */
@ -373,6 +374,11 @@ struct lock_stack_t {
ulint heap_no; /*!< heap number if rec lock */
};
extern "C" void thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd);
extern "C" int thd_need_wait_for(const MYSQL_THD thd);
extern "C"
int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
/** Stack to use during DFS search. Currently only a single stack is required
because there is no parallel deadlock check. This stack is protected by
the lock_sys_t::mutex. */
@ -388,6 +394,14 @@ UNIV_INTERN mysql_pfs_key_t lock_sys_mutex_key;
UNIV_INTERN mysql_pfs_key_t lock_sys_wait_mutex_key;
#endif /* UNIV_PFS_MUTEX */
/* Buffer to collect THDs to report waits for. */
struct thd_wait_reports {
struct thd_wait_reports *next; /*!< List link */
ulint used; /*!< How many elements in waitees[] */
trx_t *waitees[64]; /*!< Trxs for thd_report_wait_for() */
};
#ifdef UNIV_DEBUG
UNIV_INTERN ibool lock_print_waits = FALSE;
@ -1016,6 +1030,32 @@ lock_rec_has_to_wait(
return(FALSE);
}
if ((type_mode & LOCK_GAP || lock_rec_get_gap(lock2)) &&
!thd_need_ordering_with(trx->mysql_thd,
lock2->trx->mysql_thd)) {
/* If the upper server layer has already decided on the
commit order between the transaction requesting the
lock and the transaction owning the lock, we do not
need to wait for gap locks. Such ordeering by the upper
server layer happens in parallel replication, where the
commit order is fixed to match the original order on the
master.
Such gap locks are mainly needed to get serialisability
between transactions so that they will be binlogged in
the correct order so that statement-based replication
will give the correct results. Since the right order
was already determined on the master, we do not need
to enforce it again here.
Skipping the locks is not essential for correctness,
since in case of deadlock we will just kill the later
transaction and retry it. But it can save some
unnecessary rollbacks and retries. */
return (FALSE);
}
return(TRUE);
}
@ -3821,7 +3861,8 @@ static
trx_id_t
lock_deadlock_search(
/*=================*/
lock_deadlock_ctx_t* ctx) /*!< in/out: deadlock context */
lock_deadlock_ctx_t* ctx, /*!< in/out: deadlock context */
struct thd_wait_reports*waitee_ptr) /*!< in/out: list of waitees */
{
const lock_t* lock;
ulint heap_no;
@ -3896,33 +3937,59 @@ lock_deadlock_search(
/* Select the joining transaction as the victim. */
return(ctx->start->id);
} else if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
/* Another trx ahead has requested a lock in an
incompatible mode, and is itself waiting for a lock. */
++ctx->cost;
/* Save current search state. */
if (!lock_deadlock_push(ctx, lock, heap_no)) {
/* Unable to save current search state, stack
size not big enough. */
ctx->too_deep = TRUE;
return(ctx->start->id);
} else {
/* We do not need to report autoinc locks to the upper
layer. These locks are released before commit, so they
can not cause deadlocks with binlog-fixed commit
order. */
if (waitee_ptr &&
(lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)) {
if (waitee_ptr->used ==
sizeof(waitee_ptr->waitees) /
sizeof(waitee_ptr->waitees[0])) {
waitee_ptr->next =
(struct thd_wait_reports *)
mem_alloc(sizeof(*waitee_ptr));
waitee_ptr = waitee_ptr->next;
if (!waitee_ptr) {
ctx->too_deep = TRUE;
return(ctx->start->id);
}
waitee_ptr->next = NULL;
waitee_ptr->used = 0;
}
waitee_ptr->waitees[waitee_ptr->used++] = lock->trx;
}
ctx->wait_lock = lock->trx->lock.wait_lock;
lock = lock_get_first_lock(ctx, &heap_no);
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
if (lock->trx->lock.deadlock_mark > ctx->mark_start) {
/* Another trx ahead has requested a lock in an
incompatible mode, and is itself waiting for a lock. */
++ctx->cost;
/* Save current search state. */
if (!lock_deadlock_push(ctx, lock, heap_no)) {
/* Unable to save current search state, stack
size not big enough. */
ctx->too_deep = TRUE;
return(ctx->start->id);
}
ctx->wait_lock = lock->trx->lock.wait_lock;
lock = lock_get_first_lock(ctx, &heap_no);
if (lock->trx->lock.deadlock_mark > ctx->mark_start) {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
} else {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
} else {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
}
@ -3987,6 +4054,48 @@ lock_deadlock_trx_rollback(
trx_mutex_exit(trx);
}
static
void
lock_report_waiters_to_mysql(
/*=======================*/
struct thd_wait_reports* waitee_buf_ptr, /*!< in: set of trxs */
THD* mysql_thd, /*!< in: THD */
trx_id_t victim_trx_id) /*!< in: Trx selected
as deadlock victim, if
any */
{
struct thd_wait_reports* p;
struct thd_wait_reports* q;
ulint i;
p = waitee_buf_ptr;
while (p) {
i = 0;
while (i < p->used) {
trx_t *w_trx = p->waitees[i];
/* There is no need to report waits to a trx already
selected as a victim. */
if (w_trx->id != victim_trx_id) {
/* If thd_report_wait_for() decides to kill the
transaction, then we will get a call back into
innobase_kill_query. We mark this by setting
current_lock_mutex_owner, so we can avoid trying
to recursively take lock_sys->mutex. */
w_trx->current_lock_mutex_owner = mysql_thd;
thd_report_wait_for(mysql_thd, w_trx->mysql_thd);
w_trx->current_lock_mutex_owner = NULL;
}
++i;
}
q = p->next;
if (p != waitee_buf_ptr) {
mem_free(p);
}
p = q;
}
}
/********************************************************************//**
Checks if a joining lock request results in a deadlock. If a deadlock is
found this function will resolve the dadlock by choosing a victim transaction
@ -4002,13 +4111,23 @@ lock_deadlock_check_and_resolve(
const lock_t* lock, /*!< in: lock the transaction is requesting */
const trx_t* trx) /*!< in: transaction */
{
trx_id_t victim_trx_id;
trx_id_t victim_trx_id;
struct thd_wait_reports waitee_buf;
struct thd_wait_reports*waitee_buf_ptr;
THD* start_mysql_thd;
ut_ad(trx != NULL);
ut_ad(lock != NULL);
ut_ad(lock_mutex_own());
assert_trx_in_list(trx);
start_mysql_thd = trx->mysql_thd;
if (start_mysql_thd && thd_need_wait_for(start_mysql_thd)) {
waitee_buf_ptr = &waitee_buf;
} else {
waitee_buf_ptr = NULL;
}
/* Try and resolve as many deadlocks as possible. */
do {
lock_deadlock_ctx_t ctx;
@ -4021,7 +4140,19 @@ lock_deadlock_check_and_resolve(
ctx.wait_lock = lock;
ctx.mark_start = lock_mark_counter;
victim_trx_id = lock_deadlock_search(&ctx);
if (waitee_buf_ptr) {
waitee_buf_ptr->next = NULL;
waitee_buf_ptr->used = 0;
}
victim_trx_id = lock_deadlock_search(&ctx, waitee_buf_ptr);
/* Report waits to upper layer, as needed. */
if (waitee_buf_ptr) {
lock_report_waiters_to_mysql(waitee_buf_ptr,
start_mysql_thd,
victim_trx_id);
}
/* Search too deep, we rollback the joining transaction. */
if (ctx.too_deep) {

View file

@ -51,6 +51,9 @@ Created 3/26/1996 Heikki Tuuri
#include<set>
extern "C"
int thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2);
/** Set of table_id */
typedef std::set<table_id_t> table_id_set;
@ -2139,9 +2142,8 @@ trx_assert_started(
#endif /* UNIV_DEBUG */
/*******************************************************************//**
Compares the "weight" (or size) of two transactions. Transactions that
have edited non-transactional tables are considered heavier than ones
that have not.
Compares the "weight" (or size) of two transactions. The heavier the weight,
the more reluctant we will be to choose the transaction as a deadlock victim.
@return TRUE if weight(a) >= weight(b) */
UNIV_INTERN
ibool
@ -2150,26 +2152,19 @@ trx_weight_ge(
const trx_t* a, /*!< in: the first transaction to be compared */
const trx_t* b) /*!< in: the second transaction to be compared */
{
ibool a_notrans_edit;
ibool b_notrans_edit;
int pref;
/* If mysql_thd is NULL for a transaction we assume that it has
not edited non-transactional tables. */
a_notrans_edit = a->mysql_thd != NULL
&& thd_has_edited_nontrans_tables(a->mysql_thd);
b_notrans_edit = b->mysql_thd != NULL
&& thd_has_edited_nontrans_tables(b->mysql_thd);
if (a_notrans_edit != b_notrans_edit) {
return(a_notrans_edit);
/* First ask the upper server layer if it has any preference for which
to prefer as a deadlock victim. */
pref= thd_deadlock_victim_preference(a->mysql_thd, b->mysql_thd);
if (pref < 0) {
return FALSE;
} else if (pref > 0) {
return TRUE;
}
/* Either both had edited non-transactional tables or both had
not, we fall back to comparing the number of altered/locked
rows. */
/* Upper server layer had no preference, we fall back to comparing the
number of altered/locked rows. */
#if 0
fprintf(stderr,