Binlog-in-engine: Initial support for 2pc and XA

At XA PREPARE, spill all events (including COMMIT end event) as OOB, and
call into the engine to binlog a PREPARE record. Store the OOB reference
along with the XID in an engine-binlog internal hash.

At XA COMMIT, fetch the OOB reference from the internal hash and put it into
a COMMIT record for the transaction.

For both user XA and internal two-phase commit between binlog and
other storage engine, write the XID into an XA complete event in the
same mtr as the commit record. This record will be later used to be
able to consistently recover (commit or rollback) prepared
transactions in the other engines, depending on whether binlog write
became durable before the crash or not.

At XA ROLLBACK, merely put in an XA complete event.

Maintain reference counts for prending prepared XA transactions, and
for pending two-phase commit records, to make sure binlog files
containing these will not be purged while those transactions are
active.

Implement the necessary "unlog" mechanisms so that the reference
counts can be released only after all other participating engines have
durably committed (respectively XA prepared/rolled back) their part of
the transaction.

This commit does not handle XA/binlog crash recovery, will come in a later
patch.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen 2025-10-01 12:44:42 +02:00
commit 10218b8d85
22 changed files with 1466 additions and 128 deletions

View file

@ -1,15 +1,4 @@
SET GLOBAL rpl_semi_sync_master_enabled= 1; SET GLOBAL rpl_semi_sync_master_enabled= 1;
ERROR HY000: Semi-synchronous replication is not yet supported with --binlog-storage-engine ERROR HY000: Semi-synchronous replication is not yet supported with --binlog-storage-engine
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
XA START 'a';
ERROR HY000: Explicit XA transaction is not yet supported with --binlog-storage-engine
INSERT INTO t1 VALUES (0, 0);
XA END 'a';
ERROR XAE07: XAER_RMFAIL: The command cannot be executed when global transaction is in the NON-EXISTING state
XA PREPARE 'a';
ERROR XAE07: XAER_RMFAIL: The command cannot be executed when global transaction is in the NON-EXISTING state
XA COMMIT 'a';
ERROR XAE04: XAER_NOTA: Unknown XID
DROP TABLE t1;
SELECT BINLOG_GTID_POS("binlog-000000.ibb", 4096); SELECT BINLOG_GTID_POS("binlog-000000.ibb", 4096);
ERROR HY000: BINLOG_GTID_POS() is not available when --binlog-storage-engine is enabled ERROR HY000: BINLOG_GTID_POS() is not available when --binlog-storage-engine is enabled

View file

@ -4,18 +4,6 @@
--error ER_NOT_YET_SUPPORTED_ENGINE_BINLOG --error ER_NOT_YET_SUPPORTED_ENGINE_BINLOG
SET GLOBAL rpl_semi_sync_master_enabled= 1; SET GLOBAL rpl_semi_sync_master_enabled= 1;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
--error ER_NOT_YET_SUPPORTED_ENGINE_BINLOG
XA START 'a';
INSERT INTO t1 VALUES (0, 0);
--error ER_XAER_RMFAIL
XA END 'a';
--error ER_XAER_RMFAIL
XA PREPARE 'a';
--error ER_XAER_NOTA
XA COMMIT 'a';
DROP TABLE t1;
# The BINLOG_GTID_POS() function is not available and will not be, we # The BINLOG_GTID_POS() function is not available and will not be, we
# want to get away from old problematic filename/offset coordinates. # want to get away from old problematic filename/offset coordinates.
--error ER_NOT_AVAILABLE_WITH_ENGINE_BINLOG --error ER_NOT_AVAILABLE_WITH_ENGINE_BINLOG

View file

@ -0,0 +1,49 @@
include/master-slave.inc
[connection master]
CREATE TABLE t1(a INT PRIMARY KEY, b INT, c LONGBLOB) ENGINE=InnoDB;
CREATE TABLE t2(a INT PRIMARY KEY, b INT, c LONGBLOB) ENGINE=RocksDB;
INSERT INTO t1 SELECT seq, seq*seq, REPEAT('x', 50*seq) FROM seq_1_to_100;
INSERT INTO t2 SELECT seq, 10000 - seq*seq, REPEAT('y', 50*seq) FROM seq_1_to_100;
*** Cross-engine transaction, InnoDB and RocksDB.
connect con1,localhost,root,,;
BEGIN;
UPDATE t1 SET b=b+a WHERE a BETWEEN 10 AND 20;
REPLACE INTO t2 SELECT a, b, c FROM t1 WHERE a BETWEEN 30 and 40;
connect con2,localhost,root,,;
BEGIN;
UPDATE t1, t2
SET t1.b=t1.b + LENGTH(t2.c), t2.c=CONCAT("|", t2.c, "|")
WHERE t1.a = t2.a
AND t1.a BETWEEN 50 AND 60;
connection con1;
UPDATE t1 SET b=-b WHERE a=100;
connection con2;
UPDATE t2 SET c=CONCAT('-', c) WHERE a BETWEEN 50 AND 90;
connection con1;
COMMIT;
connection con2;
COMMIT;
*** RocksDB-only transactions with binlog in InnoDB.
connection master;
UPDATE t2 SET c=CONCAT('<', c, '>') WHERE a BETWEEN 20 AND 80;
UPDATE t2 SET b=b+1 WHERE a=1 OR a=92;
UPDATE t2 SET b=b*2 WHERE a MOD 7 = 0;
connection master;
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c))
100 5050 348765 252500
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t2;
COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c))
100 5050 661537 252685
include/save_master_gtid.inc
connection slave;
include/sync_with_master_gtid.inc
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c))
100 5050 348765 252500
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t2;
COUNT(*) SUM(a) SUM(b) SUM(LENGTH(c))
100 5050 661537 252685
connection master;
DROP TABLE t1, t2;
include/rpl_end.inc

View file

@ -0,0 +1,60 @@
--source include/have_sequence.inc
--source include/have_rocksdb.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
--source include/have_innodb_binlog.inc
CREATE TABLE t1(a INT PRIMARY KEY, b INT, c LONGBLOB) ENGINE=InnoDB;
CREATE TABLE t2(a INT PRIMARY KEY, b INT, c LONGBLOB) ENGINE=RocksDB;
INSERT INTO t1 SELECT seq, seq*seq, REPEAT('x', 50*seq) FROM seq_1_to_100;
INSERT INTO t2 SELECT seq, 10000 - seq*seq, REPEAT('y', 50*seq) FROM seq_1_to_100;
--echo *** Cross-engine transaction, InnoDB and RocksDB.
--connect con1,localhost,root,,
BEGIN;
UPDATE t1 SET b=b+a WHERE a BETWEEN 10 AND 20;
REPLACE INTO t2 SELECT a, b, c FROM t1 WHERE a BETWEEN 30 and 40;
--connect con2,localhost,root,,
BEGIN;
UPDATE t1, t2
SET t1.b=t1.b + LENGTH(t2.c), t2.c=CONCAT("|", t2.c, "|")
WHERE t1.a = t2.a
AND t1.a BETWEEN 50 AND 60;
--connection con1
UPDATE t1 SET b=-b WHERE a=100;
--connection con2
UPDATE t2 SET c=CONCAT('-', c) WHERE a BETWEEN 50 AND 90;
--connection con1
COMMIT;
--connection con2
COMMIT;
--echo *** RocksDB-only transactions with binlog in InnoDB.
--connection master
UPDATE t2 SET c=CONCAT('<', c, '>') WHERE a BETWEEN 20 AND 80;
UPDATE t2 SET b=b+1 WHERE a=1 OR a=92;
UPDATE t2 SET b=b*2 WHERE a MOD 7 = 0;
--connection master
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t2;
--source include/save_master_gtid.inc
--connection slave
--source include/sync_with_master_gtid.inc
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t1;
SELECT COUNT(*), SUM(a), SUM(b), SUM(LENGTH(c)) FROM t2;
--connection master
DROP TABLE t1, t2;
--source include/rpl_end.inc

View file

@ -0,0 +1,11 @@
SET spider_same_server_link= on;
create server s foreign data wrapper mysql options (host "127.0.0.1", database "spider_db", user "root", port $MASTER_MYPORT);;
CREATE DATABASE spider_db;
CREATE TABLE spider_db.t (a INT) ENGINE=InnoDB;
CREATE TABLE t (a INT) ENGINE=SPIDER COMMENT = 'wrapper "mysql", srv "s", table "t"';
Warnings:
Warning 138 Spider table params in COMMENT or CONNECTION strings have been deprecated and will be removed in a future release. Please use table options instead.
INSERT INTO t VALUES (2);
DROP TABLE t;
DROP DATABASE spider_db;
DROP SERVER s;

View file

@ -0,0 +1,20 @@
if (`SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.ENGINES WHERE engine = 'spider' AND support IN ('YES', 'DEFAULT', 'ENABLED')`)
{
--skip Test requires Spider engine
}
--source include/have_binlog_format_mixed.inc
--source include/have_innodb_binlog.inc
SET spider_same_server_link= on;
--evalp create server s foreign data wrapper mysql options (host "127.0.0.1", database "spider_db", user "root", port $MASTER_MYPORT);
CREATE DATABASE spider_db;
CREATE TABLE spider_db.t (a INT) ENGINE=InnoDB;
CREATE TABLE t (a INT) ENGINE=SPIDER COMMENT = 'wrapper "mysql", srv "s", table "t"';
INSERT INTO t VALUES (2);
DROP TABLE t;
DROP DATABASE spider_db;
DROP SERVER s;

View file

@ -0,0 +1,180 @@
include/reset_master.inc
CREATE TABLE t1(a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
*** Test basic XA COMMIT from same or different client connection.
connect con1,localhost,root,,;
XA START 'a';
INSERT INTO t1 VALUES (1, 0);
INSERT INTO t1 VALUES (2, 0);
INSERT INTO t1 VALUES (3, 0);
XA END 'a';
XA PREPARE 'a';
connection default;
disconnect con1;
connect con2,localhost,root,,;
XA COMMIT 'a';
connect con1_roll,localhost,root,,;
XA START 'a_roll';
INSERT INTO t1 VALUES (1001, 0);
INSERT INTO t1 VALUES (1002, 0);
INSERT INTO t1 VALUES (1003, 0);
XA END 'a_roll';
XA PREPARE 'a_roll';
connection default;
disconnect con1_roll;
connection con2;
XA ROLLBACK 'a_roll';
connection default;
XA START 'b';
UPDATE t1 SET b=2 WHERE a=1;
XA END 'b';
XA PREPARE 'b';
XA COMMIT 'b';
XA START 'b_roll';
UPDATE t1 SET b=3 WHERE a=1;
XA END 'b_roll';
XA PREPARE 'b_roll';
XA ROLLBACK 'b_roll';
XA START 'c';
UPDATE t1 SET b=3 WHERE a=3;
XA END 'c';
XA COMMIT 'c' ONE PHASE;
SELECT * FROM t1 ORDER BY a;
a b
1 2
2 0
3 3
CREATE TABLE t2 (a INT) ENGINE=MyISAM;
CREATE TABLE t3 (id INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1),(2);
INSERT INTO t3 VALUES (1),(2);
XA START '1';
REPLACE INTO t2 SELECT * FROM t2;
REPLACE INTO t3 SELECT * FROM t3;
XA END '1';
XA PREPARE '1';
XA RECOVER FORMAT='SQL';
formatID gtrid_length bqual_length data
1 1 0 '1'
XA ROLLBACK '1';
Warnings:
Warning 1196 Some non-transactional changed tables couldn't be rolled back
DROP TABLE t2, t3;
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1);
CREATE SEQUENCE s ENGINE=InnoDB;
XA START '2';
SELECT NEXT VALUE FOR s;
NEXT VALUE FOR s
1
REPLACE INTO t2 SELECT * FROM t2;
XA END '2';
XA PREPARE '2';
XA RECOVER FORMAT='SQL';
formatID gtrid_length bqual_length data
1 1 0 '2'
XA ROLLBACK '2';
DROP SEQUENCE s;
DROP TABLE t2;
include/show_binlog_events.inc
Log_name Pos Event_type Server_id End_log_pos Info
binlog-000000.ibb # Gtid # # GTID #-#-#
binlog-000000.ibb # Query # # use `test`; INSERT INTO t1 VALUES (1, 0)
binlog-000000.ibb # Query # # use `test`; INSERT INTO t1 VALUES (2, 0)
binlog-000000.ibb # Query # # use `test`; INSERT INTO t1 VALUES (3, 0)
binlog-000000.ibb # Query # # COMMIT
binlog-000000.ibb # Gtid # # GTID #-#-#
binlog-000000.ibb # Query # # use `test`; UPDATE t1 SET b=2 WHERE a=1
binlog-000000.ibb # Query # # COMMIT
binlog-000000.ibb # Gtid # # BEGIN GTID #-#-#
binlog-000000.ibb # Query # # use `test`; UPDATE t1 SET b=3 WHERE a=3
binlog-000000.ibb # Xid # # COMMIT /* XID */
binlog-000000.ibb # Gtid # # GTID #-#-#
binlog-000000.ibb # Query # # use `test`; CREATE TABLE t2 (a INT) ENGINE=MyISAM
binlog-000000.ibb # Gtid # # GTID #-#-#
binlog-000000.ibb # Query # # use `test`; CREATE TABLE t3 (id INT PRIMARY KEY) ENGINE=InnoDB
binlog-000000.ibb # Gtid # # BEGIN GTID #-#-#
binlog-000000.ibb # Query # # use `test`; INSERT INTO t2 VALUES (1),(2)
binlog-000000.ibb # Query # # COMMIT
binlog-000000.ibb # Gtid # # BEGIN GTID #-#-#
binlog-000000.ibb # Query # # use `test`; INSERT INTO t3 VALUES (1),(2)
binlog-000000.ibb # Xid # # COMMIT /* XID */
binlog-000000.ibb # Gtid # # BEGIN GTID #-#-#
binlog-000000.ibb # Annotate_rows # # REPLACE INTO t2 SELECT * FROM t2
binlog-000000.ibb # Table_map # # table_id: # (test.t2)
binlog-000000.ibb # Write_rows_v1 # # table_id: # flags: STMT_END_F
binlog-000000.ibb # Query # # COMMIT
binlog-000000.ibb # Gtid # # GTID #-#-#
binlog-000000.ibb # Query # # use `test`; DROP TABLE `t2`,`t3` /* generated by server */
binlog-000000.ibb # Gtid # # GTID #-#-#
binlog-000000.ibb # Query # # use `test`; CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB
binlog-000000.ibb # Gtid # # BEGIN GTID #-#-#
binlog-000000.ibb # Query # # use `test`; INSERT INTO t2 VALUES (1)
binlog-000000.ibb # Xid # # COMMIT /* XID */
binlog-000000.ibb # Gtid # # GTID #-#-#
binlog-000000.ibb # Query # # use `test`; CREATE SEQUENCE s ENGINE=InnoDB
binlog-000000.ibb # Gtid # # BEGIN GTID #-#-#
binlog-000000.ibb # Annotate_rows # # SELECT NEXT VALUE FOR s
binlog-000000.ibb # Table_map # # table_id: # (test.s)
binlog-000000.ibb # Write_rows_v1 # # table_id: # flags: STMT_END_F
binlog-000000.ibb # Query # # COMMIT
binlog-000000.ibb # Gtid # # GTID #-#-#
binlog-000000.ibb # Query # # use `test`; DROP SEQUENCE `s` /* generated by server */
binlog-000000.ibb # Gtid # # GTID #-#-#
binlog-000000.ibb # Query # # use `test`; DROP TABLE `t2` /* generated by server */
*** Test that we will not purge a file that is needed by an active XA transaction.
connection default;
ALTER TABLE t1 ADD COLUMN c LONGBLOB;
UPDATE t1 SET b=10 WHERE a=1;
connection con2;
SET SESSION binlog_format=ROW;
XA START 'd';
INSERT INTO t1 VALUES (10, 2, REPEAT('#', 40000));
connect con3,localhost,root,,;
SET SESSION binlog_format=ROW;
XA START 'e';
INSERT INTO t1 VALUES (110, 2, REPEAT('#', 40000));
connection default;
UPDATE t1 SET b=11 WHERE a=1;
FLUSH BINARY LOGS;
UPDATE t1 SET b=12 WHERE a=1;
connection con2;
INSERT INTO t1 VALUES (11, 2, REPEAT('*', 40000));
connection con3;
INSERT INTO t1 VALUES (111, 2, REPEAT('*', 40000));
connection default;
UPDATE t1 SET b=13 WHERE a=1;
FLUSH BINARY LOGS;
UPDATE t1 SET b=14 WHERE a=1;
UPDATE t1 SET b=15 WHERE a=1;
FLUSH BINARY LOGS;
UPDATE t1 SET b=16 WHERE a=1;
FLUSH BINARY LOGS;
connection default;
SET @old_min_slaves= @@GLOBAL.slave_connections_needed_for_purge;
SET GLOBAL slave_connections_needed_for_purge= 0;
PURGE BINARY LOGS TO 'binlog-000001.ibb';
ERROR HY000: A purgeable log is in use, will not purge
connection con2;
XA END 'd';
XA PREPARE 'd';
connection con3;
XA END 'e';
XA PREPARE 'e';
connection default;
PURGE BINARY LOGS TO 'binlog-000001.ibb';
ERROR HY000: A purgeable log is in use, will not purge
connection con2;
XA COMMIT 'd';
connection con3;
XA ROLLBACK 'e';
connection default;
PURGE BINARY LOGS TO 'binlog-000001.ibb';
FLUSH BINARY LOGS;
UPDATE t1 SET b=17 WHERE a=1;
FLUSH BINARY LOGS;
UPDATE t1 SET b=18 WHERE a=1;
PURGE BINARY LOGS TO 'binlog-000001.ibb';
disconnect con2;
disconnect con3;
SET GLOBAL slave_connections_needed_for_purge= @old_min_slaves;
DROP TABLE t1;

