Binlog-in-engine: Fix incorrect handling of internal 2pc rollback

The error handling for internal 2pc transactions (eg. RocksDB/Spider) would
incorrectly try to handle the engine binlog_unlog() during rollback, in
binlog_post_rollback(); this should instead be handled solely in
log_and_order() and unlog(). This could trigger for example in parallel
replication error handling, causing assertions when wrongly entering XA code
paths.

Also fix a couple bugs found during debug:

 - Don't send format description even to the slave from before the starting
   GTID position, as that can cause the slave to wrongly drop temporary
   tables.

 - When looking up the initial GTID position for a new dump thread, wait for
   the necessary part of the binlog to become durable before reading it.

 - Don't error when searching the initial GTID position if reaching EOF of
   the durable portion, instead search back to an earlier GTID state record.

 - A rare race in the test framework that could fail to kill off lingering
   dump threads before RESET MASTER.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen 2025-10-22 18:28:15 +02:00
commit 5335c85a47
9 changed files with 228 additions and 42 deletions

View file

@ -315,9 +315,10 @@ public:
virtual int read_binlog_data(uchar *buf, uint32_t len) final;
virtual bool data_available() final;
virtual bool wait_available(THD *thd, const struct timespec *abstime) final;
virtual int init_gtid_pos(slave_connection_state *pos,
virtual int init_gtid_pos(THD *thd, slave_connection_state *pos,
rpl_binlog_state_base *state) final;
virtual int init_legacy_pos(const char *filename, ulonglong offset) final;
virtual int init_legacy_pos(THD *thd, const char *filename,
ulonglong offset) final;
virtual void enable_single_file() final;
bool is_valid() { return page_buf != nullptr; }
bool init_from_fd_pos(File fd, ulonglong start_position);
@ -789,7 +790,7 @@ binlog_reader_innodb::wait_available(THD *thd, const struct timespec *abstime)
int
binlog_reader_innodb::init_gtid_pos(slave_connection_state *pos,
binlog_reader_innodb::init_gtid_pos(THD *thd, slave_connection_state *pos,
rpl_binlog_state_base *state)
{
DBUG_ASSERT(0 /* Should not be used in mysqlbinlog. */);
@ -798,7 +799,8 @@ binlog_reader_innodb::init_gtid_pos(slave_connection_state *pos,
int
binlog_reader_innodb::init_legacy_pos(const char *filename, ulonglong offset)
binlog_reader_innodb::init_legacy_pos(THD *thd, const char *filename,
ulonglong offset)
{
DBUG_ASSERT(0 /* Should not be used in mysqlbinlog. */);
return 1;

View file

@ -77,13 +77,14 @@ public:
0 The requested GTID position not found, needed binlogs have been purged
1 Ok, position found and returned.
*/
virtual int init_gtid_pos(slave_connection_state *pos,
virtual int init_gtid_pos(THD *thd, slave_connection_state *pos,
rpl_binlog_state_base *state) = 0;
/*
Initialize to a legacy-type position (filename, offset). This mostly to
support legacy SHOW BINLOG EVENTS.
*/
virtual int init_legacy_pos(const char *filename, ulonglong offset) = 0;
virtual int init_legacy_pos(THD *thd, const char *filename,
ulonglong offset) = 0;
/*
Can be called after init_gtid_pos() or init_legacy_pos() to make the reader
stop (return EOF) at the end of the binlog file. Used for SHOW BINLOG

View file

@ -35,11 +35,17 @@ let $success= 0;
while ($wait_counter)
{
dec $wait_counter;
let $_tid= `SELECT id FROM information_schema.processlist WHERE command = 'Binlog Dump' LIMIT 1`;
# Tricky here. The binlog dump thread will normally be identified by the
# command name "Binlog Dump". But if it was killed, but didn't have time
# to react on the killed yet, it will be 'Killed'. It can also be 'Busy'
# if the code fails to obtain the LOCK_thd_data mutex.
let $_tid= `SELECT IF(command='Binlog Dump', id, -1) FROM information_schema.processlist WHERE command IN ('Binlog Dump', 'Killed', 'Busy') LIMIT 1`;
if ($_tid)
{
--error 0,ER_NO_SUCH_THREAD
eval KILL CONNECTION $_tid;
if ($_tid > 0) {
--error 0,ER_NO_SUCH_THREAD
eval KILL CONNECTION $_tid;
}
}
if (!$_tid)
{

View file

@ -4,6 +4,13 @@ CREATE TABLE t1(a INT PRIMARY KEY, b INT, c LONGBLOB) ENGINE=InnoDB;
CREATE TABLE t2(a INT PRIMARY KEY, b INT, c LONGBLOB) ENGINE=RocksDB;
INSERT INTO t1 SELECT seq, seq*seq, REPEAT('x', 50*seq) FROM seq_1_to_100;
INSERT INTO t2 SELECT seq, 10000 - seq*seq, REPEAT('y', 50*seq) FROM seq_1_to_100;
include/save_master_gtid.inc
connection slave;
include/sync_with_master_gtid.inc
include/stop_slave.inc
SET @old_threads= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads= 8;
include/start_slave.inc
*** Cross-engine transaction, InnoDB and RocksDB.
connect con1,localhost,root,,;
BEGIN;
@ -28,6 +35,18 @@ connection master;
UPDATE t2 SET c=CONCAT('<', c, '>') WHERE a BETWEEN 20 AND 80;
UPDATE t2 SET b=b+1 WHERE a=1 OR a=92;
UPDATE t2 SET b=b*2 WHERE a MOD 7 = 0;
*** RocksDB transaction that rolls back.
BEGIN;
UPDATE t2 SET b=b+1 WHERE a=3;
UPDATE t2 SET b=b+1 WHERE a=5;
UPDATE t2 SET b=b+1 WHERE a=8;
ROLLBACK;
connection con2;
BEGIN;
UPDATE t2 SET b=b+1 WHERE a=4;
UPDATE t2 SET b=b+1 WHERE a=9;
UPDATE t2 SET b=b+1 WHERE a=13;
disconnect con2;
connection master;
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c))
@ -44,6 +63,36 @@ COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c))
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t2;
COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c))
100 5050 661537 252685
*** Test a RocksDB transaction that needs to roll back after having binlogged an internal 2pc xid
connection slave1;
SET STATEMENT sql_log_bin= 0
FOR UPDATE t1 SET a= a+1000000 WHERE a=5;
BEGIN;
SELECT a FROM t1 WHERE a=2 FOR UPDATE;
a
2
connection master;
BEGIN;
UPDATE t1 SET b=b+1 WHERE a=2;
UPDATE t1 SET b=b+1 WHERE a=5;
COMMIT;
UPDATE t2 SET b=b+2 WHERE a=10;
include/save_master_gtid.inc
connection slave;
connection slave1;
ROLLBACK;
connection slave;
include/wait_for_slave_sql_error.inc [errno=1032]
SET STATEMENT sql_log_bin= 0
FOR UPDATE t1 SET a= a-1000000 WHERE a=1000000 + 5;
START SLAVE SQL_THREAD;
include/sync_with_master_gtid.inc
connection slave;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads= @old_threads;
include/start_slave.inc
connection master;
DROP TABLE t1, t2;
CALL mtr.add_suppression("Can't find record in 't1'");
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
include/rpl_end.inc

View file

@ -10,6 +10,14 @@ CREATE TABLE t2(a INT PRIMARY KEY, b INT, c LONGBLOB) ENGINE=RocksDB;
INSERT INTO t1 SELECT seq, seq*seq, REPEAT('x', 50*seq) FROM seq_1_to_100;
INSERT INTO t2 SELECT seq, 10000 - seq*seq, REPEAT('y', 50*seq) FROM seq_1_to_100;
--source include/save_master_gtid.inc
--connection slave
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
SET @old_threads= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads= 8;
--source include/start_slave.inc
--echo *** Cross-engine transaction, InnoDB and RocksDB.
--connect con1,localhost,root,,
BEGIN;
@ -42,6 +50,20 @@ UPDATE t2 SET c=CONCAT('<', c, '>') WHERE a BETWEEN 20 AND 80;
UPDATE t2 SET b=b+1 WHERE a=1 OR a=92;
UPDATE t2 SET b=b*2 WHERE a MOD 7 = 0;
--echo *** RocksDB transaction that rolls back.
BEGIN;
UPDATE t2 SET b=b+1 WHERE a=3;
UPDATE t2 SET b=b+1 WHERE a=5;
UPDATE t2 SET b=b+1 WHERE a=8;
ROLLBACK;
--connection con2
BEGIN;
UPDATE t2 SET b=b+1 WHERE a=4;
UPDATE t2 SET b=b+1 WHERE a=9;
UPDATE t2 SET b=b+1 WHERE a=13;
--disconnect con2
--connection master
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t2;
@ -53,8 +75,58 @@ SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t2;
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t2;
--echo *** Test a RocksDB transaction that needs to roll back after having binlogged an internal 2pc xid
--connection slave1
# Cause a "row not found" error by removing a row.
SET STATEMENT sql_log_bin= 0
FOR UPDATE t1 SET a= a+1000000 WHERE a=5;
# Block a row temporarily to control parallel replication thread scheduling
BEGIN;
SELECT a FROM t1 WHERE a=2 FOR UPDATE;
--connection master
# Create a transaction T1 that will first wait, and then error.
BEGIN;
UPDATE t1 SET b=b+1 WHERE a=2;
UPDATE t1 SET b=b+1 WHERE a=5;
COMMIT;
# Create a transaction T2 that will queue for group commit and wait for T1
# to commit (or fail, as it were).
UPDATE t2 SET b=b+2 WHERE a=10;
--source include/save_master_gtid.inc
--connection slave
--let $wait_condition= SELECT COUNT(*)=1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE Command='Slave_worker' AND State='Waiting for prior transaction to commit'
--source include/wait_condition.inc
# Release T1 so that it can fail after T2 has queued for group commit.
--connection slave1
ROLLBACK;
--connection slave
--let $slave_sql_errno= 1032
--source include/wait_for_slave_sql_error.inc
# Now move back the row so the replication can continue and succeed.
SET STATEMENT sql_log_bin= 0
FOR UPDATE t1 SET a= a-1000000 WHERE a=1000000 + 5;
START SLAVE SQL_THREAD;
--source include/sync_with_master_gtid.inc
# Clean up.
--connection slave
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads= @old_threads;
--source include/start_slave.inc
--connection master
DROP TABLE t1, t2;
CALL mtr.add_suppression("Can't find record in 't1'");
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
--source include/rpl_end.inc

View file

@ -2130,6 +2130,7 @@ binlog_setup_engine_commit_data(handler_binlog_event_group_info *context,
{
/* Mark that we are doing XA and need to unlog. */
cache_mngr->need_engine_2pc= true;
context->internal_xa= false;
}
else if (unlikely(cache_mngr->need_engine_2pc))
{
@ -2302,6 +2303,8 @@ binlog_rollback_flush_trx_cache(THD *thd, bool all,
handler_binlog_event_group_info *engine_context=
&cache_data->engine_binlog_info;
const XID *xid= thd->transaction->xid_state.get_xid();
engine_context->xa_xid= xid;
engine_context->internal_xa= false;
mysql_mutex_lock(&LOCK_commit_ordered);
err= (*opt_binlog_engine_hton->binlog_xa_rollback_ordered)
(thd, xid, &engine_context->engine_ptr);
@ -2765,6 +2768,7 @@ binlog_post_commit(THD *thd, bool all)
binlog_cache_mngr *cache_mngr= thd->binlog_get_cache_mngr();
if (likely(cache_mngr != nullptr) && unlikely(cache_mngr->need_engine_2pc))
{
DBUG_ASSERT(!cache_mngr->trx_cache.engine_binlog_info.internal_xa);
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT &&
thd->lex->xa_opt != XA_ONE_PHASE);
cache_mngr->need_engine_2pc= false;
@ -2785,6 +2789,7 @@ binlog_post_commit_by_xid(handlerton *hton, XID *xid)
binlog_cache_mngr *cache_mngr= thd->binlog_get_cache_mngr();
if (likely(cache_mngr != nullptr) && unlikely(cache_mngr->need_engine_2pc))
{
DBUG_ASSERT(!cache_mngr->trx_cache.engine_binlog_info.internal_xa);
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT &&
thd->lex->xa_opt != XA_ONE_PHASE);
cache_mngr->need_engine_2pc= false;
@ -2802,10 +2807,15 @@ binlog_post_rollback(THD *thd, bool all)
binlog_cache_mngr *cache_mngr= thd->binlog_get_cache_mngr();
if (likely(cache_mngr != nullptr) && unlikely(cache_mngr->need_engine_2pc))
{
handler_binlog_event_group_info *context=
&cache_mngr->trx_cache.engine_binlog_info;
DBUG_ASSERT(!context->internal_xa);
if (!context->internal_xa)
{
const XID *xid= thd->transaction->xid_state.get_xid();
(*opt_binlog_engine_hton->binlog_unlog)(xid, &context->engine_ptr);
}
cache_mngr->need_engine_2pc= false;
(*opt_binlog_engine_hton->binlog_unlog)
(thd->transaction->xid_state.get_xid(),
&cache_mngr->trx_cache.engine_binlog_info.engine_ptr);
}
}
@ -2820,6 +2830,7 @@ binlog_post_rollback_by_xid(handlerton *hton, XID *xid)
binlog_cache_mngr *cache_mngr= thd->binlog_get_cache_mngr();
if (likely(cache_mngr != nullptr) && unlikely(cache_mngr->need_engine_2pc))
{
DBUG_ASSERT(!cache_mngr->trx_cache.engine_binlog_info.internal_xa);
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK);
cache_mngr->need_engine_2pc= false;
(*opt_binlog_engine_hton->binlog_unlog)
@ -9740,7 +9751,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
if (ha_info->ht() != binlog_hton &&
ha_info->ht() != opt_binlog_engine_hton &&
ha_info->is_trx_read_write())
{
cache_mngr->need_engine_2pc= true;
cache_mngr->trx_cache.engine_binlog_info.internal_xa= true;
}
}
else
{
@ -12600,21 +12614,22 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
DEBUG_SYNC(thd, "binlog_after_log_and_order");
if (err)
DBUG_RETURN(0);
bool need_unlog= cache_mngr->need_unlog;
bool need_engine_2pc= cache_mngr->need_engine_2pc;
/*
The transaction won't need the flag anymore.
Todo/fixme: consider to move the statement into cache_mngr->reset()
relocated to the current or later point.
*/
cache_mngr->need_unlog= false;
cache_mngr->need_engine_2pc= false;
if (unlikely(cache_mngr->need_engine_2pc))
if (err)
DBUG_RETURN(0);
if (unlikely(need_engine_2pc))
{
DBUG_ASSERT(!need_unlog);
cache_mngr->need_engine_2pc= false;
DBUG_RETURN(BINLOG_COOKIE_ENGINE_UNLOG(cache_mngr->delayed_error));
}
/*
@ -12786,13 +12801,15 @@ int TC_LOG_BINLOG::unlog(THD *thd, ulong cookie, my_xid xid)
if (BINLOG_COOKIE_IS_ENGINE_UNLOG(cookie))
{
DBUG_ASSERT(opt_binlog_engine_hton);
XID xid_buf;
xid_buf.set(xid);
binlog_cache_mngr *cache_mngr= thd->binlog_get_cache_mngr();
DBUG_ASSERT(cache_mngr != nullptr);
if (likely(cache_mngr != nullptr))
{
cache_mngr->xid_buf.set(xid);
(*opt_binlog_engine_hton->binlog_unlog)
(&xid_buf, &cache_mngr->trx_cache.engine_binlog_info.engine_ptr);
(&cache_mngr->xid_buf,
&cache_mngr->trx_cache.engine_binlog_info.engine_ptr);
}
}
else if (!BINLOG_COOKIE_IS_DUMMY(cookie))
mark_xid_done(BINLOG_COOKIE_GET_ID(cookie), true);

View file

@ -1793,12 +1793,14 @@ end:
static const char *
gtid_find_engine_pos(handler_binlog_reader *binlog_reader,
slave_connection_state *pos,
slave_connection_state *until_gtid_pos,
rpl_binlog_state *until_binlog_state)
gtid_find_engine_pos(binlog_send_info *info)
{
int res= binlog_reader->init_gtid_pos(pos, until_binlog_state);
handler_binlog_reader *binlog_reader= info->engine_binlog_reader;
slave_connection_state *pos= &info->gtid_state;
slave_connection_state *until_gtid_pos= info->until_gtid_state;
rpl_binlog_state *until_binlog_state= &info->until_binlog_state;
int res= binlog_reader->init_gtid_pos(info->thd, pos, until_binlog_state);
if (res < 0)
return "Error while looking up GTID position in engine binlog";
if (res == 0)
@ -2691,10 +2693,7 @@ static int init_binlog_sender(binlog_send_info *info,
if (opt_binlog_engine_hton)
{
if ((info->errmsg= gtid_find_engine_pos(info->engine_binlog_reader,
&info->gtid_state,
info->until_gtid_state,
&info->until_binlog_state)))
if ((info->errmsg= gtid_find_engine_pos(info)))
{
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return 1;
@ -3452,6 +3451,19 @@ static int send_engine_events(binlog_send_info *info, LOG_INFO* linfo)
if (err)
return 1;
}
else if (unlikely(event_type == FORMAT_DESCRIPTION_EVENT) &&
info->gtid_state.count() > 0)
{
/*
In the engine-implemented binlog, format description event is (only)
written to mark a master server restart; this is used by the slave to
know that the master discarded temporary tabls at this point. So don't
send such event until we have reached our GTID starting position, so
that the slave will not mistakenly discard such temporary tables too
early.
*/
continue;
}
if (((info->errmsg= send_event_to_slave(info, event_type, nullptr,
ev_offset, &info->error_gtid))))
return 1;
@ -4939,7 +4951,7 @@ show_engine_binlog_events(THD* thd, Protocol *protocol, LEX_MASTER_INFO *lex_mi)
*/
if (pos == 4)
pos= 0;
if (reader->init_legacy_pos(lex_mi->log_file_name, pos))
if (reader->init_legacy_pos(thd, lex_mi->log_file_name, pos))
{
err= true;
goto end;

View file

@ -2149,16 +2149,24 @@ binlog_chunk_reader::find_offset_in_page(uint32_t off)
}
/**
Read the header page of the current binlog file_no.
Returns:
1 Header page found and returned.
0 EOF, no header page found (ie. file is empty / nothing is durable yet).
-1 Error.
*/
int
binlog_chunk_reader::get_file_header(binlog_header_data *out_header)
{
seek(current_file_no(), 0);
if (fetch_current_page() != CHUNK_READER_FOUND)
return -1;
enum chunk_reader_status res= fetch_current_page();
if (UNIV_UNLIKELY(res != CHUNK_READER_FOUND))
return res == CHUNK_READER_EOF ? 0 : -1;
fsp_binlog_extract_header_page(page_ptr, out_header);
if (out_header->is_invalid || out_header->is_empty)
return -1;
return 0;
return 1;
}

View file

@ -303,9 +303,10 @@ public:
virtual int read_binlog_data(uchar *buf, uint32_t len) final;
virtual bool data_available() final;
virtual bool wait_available(THD *thd, const struct timespec *abstime) final;
virtual int init_gtid_pos(slave_connection_state *pos,
virtual int init_gtid_pos(THD *thd, slave_connection_state *pos,
rpl_binlog_state_base *state) final;
virtual int init_legacy_pos(const char *filename, ulonglong offset) final;
virtual int init_legacy_pos(THD *thd, const char *filename,
ulonglong offset) final;
virtual void enable_single_file() final;
void seek_internal(uint64_t file_no, uint64_t offset);
};
@ -3427,16 +3428,20 @@ gtid_search::find_gtid_pos(slave_connection_state *pos,
/* Read the header page, needed to get the binlog diff state interval. */
binlog_header_data header;
chunk_reader.seek(file_no, 0);
if (chunk_reader.get_file_header(&header))
int res= chunk_reader.get_file_header(&header);
if (UNIV_UNLIKELY(res < 0))
return -1;
if (UNIV_UNLIKELY(res == 0))
goto not_found_in_file;
diff_state_page_interval= header.diff_state_interval;
chunk_reader.seek(file_no, ibb_page_size);
int res= read_gtid_state(&chunk_reader, &base_state, &dummy_xa_ref);
res= read_gtid_state(&chunk_reader, &base_state, &dummy_xa_ref);
if (UNIV_UNLIKELY(res < 0))
return -1;
if (res == 0)
{
not_found_in_file:
if (file_no == 0)
{
/* Handle the special case of a completely empty binlog file. */
@ -3445,10 +3450,9 @@ gtid_search::find_gtid_pos(slave_connection_state *pos,
*out_offset= ibb_page_size;
return 1;
}
ut_ad(0 /* Not expected to find no state, should always be written. */);
return -1;
/* If GTID state is not (durably) available, try the previous file. */
}
if (base_state.is_before_pos(pos))
else if (base_state.is_before_pos(pos))
break;
base_state.reset_nolock();
if (file_no <= earliest_binlog_file_no)
@ -3518,12 +3522,26 @@ gtid_search::find_gtid_pos(slave_connection_state *pos,
int
ha_innodb_binlog_reader::init_gtid_pos(slave_connection_state *pos,
ha_innodb_binlog_reader::init_gtid_pos(THD *thd, slave_connection_state *pos,
rpl_binlog_state_base *state)
{
gtid_search search_obj;
uint64_t file_no;
uint64_t offset;
/*
Wait for at least the initial GTID state record to become durable before
looking for the starting GTID position.
This is unlikely to need to wait, as it would imply that _no_ part of the
binlog is durable at this point. But it might theoretically occur perhaps
after a PURGE of all binlog files but the active; and failing to do the
wait if needed might wrongly return an error that the GTID position is
too old.
*/
chunk_rd.seek(earliest_binlog_file_no, ibb_page_size);
if (UNIV_UNLIKELY(wait_available(thd, nullptr)))
return -1;
int res= search_obj.find_gtid_pos(pos, state, &file_no, &offset);
if (res < 0)
return -1;
@ -3540,7 +3558,8 @@ ha_innodb_binlog_reader::init_gtid_pos(slave_connection_state *pos,
int
ha_innodb_binlog_reader::init_legacy_pos(const char *filename, ulonglong offset)
ha_innodb_binlog_reader::init_legacy_pos(THD *thd, const char *filename,
ulonglong offset)
{
uint64_t file_no;
if (!filename)