MDEV-30653 : With wsrep_mode=REPLICATE_ARIA only part of mixed-engine transactions is replicated

Replication of non-transactional engines is experimental and
uses TOI. This naturally means that if there is open transaction
with transactional engine it's changes will be rolled back.

Fixed by adding error message if non-transactional engine
is part of multi-engine transaction with warning.

Signed-off-by: Julius Goryavsky <julius.goryavsky@mariadb.com>
This commit is contained in:
Jan Lindström 2024-06-21 13:07:35 +03:00 committed by Julius Goryavsky
parent b38edd09ff
commit b3be3c2157
6 changed files with 207 additions and 13 deletions

View file

@ -0,0 +1,64 @@
connection node_2;
connection node_1;
create table t1 (id serial, val int) engine=innodb;
create table t2 (id serial, val int) engine=aria;
insert into t1 values(1, 23);
insert into t2 values(2, 42);
call mtr.add_suppression("WSREP: Replication of non-transactional engines is experimental. Storage engine Aria for table 'test'.'t2' is not supported in Galera");
begin;
update t1 set val=24 where id=1;
update t2 set val=41 where id=2;
commit;
ERROR HY000: Transactional commit not supported by involved engine(s)
select * from t1;
id val
1 23
select * from t2;
id val
2 41
connection node_2;
select * from t1;
id val
1 23
select * from t2;
id val
connection node_1;
drop table t1, t2;
SET GLOBAL wsrep_mode=REPLICATE_ARIA;
create table t1 (id serial, val int) engine=innodb;
create table t2 (id serial, val int) engine=aria;
insert into t1 values(1, 23);
insert into t2 values(2, 42);
begin;
update t1 set val=24 where id=1;
update t2 set val=41 where id=2;
ERROR HY000: Transactional commit not supported by involved engine(s)
commit;
select * from t1;
id val
1 24
select * from t2;
id val
2 42
connection node_2;
select * from t1;
id val
1 23
select * from t2;
id val
2 42
connection node_1;
drop table t1, t2;
create table t2 (id serial, val int) engine=aria;
INSERT INTO t2 values(1,1);
UPDATE t2 set id=5,val=6 where id = 1;
SELECT * from t2;
id val
5 6
connection node_2;
SELECT * from t2;
id val
5 6
DROP TABLE t2;
connection node_1;
SET GLOBAL wsrep_mode=DEFAULT;

View file

@ -0,0 +1,62 @@
--source include/galera_cluster.inc
--source include/have_aria.inc
create table t1 (id serial, val int) engine=innodb;
create table t2 (id serial, val int) engine=aria;
insert into t1 values(1, 23);
insert into t2 values(2, 42);
call mtr.add_suppression("WSREP: Replication of non-transactional engines is experimental. Storage engine Aria for table 'test'.'t2' is not supported in Galera");
begin;
update t1 set val=24 where id=1;
update t2 set val=41 where id=2;
--error ER_ERROR_DURING_COMMIT
commit;
select * from t1;
select * from t2;
--connection node_2
select * from t1;
select * from t2;
--connection node_1
drop table t1, t2;
# case 2
SET GLOBAL wsrep_mode=REPLICATE_ARIA;
create table t1 (id serial, val int) engine=innodb;
create table t2 (id serial, val int) engine=aria;
insert into t1 values(1, 23);
insert into t2 values(2, 42);
begin;
update t1 set val=24 where id=1;
--error ER_ERROR_DURING_COMMIT
update t2 set val=41 where id=2;
commit;
select * from t1;
select * from t2;
--connection node_2
select * from t1;
select * from t2;
--connection node_1
drop table t1, t2;
# case 3
create table t2 (id serial, val int) engine=aria;
INSERT INTO t2 values(1,1);
UPDATE t2 set id=5,val=6 where id = 1;
SELECT * from t2;
--connection node_2
SELECT * from t2;
DROP TABLE t2;
--connection node_1
SET GLOBAL wsrep_mode=DEFAULT;

View file

@ -7567,6 +7567,11 @@ public:
DDL statement that may be subject to error filtering.
*/
#define CF_WSREP_MAY_IGNORE_ERRORS (1U << 24)
/**
Basic DML statements that create writeset.
*/
#define CF_WSREP_BASIC_DML (1u << 25)
#endif /* WITH_WSREP */

View file

@ -895,6 +895,18 @@ void init_update_queries(void)
sql_command_flags[SQLCOM_DROP_TABLE]|= CF_WSREP_MAY_IGNORE_ERRORS;
sql_command_flags[SQLCOM_DROP_INDEX]|= CF_WSREP_MAY_IGNORE_ERRORS;
sql_command_flags[SQLCOM_ALTER_TABLE]|= CF_WSREP_MAY_IGNORE_ERRORS;
/*
Basic DML-statements that create writeset.
*/
sql_command_flags[SQLCOM_INSERT]|= CF_WSREP_BASIC_DML;
sql_command_flags[SQLCOM_INSERT_SELECT]|= CF_WSREP_BASIC_DML;
sql_command_flags[SQLCOM_REPLACE]|= CF_WSREP_BASIC_DML;
sql_command_flags[SQLCOM_REPLACE_SELECT]|= CF_WSREP_BASIC_DML;
sql_command_flags[SQLCOM_UPDATE]|= CF_WSREP_BASIC_DML;
sql_command_flags[SQLCOM_UPDATE_MULTI]|= CF_WSREP_BASIC_DML;
sql_command_flags[SQLCOM_LOAD]|= CF_WSREP_BASIC_DML;
sql_command_flags[SQLCOM_DELETE]|= CF_WSREP_BASIC_DML;
sql_command_flags[SQLCOM_DELETE_MULTI]|= CF_WSREP_BASIC_DML;
#endif /* WITH_WSREP */
}