View file

@ -0,0 +1,182 @@
--source include/have_binlog_format_mixed.inc
--source include/have_innodb_binlog.inc
--source include/reset_master.inc
CREATE TABLE t1(a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
--echo *** Test basic XA COMMIT from same or different client connection.
--let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1)
--let $binlog_start= query_get_value(SHOW MASTER STATUS, Position, 1)
--connect con1,localhost,root,,
--let $con1_id= `SELECT connection_id()`
XA START 'a';
INSERT INTO t1 VALUES (1, 0);
INSERT INTO t1 VALUES (2, 0);
INSERT INTO t1 VALUES (3, 0);
XA END 'a';
XA PREPARE 'a';
--connection default
--disconnect con1
# Must wait; XA transaction is not available until the creating session has disconnected.
--let $wait_condition= SELECT NOT EXISTS (SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE ID=$con1_id)
--source include/wait_condition.inc
--connect con2,localhost,root,,
XA COMMIT 'a';
# Also test rollback from separate client session.
--connect con1_roll,localhost,root,,
--let $con1_roll_id= `SELECT connection_id()`
XA START 'a_roll';
INSERT INTO t1 VALUES (1001, 0);
INSERT INTO t1 VALUES (1002, 0);
INSERT INTO t1 VALUES (1003, 0);
XA END 'a_roll';
XA PREPARE 'a_roll';
--connection default
--disconnect con1_roll
--let $wait_condition= SELECT NOT EXISTS (SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE ID=$con1_roll_id)
--source include/wait_condition.inc
--connection con2
XA ROLLBACK 'a_roll';
--connection default
XA START 'b';
UPDATE t1 SET b=2 WHERE a=1;
XA END 'b';
XA PREPARE 'b';
XA COMMIT 'b';
XA START 'b_roll';
UPDATE t1 SET b=3 WHERE a=1;
XA END 'b_roll';
XA PREPARE 'b_roll';
XA ROLLBACK 'b_roll';
XA START 'c';
UPDATE t1 SET b=3 WHERE a=3;
XA END 'c';
XA COMMIT 'c' ONE PHASE;
SELECT * FROM t1 ORDER BY a;
# Some queries that create empty XA transactions.
CREATE TABLE t2 (a INT) ENGINE=MyISAM;
CREATE TABLE t3 (id INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1),(2);
INSERT INTO t3 VALUES (1),(2);
XA START '1';
REPLACE INTO t2 SELECT * FROM t2;
REPLACE INTO t3 SELECT * FROM t3;
XA END '1';
XA PREPARE '1';
XA RECOVER FORMAT='SQL';
# Cleanup
XA ROLLBACK '1';
DROP TABLE t2, t3;
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1);
CREATE SEQUENCE s ENGINE=InnoDB;
XA START '2';
--disable_ps2_protocol
SELECT NEXT VALUE FOR s;
--enable_ps2_protocol
REPLACE INTO t2 SELECT * FROM t2;
XA END '2';
XA PREPARE '2';
XA RECOVER FORMAT='SQL';
# Cleanup
XA ROLLBACK '2';
DROP SEQUENCE s;
DROP TABLE t2;
--let $binlog_limit= 100
--source include/show_binlog_events.inc
--echo *** Test that we will not purge a file that is needed by an active XA transaction.
--connection default
# Add some longer data to force spill to binlog.
ALTER TABLE t1 ADD COLUMN c LONGBLOB;
UPDATE t1 SET b=10 WHERE a=1;
--connection con2
SET SESSION binlog_format=ROW;
XA START 'd';
INSERT INTO t1 VALUES (10, 2, REPEAT('#', 40000));
--connect con3,localhost,root,,
SET SESSION binlog_format=ROW;
XA START 'e';
INSERT INTO t1 VALUES (110, 2, REPEAT('#', 40000));
--connection default
UPDATE t1 SET b=11 WHERE a=1;
FLUSH BINARY LOGS;
UPDATE t1 SET b=12 WHERE a=1;
--let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1)
--connection con2
INSERT INTO t1 VALUES (11, 2, REPEAT('*', 40000));
--connection con3
INSERT INTO t1 VALUES (111, 2, REPEAT('*', 40000));
--connection default
UPDATE t1 SET b=13 WHERE a=1;
FLUSH BINARY LOGS;
UPDATE t1 SET b=14 WHERE a=1;
UPDATE t1 SET b=15 WHERE a=1;
FLUSH BINARY LOGS;
UPDATE t1 SET b=16 WHERE a=1;
FLUSH BINARY LOGS;
--connection default
SET @old_min_slaves= @@GLOBAL.slave_connections_needed_for_purge;
SET GLOBAL slave_connections_needed_for_purge= 0;
--error ER_LOG_IN_USE
eval PURGE BINARY LOGS TO '$binlog_file';
--connection con2
XA END 'd';
XA PREPARE 'd';
--connection con3
XA END 'e';
XA PREPARE 'e';
--connection default
--error ER_LOG_IN_USE
eval PURGE BINARY LOGS TO '$binlog_file';
--connection con2
XA COMMIT 'd';
--connection con3
XA ROLLBACK 'e';
--connection default
eval PURGE BINARY LOGS TO '$binlog_file';
FLUSH BINARY LOGS;
UPDATE t1 SET b=17 WHERE a=1;
FLUSH BINARY LOGS;
UPDATE t1 SET b=18 WHERE a=1;
eval PURGE BINARY LOGS TO '$binlog_file';
--disconnect con2
--disconnect con3
SET GLOBAL slave_connections_needed_for_purge= @old_min_slaves;
DROP TABLE t1;

View file

