MDEV-35465 Async replication stops working on Galera async replica node when parallel replication is enabled

Parallel slave failed to retry in retry_event_group() with error

    WSREP: Parallel slave worker failed at wsrep_before_command() hook

Fix wsrep transaction cleanup/restart in retry_event_group() to properly
clean up previous transaction by calling wsrep_after_statement().
Also move call to reset error after call to wsrep_after_statement()
to make sure that it remains effective.

Add a MTR test galera_as_slave_parallel_retry to reproduce the error
when the fix is not present.

Other issues which were detected when testing with sysbench:

Check if parallel slave is killed for retry before waiting for prior
commits in THD::wsrep_parallel_slave_wait_for_prior_commit(). This
is required with slave-parallel-mode=optimistic to avoid deadlock
when a slave later in commit order manages to reach prepare phase
before a lock conflict is detected.

Suppress wsrep applier specific warning for slave threads.

Signed-off-by: Julius Goryavsky <julius.goryavsky@mariadb.com>
This commit is contained in:
Teemu Ollakka 2024-11-21 16:54:51 +02:00 committed by Julius Goryavsky
parent c772344510
commit a2575a0703
7 changed files with 113 additions and 14 deletions

View file

@ -0,0 +1,22 @@
connection node_2;
connection node_1;
connect master, 127.0.0.1, root, , test, $NODE_MYPORT_3;
connect node_1_ctrl, 127.0.0.1, root, , test, $NODE_MYPORT_1;
connection node_1;
START SLAVE;
connection master;
CREATE TABLE t1 (f1 INT PRIMARY KEY) ENGINE=InnoDB;
connection node_1;
SET GLOBAL debug_dbug = '+d,rpl_parallel_simulate_temp_err_xid,sync.wsrep_retry_event_group';
connection master;
INSERT INTO t1 VALUES (1);
connection node_1_ctrl;
SET debug_sync = 'now WAIT_FOR sync.wsrep_retry_event_group_reached';
SET GLOBAL debug_dbug = '';
SET debug_sync = 'now SIGNAL signal.wsrep_retry_event_group';
connection node_1;
SET debug_sync = 'RESET';
connection master;
DROP TABLE t1;
connection node_1;
STOP SLAVE;

View file

@ -0,0 +1,10 @@
!include ../galera_2nodes_as_slave.cnf
[mysqld]
log-bin=mysqld-bin
log-slave-updates
binlog-format=ROW
[mysqld.1]
slave-parallel-threads=2
slave-parallel-mode=optimistic

View file

@ -0,0 +1,52 @@
# MDEV-35465 Async replication stops working on Galera async replica node
# when parallel replication is enabled
--source include/have_innodb.inc
--source include/have_log_bin.inc
--source include/galera_cluster.inc
--source include/have_debug_sync.inc
# Node 3 is not a Galera node, use it as a master
--connect master, 127.0.0.1, root, , test, $NODE_MYPORT_3
--connect node_1_ctrl, 127.0.0.1, root, , test, $NODE_MYPORT_1
--connection node_1
--disable_query_log
--eval CHANGE MASTER TO MASTER_HOST='127.0.0.1', MASTER_USER='root', MASTER_PORT=$NODE_MYPORT_3;
--enable_query_log
START SLAVE;
--connection master
CREATE TABLE t1 (f1 INT PRIMARY KEY) ENGINE=InnoDB;
--connection node_1
--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1'
--source include/wait_condition.inc
#
--let debug_dbug_orig = `SELECT @@GLOBAL.debug_dbug`
SET GLOBAL debug_dbug = '+d,rpl_parallel_simulate_temp_err_xid,sync.wsrep_retry_event_group';
--connection master
INSERT INTO t1 VALUES (1);
--connection node_1_ctrl
SET debug_sync = 'now WAIT_FOR sync.wsrep_retry_event_group_reached';
--eval SET GLOBAL debug_dbug = '$debug_dbug_orig'
SET debug_sync = 'now SIGNAL signal.wsrep_retry_event_group';
--let $wait_condition = SELECT COUNT(*) = 1 FROM t1;
--source include/wait_condition.inc
--connection node_1
SET debug_sync = 'RESET';
--connection master
DROP TABLE t1;
--connection node_1
--let $wait_condition = SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1'
--source include/wait_condition.inc
STOP SLAVE;

View file

@ -5511,7 +5511,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
if (unlikely(open_and_lock_tables(thd, rgi->tables_to_lock, FALSE, 0)))
{
#ifdef WITH_WSREP
if (WSREP(thd))
if (WSREP(thd) && !thd->slave_thread)
{
WSREP_WARN("BF applier thread=%lu failed to open_and_lock_tables for "
"%s, fatal: %d "

View file

@ -134,7 +134,7 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
asynchronously, we need to be sure they will be completed before starting a
new transaction. Otherwise the new transaction might suffer a spurious kill.
*/
static void
void
wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi)
{
PSI_stage_info old_stage;
@ -831,8 +831,12 @@ do_retry:
err= 0;
errmsg= NULL;
#ifdef WITH_WSREP
thd->wsrep_cs().reset_error();
WSREP_DEBUG("retrying async replication event");
DBUG_EXECUTE_IF("sync.wsrep_retry_event_group", {
const char act[]= "now "
"SIGNAL sync.wsrep_retry_event_group_reached "
"WAIT_FOR signal.wsrep_retry_event_group";
debug_sync_set_action(thd, STRING_WITH_LEN(act));
};);
#endif /* WITH_WSREP */
/*
@ -981,15 +985,20 @@ do_retry:
*/
thd->reset_killed();
#ifdef WITH_WSREP
if (wsrep_before_command(thd))
if (WSREP(thd))
{
WSREP_WARN("Parallel slave worker failed at wsrep_before_command() hook");
err= 1;
goto err;
/* Exec after statement hook to make sure that the failed transaction
* gets cleared and reset error state. */
if (wsrep_after_statement(thd))
{
WSREP_WARN("Parallel slave worker failed at wsrep_after_statement() hook");
err= 1;
goto err;
}
thd->wsrep_cs().reset_error();
wsrep_start_trx_if_not_started(thd);
WSREP_DEBUG("parallel slave retry, after trx start");
}
wsrep_start_trx_if_not_started(thd);
WSREP_DEBUG("parallel slave retry, after trx start");
#endif /* WITH_WSREP */
strmake_buf(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)

View file

@ -380,6 +380,7 @@ struct rpl_parallel {
extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
extern void wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi);
extern int rpl_parallel_resize_pool_if_no_slaves(void);
extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);

View file

@ -3426,11 +3426,16 @@ enum wsrep::streaming_context::fragment_unit wsrep_fragment_unit(ulong unit)
bool THD::wsrep_parallel_slave_wait_for_prior_commit()
{
if (rgi_slave && rgi_slave->is_parallel_exec && wait_for_prior_commit())
if (rgi_slave && rgi_slave->is_parallel_exec)
{
return 1;
wait_for_pending_deadlock_kill(this, rgi_slave);
if (rgi_slave->killed_for_retry) {
my_error(ER_LOCK_DEADLOCK, MYF(0));
return true;
}
return wait_for_prior_commit();
}
return 0;
return false;
}
/***** callbacks for wsrep service ************/