View file

@ -1217,7 +1217,8 @@ enum wsrep_warning_type {
WSREP_DISABLED = 0,
WSREP_REQUIRE_PRIMARY_KEY= 1,
WSREP_REQUIRE_INNODB= 2,
WSREP_REQUIRE_MAX=3,
WSREP_EXPERIMENTAL= 3,
WSREP_REQUIRE_MAX=4,
};
static ulonglong wsrep_warning_start_time=0;
@ -1254,6 +1255,9 @@ static const char* wsrep_warning_name(const enum wsrep_warning_type type)
return "WSREP_REQUIRE_PRIMARY_KEY"; break;
case WSREP_REQUIRE_INNODB:
return "WSREP_REQUIRE_INNODB"; break;
case WSREP_EXPERIMENTAL:
return "WSREP_EXPERIMENTAL"; break;
default: assert(0); return " "; break; // for compiler
}
}
@ -1387,7 +1391,22 @@ static void wsrep_push_warning(THD *thd,
ha_resolve_storage_engine_name(hton),
tables->db.str, tables->table_name.str);
break;
case WSREP_EXPERIMENTAL:
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_ERROR_DURING_COMMIT,
"WSREP: Replication of non-transactional engines is experimental. "
"Storage engine %s for table '%s'.'%s' is "
"not supported in Galera",
ha_resolve_storage_engine_name(hton),
tables->db.str, tables->table_name.str);
if (global_system_variables.log_warnings > 1 &&
!wsrep_protect_against_warning_flood(type))
WSREP_WARN("Replication of non-transactional engines is experimental. "
"Storage engine %s for table '%s'.'%s' is "
"not supported in Galera",
ha_resolve_storage_engine_name(hton),
tables->db.str, tables->table_name.str);
break;
default: assert(0); break;
}
}
@ -1397,15 +1416,8 @@ bool wsrep_check_mode_after_open_table (THD *thd,
TABLE_LIST *tables)
{
enum_sql_command sql_command= thd->lex->sql_command;
bool is_dml_stmt= thd->get_command() != COM_STMT_PREPARE &&
(sql_command == SQLCOM_INSERT ||
sql_command == SQLCOM_INSERT_SELECT ||
sql_command == SQLCOM_REPLACE ||
sql_command == SQLCOM_REPLACE_SELECT ||
sql_command == SQLCOM_UPDATE ||
sql_command == SQLCOM_UPDATE_MULTI ||
sql_command == SQLCOM_LOAD ||
sql_command == SQLCOM_DELETE);
bool is_dml_stmt= (thd->get_command() != COM_STMT_PREPARE &&
(sql_command_flags[sql_command] & CF_WSREP_BASIC_DML));
if (!is_dml_stmt)
return true;
@ -1432,9 +1444,24 @@ bool wsrep_check_mode_after_open_table (THD *thd,
wsrep_push_warning(thd, WSREP_REQUIRE_PRIMARY_KEY, hton, tables);
}
// Check are we inside a transaction
uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, thd->transaction->all.ha_list, true);
bool changes= wsrep_has_changes(thd);
// Roll back current stmt if exists
wsrep_before_rollback(thd, true);
wsrep_after_rollback(thd, true);
wsrep_after_statement(thd);
// If there is updates, they would be lost above rollback
if (rw_ha_count > 0 && changes)
{
my_message(ER_ERROR_DURING_COMMIT, "Transactional commit not supported "
"by involved engine(s)", MYF(0));
wsrep_push_warning(thd, WSREP_EXPERIMENTAL, hton, tables);
return false;
}
WSREP_TO_ISOLATION_BEGIN(NULL, NULL, (tables));
}
} else if (db_type != DB_TYPE_UNKNOWN &&
@ -2720,6 +2747,8 @@ static int wsrep_TOI_begin(THD *thd, const char *db, const char *table,
thd_proc_info(thd, "acquiring total order isolation");
DEBUG_SYNC(thd, "wsrep_before_toi_begin");
wsrep::client_state& cs(thd->wsrep_cs());
int ret= cs.enter_toi_local(key_array,

View file

@ -634,9 +634,10 @@ static bool wsrep_will_BF_abort(const lock_t *lock, const trx_t *trx)
/** check if lock timeout was for priority thread,
as a side effect trigger lock monitor
@param trx transaction owning the lock
@param trx transaction owning the lock
@return false for regular lock timeout */
ATTRIBUTE_NOINLINE static bool wsrep_is_BF_lock_timeout(const trx_t &trx)
ATTRIBUTE_NOINLINE static
bool wsrep_is_BF_lock_timeout(const trx_t &trx)
{
ut_ad(trx.is_wsrep());
@ -645,7 +646,28 @@ ATTRIBUTE_NOINLINE static bool wsrep_is_BF_lock_timeout(const trx_t &trx)
return false;
ib::info() << "WSREP: BF lock wait long for trx:" << ib::hex(trx.id)
<< " error: " << trx.error_state
<< " query: " << wsrep_thd_query(trx.mysql_thd);
if (const lock_t*wait_lock = trx.lock.wait_lock)
{
const my_hrtime_t now= my_hrtime_coarse();
const my_hrtime_t suspend_time= trx.lock.suspend_time;
fprintf(stderr,
"------- TRX HAS BEEN WAITING %llu us"
" FOR THIS LOCK TO BE GRANTED:\n",
now.val - suspend_time.val);
if (!wait_lock->is_table()) {
mtr_t mtr;
lock_rec_print(stderr, wait_lock, mtr);
} else {
lock_table_print(stderr, wait_lock);
}
fprintf(stderr, "------------------\n");
}
return true;
}