@ -1527,6 +1527,12 @@ int ha_prepare(THD *thd)
if (ha_info) if (ha_info)
{ {
if (unlikely(tc_log->log_xa_prepare(thd, all)))
{
ha_rollback_trans(thd, all);
error= 1;
goto binlog_error;
}
for (; ha_info; ha_info= ha_info->next()) for (; ha_info; ha_info= ha_info->next())
{ {
handlerton *ht= ha_info->ht(); handlerton *ht= ha_info->ht();
@ -1549,6 +1555,7 @@ int ha_prepare(THD *thd)
} }
} }
binlog_error:
DEBUG_SYNC(thd, "at_unlog_xa_prepare"); DEBUG_SYNC(thd, "at_unlog_xa_prepare");
if (tc_log->unlog_xa_prepare(thd, all)) if (tc_log->unlog_xa_prepare(thd, all))
@ -2056,14 +2063,14 @@ int ha_commit_trans(THD *thd, bool all)
if (wsrep_must_abort(thd)) if (wsrep_must_abort(thd))
{ {
mysql_mutex_unlock(&thd->LOCK_thd_data); mysql_mutex_unlock(&thd->LOCK_thd_data);
(void)tc_log->unlog(cookie, xid); (void)tc_log->unlog(thd, cookie, xid);
goto wsrep_err; goto wsrep_err;
} }
mysql_mutex_unlock(&thd->LOCK_thd_data); mysql_mutex_unlock(&thd->LOCK_thd_data);
} }
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE();); DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE(););
if (tc_log->unlog(cookie, xid)) if (tc_log->unlog(thd, cookie, xid))
error= 2; /* Error during commit */ error= 2; /* Error during commit */
done: done:
@ -2208,29 +2215,6 @@ inline Ha_trx_info* get_binlog_hton(Ha_trx_info *ha_info)
return ha_info; return ha_info;
} }
static int run_binlog_first(THD *thd, bool all, THD_TRANS *trans,
bool is_real_trans, bool is_commit)
{
int rc= 0;
Ha_trx_info *ha_info= trans->ha_list;
if ((ha_info= get_binlog_hton(ha_info)))
{
int err;
if ((err= is_commit ? binlog_commit(thd, all,
is_ro_1pc_trans(thd, ha_info, all,
is_real_trans))
: binlog_rollback(ha_info->ht(), thd, all)))
{
my_error(is_commit ? ER_ERROR_DURING_COMMIT : ER_ERROR_DURING_ROLLBACK,
MYF(0), err);
rc= 1;
}
}
return rc;
}
static int static int
commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
{ {
@ -2244,19 +2228,19 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
if (ha_info) if (ha_info)
{ {
int err= 0; int err= 0;
/* Ha_trx_info *binlog_ha_info= get_binlog_hton(ha_info);
Binlog hton must be called first regardless of its position if (binlog_ha_info &&
in trans->ha_list at least to prevent from commiting any engine (err= binlog_commit(thd, all,
branches when afterward a duplicate GTID error out of binlog_commit() is_ro_1pc_trans(thd, ha_info, all,
is generated. is_real_trans))))
*/
for (int binlog_err= error=
run_binlog_first(thd, all, trans, is_real_trans, true);
ha_info; ha_info= ha_info_next)
{ {
if (binlog_err) my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
goto err; error= 1;
goto err;
}
for (; ha_info; ha_info= ha_info_next)
{
handlerton *ht= ha_info->ht(); handlerton *ht= ha_info->ht();
if ((err= ht->commit(ht, thd, all))) if ((err= ht->commit(ht, thd, all)))
{ {
@ -2270,6 +2254,8 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
ha_info_next= ha_info->next(); ha_info_next= ha_info->next();
ha_info->reset(); /* keep it conveniently zero-filled */ ha_info->reset(); /* keep it conveniently zero-filled */
} }
if (binlog_ha_info && is_real_trans)
binlog_post_commit(thd, all);
trans->ha_list= 0; trans->ha_list= 0;
trans->no_2pc=0; trans->no_2pc=0;
if (all) if (all)
@ -2378,6 +2364,8 @@ int ha_rollback_trans(THD *thd, bool all)
if (ha_info) if (ha_info)
{ {
int err;
/* Close all cursors that can not survive ROLLBACK */ /* Close all cursors that can not survive ROLLBACK */
if (is_real_trans) /* not a statement commit */ if (is_real_trans) /* not a statement commit */
thd->stmt_map.close_transient_cursors(); thd->stmt_map.close_transient_cursors();
@ -2388,12 +2376,17 @@ int ha_rollback_trans(THD *thd, bool all)
rollbacker and any transaction that depends on it. This guarantees rollbacker and any transaction that depends on it. This guarantees
the execution time dependency identifies binlog ordering. the execution time dependency identifies binlog ordering.
*/ */
for (error= run_binlog_first(thd, all, trans, is_real_trans, false); Ha_trx_info *binlog_ha_info= get_binlog_hton(ha_info);
ha_info; ha_info= ha_info_next) if (binlog_ha_info && (err= binlog_rollback(binlog_hton, thd, all)))
{
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
error= 1;
}
for (; ha_info; ha_info= ha_info_next)
{ {
int err;
handlerton *ht= ha_info->ht(); handlerton *ht= ha_info->ht();
if (ht != binlog_hton && (err= ht->rollback(ht, thd, all))) if (ha_info != binlog_ha_info && (err= ht->rollback(ht, thd, all)))
{ {
// cannot happen // cannot happen
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err); my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
@ -2415,6 +2408,9 @@ int ha_rollback_trans(THD *thd, bool all)
} }
trans->ha_list= 0; trans->ha_list= 0;
trans->no_2pc=0; trans->no_2pc=0;
if (binlog_ha_info)
binlog_post_rollback(thd, all);
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
@ -2528,6 +2524,10 @@ int ha_commit_or_rollback_by_xid(XID *xid, bool commit)
plugin_foreach(NULL, commit ? xacommit_handlerton : xarollback_handlerton, plugin_foreach(NULL, commit ? xacommit_handlerton : xarollback_handlerton,
MYSQL_STORAGE_ENGINE_PLUGIN, &xaop); MYSQL_STORAGE_ENGINE_PLUGIN, &xaop);
if (commit)
binlog_post_commit_by_xid(binlog_hton, xid);
else
binlog_post_rollback_by_xid(binlog_hton, xid);
return xaop.result; return xaop.result;
} }

View file

@ -917,7 +917,7 @@ struct xid_t {
{ return !xid->is_null() && eq(xid->gtrid_length, xid->bqual_length, xid->data); } { return !xid->is_null() && eq(xid->gtrid_length, xid->bqual_length, xid->data); }
bool eq(long g, long b, const char *d) const bool eq(long g, long b, const char *d) const
{ return !is_null() && g == gtrid_length && b == bqual_length && !memcmp(d, data, g+b); } { return !is_null() && g == gtrid_length && b == bqual_length && !memcmp(d, data, g+b); }
void set(struct xid_t *xid) void set(const struct xid_t *xid)
{ memcpy(this, xid, xid->length()); } { memcpy(this, xid, xid->length()); }
void set(long f, const char *g, long gl, const char *b, long bl) void set(long f, const char *g, long gl, const char *b, long bl)
{ {
@ -965,7 +965,7 @@ struct xid_t {
memcpy(&trx_server_id, data+MYSQL_XID_PREFIX_LEN, sizeof(trx_server_id)); memcpy(&trx_server_id, data+MYSQL_XID_PREFIX_LEN, sizeof(trx_server_id));
return trx_server_id; return trx_server_id;
} }
uint length() uint length() const
{ {
return static_cast<uint>(sizeof(formatID)) + key_length(); return static_cast<uint>(sizeof(formatID)) + key_length();
} }
@ -1604,6 +1604,43 @@ struct handlerton
void (*binlog_oob_reset)(void **engine_data); void (*binlog_oob_reset)(void **engine_data);
/* Call to allow engine to release the engine_data from binlog_oob_data(). */ /* Call to allow engine to release the engine_data from binlog_oob_data(). */
void (*binlog_oob_free)(void *engine_data); void (*binlog_oob_free)(void *engine_data);
/*
Durably persist the event data for the current user-XA transaction,
identified by XID.
This way, a later XA COMMIT can then be binlogged correctly with the
persisted event data, even across server restart.
The ENGINE_COUNT is the number of storage engines that participate in the
XA transaction. This is used to correctly handle crash recovery if the
server crashed in the middle of XA PREPARE. If during crash recovery,
we find the XID present in less than ENGINE_COUNT engines, then the
XA PREPARE did not complete before the crash, and should be rolled back
during crash recovery.
*/
/* Binlog an event group that doesn't go through commit_ordered. */
bool (*binlog_write_xa_prepare_ordered)(THD *thd,
handler_binlog_event_group_info *binlog_info, uchar engine_count);
bool (*binlog_write_xa_prepare)(THD *thd,
handler_binlog_event_group_info *binlog_info, uchar engine_count);
/*
Binlog rollback a transaction that was previously made durably prepared
with binlog_write_xa_prepare.
*/
bool (*binlog_xa_rollback_ordered)(THD *thd, const XID *xid,
void **engine_data);
bool (*binlog_xa_rollback)(THD *thd, const XID *xid, void **engine_data);
/*
The "unlog" method is used after a commit with an XID - either internal
2-phase commit with a separate storage engine, or explicit user
XA COMMIT. For user XA, it is also used after XA ROLLBACK.
The binlog first writes the commit durably, then the engines commit
durably, and finally "unlog" is done. The binlog engine must ensure it
can recover the committed XID until unlog has been called, after which
point resources can be freed, binlog files purged, etc.
*/
void (*binlog_unlog)(const XID *xid, void **engine_data);
/* /*
Obtain an object to allow reading from the binlog. Obtain an object to allow reading from the binlog.
The boolean argument wait_durable is set to true to require that The boolean argument wait_durable is set to true to require that
@ -5930,6 +5967,13 @@ String dbug_format_row(TABLE *table, const uchar *rec, bool print_names= true);
/* Struct with info about an event group to be binlogged by a storage engine. */ /* Struct with info about an event group to be binlogged by a storage engine. */
struct handler_binlog_event_group_info { struct handler_binlog_event_group_info {
/*
These are returned by (set by) the binlog_write_direct_ordered hton
method to approximate/best-effort position of the start of where the
event group was written.
*/
uint64_t out_file_no;
uint64_t out_offset;
/* Opaque pointer for the engine's use. */ /* Opaque pointer for the engine's use. */
void *engine_ptr; void *engine_ptr;
/* /*
@ -5941,6 +5985,14 @@ struct handler_binlog_event_group_info {
transaction cache part in the commit record. transaction cache part in the commit record.
*/ */
void *engine_ptr2; void *engine_ptr2;
/*
The XID for XA PREPARE/XA COMMIT; else NULL.
When this is set, the IO_CACHE only contains the GTID. All other event data
was spilled as OOB and persisted with the binlog_write_xa_prepare hton
call; the engine binlog implementation must use the XID to look up or
otherwise refer to that OOB data.
*/
const XID *xa_xid;
/* End of data that has already been binlogged out-of-band. */ /* End of data that has already been binlogged out-of-band. */
my_off_t out_of_band_offset; my_off_t out_of_band_offset;
/* /*
@ -5948,6 +6000,22 @@ struct handler_binlog_event_group_info {
at the end of the IO_CACHE containing the data to be binlogged. at the end of the IO_CACHE containing the data to be binlogged.
*/ */
my_off_t gtid_offset; my_off_t gtid_offset;
/*
If xa_xid is non-NULL, this is set for an internal 2-phase commit between
the engine binlog and one or more additional storage engines participating
in the transaction. In this case, there is no call to the
binlog_write_xa_prepare() method. The binlog engine must record durably
that the xa_xid was committed, and in case of recovery it must pass the
xa_xid to the server layer for it to commit in all participating engines.
If not set, any XID is user external XA, and the xa_xid was previously
passed to binlog_write_xa_prepare(). The binlog engine must again record
durably that the xa_xid was committed and recover it in case of crash.
The ability to recover the xa_xid must remain until the binlog_xa_unlog()
method is called.
*/
bool internal_xa;
}; };

View file

@ -451,6 +451,10 @@ public:
} }
engine_binlogged= FALSE; engine_binlogged= FALSE;
need_write_direct= FALSE; need_write_direct= FALSE;
/*
need_engine_2pc is not reset here, as we need it still, at the end of
MYSQL_LOG_BIN::log_and_order() where it will be reset.
*/
} }
binlog_cache_data* get_binlog_cache_data(bool is_transactional) binlog_cache_data* get_binlog_cache_data(bool is_transactional)
@ -469,6 +473,9 @@ public:
binlog_cache_data trx_cache; binlog_cache_data trx_cache;
/* Buffer used to pass internal my_xid into engine as struct xid_t. */
XID xid_buf;
/* /*
Binlog position for current transaction. Binlog position for current transaction.
For START TRANSACTION WITH CONSISTENT SNAPSHOT, this is the binlog For START TRANSACTION WITH CONSISTENT SNAPSHOT, this is the binlog
@ -500,6 +507,12 @@ public:
*/ */
bool using_xa; bool using_xa;
my_xid xa_xid; my_xid xa_xid;
/*
Set true when not using --binlog-storage-engine and we need to decrement
the xid_list reference count for the transaction at unlog time. (The
xid_list refcounting is used to keep binlog files for recovery while
transactions may still be in the prepared state).
*/
bool need_unlog; bool need_unlog;
/* /*
Set true when binlog engine fetches the cache data with binlog_get_cache() Set true when binlog engine fetches the cache data with binlog_get_cache()
@ -516,6 +529,12 @@ public:
Id of binlog that transaction was written to; only needed if need_unlog is Id of binlog that transaction was written to; only needed if need_unlog is
true. true.
*/ */
/*
Set when using --binlog-storage-engine, but there is another XA-capable
engine involved in the transaction, so that we need to do 2-phase commit
to ensure consistency in case of crash.
*/
bool need_engine_2pc;
ulong binlog_id; ulong binlog_id;
/* Set if we get an error during commit that must be returned from unlog(). */ /* Set if we get an error during commit that must be returned from unlog(). */
bool delayed_error; bool delayed_error;
@ -1993,11 +2012,16 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
} }
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
if (opt_binlog_engine_hton) if (opt_binlog_engine_hton &&
likely(!(thd->lex->sql_command == SQLCOM_XA_COMMIT &&
thd->lex->xa_opt != XA_ONE_PHASE)))
{ {
/* /*
Write the end_event into the cache, in preparation for sending the Write the end_event into the cache, in preparation for sending the
cache to the engine to be binlogged as a whole. cache to the engine to be binlogged as a whole.
Except for user XA COMMIT, where we already wrote the end event into
the OOB data that was persisted in the binlog.
*/ */
binlog_cache_data *cache_data; binlog_cache_data *cache_data;
if (doing_trx || !doing_stmt) if (doing_trx || !doing_stmt)
@ -2065,6 +2089,26 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
} }
static void
binlog_setup_engine_commit_data(handler_binlog_event_group_info *context,
binlog_cache_mngr *cache_mngr)
{
if (unlikely(context->xa_xid))
{
/* Mark that we are doing XA and need to unlog. */
cache_mngr->need_engine_2pc= true;
}
else if (unlikely(cache_mngr->need_engine_2pc))
{
/* Internal 2-phase with multiple xa-capable engines. */
DBUG_ASSERT(cache_mngr->xa_xid != 0);
cache_mngr->xid_buf.set(cache_mngr->xa_xid);
context->xa_xid= &cache_mngr->xid_buf;
context->internal_xa= true;
}
}
extern "C" extern "C"
void void
binlog_get_cache(THD *thd, uint64_t file_no, uint64_t offset, binlog_get_cache(THD *thd, uint64_t file_no, uint64_t offset,
@ -2077,7 +2121,8 @@ binlog_get_cache(THD *thd, uint64_t file_no, uint64_t offset,
binlog_cache_mngr *cache_mngr; binlog_cache_mngr *cache_mngr;
const rpl_gtid *gtid= nullptr; const rpl_gtid *gtid= nullptr;
/* opt_binlog_engine_hton can be unset during bootstrap. */ /* opt_binlog_engine_hton can be unset during bootstrap. */
if (opt_binlog_engine_hton && (cache_mngr= thd->binlog_get_cache_mngr())) if (likely(opt_binlog_engine_hton) &&
(cache_mngr= thd->binlog_get_cache_mngr()))
{ {
cache_mngr->engine_binlogged= TRUE; cache_mngr->engine_binlogged= TRUE;
cache_mngr->last_commit_pos_file.engine_file_no= file_no; cache_mngr->last_commit_pos_file.engine_file_no= file_no;
@ -2099,6 +2144,7 @@ binlog_get_cache(THD *thd, uint64_t file_no, uint64_t offset,
else else
context->engine_ptr2= nullptr; context->engine_ptr2= nullptr;
} }
binlog_setup_engine_commit_data(context, cache_mngr);
gtid= thd->get_last_commit_gtid(); gtid= thd->get_last_commit_gtid();
} }
*out_cache= cache; *out_cache= cache;
@ -2174,8 +2220,17 @@ binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr,
DBUG_ASSERT(thd->transaction->xid_state.get_state_code() == DBUG_ASSERT(thd->transaction->xid_state.get_state_code() ==
XA_PREPARED); XA_PREPARED);
buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(), if (opt_binlog_engine_hton)
buf, query, q_len); {
cache_mngr->trx_cache.engine_binlog_info.xa_xid=
thd->transaction->xid_state.get_xid();
cache_mngr->trx_cache.engine_binlog_info.internal_xa= false;
}
else
{
buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(),
buf, query, q_len);
}
} }
Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0); Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0);
@ -2201,8 +2256,31 @@ binlog_rollback_flush_trx_cache(THD *thd, bool all,
char buf[q_len + ser_buf_size]= "ROLLBACK"; char buf[q_len + ser_buf_size]= "ROLLBACK";
size_t buflen= sizeof("ROLLBACK") - 1; size_t buflen= sizeof("ROLLBACK") - 1;
if (thd->transaction->xid_state.is_explicit_XA()) if (unlikely(thd->transaction->xid_state.is_explicit_XA()))
{ {
if (opt_binlog_engine_hton)
{
int err= 0;
binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
if (unlikely(!cache_mngr))
return 1;
binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(true);
handler_binlog_event_group_info *engine_context=
&cache_data->engine_binlog_info;
const XID *xid= thd->transaction->xid_state.get_xid();
mysql_mutex_lock(&LOCK_commit_ordered);
err= (*opt_binlog_engine_hton->binlog_xa_rollback_ordered)
(thd, xid, &engine_context->engine_ptr);
mysql_mutex_unlock(&LOCK_commit_ordered);
if (likely(!err))
err= (*opt_binlog_engine_hton->binlog_xa_rollback)
(thd, xid, &engine_context->engine_ptr);
cache_mngr->reset(false, true);
cache_mngr->need_engine_2pc= true;
return err;
}
/* for not prepared use plain ROLLBACK */ /* for not prepared use plain ROLLBACK */
if (thd->transaction->xid_state.get_state_code() == XA_PREPARED) if (thd->transaction->xid_state.get_state_code() == XA_PREPARED)
buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(), buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(),
@ -2329,10 +2407,70 @@ inline bool is_preparing_xa(THD *thd)
} }
static int binlog_prepare(handlerton *hton, THD *thd, bool all) int
MYSQL_BIN_LOG::log_xa_prepare(THD *thd, bool all)
{ {
/* Do nothing unless the transaction is a user XA. */ /* Do nothing unless the transaction is a user XA. */
return is_preparing_xa(thd) ? binlog_commit(thd, all, FALSE) : 0; if (is_preparing_xa(thd) &&
thd->ha_data[binlog_hton->slot].ha_info[1].is_started())
{
if (opt_binlog_engine_hton)
{
/*
Tell the binlog engine to persist the event data for the current
transaction, identified by the user-supplied XID.
This way, a later XA COMMIT can then be binlogged correctly with the
persisted event data, even across server restart.
*/
binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
if (unlikely(!cache_mngr))
return 1;
binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(true);
/* Put in the end event. */
{
Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"),
TRUE, TRUE, TRUE, 0);
end_ev.cache_type= Log_event::EVENT_TRANSACTIONAL_CACHE;
if (write_event(&end_ev, BINLOG_CHECKSUM_ALG_OFF, 0,
&cache_data->cache_log))
return 1;
}
/* Make sure all event data is flushed as OOB. */
if (unlikely(my_b_flush_io_cache(&cache_data->cache_log, 0)))
return 1;
handler_binlog_event_group_info *engine_context=
&cache_data->engine_binlog_info;
engine_context->xa_xid= thd->transaction->xid_state.get_xid();
uchar engine_count= (uchar)ha_count_rw_2pc(thd, true);
mysql_mutex_lock(&LOCK_commit_ordered);
bool err= (*opt_binlog_engine_hton->binlog_write_xa_prepare_ordered)
(thd, engine_context, engine_count);
mysql_mutex_unlock(&LOCK_commit_ordered);
if (likely(!err))
err= (*opt_binlog_engine_hton->binlog_write_xa_prepare)
(thd, engine_context, engine_count);
cache_mngr->reset(false, true);
return err;
}
else
return binlog_commit(thd, all, FALSE);
}
return 0;
}
static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{
/*
ToDo: We do not really need a prepare() hton method in the binlog, we are
the transaction coordinator, should do our work in log_xa_prepare().
There is currently code that looks at registered htons if they have the
"prepare" method and use that to decide how the transaction should be
handled; until this is refactored, we need to have a prepare method in the
binlog which just does nothing.
*/
return 0;
} }
@ -2568,7 +2706,8 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc)
if (cache_mngr->need_unlog && !is_xa_prepare) if (cache_mngr->need_unlog && !is_xa_prepare)
{ {
error= error=
mysql_bin_log.unlog(BINLOG_COOKIE_MAKE(cache_mngr->binlog_id, mysql_bin_log.unlog(thd,
BINLOG_COOKIE_MAKE(cache_mngr->binlog_id,
cache_mngr->delayed_error), 1); cache_mngr->delayed_error), 1);
cache_mngr->need_unlog= false; cache_mngr->need_unlog= false;
} }
@ -2583,6 +2722,80 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc)
DBUG_RETURN(error); DBUG_RETURN(error);
} }
void
binlog_post_commit(THD *thd, bool all)
{
if (!opt_binlog_engine_hton)
return;
binlog_cache_mngr *cache_mngr= thd->binlog_get_cache_mngr();
if (likely(cache_mngr != nullptr) && unlikely(cache_mngr->need_engine_2pc))
{
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT &&
thd->lex->xa_opt != XA_ONE_PHASE);
cache_mngr->need_engine_2pc= false;
(*opt_binlog_engine_hton->binlog_unlog)
(thd->transaction->xid_state.get_xid(),
&cache_mngr->trx_cache.engine_binlog_info.engine_ptr);
}
}
void
binlog_post_commit_by_xid(handlerton *hton, XID *xid)
{
if (!opt_binlog_engine_hton)
return;
THD *thd= current_thd;
binlog_cache_mngr *cache_mngr= thd->binlog_get_cache_mngr();
if (likely(cache_mngr != nullptr) && unlikely(cache_mngr->need_engine_2pc))
{
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT &&
thd->lex->xa_opt != XA_ONE_PHASE);
cache_mngr->need_engine_2pc= false;
(*opt_binlog_engine_hton->binlog_unlog)
(xid, &cache_mngr->trx_cache.engine_binlog_info.engine_ptr);
}
}
void
binlog_post_rollback(THD *thd, bool all)
{
if (!opt_binlog_engine_hton)
return;
binlog_cache_mngr *cache_mngr= thd->binlog_get_cache_mngr();
if (likely(cache_mngr != nullptr) && unlikely(cache_mngr->need_engine_2pc))
{
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK);
cache_mngr->need_engine_2pc= false;
(*opt_binlog_engine_hton->binlog_unlog)
(thd->transaction->xid_state.get_xid(),
&cache_mngr->trx_cache.engine_binlog_info.engine_ptr);
}
}
void
binlog_post_rollback_by_xid(handlerton *hton, XID *xid)
{
if (!opt_binlog_engine_hton)
return;
THD *thd= current_thd;
binlog_cache_mngr *cache_mngr= thd->binlog_get_cache_mngr();
if (likely(cache_mngr != nullptr) && unlikely(cache_mngr->need_engine_2pc))
{
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK);
cache_mngr->need_engine_2pc= false;
(*opt_binlog_engine_hton->binlog_unlog)
(xid, &cache_mngr->trx_cache.engine_binlog_info.engine_ptr);
}
}
/** /**
This function is called when a transaction or a statement is rolled back. This function is called when a transaction or a statement is rolled back.
@ -4550,7 +4763,8 @@ MYSQL_BIN_LOG::open_engine(handlerton *hton, ulong max_size, const char *dir)
IO_CACHE cache; IO_CACHE cache;
init_io_cache(&cache, (File)-1, binlog_cache_size, WRITE_CACHE, 0, false, init_io_cache(&cache, (File)-1, binlog_cache_size, WRITE_CACHE, 0, false,
MYF(MY_DONT_CHECK_FILESIZE)); MYF(MY_DONT_CHECK_FILESIZE));
handler_binlog_event_group_info engine_context= { nullptr, nullptr, 0, 0 }; handler_binlog_event_group_info engine_context=
{ 0, 0, nullptr, nullptr, nullptr, 0, 0, 0 };
write_event(&s, BINLOG_CHECKSUM_ALG_OFF, 0, &cache); write_event(&s, BINLOG_CHECKSUM_ALG_OFF, 0, &cache);
mysql_mutex_lock(&LOCK_commit_ordered); mysql_mutex_lock(&LOCK_commit_ordered);
(*opt_binlog_engine_hton->binlog_write_direct_ordered) (&cache, (*opt_binlog_engine_hton->binlog_write_direct_ordered) (&cache,
@ -8385,6 +8599,9 @@ err:
goto engine_fail; goto engine_fail;
} }
mysql_mutex_unlock(&LOCK_commit_ordered); mysql_mutex_unlock(&LOCK_commit_ordered);
cache_mngr->last_commit_pos_file.engine_file_no=
engine_context->out_file_no;
cache_mngr->last_commit_pos_offset= engine_context->out_file_no;
if (unlikely((*opt_binlog_engine_hton->binlog_write_direct) if (unlikely((*opt_binlog_engine_hton->binlog_write_direct)
(file, engine_context, commit_gtid))) (file, engine_context, commit_gtid)))
@ -9452,6 +9669,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
) )
{ {
cache_mngr->need_unlog= false; cache_mngr->need_unlog= false;
cache_mngr->need_engine_2pc= false;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
@ -9461,21 +9679,32 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
entry.all= all; entry.all= all;
entry.using_stmt_cache= using_stmt_cache; entry.using_stmt_cache= using_stmt_cache;
entry.using_trx_cache= using_trx_cache; entry.using_trx_cache= using_trx_cache;
entry.need_unlog= is_preparing_xa(thd); entry.need_unlog= unlikely(is_preparing_xa(thd)) && !opt_binlog_engine_hton;
ha_info= all ? thd->transaction->all.ha_list : thd->transaction->stmt.ha_list; ha_info= all ? thd->transaction->all.ha_list : thd->transaction->stmt.ha_list;
entry.ro_1pc= is_ro_1pc; entry.ro_1pc= is_ro_1pc;
entry.do_binlog_group_commit_ordered= false; entry.do_binlog_group_commit_ordered= false;
entry.end_event= end_ev; entry.end_event= end_ev;
cache_mngr->need_engine_2pc= false;
auto has_xid= entry.end_event->get_type_code() == XID_EVENT; auto has_xid= entry.end_event->get_type_code() == XID_EVENT;
for (; ha_info; ha_info= ha_info->next()) for (; ha_info; ha_info= ha_info->next())
{ {
if (likely(ha_info->is_started())) if (likely(has_xid) && likely(ha_info->is_started()))
{ {
if (has_xid && ha_info->ht() != binlog_hton && if (opt_binlog_engine_hton)
ha_info->is_trx_read_write() && {
!ha_info->ht()->commit_checkpoint_request) if (ha_info->ht() != binlog_hton &&
entry.need_unlog= true; ha_info->ht() != opt_binlog_engine_hton &&
ha_info->is_trx_read_write())
cache_mngr->need_engine_2pc= true;
}
else
{
if (ha_info->ht() != binlog_hton &&
ha_info->is_trx_read_write() &&
!ha_info->ht()->commit_checkpoint_request)
entry.need_unlog= true;
}
} }
else else
break; break;
@ -9930,12 +10159,17 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
IO_CACHE *file= &cache_data->cache_log; IO_CACHE *file= &cache_data->cache_log;
handler_binlog_event_group_info *engine_context= handler_binlog_event_group_info *engine_context=
&cache_data->engine_binlog_info; &cache_data->engine_binlog_info;
binlog_setup_engine_commit_data(engine_context, cache_mngr);
if (likely(!entry->error)) if (likely(!entry->error))
{ {
entry->error= (*opt_binlog_engine_hton->binlog_write_direct_ordered) entry->error= (*opt_binlog_engine_hton->binlog_write_direct_ordered)
(file, engine_context, entry->thd->get_last_commit_gtid()); (file, engine_context, entry->thd->get_last_commit_gtid());
if (likely(!entry->error)) if (likely(!entry->error))
{ {
cache_mngr->last_commit_pos_file.engine_file_no=
engine_context->out_file_no;
cache_mngr->last_commit_pos_offset= engine_context->out_file_no;
/* Mark to call binlog_write_direct() later. */ /* Mark to call binlog_write_direct() later. */
cache_mngr->need_write_direct= TRUE; cache_mngr->need_write_direct= TRUE;
} }
@ -10420,12 +10654,17 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
IO_CACHE *file= &cache_data->cache_log; IO_CACHE *file= &cache_data->cache_log;
handler_binlog_event_group_info *engine_context= handler_binlog_event_group_info *engine_context=
&cache_data->engine_binlog_info; &cache_data->engine_binlog_info;
binlog_setup_engine_commit_data(engine_context, cache_mngr);
if (likely(!current->error)) if (likely(!current->error))
{ {
current->error= (*opt_binlog_engine_hton->binlog_write_direct_ordered) current->error= (*opt_binlog_engine_hton->binlog_write_direct_ordered)
(file, engine_context, current->thd->get_last_commit_gtid()); (file, engine_context, current->thd->get_last_commit_gtid());
if (likely(!current->error)) if (likely(!current->error))
{ {
cache_mngr->last_commit_pos_file.engine_file_no=
engine_context->out_file_no;
cache_mngr->last_commit_pos_offset= engine_context->out_file_no;
/* Mark to call binlog_write_direct later. */ /* Mark to call binlog_write_direct later. */
cache_mngr->need_write_direct= TRUE; cache_mngr->need_write_direct= TRUE;
} }
@ -10565,8 +10804,15 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
else else
{ {
DBUG_ASSERT((entry->using_stmt_cache && !mngr->stmt_cache.empty()) || DBUG_ASSERT((entry->using_stmt_cache && !mngr->stmt_cache.empty()) ||
(entry->using_trx_cache && !mngr->trx_cache.empty()) (entry->using_trx_cache && !mngr->trx_cache.empty()) ||
/* Assert that empty transaction is handled elsewhere. */); (entry->using_trx_cache &&
mngr->trx_cache.engine_binlog_info.xa_xid != nullptr)
/*
Assert that empty transaction is handled elsewhere.
Except in XA COMMIT, all events are OOB-spilled with the
prepare record, the caches are empty.
*/
);
if (unlikely((entry->using_stmt_cache && !mngr->stmt_cache.empty()) && if (unlikely((entry->using_stmt_cache && !mngr->stmt_cache.empty()) &&
(entry->using_trx_cache && !mngr->trx_cache.empty()))) (entry->using_trx_cache && !mngr->trx_cache.empty())))
{ {
@ -10590,9 +10836,16 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
DBUG_RETURN(ER_ERROR_ON_WRITE); DBUG_RETURN(ER_ERROR_ON_WRITE);
} }
binlog_cache_data *cache_data= binlog_cache_data *cache_data= entry->using_trx_cache ?
(entry->using_trx_cache && !mngr->trx_cache.empty()) ?
&mngr->trx_cache : &mngr->stmt_cache; &mngr->trx_cache : &mngr->stmt_cache;
DBUG_ASSERT(!(entry->using_trx_cache &&
mngr->trx_cache.empty() &&
!mngr->stmt_cache.empty())
/*
Assert that we do not put the GTID in the trx cache
when the only event data is in the stmt cache.
*/
);
/* /*
The GTID event cannot go first since we only allocate the GTID at binlog The GTID event cannot go first since we only allocate the GTID at binlog
time. So write the GTID at the very end, and record its offset so that the time. So write the GTID at the very end, and record its offset so that the
@ -11878,7 +12131,7 @@ mmap_do_checkpoint_callback(void *data)
++pending->pending_count; ++pending->pending_count;
} }
int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid) int TC_LOG_MMAP::unlog(THD *thd, ulong cookie, my_xid xid)
{ {
pending_cookies *full_buffer= NULL; pending_cookies *full_buffer= NULL;
uint32 ncookies= tc_log_page_size / sizeof(my_xid); uint32 ncookies= tc_log_page_size / sizeof(my_xid);
@ -12327,6 +12580,13 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
relocated to the current or later point. relocated to the current or later point.
*/ */
cache_mngr->need_unlog= false; cache_mngr->need_unlog= false;
if (unlikely(cache_mngr->need_engine_2pc))
{
DBUG_ASSERT(!need_unlog);
cache_mngr->need_engine_2pc= false;
DBUG_RETURN(BINLOG_COOKIE_ENGINE_UNLOG(cache_mngr->delayed_error));
}
/* /*
If using explicit user XA, we will not have XID. We must still return a If using explicit user XA, we will not have XID. We must still return a
non-zero cookie (as zero cookie signals error). non-zero cookie (as zero cookie signals error).
@ -12487,13 +12747,24 @@ TC_LOG_BINLOG::mark_xid_done(ulong binlog_id, bool write_checkpoint)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) int TC_LOG_BINLOG::unlog(THD *thd, ulong cookie, my_xid xid)
{ {
DBUG_ENTER("TC_LOG_BINLOG::unlog"); DBUG_ENTER("TC_LOG_BINLOG::unlog");
if (!xid) if (!xid)
DBUG_RETURN(0); DBUG_RETURN(0);
if (!BINLOG_COOKIE_IS_DUMMY(cookie)) if (BINLOG_COOKIE_IS_ENGINE_UNLOG(cookie))
{
DBUG_ASSERT(opt_binlog_engine_hton);
XID xid_buf;
xid_buf.set(xid);
binlog_cache_mngr *cache_mngr= thd->binlog_get_cache_mngr();
DBUG_ASSERT(cache_mngr != nullptr);
if (likely(cache_mngr != nullptr))
(*opt_binlog_engine_hton->binlog_unlog)
(&xid_buf, &cache_mngr->trx_cache.engine_binlog_info.engine_ptr);
}
else if (!BINLOG_COOKIE_IS_DUMMY(cookie))
mark_xid_done(BINLOG_COOKIE_GET_ID(cookie), true); mark_xid_done(BINLOG_COOKIE_GET_ID(cookie), true);
/* /*
See comment in trx_group_commit_leader() - if rotate() gave a failure, See comment in trx_group_commit_leader() - if rotate() gave a failure,
@ -12511,6 +12782,9 @@ int TC_LOG_BINLOG::unlog_xa_prepare(THD *thd, bool all)
{ {
DBUG_ASSERT(is_preparing_xa(thd)); DBUG_ASSERT(is_preparing_xa(thd));
if (opt_binlog_engine_hton)
return 0;
binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data(); binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
int cookie= 0; int cookie= 0;
@ -12541,7 +12815,7 @@ int TC_LOG_BINLOG::unlog_xa_prepare(THD *thd, bool all)
cookie= BINLOG_COOKIE_MAKE(cache_mngr->binlog_id, cache_mngr->delayed_error); cookie= BINLOG_COOKIE_MAKE(cache_mngr->binlog_id, cache_mngr->delayed_error);
cache_mngr->need_unlog= false; cache_mngr->need_unlog= false;
return unlog(cookie, 1); return unlog(thd, cookie, 1);
} }

View file

@ -63,7 +63,8 @@ class TC_LOG
virtual int log_and_order(THD *thd, my_xid xid, bool all, virtual int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_prepare_ordered,
bool need_commit_ordered) = 0; bool need_commit_ordered) = 0;
virtual int unlog(ulong cookie, my_xid xid)=0; virtual int unlog(THD *thd, ulong cookie, my_xid xid)=0;
virtual int log_xa_prepare(THD *thd, bool all)= 0;
virtual int unlog_xa_prepare(THD *thd, bool all)= 0; virtual int unlog_xa_prepare(THD *thd, bool all)= 0;
virtual void commit_checkpoint_notify(void *cookie)= 0; virtual void commit_checkpoint_notify(void *cookie)= 0;
@ -118,7 +119,11 @@ public:
DBUG_ASSERT(0); DBUG_ASSERT(0);
return 1; return 1;
} }
int unlog(ulong cookie, my_xid xid) override { return 0; } int unlog(THD *thd, ulong cookie, my_xid xid) override { return 0; }
int log_xa_prepare(THD *thd, bool all) override
{
return 0;
}
int unlog_xa_prepare(THD *thd, bool all) override int unlog_xa_prepare(THD *thd, bool all) override
{ {
return 0; return 0;
@ -205,7 +210,11 @@ class TC_LOG_MMAP: public TC_LOG
void close() override; void close() override;
int log_and_order(THD *thd, my_xid xid, bool all, int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_commit_ordered) override; bool need_prepare_ordered, bool need_commit_ordered) override;
int unlog(ulong cookie, my_xid xid) override; int unlog(THD *thd, ulong cookie, my_xid xid) override;
int log_xa_prepare(THD *thd, bool all) override
{
return 0;
}
int unlog_xa_prepare(THD *thd, bool all) override int unlog_xa_prepare(THD *thd, bool all) override
{ {
return 0; return 0;
@ -581,6 +590,10 @@ private:
case where there is no corresponding binlog id (since nothing was logged). case where there is no corresponding binlog id (since nothing was logged).
And we need an error flag to mark that unlog() must return failure. And we need an error flag to mark that unlog() must return failure.
For --binlog-storage-engine, we need unlog if another engine than the
binlog engine participated in the transaction, or if we did a user XA
commit.
We use the following macros to pack all of this information into the single We use the following macros to pack all of this information into the single
ulong available with log_and_order() / unlog(). ulong available with log_and_order() / unlog().
@ -589,15 +602,20 @@ private:
*/ */
#define BINLOG_COOKIE_ERROR_RETURN 0 #define BINLOG_COOKIE_ERROR_RETURN 0
#define BINLOG_COOKIE_DUMMY_ID 1 #define BINLOG_COOKIE_DUMMY_ID 1
#define BINLOG_COOKIE_BASE 2 #define BINLOG_COOKIE_ENGINE_UNLOG_ID 2
#define BINLOG_COOKIE_BASE 3
#define BINLOG_COOKIE_DUMMY(error_flag) \ #define BINLOG_COOKIE_DUMMY(error_flag) \
( (BINLOG_COOKIE_DUMMY_ID<<1) | ((error_flag)&1) ) ( (BINLOG_COOKIE_DUMMY_ID<<1) | ((error_flag)&1) )
#define BINLOG_COOKIE_ENGINE_UNLOG(error_flag) \
( (BINLOG_COOKIE_ENGINE_UNLOG_ID<<1) | ((error_flag)&1) )
#define BINLOG_COOKIE_MAKE(id, error_flag) \ #define BINLOG_COOKIE_MAKE(id, error_flag) \
( (((id)+BINLOG_COOKIE_BASE)<<1) | ((error_flag)&1) ) ( (((id)+BINLOG_COOKIE_BASE)<<1) | ((error_flag)&1) )
#define BINLOG_COOKIE_GET_ERROR_FLAG(c) ((c) & 1) #define BINLOG_COOKIE_GET_ERROR_FLAG(c) ((c) & 1)
#define BINLOG_COOKIE_GET_ID(c) ( ((ulong)(c)>>1) - BINLOG_COOKIE_BASE ) #define BINLOG_COOKIE_GET_ID(c) ( ((ulong)(c)>>1) - BINLOG_COOKIE_BASE )
#define BINLOG_COOKIE_IS_DUMMY(c) \ #define BINLOG_COOKIE_IS_DUMMY(c) \
( ((ulong)(c)>>1) == BINLOG_COOKIE_DUMMY_ID ) ( ((ulong)(c)>>1) == BINLOG_COOKIE_DUMMY_ID )
#define BINLOG_COOKIE_IS_ENGINE_UNLOG(c) \
( ((ulong)(c)>>1) == BINLOG_COOKIE_ENGINE_UNLOG_ID )
class binlog_cache_mngr; class binlog_cache_mngr;
@ -934,7 +952,8 @@ public:
ulong next_log_number) override; ulong next_log_number) override;
int log_and_order(THD *thd, my_xid xid, bool all, int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_commit_ordered) override; bool need_prepare_ordered, bool need_commit_ordered) override;
int unlog(ulong cookie, my_xid xid) override; int unlog(THD *thd, ulong cookie, my_xid xid) override;
int log_xa_prepare(THD *thd, bool all) override;
int unlog_xa_prepare(THD *thd, bool all) override; int unlog_xa_prepare(THD *thd, bool all) override;
void commit_checkpoint_notify(void *cookie) override; void commit_checkpoint_notify(void *cookie) override;
int recover(LOG_INFO *linfo, const char *last_log_name, IO_CACHE *first_log, int recover(LOG_INFO *linfo, const char *last_log_name, IO_CACHE *first_log,
@ -1532,8 +1551,12 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list);
int binlog_commit(THD *thd, bool all, bool is_ro_1pc= false); int binlog_commit(THD *thd, bool all, bool is_ro_1pc= false);
int binlog_rollback(handlerton *hton, THD *thd, bool all); int binlog_rollback(handlerton *hton, THD *thd, bool all);
void binlog_post_commit(THD *thd, bool all);
void binlog_post_rollback(THD *thd, bool all);
int binlog_commit_by_xid(handlerton *hton, XID *xid); int binlog_commit_by_xid(handlerton *hton, XID *xid);
int binlog_rollback_by_xid(handlerton *hton, XID *xid); int binlog_rollback_by_xid(handlerton *hton, XID *xid);
void binlog_post_commit_by_xid(handlerton *hton, XID *xid);
void binlog_post_rollback_by_xid(handlerton *hton, XID *xid);
bool write_bin_log_start_alter(THD *thd, bool& partial_alter, bool write_bin_log_start_alter(THD *thd, bool& partial_alter,
uint64 start_alter_id, bool log_if_exists); uint64 start_alter_id, bool log_if_exists);
#endif /* LOG_H */ #endif /* LOG_H */

View file

@ -27,7 +27,7 @@ class binlog_cache_data
{ {
public: public:
binlog_cache_data(bool precompute_checksums): binlog_cache_data(bool precompute_checksums):
engine_binlog_info {0, 0, 0, 0}, engine_binlog_info {0, 0, 0, 0, 0, 0, 0, 0},
before_stmt_pos(MY_OFF_T_UNDEF), m_pending(0), status(0), before_stmt_pos(MY_OFF_T_UNDEF), m_pending(0), status(0),
incident(FALSE), precompute_checksums(precompute_checksums), incident(FALSE), precompute_checksums(precompute_checksums),
saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0), saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0),
@ -122,8 +122,10 @@ public:
(*opt_binlog_engine_hton->binlog_oob_reset) (*opt_binlog_engine_hton->binlog_oob_reset)
(&engine_binlog_info.engine_ptr); (&engine_binlog_info.engine_ptr);
engine_binlog_info.engine_ptr2= nullptr; engine_binlog_info.engine_ptr2= nullptr;
engine_binlog_info.xa_xid= nullptr;
engine_binlog_info.out_of_band_offset= 0; engine_binlog_info.out_of_band_offset= 0;
engine_binlog_info.gtid_offset= 0; engine_binlog_info.gtid_offset= 0;
engine_binlog_info.internal_xa= false;
/* Preserve the engine_ptr for the engine to re-use, was reset above. */ /* Preserve the engine_ptr for the engine to re-use, was reset above. */
truncate(cache_log.pos_in_file); truncate(cache_log.pos_in_file);

View file

@ -68,7 +68,7 @@ void set_thd_stage_info(void *thd,
(thd)->enter_stage(&stage, __func__, __FILE__, __LINE__) (thd)->enter_stage(&stage, __func__, __FILE__, __LINE__)
#include "my_apc.h" #include "my_apc.h"
#include "rpl_gtid.h" #include "rpl_gtid_base.h"
#include "wsrep.h" #include "wsrep.h"
#include "wsrep_on.h" #include "wsrep_on.h"

View file

@ -442,13 +442,6 @@ bool trans_xa_start(THD *thd)
{ {
DBUG_ENTER("trans_xa_start"); DBUG_ENTER("trans_xa_start");
if (opt_binlog_engine_hton)
{
my_error(ER_NOT_YET_SUPPORTED_ENGINE_BINLOG, MYF(0),
"Explicit XA transaction");
DBUG_RETURN(TRUE);
}
if (thd->transaction->xid_state.is_explicit_XA() && if (thd->transaction->xid_state.is_explicit_XA() &&
thd->transaction->xid_state.xid_cache_element->xa_state == XA_IDLE && thd->transaction->xid_state.xid_cache_element->xa_state == XA_IDLE &&
thd->lex->xa_opt == XA_RESUME) thd->lex->xa_opt == XA_RESUME)

View file

@ -296,13 +296,20 @@ fsp_binlog_page_fifo::release_page_mtr(fsp_binlog_page_entry *page, mtr_t *mtr)
if (!page->last_page) if (!page->last_page)
return release_page(page); return release_page(page);
/*
Check against having two pending last-in-binlog-file pages to release.
But allow to have the same page released twice in a single mtr (this can
happen when 2-phase commit puts an XID/XA complete record just in front
of the commit record).
*/
fsp_binlog_page_entry *old_page= mtr->get_binlog_page(); fsp_binlog_page_entry *old_page= mtr->get_binlog_page();
ut_ad(!old_page); ut_ad(!(old_page != nullptr && old_page != page));
if (UNIV_UNLIKELY(old_page != nullptr)) if (UNIV_UNLIKELY(old_page != nullptr))
{ {
sql_print_error("InnoDB: Internal inconsistency with mini-transaction that " if (UNIV_UNLIKELY(old_page != page))
"spans more than two binlog files. Recovery may be " sql_print_error("InnoDB: Internal inconsistency with mini-transaction "
"affected until the next checkpoint."); "that spans more than two binlog files. Recovery may "
"be affected until the next checkpoint.");
release_page(old_page); release_page(old_page);
} }
mtr->set_binlog_page(page); mtr->set_binlog_page(page);
@ -968,25 +975,29 @@ ibb_file_oob_refs::remove_up_to(uint64_t file_no, LF_PINS *pins)
bool bool
ibb_file_oob_refs::oob_ref_inc(uint64_t file_no, LF_PINS *pins) ibb_file_oob_refs::oob_ref_inc(uint64_t file_no, LF_PINS *pins, bool do_xa)
{
ibb_tblspc_entry *e= static_cast<ibb_tblspc_entry *>
(lf_hash_search(&hash, pins, &file_no, sizeof(file_no)));
if (!e)
return false;
e->oob_refs.fetch_add(1, std::memory_order_acquire);
lf_hash_search_unpin(pins);
return true;
}
bool
ibb_file_oob_refs::oob_ref_dec(uint64_t file_no, LF_PINS *pins)
{ {
ibb_tblspc_entry *e= static_cast<ibb_tblspc_entry *> ibb_tblspc_entry *e= static_cast<ibb_tblspc_entry *>
(lf_hash_search(&hash, pins, &file_no, sizeof(file_no))); (lf_hash_search(&hash, pins, &file_no, sizeof(file_no)));
if (!e) if (!e)
return true; return true;
if (UNIV_UNLIKELY(do_xa))
e->xa_refs.fetch_add(1, std::memory_order_acquire);
e->oob_refs.fetch_add(1, std::memory_order_acquire);
lf_hash_search_unpin(pins);
return false;
}
bool
ibb_file_oob_refs::oob_ref_dec(uint64_t file_no, LF_PINS *pins, bool do_xa)
{
ibb_tblspc_entry *e= static_cast<ibb_tblspc_entry *>
(lf_hash_search(&hash, pins, &file_no, sizeof(file_no)));
if (!e)
return true;
if (UNIV_UNLIKELY(do_xa))
e->xa_refs.fetch_sub(1, std::memory_order_acquire);
uint64_t refcnt= e->oob_refs.fetch_sub(1, std::memory_order_acquire) - 1; uint64_t refcnt= e->oob_refs.fetch_sub(1, std::memory_order_acquire) - 1;
lf_hash_search_unpin(pins); lf_hash_search_unpin(pins);
ut_ad(refcnt != (uint64_t)0 - 1); ut_ad(refcnt != (uint64_t)0 - 1);
@ -1076,6 +1087,25 @@ ibb_file_oob_refs::get_oob_ref_file_no(uint64_t file_no, LF_PINS *pins,
} }
/*
Check if a file_no contains oob data that is needed by an active
(ie. not committed) transaction. This is seen simply as having refcount
greater than 0.
*/
bool
ibb_file_oob_refs::get_oob_ref_in_use(uint64_t file_no, LF_PINS *pins)
{
ibb_tblspc_entry *e= static_cast<ibb_tblspc_entry *>
(lf_hash_search(&hash, pins, &file_no, sizeof(file_no)));
if (!e)
return false;
uint64_t refcnt= e->oob_refs.load(std::memory_order_relaxed);
lf_hash_search_unpin(pins);
return refcnt > 0;
}
bool bool
ibb_record_in_file_hash(uint64_t file_no, uint64_t oob_ref, uint64_t xa_ref, ibb_record_in_file_hash(uint64_t file_no, uint64_t oob_ref, uint64_t xa_ref,
LF_PINS *in_pins) LF_PINS *in_pins)

View file

@ -527,6 +527,7 @@ mysql_pfs_key_t fsp_binlog_durable_mutex_key;
mysql_pfs_key_t fsp_binlog_durable_cond_key; mysql_pfs_key_t fsp_binlog_durable_cond_key;
mysql_pfs_key_t fsp_purge_binlog_mutex_key; mysql_pfs_key_t fsp_purge_binlog_mutex_key;
mysql_pfs_key_t fsp_page_fifo_mutex_key; mysql_pfs_key_t fsp_page_fifo_mutex_key;
mysql_pfs_key_t ibb_xid_hash_mutex_key;
/* all_innodb_mutexes array contains mutexes that are /* all_innodb_mutexes array contains mutexes that are
performance schema instrumented if "UNIV_PFS_MUTEX" performance schema instrumented if "UNIV_PFS_MUTEX"
@ -4074,6 +4075,13 @@ static int innodb_init(void* p)
innobase_hton->binlog_savepoint_rollback= ibb_savepoint_rollback; innobase_hton->binlog_savepoint_rollback= ibb_savepoint_rollback;
innobase_hton->binlog_oob_reset= innodb_reset_oob; innobase_hton->binlog_oob_reset= innodb_reset_oob;
innobase_hton->binlog_oob_free= innodb_free_oob; innobase_hton->binlog_oob_free= innodb_free_oob;
innobase_hton->binlog_write_xa_prepare_ordered=
ibb_write_xa_prepare_ordered;
innobase_hton->binlog_write_xa_prepare= ibb_write_xa_prepare;
innobase_hton->binlog_xa_rollback_ordered=
ibb_xa_rollback_ordered;
innobase_hton->binlog_xa_rollback= ibb_xa_rollback;
innobase_hton->binlog_unlog= ibb_binlog_unlog;
innobase_hton->get_binlog_reader= innodb_get_binlog_reader; innobase_hton->get_binlog_reader= innodb_get_binlog_reader;
innobase_hton->get_binlog_file_list= innodb_get_binlog_file_list; innobase_hton->get_binlog_file_list= innodb_get_binlog_file_list;
innobase_hton->get_filename= ibb_get_filename; innobase_hton->get_filename= ibb_get_filename;
@ -4383,8 +4391,8 @@ innobase_commit_ordered(
DBUG_ASSERT(all || DBUG_ASSERT(all ||
(!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))); (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)));
innobase_commit_ordered_2(trx, thd);
trx->active_commit_ordered = true; trx->active_commit_ordered = true;
innobase_commit_ordered_2(trx, thd);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }

View file

@ -43,6 +43,9 @@ InnoDB implementation of binlog.
#include "log.h" #include "log.h"
class ibb_xid_hash;
static int innodb_binlog_inited= 0; static int innodb_binlog_inited= 0;
pending_lsn_fifo ibb_pending_lsn_fifo; pending_lsn_fifo ibb_pending_lsn_fifo;
@ -89,6 +92,8 @@ size_t total_binlog_used_size;
static bool purge_warning_given= false; static bool purge_warning_given= false;
/** References to pending XA PREPARED transactions in the binlog. */
static ibb_xid_hash *ibb_xa_xid_hash;
#ifdef UNIV_PFS_THREAD #ifdef UNIV_PFS_THREAD
mysql_pfs_key_t binlog_prealloc_thread_key; mysql_pfs_key_t binlog_prealloc_thread_key;
@ -161,6 +166,8 @@ struct binlog_oob_context {
decrement again at commit record write or reset/rollback. decrement again at commit record write or reset/rollback.
*/ */
bool pending_refcount; bool pending_refcount;
/* Set when the transaction is sealed after writing an XA PREPARE record. */
bool is_xa_prepared;
/* /*
The node_list contains the root of each tree in the forest of perfect The node_list contains the root of each tree in the forest of perfect
binary trees. binary trees.
@ -304,6 +311,32 @@ public:
}; };
/**
Class that keeps track of the oob references etc. for each
XA PREPAREd XID.
*/
class ibb_xid_hash {
public:
struct xid_elem {
XID xid;
uint64_t refcnt_file_no;
uint64_t oob_num_nodes;
uint64_t oob_first_file_no;
uint64_t oob_first_offset;
uint64_t oob_last_file_no;
uint64_t oob_last_offset;
};
HASH xid_hash;
mysql_mutex_t xid_mutex;
ibb_xid_hash();
~ibb_xid_hash();
bool add_xid(const XID *xid, const binlog_oob_context *c);
xid_elem *grab_xid(const XID *xid);
template <typename F> bool run_on_xid(const XID *xid, F callback);
};
struct chunk_data_cache : public chunk_data_base { struct chunk_data_cache : public chunk_data_base {
IO_CACHE *cache; IO_CACHE *cache;
binlog_oob_context *oob_ctx; binlog_oob_context *oob_ctx;
@ -333,7 +366,40 @@ struct chunk_data_cache : public chunk_data_base {
unsigned char *p= header_buf; unsigned char *p= header_buf;
ut_ad(c); ut_ad(c);
oob_ctx= c; oob_ctx= c;
if (c && c->node_list_len) if (UNIV_UNLIKELY(!c))
;
else if (UNIV_UNLIKELY(binlog_info->xa_xid != nullptr) &&
!binlog_info->internal_xa)
{
/*
For explicit user XA COMMIT, the commit record must point to the
OOB data previously saved in XA PREPARE.
*/
bool err= ibb_xa_xid_hash->run_on_xid(binlog_info->xa_xid,
[&p](const ibb_xid_hash::xid_elem *elem) -> bool {
if (UNIV_LIKELY(elem->oob_num_nodes > 0))
{
p= compr_int_write(p, elem->oob_num_nodes);
p= compr_int_write(p, elem->oob_first_file_no);
p= compr_int_write(p, elem->oob_first_offset);
p= compr_int_write(p, elem->oob_last_file_no);
p= compr_int_write(p, elem->oob_last_offset);
p= compr_int_write(p, 0);
}
else
p= compr_int_write(p, 0);
return false;
});
/*
The XID must always be found, else we have a serious
inconsistency between the server layer and binlog state.
In case of inconsistency, better crash than leave a corrupt
binlog.
*/
ut_a(!err);
ut_ad(binlog_info->engine_ptr2 == nullptr);
}
else if (c->node_list_len)
{ {
/* /*
Link to the out-of-band data. First store the number of nodes; then Link to the out-of-band data. First store the number of nodes; then
@ -380,6 +446,18 @@ struct chunk_data_cache : public chunk_data_base {
ut_ad (cache->pos_in_file <= binlog_info->out_of_band_offset); ut_ad (cache->pos_in_file <= binlog_info->out_of_band_offset);
if (UNIV_UNLIKELY(binlog_info->internal_xa))
{
/*
Insert the XID for the internal 2-phase commit in the xid_hash,
incrementing the reference count. This will ensure we hold on to
the commit record until ibb_binlog_unlog() is called, at which point
the other participating storage engine(s) have durably committed.
*/
bool err= ibb_xa_xid_hash->add_xid(binlog_info->xa_xid, c);
ut_a(!err);
}
/* Start with the GTID event, which is put at the end of the IO_CACHE. */ /* Start with the GTID event, which is put at the end of the IO_CACHE. */
my_bool res= reinit_io_cache(cache, READ_CACHE, binlog_info->gtid_offset, 0, 0); my_bool res= reinit_io_cache(cache, READ_CACHE, binlog_info->gtid_offset, 0, 0);
ut_a(!res /* ToDo: Error handling. */); ut_a(!res /* ToDo: Error handling. */);
@ -455,6 +533,94 @@ struct chunk_data_cache : public chunk_data_base {
}; };
template<uint32_t bufsize_>
struct chunk_data_from_buf : public chunk_data_base {
static constexpr uint32_t bufsize= bufsize_;
uint32_t data_remain;
uint32_t data_sofar;
byte buffer[bufsize];
chunk_data_from_buf() : data_sofar(0)
{
/* data_remain must be initialized in derived class constructor. */
}
virtual std::pair<uint32_t, bool> copy_data(byte *p, uint32_t max_len) final
{
if (UNIV_UNLIKELY(data_remain <= 0))
return {0, true};
uint32_t size= data_remain > max_len ? max_len : data_remain;
memcpy(p, buffer + data_sofar, size);
data_remain-= size;
data_sofar+= size;
return {size, data_remain == 0};
}
~chunk_data_from_buf() { }
};
/**
Record data for the XA prepare record.
Size needed for the record data:
1 byte type/flag.
1 byte engine count.
4 bytes formatID
1 byte gtrid length
1 byte bqual length
128 bytes (max) gtrid and bqual strings.
*/
struct chunk_data_xa_prepare :
public chunk_data_from_buf<1 + 1 + 4 + 1 + 1 + 128> {
chunk_data_xa_prepare(const XID *xid, uchar engine_count)
{
/* ToDo: Need the correct data here, like the oob references. To be done when we start doing the XA crash recovery. */
buffer[0]= 42 /* ToDo */;
buffer[1]= engine_count;
int4store(&buffer[2], xid->formatID);
ut_a(xid->gtrid_length >= 0 && xid->gtrid_length <= 64);
buffer[6]= (uchar)xid->gtrid_length;
ut_a(xid->bqual_length >= 0 && xid->bqual_length <= 64);
buffer[7]= (uchar)xid->bqual_length;
memcpy(&buffer[8], &xid->data[0], xid->gtrid_length + xid->bqual_length);
data_remain=
static_cast<uint32_t>(8 + xid->gtrid_length + xid->bqual_length);
}
~chunk_data_xa_prepare() { }
};
/**
Record data for the XA COMMIT or XA ROLLBACK record.
Size needed for the record data:
1 byte type/flag.
4 bytes formatID
1 byte gtrid length
1 byte bqual length
128 bytes (max) gtrid and bqual strings.
*/
struct chunk_data_xa_complete :
public chunk_data_from_buf<1 + 4 + 1 + 1 + 128> {
chunk_data_xa_complete(const XID *xid, bool is_commit)
{
buffer[0]= (is_commit ? IBB_FL_XA_TYPE_COMMIT : IBB_FL_XA_TYPE_ROLLBACK);
int4store(&buffer[1], xid->formatID);
ut_a(xid->gtrid_length >= 0 && xid->gtrid_length <= 64);
buffer[5]= (uchar)xid->gtrid_length;
ut_a(xid->bqual_length >= 0 && xid->bqual_length <= 64);
buffer[6]= (uchar)xid->bqual_length;
memcpy(&buffer[7], &xid->data[0], xid->gtrid_length + xid->bqual_length);
data_remain=
static_cast<uint32_t>(7 + xid->gtrid_length + xid->bqual_length);
}
~chunk_data_xa_complete() { }
};
class gtid_search { class gtid_search {
public: public:
gtid_search(); gtid_search();
@ -1232,6 +1398,8 @@ innodb_binlog_startup_init()
fsp_binlog_init(); fsp_binlog_init();
mysql_mutex_init(fsp_purge_binlog_mutex_key, &purge_binlog_mutex, nullptr); mysql_mutex_init(fsp_purge_binlog_mutex_key, &purge_binlog_mutex, nullptr);
binlog_diff_state.init(); binlog_diff_state.init();
ibb_xa_xid_hash= new ibb_xid_hash();
innodb_binlog_inited= 1; innodb_binlog_inited= 1;
} }
@ -1753,6 +1921,7 @@ void innodb_binlog_close(bool shutdown)
if (shutdown && innodb_binlog_inited >= 1) if (shutdown && innodb_binlog_inited >= 1)
{ {
delete ibb_xa_xid_hash;
binlog_diff_state.free(); binlog_diff_state.free();
fsp_binlog_shutdown(); fsp_binlog_shutdown();
mysql_mutex_destroy(&purge_binlog_mutex); mysql_mutex_destroy(&purge_binlog_mutex);
@ -1914,10 +2083,15 @@ serialize_gtid_state(rpl_binlog_state_base *state, byte *buf, size_t buf_size)
unsigned char *p= (unsigned char *)buf; unsigned char *p= (unsigned char *)buf;
/* /*
1 uint64_t for the number of entries in the state stored. 1 uint64_t for the number of entries in the state stored.
1 uint64_t for the XA references file_no.
2 uint32_t + 1 uint64_t for at least one GTID. 2 uint32_t + 1 uint64_t for at least one GTID.
*/ */
ut_ad(buf_size >= 2*COMPR_INT_MAX32 + 2*COMPR_INT_MAX64); ut_ad(buf_size >= 2*COMPR_INT_MAX32 + 3*COMPR_INT_MAX64);
p= compr_int_write(p, state->count_nolock()); p= compr_int_write(p, state->count_nolock());
uint64_t xa_ref_file_no=
ibb_file_hash.earliest_xa_ref.load(std::memory_order_relaxed);
/* Write 1 +file_no, so that 0 (1 + ~0) means "no reference". */
p= compr_int_write(p, xa_ref_file_no + 1);
unsigned char * const pmax= unsigned char * const pmax=
p + (buf_size - (2*COMPR_INT_MAX32 + COMPR_INT_MAX64)); p + (buf_size - (2*COMPR_INT_MAX32 + COMPR_INT_MAX64));
@ -1958,7 +2132,7 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
} }
else else
{ {
size_t buf_size= size_t buf_size= 2*COMPR_INT_MAX64 +
state->count_nolock() * (2*COMPR_INT_MAX32 + COMPR_INT_MAX64); state->count_nolock() * (2*COMPR_INT_MAX32 + COMPR_INT_MAX64);
alloced_buf= static_cast<byte *>(ut_malloc(buf_size, mem_key_binlog)); alloced_buf= static_cast<byte *>(ut_malloc(buf_size, mem_key_binlog));
if (UNIV_UNLIKELY(!alloced_buf)) if (UNIV_UNLIKELY(!alloced_buf))
@ -2052,10 +2226,11 @@ binlog_gtid_state(rpl_binlog_state_base *state, mtr_t *mtr,
*/ */
static int static int
read_gtid_state(binlog_chunk_reader *chunk_reader, read_gtid_state(binlog_chunk_reader *chunk_reader,
rpl_binlog_state_base *state) noexcept rpl_binlog_state_base *state,
uint64_t *out_xa_ref_file_no) noexcept
{ {
byte buf[256]; byte buf[256];
static_assert(sizeof(buf) >= 6*COMPR_INT_MAX64, static_assert(sizeof(buf) >= 2*COMPR_INT_MAX64 + 6*COMPR_INT_MAX64,
"buf must hold at least 2 GTIDs"); "buf must hold at least 2 GTIDs");
int res= chunk_reader->read_data(buf, sizeof(buf), true); int res= chunk_reader->read_data(buf, sizeof(buf), true);
if (UNIV_UNLIKELY(res < 0)) if (UNIV_UNLIKELY(res < 0))
@ -2070,9 +2245,19 @@ read_gtid_state(binlog_chunk_reader *chunk_reader,
p= v_and_p.second; p= v_and_p.second;
if (UNIV_UNLIKELY(p > p_end)) if (UNIV_UNLIKELY(p > p_end))
return -1; return -1;
uint64_t num_gtid= v_and_p.first;
/*
Read the earliest file_no containing pending XA if any.
Note that unsigned underflow means 0 - 1 becomes ~0, as required.
*/
v_and_p= compr_int_read(p);
p= v_and_p.second;
if (UNIV_UNLIKELY(p > p_end))
return -1;
*out_xa_ref_file_no= v_and_p.first - 1;
/* Read each GTID one by one and add into the state. */ /* Read each GTID one by one and add into the state. */
for (uint64_t count= v_and_p.first; count > 0; --count) for (uint64_t count= num_gtid; count > 0; --count)
{ {
ptrdiff_t remain= p_end - p; ptrdiff_t remain= p_end - p;
/* Read more data as needed to ensure we have read a full GTID. */ /* Read more data as needed to ensure we have read a full GTID. */
@ -2140,6 +2325,7 @@ binlog_state_recover()
uint64_t active= active_binlog_file_no.load(std::memory_order_relaxed); uint64_t active= active_binlog_file_no.load(std::memory_order_relaxed);
uint64_t diff_state_interval= current_binlog_state_interval; uint64_t diff_state_interval= current_binlog_state_interval;
uint32_t page_no= 1; uint32_t page_no= 1;
uint64_t xa_ref_file_no;
binlog_chunk_reader chunk_reader(binlog_cur_end_offset); binlog_chunk_reader chunk_reader(binlog_cur_end_offset);
byte *page_buf= byte *page_buf=
@ -2148,7 +2334,7 @@ binlog_state_recover()
return true; return true;
chunk_reader.set_page_buf(page_buf); chunk_reader.set_page_buf(page_buf);
chunk_reader.seek(active, page_no << ibb_page_size_shift); chunk_reader.seek(active, page_no << ibb_page_size_shift);
int res= read_gtid_state(&chunk_reader, &state); int res= read_gtid_state(&chunk_reader, &state, &xa_ref_file_no);
if (res < 0) if (res < 0)
{ {
ut_free(page_buf); ut_free(page_buf);
@ -2166,7 +2352,7 @@ binlog_state_recover()
while (page_no > 1) while (page_no > 1)
{ {
chunk_reader.seek(active, page_no << ibb_page_size_shift); chunk_reader.seek(active, page_no << ibb_page_size_shift);
res= read_gtid_state(&chunk_reader, &state); res= read_gtid_state(&chunk_reader, &state, &xa_ref_file_no);
if (res > 0) if (res > 0)
break; break;
page_no-= (uint32_t)diff_state_interval; page_no-= (uint32_t)diff_state_interval;
@ -2203,6 +2389,7 @@ alloc_oob_context(uint32 list_length= 10)
c->node_list_len= 0; c->node_list_len= 0;
c->secondary_ctx= nullptr; c->secondary_ctx= nullptr;
c->pending_refcount= false; c->pending_refcount= false;
c->is_xa_prepared= false;
} }
else else
my_error(ER_OUTOFMEMORY, MYF(0), needed); my_error(ER_OUTOFMEMORY, MYF(0), needed);
@ -2220,6 +2407,21 @@ innodb_binlog_write_cache(IO_CACHE *cache,
if (!c) if (!c)
binlog_info->engine_ptr= c= alloc_oob_context(); binlog_info->engine_ptr= c= alloc_oob_context();
ut_a(c); ut_a(c);
if (unlikely(binlog_info->xa_xid))
{
/*
Write an XID commit record just before the main commit record.
The XID commit record just contains the XID, and is used by binlog XA
crash recovery to ensure than the other storage engine(s) that are part
of the transaciton commit or rollback consistently with the binlog
engine.
*/
chunk_data_xa_complete chunk_data2(binlog_info->xa_xid, true);
fsp_binlog_write_rec(&chunk_data2, mtr, FSP_BINLOG_TYPE_XA_COMPLETE,
c->lf_pins);
}
chunk_data_cache chunk_data(cache, binlog_info); chunk_data_cache chunk_data(cache, binlog_info);
fsp_binlog_write_rec(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT, c->lf_pins); fsp_binlog_write_rec(&chunk_data, mtr, FSP_BINLOG_TYPE_COMMIT, c->lf_pins);
@ -2249,6 +2451,7 @@ reset_oob_context(binlog_oob_context *c)
} }
c->node_list_len= 0; c->node_list_len= 0;
c->secondary_ctx= nullptr; c->secondary_ctx= nullptr;
c->is_xa_prepared= false;
} }
@ -2353,6 +2556,11 @@ innodb_binlog_oob_ordered(THD *thd, const unsigned char *data, size_t data_len,
*engine_data= c= alloc_oob_context(); *engine_data= c= alloc_oob_context();
if (UNIV_UNLIKELY(!c)) if (UNIV_UNLIKELY(!c))
return true; return true;
if (UNIV_UNLIKELY(c->is_xa_prepared))
{
my_error(ER_XAER_RMFAIL, MYF(0), "IDLE");
return true;
}
if (stm_start_data) if (stm_start_data)
{ {
@ -2415,7 +2623,7 @@ innodb_binlog_oob_ordered(THD *thd, const unsigned char *data, size_t data_len,
c->first_node_offset= c->node_list[i].offset; c->first_node_offset= c->node_list[i].offset;
c->node_list_len= 1; c->node_list_len= 1;
c->pending_refcount= c->pending_refcount=
ibb_file_hash.oob_ref_inc(c->first_node_file_no, c->lf_pins); !ibb_file_hash.oob_ref_inc(c->first_node_file_no, c->lf_pins);
} }
uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed); uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed);
@ -3191,6 +3399,7 @@ gtid_search::find_gtid_pos(slave_connection_state *pos,
rpl_binlog_state_base *out_state, rpl_binlog_state_base *out_state,
uint64_t *out_file_no, uint64_t *out_offset) uint64_t *out_file_no, uint64_t *out_offset)
{ {
uint64_t dummy_xa_ref;
/* /*
Dirty read, but getting a slightly stale value is no problem, we will just Dirty read, but getting a slightly stale value is no problem, we will just
be starting to scan the binlog file at a slightly earlier position than be starting to scan the binlog file at a slightly earlier position than
@ -3223,7 +3432,7 @@ gtid_search::find_gtid_pos(slave_connection_state *pos,
diff_state_page_interval= header.diff_state_interval; diff_state_page_interval= header.diff_state_interval;
chunk_reader.seek(file_no, ibb_page_size); chunk_reader.seek(file_no, ibb_page_size);
int res= read_gtid_state(&chunk_reader, &base_state); int res= read_gtid_state(&chunk_reader, &base_state, &dummy_xa_ref);
if (UNIV_UNLIKELY(res < 0)) if (UNIV_UNLIKELY(res < 0))
return -1; return -1;
if (res == 0) if (res == 0)
@ -3272,7 +3481,7 @@ gtid_search::find_gtid_pos(slave_connection_state *pos,
tmp_diff_state.reset_nolock(); tmp_diff_state.reset_nolock();
tmp_diff_state.load_nolock(&base_state); tmp_diff_state.load_nolock(&base_state);
chunk_reader.seek(file_no, page1 << ibb_page_size_shift); chunk_reader.seek(file_no, page1 << ibb_page_size_shift);
int res= read_gtid_state(&chunk_reader, &tmp_diff_state); int res= read_gtid_state(&chunk_reader, &tmp_diff_state, &dummy_xa_ref);
if (UNIV_UNLIKELY(res < 0)) if (UNIV_UNLIKELY(res < 0))
return -1; return -1;
if (res == 0) if (res == 0)
@ -3572,6 +3781,122 @@ pending_lsn_fifo::add_to_fifo(uint64_t lsn, uint64_t file_no, uint64_t offset)
} }
static const uchar *get_xid_hash_key(const void *p, size_t *out_len, my_bool)
{
const XID *xid= &(reinterpret_cast<const ibb_xid_hash::xid_elem *>(p)->xid);
*out_len= xid->key_length();
return xid->key();
}
ibb_xid_hash::ibb_xid_hash()
{
mysql_mutex_init(ibb_xid_hash_mutex_key, &xid_mutex, nullptr);
my_hash_init(mem_key_binlog, &xid_hash, &my_charset_bin, 32, 0,
sizeof(XID), get_xid_hash_key, nullptr, MYF(HASH_UNIQUE));
}
ibb_xid_hash::~ibb_xid_hash()
{
for (uint32 i= 0; i < xid_hash.records; ++i)
my_free(my_hash_element(&xid_hash, i));
my_hash_free(&xid_hash);
mysql_mutex_destroy(&xid_mutex);
}
bool
ibb_xid_hash::add_xid(const XID *xid, const binlog_oob_context *c)
{
xid_elem *e=
(xid_elem *)my_malloc(mem_key_binlog, sizeof(xid_elem), MYF(MY_WME));
if (!e)
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(xid_elem));
return true;
}
e->xid.set(xid);
uint64_t refcnt_file_no;
if (UNIV_LIKELY(c->node_list_len > 0))
{
uint32_t last= c->node_list_len-1;
e->oob_num_nodes= c->node_list[last].node_index + 1;
e->oob_first_file_no= c->first_node_file_no;
e->oob_first_offset= c->first_node_offset;
e->oob_last_file_no= c->node_list[last].file_no;
e->oob_last_offset= c->node_list[last].offset;
refcnt_file_no= e->oob_first_file_no;
}
else
{
e->oob_num_nodes= 0;
e->oob_first_file_no= 0;
e->oob_first_offset= 0;
e->oob_last_file_no= 0;
e->oob_last_offset= 0;
/*
Empty XA transaction, but we still need to ensure the prepare record
is kept until the (empty) transactions gets XA COMMMIT'ted.
*/
refcnt_file_no= active_binlog_file_no.load(std::memory_order_acquire);
}
e->refcnt_file_no= refcnt_file_no;
mysql_mutex_lock(&xid_mutex);
if (my_hash_insert(&xid_hash, (uchar *)e))
{
mysql_mutex_unlock(&xid_mutex);
my_free(e);
return true;
}
mysql_mutex_unlock(&xid_mutex);
ibb_file_hash.oob_ref_inc(refcnt_file_no, c->lf_pins, true);
return false;
}
template <typename F> bool
ibb_xid_hash::run_on_xid(const XID *xid, F callback)
{
size_t key_len= 0;
const uchar *key_ptr= get_xid_hash_key(xid, &key_len, 1);
bool err;
mysql_mutex_lock(&xid_mutex);
uchar *rec= my_hash_search(&xid_hash, key_ptr, key_len);
if (UNIV_LIKELY(rec != nullptr))
{
err= callback(reinterpret_cast<xid_elem *>(rec));
}
else
err= true;
mysql_mutex_unlock(&xid_mutex);
return err;
}
/*
Look up an XID in the internal XID hash.
Remove the entry found (if any) and return it.
*/
ibb_xid_hash::xid_elem *
ibb_xid_hash::grab_xid(const XID *xid)
{
xid_elem *e= nullptr;
size_t key_len= 0;
const uchar *key_ptr= get_xid_hash_key(xid, &key_len, 1);
mysql_mutex_lock(&xid_mutex);
uchar *rec= my_hash_search(&xid_hash, key_ptr, key_len);
if (UNIV_LIKELY(rec != nullptr))
{
e= reinterpret_cast<xid_elem *>(rec);
my_hash_delete(&xid_hash, rec);
}
mysql_mutex_unlock(&xid_mutex);
return e;
}
void void
ibb_get_filename(char name[FN_REFLEN], uint64_t file_no) ibb_get_filename(char name[FN_REFLEN], uint64_t file_no)
{ {
@ -3627,6 +3952,7 @@ innobase_binlog_write_direct_ordered(IO_CACHE *cache,
ut_ad(binlog_info->engine_ptr2 == nullptr); ut_ad(binlog_info->engine_ptr2 == nullptr);
if (gtid) if (gtid)
binlog_diff_state.update_nolock(gtid); binlog_diff_state.update_nolock(gtid);
innodb_binlog_status(&binlog_info->out_file_no, &binlog_info->out_offset);
mtr.start(); mtr.start();
innodb_binlog_write_cache(cache, binlog_info, &mtr); innodb_binlog_write_cache(cache, binlog_info, &mtr);
mtr.commit(); mtr.commit();
@ -3674,6 +4000,113 @@ ibb_group_commit(THD *thd, handler_binlog_event_group_info *binlog_info)
} }
bool
ibb_write_xa_prepare_ordered(THD *thd,
handler_binlog_event_group_info *binlog_info,
uchar engine_count)
{
mtr_t mtr(nullptr);
binlog_oob_context *c=
static_cast<binlog_oob_context *>(binlog_info->engine_ptr);
// ToDo: Here need also the oob ref.
chunk_data_xa_prepare chunk_data(binlog_info->xa_xid, engine_count);
mtr.start();
fsp_binlog_write_rec(&chunk_data, &mtr, FSP_BINLOG_TYPE_XA_PREPARE,
c->lf_pins);
mtr.commit();
return false;
}
bool
ibb_write_xa_prepare(THD *thd,
handler_binlog_event_group_info *binlog_info,
uchar engine_count)
{
binlog_oob_context *c=
static_cast<binlog_oob_context *>(binlog_info->engine_ptr);
ut_ad(binlog_info->xa_xid != nullptr);
if (ibb_xa_xid_hash->add_xid(binlog_info->xa_xid, c))
return true;
/*
Sync the redo log to ensure that the prepare record is durably written to
disk. This is necessary before returning OK to the client, to be sure we
can recover the binlog part of the XA transaction in case of crash.
*/
if (srv_flush_log_at_trx_commit > 0)
log_write_up_to(c->pending_lsn, (srv_flush_log_at_trx_commit & 1));
return false;
}
bool
ibb_xa_rollback_ordered(THD *thd, const XID *xid, void **engine_data)
{
binlog_oob_context *c=
static_cast<binlog_oob_context *>(*engine_data);
if (UNIV_UNLIKELY(c == nullptr))
*engine_data= c= alloc_oob_context();
/*
Write ROLLBACK record to the binlog.
This will be used during recovery to know that the XID is no longer active,
allowing purge of the associated binlogs.
*/
chunk_data_xa_complete chunk_data(xid, false);
mtr_t mtr(nullptr);
mtr.start();
fsp_binlog_write_rec(&chunk_data, &mtr, FSP_BINLOG_TYPE_XA_COMPLETE,
c->lf_pins);
mtr.commit();
c->pending_lsn= mtr.commit_lsn();
return false;
}
bool
ibb_xa_rollback(THD *thd, const XID *xid, void **engine_data)
{
binlog_oob_context *c=
static_cast<binlog_oob_context *>(*engine_data);
/*
Keep the reference count here, as we need the rollback record to be
available for recovery until all engines have durably rolled back.
Decrement will happen after that, in ibb_binlog_unlog().
*/
/*
Durably write the rollback record to disk. This way, when we return the
"ok" packet to the client, we are sure that crash recovery will make the
XID rollback in engines if needed.
*/
ut_ad(c->pending_lsn > 0);
if (srv_flush_log_at_trx_commit > 0)
log_write_up_to(c->pending_lsn, (srv_flush_log_at_trx_commit & 1));
c->pending_lsn= 0;
return false;
}
void
ibb_binlog_unlog(const XID *xid, void **engine_data)
{
binlog_oob_context *c=
static_cast<binlog_oob_context *>(*engine_data);
if (UNIV_UNLIKELY(c == nullptr))
*engine_data= c= alloc_oob_context();
ibb_xid_hash::xid_elem *elem= ibb_xa_xid_hash->grab_xid(xid);
if (elem)
{
ibb_file_hash.oob_ref_dec(elem->refcnt_file_no, c->lf_pins, true);
my_free(elem);
}
}
bool bool
innodb_find_binlogs(uint64_t *out_first, uint64_t *out_last) innodb_find_binlogs(uint64_t *out_first, uint64_t *out_last)
{ {
@ -3710,6 +4143,7 @@ innodb_binlog_get_init_state(rpl_binlog_state_base *out_state)
{ {
binlog_chunk_reader chunk_reader(binlog_cur_end_offset); binlog_chunk_reader chunk_reader(binlog_cur_end_offset);
bool err= false; bool err= false;
uint64_t dummy_xa_ref;
byte *page_buf= static_cast<byte *>(ut_malloc(ibb_page_size, mem_key_binlog)); byte *page_buf= static_cast<byte *>(ut_malloc(ibb_page_size, mem_key_binlog));
if (!page_buf) if (!page_buf)
@ -3721,7 +4155,7 @@ innodb_binlog_get_init_state(rpl_binlog_state_base *out_state)
mysql_mutex_lock(&purge_binlog_mutex); mysql_mutex_lock(&purge_binlog_mutex);
chunk_reader.seek(earliest_binlog_file_no, ibb_page_size); chunk_reader.seek(earliest_binlog_file_no, ibb_page_size);
int res= read_gtid_state(&chunk_reader, out_state); int res= read_gtid_state(&chunk_reader, out_state, &dummy_xa_ref);
mysql_mutex_unlock(&purge_binlog_mutex); mysql_mutex_unlock(&purge_binlog_mutex);
if (res != 1) if (res != 1)
err= true; err= true;
@ -3888,7 +4322,8 @@ purge_adjust_limit_file_no(handler_binlog_purge_info *purge_info, LF_PINS *pins)
1b. first_open_binlog_file_no 1b. first_open_binlog_file_no
1c. Any file_no in use by an active dump thread 1c. Any file_no in use by an active dump thread
1d. Any file_no containing oob data referenced by file_no from (1c) 1d. Any file_no containing oob data referenced by file_no from (1c)
1e. User specified file_no (from PURGE BINARY LOGS TO, if any). 1e. Any file_no containing oob data referenced by an active transaction.
1f. User specified file_no (from PURGE BINARY LOGS TO, if any).
2. Unix timestamp specifying the minimal value that should not be purged, 2. Unix timestamp specifying the minimal value that should not be purged,
optional (used by PURGE BINARY LOGS BEFORE and --binlog-expire-log-seconds). optional (used by PURGE BINARY LOGS BEFORE and --binlog-expire-log-seconds).
@ -3947,8 +4382,11 @@ innodb_binlog_purge_low(handler_binlog_purge_info *purge_info,
want_purge= true; want_purge= true;
if (by_name && file_no < limit_name_file_no) if (by_name && file_no < limit_name_file_no)
want_purge= true; want_purge= true;
if (file_no >= limit_file_no || !want_purge) if (!want_purge ||
file_no >= limit_file_no ||
ibb_file_hash.get_oob_ref_in_use(file_no, lf_pins))
break; break;
earliest_binlog_file_no= file_no + 1; earliest_binlog_file_no= file_no + 1;
if (loc_total_size < (size_t)stat_buf.st_size) if (loc_total_size < (size_t)stat_buf.st_size)
{ {

View file

@ -74,6 +74,10 @@ enum fsp_binlog_chunk_types {
FSP_BINLOG_TYPE_OOB_DATA= 3, FSP_BINLOG_TYPE_OOB_DATA= 3,
/* Dummy record, use to fill remainder of page (eg. FLUSH BINARY LOGS). */ /* Dummy record, use to fill remainder of page (eg. FLUSH BINARY LOGS). */
FSP_BINLOG_TYPE_DUMMY= 4, FSP_BINLOG_TYPE_DUMMY= 4,
/* User XA record containing XID and OOB reference for XA PREPARE. */
FSP_BINLOG_TYPE_XA_PREPARE= 5,
/* User XA record containing XID for XA COMMIT/ROLLBACK. */
FSP_BINLOG_TYPE_XA_COMPLETE= 6,
/* Must be one more than the last type. */ /* Must be one more than the last type. */
FSP_BINLOG_TYPE_END, FSP_BINLOG_TYPE_END,
@ -96,6 +100,11 @@ static constexpr uint32_t FSP_BINLOG_FLAG_LAST= (1 << FSP_BINLOG_FLAG_BIT_LAST);
static constexpr uint32_t FSP_BINLOG_TYPE_MASK= static constexpr uint32_t FSP_BINLOG_TYPE_MASK=
~(FSP_BINLOG_FLAG_CONT | FSP_BINLOG_FLAG_LAST); ~(FSP_BINLOG_FLAG_CONT | FSP_BINLOG_FLAG_LAST);
/* Flag bits for FSP_BINLOG_TYPE_XA_COMPLETE. */
static constexpr uint32_t IBB_FL_XA_TYPE_MASK= 0x1;
static constexpr uint32_t IBB_FL_XA_TYPE_COMMIT= 0x0;
static constexpr uint32_t IBB_FL_XA_TYPE_ROLLBACK= 0x1;
/** /**
These are the chunk types that are allowed to occur in the middle of These are the chunk types that are allowed to occur in the middle of
another record. another record.
@ -337,8 +346,8 @@ public:
/* Delete all (consecutive) entries from file_no down. */ /* Delete all (consecutive) entries from file_no down. */
void remove_up_to(uint64_t file_no, LF_PINS *pins); void remove_up_to(uint64_t file_no, LF_PINS *pins);
/* Update an entry when an OOB record is started/completed. */ /* Update an entry when an OOB record is started/completed. */
bool oob_ref_inc(uint64_t file_no, LF_PINS *pins); bool oob_ref_inc(uint64_t file_no, LF_PINS *pins, bool do_xa= false);
bool oob_ref_dec(uint64_t file_no, LF_PINS *pins); bool oob_ref_dec(uint64_t file_no, LF_PINS *pins, bool do_xa= false);
/* Update earliest_oob_ref when refcount drops to zero. */ /* Update earliest_oob_ref when refcount drops to zero. */
void do_zero_refcnt_action(uint64_t file_no, LF_PINS *pins, void do_zero_refcnt_action(uint64_t file_no, LF_PINS *pins,
bool active_moving); bool active_moving);
@ -348,6 +357,8 @@ public:
/* Lookup the oob-referenced file_no from a file_no. */ /* Lookup the oob-referenced file_no from a file_no. */
bool get_oob_ref_file_no(uint64_t file_no, LF_PINS *pins, bool get_oob_ref_file_no(uint64_t file_no, LF_PINS *pins,
uint64_t *out_oob_ref_file_no); uint64_t *out_oob_ref_file_no);
/* Check if file_no needed by active, not committed transaction. */
bool get_oob_ref_in_use(uint64_t file_no, LF_PINS *pins);
}; };

View file

@ -263,6 +263,16 @@ extern bool innobase_binlog_write_direct
const rpl_gtid *gtid); const rpl_gtid *gtid);
extern void ibb_group_commit(THD *thd, extern void ibb_group_commit(THD *thd,
handler_binlog_event_group_info *binlog_info); handler_binlog_event_group_info *binlog_info);
extern bool ibb_write_xa_prepare_ordered(THD *thd,
handler_binlog_event_group_info *binlog_info,
uchar engine_count);
extern bool ibb_write_xa_prepare(THD *thd,
handler_binlog_event_group_info *binlog_info,
uchar engine_count);
extern bool ibb_xa_rollback_ordered(THD *thd, const XID *xid,
void **engine_data);
extern bool ibb_xa_rollback(THD *thd, const XID *xid, void **engine_data);
extern void ibb_binlog_unlog(const XID *xid, void **engine_data);
extern bool innodb_find_binlogs(uint64_t *out_first, uint64_t *out_last); extern bool innodb_find_binlogs(uint64_t *out_first, uint64_t *out_last);
extern void innodb_binlog_status(uint64_t *out_file_no, uint64_t *out_pos); extern void innodb_binlog_status(uint64_t *out_file_no, uint64_t *out_pos);
extern bool innodb_binlog_get_init_state(rpl_binlog_state_base *out_state); extern bool innodb_binlog_get_init_state(rpl_binlog_state_base *out_state);

View file

@ -486,6 +486,7 @@ extern mysql_pfs_key_t fsp_binlog_durable_mutex_key;
extern mysql_pfs_key_t fsp_binlog_durable_cond_key; extern mysql_pfs_key_t fsp_binlog_durable_cond_key;
extern mysql_pfs_key_t fsp_purge_binlog_mutex_key; extern mysql_pfs_key_t fsp_purge_binlog_mutex_key;
extern mysql_pfs_key_t fsp_page_fifo_mutex_key; extern mysql_pfs_key_t fsp_page_fifo_mutex_key;
extern mysql_pfs_key_t ibb_xid_hash_mutex_key;
# endif /* UNIV_PFS_MUTEX */ # endif /* UNIV_PFS_MUTEX */
# ifdef UNIV_PFS_RWLOCK # ifdef UNIV_PFS_RWLOCK

View file

@ -1176,7 +1176,8 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr)
trx_sys.assign_new_trx_no(this); trx_sys.assign_new_trx_no(this);
/* Include binlog data in the commit record, if any. */ /* Include binlog data in the commit record, if any. */
binlog_ctx= innodb_binlog_trx(this, mtr); if (active_commit_ordered)
binlog_ctx= innodb_binlog_trx(this, mtr);
UT_LIST_REMOVE(rseg->undo_list, undo); UT_LIST_REMOVE(rseg->undo_list, undo);
/* Change the undo log segment state from TRX_UNDO_ACTIVE, to /* Change the undo log segment state from TRX_UNDO_ACTIVE, to