MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel

replication causing replication to fail.

In parallel replication, we run transactions from the master in parallel, but
force them to commit in the same order they did on the master. If we force T1
to commit before T2, but T2 holds eg. a row lock that is needed by T1, we get
a deadlock when T2 waits until T1 has committed.

Usually, we do not run T1 and T2 in parallel if there is a chance that they
can have conflicting locks like this, but there are certain edge cases where
it can occasionally happen (eg. MDEV-5914, MDEV-5941, MDEV-6020). The bug was
that this would cause replication to hang, eventually getting a lock timeout
and causing the slave to stop with error.

With this patch, InnoDB will report back to the upper layer whenever a
transactions T1 is about to do a lock wait on T2. If T1 and T2 are parallel
replication transactions, and T2 needs to commit later than T1, we can thus
detect the deadlock; we then kill T2, setting a flag that causes it to catch
the kill and convert it to a deadlock error; this error will then cause T2 to
roll back and release its locks (so that T1 can commit), and later T2 will be
re-tried and eventually also committed.

The kill happens asynchroneously in a slave background thread; this is
necessary, as the reporting from InnoDB about lock waits happen deep inside
the locking code, at a point where it is not possible to directly call
THD::awake() due to mutexes held.

Deadlock is assumed to be (very) rarely occuring, so this patch tries to
minimise the performance impact on the normal case where no deadlocks occur,
rather than optimise the handling of the occasional deadlock.

Also fix transaction retry due to deadlock when it happens after a transaction
already signalled to later transactions that it started to commit. In this
case we need to undo this signalling (and later redo it when we commit again
during retry), so following transactions will not start too early.

Also add a missing thd->send_kill_message() that got triggered during testing
(this corrects an incorrect fix for MySQL Bug#58933).
This commit is contained in:
unknown 2014-06-03 10:31:11 +02:00 committed by Kristian Nielsen
parent 787c470cef
commit 629b822913
25 changed files with 589 additions and 52 deletions

View file

@ -730,6 +730,58 @@ void thd_set_ha_data(MYSQL_THD thd, const struct handlerton *hton,
*/
void thd_wakeup_subsequent_commits(MYSQL_THD thd, int wakeup_error);
/*
Used by a storage engine to report that one transaction THD is about to
go to wait for a transactional lock held by another transactions OTHER_THD.
This is used for parallel replication, where transactions are required to
commit in the same order on the slave as they did on the master. If the
transactions on the slave can encounter lock conflicts on the slave that did
not exist on the master, this can cause deadlocks.
The storage engine can report such conflicting locks using this call. This
will allow parallel replication to detect such conflicts and resolve the
deadlock (by killing the second transaction to release the locks that the
first is waiting for, and then later re-try the second killed transaction).
The storage engine should not report false positives. That is, it should not
report any lock waits that do not actually require one transaction to wait
for the other. Nor should it report waits for locks that will be released
before the commit of the other transactions.
*/
void thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd);
/*
This function can optionally be called to check if thd_report_wait_for()
needs to be called for waits done by a given transaction.
If this function returns false for a given thd, there is no need to do any
calls to thd_report_wait_for() on that thd.
This call is optional; it is safe to call thd_report_wait_for() in any case.
This call can be used to save some redundant calls to thd_report_wait_for()
if desired. (This is unlikely to matter much unless there are _lots_ of
waits to report, as the overhead of thd_report_wait_for() is small).
*/
int thd_need_wait_for(const MYSQL_THD thd);
/*
This function can be called by storage engines to check if the commit order
of two transactions has already been decided by the upper layer. This
happens in parallel replication, where the commit order is forced to be the
same on the slave as it was originally on the master.
If this function returns false, it means that such commit order will be
enforced. This allows the storage engine to optionally omit gap lock waitss
or similar measures that would otherwise be needed to ensure that
transactions would be serialised in a way that would cause a commit order
that is correct for binlogging for statement-based replication.
If this function returns true, normal locking should be done as required by
the binlogging and transaction isolation level in effect.
*/
int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
#ifdef __cplusplus
}
#endif

View file

@ -314,6 +314,9 @@ void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
struct mysql_event_general
{
unsigned int event_subclass;

View file

@ -314,6 +314,9 @@ void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
#include <mysql/plugin_auth_common.h>
typedef struct st_plugin_vio_info
{

View file

@ -267,6 +267,9 @@ void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
enum enum_ftparser_mode
{
MYSQL_FTPARSER_SIMPLE_MODE= 0,

View file

@ -86,7 +86,10 @@ SET DEBUG_SYNC= 'now SIGNAL killed';
# Reaping: OPTIMIZE TABLE t1
Table Op Msg_type Msg_text
test.t1 optimize note Table does not support optimize, doing recreate + analyze instead
test.t1 optimize error Query execution was interrupted
test.t1 optimize status Operation failed
Warnings:
Error 1317 Query execution was interrupted
# Connection default
DROP TABLE t1;
SET DEBUG_SYNC= 'RESET';

Binary file not shown.

View file

@ -44,6 +44,16 @@ processlist_info NULL
unified_parent_thread_id unified parent_thread_id
role NULL
instrumented YES
name thread/sql/slave_background
type BACKGROUND
processlist_user NULL
processlist_host NULL
processlist_db NULL
processlist_command NULL
processlist_info NULL
unified_parent_thread_id unified parent_thread_id
role NULL
instrumented YES
CREATE TEMPORARY TABLE t1 AS
SELECT thread_id FROM performance_schema.threads
WHERE name LIKE 'thread/sql%';
@ -105,4 +115,5 @@ parent_thread_name child_thread_name
thread/sql/event_scheduler thread/sql/event_worker
thread/sql/main thread/sql/one_connection
thread/sql/main thread/sql/signal_handler
thread/sql/main thread/sql/slave_background
thread/sql/one_connection thread/sql/event_scheduler

View file

@ -0,0 +1,49 @@
include/master-slave.inc
[connection master]
include/stop_slave.inc
include/rpl_stop_server.inc [server_number=1]
include/rpl_start_server.inc [server_number=1]
SET SQL_LOG_BIN=0;
ALTER TABLE mysql.gtid_slave_pos ENGINE = InnoDB;
SET SQL_LOG_BIN=1;
SET @old_engine= @@GLOBAL.default_storage_engine;
SET GLOBAL default_storage_engine=InnoDB;
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=12;
CHANGE MASTER TO master_host='127.0.0.1', master_port=SERVER_MYPORT_1, master_user='root', master_log_file='master-bin.000001', master_log_pos=4;
include/start_slave.inc
SET SQL_LOG_BIN=0;
ALTER TABLE mysql.gtid_slave_pos ENGINE = InnoDB;
SET SQL_LOG_BIN=1;
SELECT @@gtid_slave_pos;
@@gtid_slave_pos
0-1-1381
CHECKSUM TABLE table0_int_autoinc, table0_key_pk_parts_2_int_autoinc, table100_int_autoinc, table100_key_pk_parts_2_int_autoinc, table10_int_autoinc, table10_key_pk_parts_2_int_autoinc, table1_int_autoinc, table1_key_pk_parts_2_int_autoinc, table2_int_autoinc, table2_key_pk_parts_2_int_autoinc;
Table Checksum
test.table0_int_autoinc 3623174395
test.table0_key_pk_parts_2_int_autoinc 2888328157
test.table100_int_autoinc 3624823809
test.table100_key_pk_parts_2_int_autoinc 3316583308
test.table10_int_autoinc 1615053718
test.table10_key_pk_parts_2_int_autoinc 4147461080
test.table1_int_autoinc 478809705
test.table1_key_pk_parts_2_int_autoinc 3032208641
test.table2_int_autoinc 854763867
test.table2_key_pk_parts_2_int_autoinc 4231615291
include/stop_slave.inc
SET GLOBAL default_storage_engine= @old_engine;
SET GLOBAL slave_parallel_threads=@old_parallel;
SET sql_log_bin=0;
DROP TABLE table0_int_autoinc;
DROP TABLE table0_key_pk_parts_2_int_autoinc;
DROP TABLE table100_int_autoinc;
DROP TABLE table100_key_pk_parts_2_int_autoinc;
DROP TABLE table10_int_autoinc;
DROP TABLE table10_key_pk_parts_2_int_autoinc;
DROP TABLE table1_int_autoinc;
DROP TABLE table1_key_pk_parts_2_int_autoinc;
DROP TABLE table2_int_autoinc;
DROP TABLE table2_key_pk_parts_2_int_autoinc;
SET sql_log_bin=1;
include/start_slave.inc
include/rpl_end.inc

View file

@ -0,0 +1,70 @@
--source include/have_innodb.inc
--source include/have_partition.inc
--source include/have_binlog_format_mixed_or_row.inc
--source include/master-slave.inc
--connection slave
--source include/stop_slave.inc
--connection master
--let $datadir= `SELECT @@datadir`
--let $rpl_server_number= 1
--source include/rpl_stop_server.inc
--remove_file $datadir/master-bin.000001
--remove_file $datadir/master-bin.state
--copy_file $MYSQL_TEST_DIR/std_data/mdev6020-mysql-bin.000001 $datadir/master-bin.000001
--let $rpl_server_number= 1
--source include/rpl_start_server.inc
--source include/wait_until_connected_again.inc
--connection slave
SET SQL_LOG_BIN=0;
ALTER TABLE mysql.gtid_slave_pos ENGINE = InnoDB;
SET SQL_LOG_BIN=1;
SET @old_engine= @@GLOBAL.default_storage_engine;
SET GLOBAL default_storage_engine=InnoDB;
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=12;
--replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1
eval CHANGE MASTER TO master_host='127.0.0.1', master_port=$SERVER_MYPORT_1, master_user='root', master_log_file='master-bin.000001', master_log_pos=4;
--source include/start_slave.inc
--connection master
SET SQL_LOG_BIN=0;
ALTER TABLE mysql.gtid_slave_pos ENGINE = InnoDB;
SET SQL_LOG_BIN=1;
--save_master_pos
--connection slave
--sync_with_master
SELECT @@gtid_slave_pos;
CHECKSUM TABLE table0_int_autoinc, table0_key_pk_parts_2_int_autoinc, table100_int_autoinc, table100_key_pk_parts_2_int_autoinc, table10_int_autoinc, table10_key_pk_parts_2_int_autoinc, table1_int_autoinc, table1_key_pk_parts_2_int_autoinc, table2_int_autoinc, table2_key_pk_parts_2_int_autoinc;
--source include/stop_slave.inc
SET GLOBAL default_storage_engine= @old_engine;
SET GLOBAL slave_parallel_threads=@old_parallel;
SET sql_log_bin=0;
DROP TABLE table0_int_autoinc;
DROP TABLE table0_key_pk_parts_2_int_autoinc;
DROP TABLE table100_int_autoinc;
DROP TABLE table100_key_pk_parts_2_int_autoinc;
DROP TABLE table10_int_autoinc;
DROP TABLE table10_key_pk_parts_2_int_autoinc;
DROP TABLE table1_int_autoinc;
DROP TABLE table1_key_pk_parts_2_int_autoinc;
DROP TABLE table2_int_autoinc;
DROP TABLE table2_key_pk_parts_2_int_autoinc;
SET sql_log_bin=1;
--source include/start_slave.inc
--connection master
--source include/rpl_end.inc

View file

@ -6847,12 +6847,6 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
else
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
}
if (wfc && wfc->wakeup_error)
{
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
DBUG_RETURN(-1);
}
/*
If the transaction we were waiting for has already put us into the group
commit queue (and possibly already done the entire binlog commit for us),
@ -6861,6 +6855,12 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
if (orig_entry->queued_by_other)
DBUG_RETURN(0);
if (wfc && wfc->wakeup_error)
{
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
DBUG_RETURN(-1);
}
/* Now enqueue ourselves in the group commit queue. */
DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue");
orig_entry->thd->clear_wakeup_ready();
@ -9064,6 +9064,8 @@ binlog_background_thread(void *arg __attribute__((unused)))
thd->thread_id= thread_id++;
mysql_mutex_unlock(&LOCK_thread_count);
thd->store_globals();
thd->security_ctx->skip_grants();
thd->set_command(COM_DAEMON);
/*
Load the slave replication GTID state from the mysql.gtid_slave_pos

View file

@ -216,8 +216,19 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
thd->get_stmt_da()->sql_conditions();
Relay_log_info const *rli= rgi->rli;
const Sql_condition *err;
Relay_log_info const *rli= rgi->rli;
buff[0]= 0;
/*
In parallel replication, deadlocks or other temporary errors can happen
occasionally in normal operation, they will be handled correctly and
automatically by re-trying the transactions. So do not pollute the error
log with messages about them.
*/
if (rgi->is_parallel_exec &&
(rgi->killed_for_retry || has_temporary_error(thd)))
return;
for (err= it++, slider= buff; err && slider < buff_end - 1;
slider += len, err= it++)
{
@ -7306,6 +7317,13 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true, false);
if (err)
{
/*
Do not report an error if this is really a kill due to a deadlock.
In this case, the transaction will be re-tried instead.
*/
if (rgi->killed_for_retry &&
thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED)
return err;
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
"Error during XID COMMIT: failed to update GTID state in "
"%s.%s: %d: %s",
@ -9631,7 +9649,8 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
if (open_and_lock_tables(thd, rgi->tables_to_lock, FALSE, 0))
{
uint actual_error= thd->get_stmt_da()->sql_errno();
if (thd->is_slave_error || thd->is_fatal_error)
if ((thd->is_slave_error || thd->is_fatal_error) &&
!(rgi->killed_for_retry && actual_error == ER_QUERY_INTERRUPTED))
{
/*
Error reporting borrowed from Query_log_event with many excessive

View file

@ -368,6 +368,7 @@ static I_List<THD> thread_cache;
static bool binlog_format_used= false;
LEX_STRING opt_init_connect, opt_init_slave;
static mysql_cond_t COND_thread_cache, COND_flush_thread_cache;
mysql_cond_t COND_slave_background;
static DYNAMIC_ARRAY all_options;
/* Global variables */
@ -706,7 +707,7 @@ mysql_mutex_t
LOCK_crypt,
LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_active_mi,
LOCK_connection_count, LOCK_error_messages;
LOCK_connection_count, LOCK_error_messages, LOCK_slave_background;
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats;
@ -880,7 +881,8 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_wakeup_ready, key_LOCK_wait_commit;
PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
key_LOCK_slave_background;
PSI_mutex_key key_TABLE_SHARE_LOCK_share;
static PSI_mutex_info all_server_mutexes[]=
@ -943,6 +945,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL},
{ &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_background, "LOCK_slave_background", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
@ -997,7 +1000,7 @@ PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered;
key_COND_prepare_ordered, key_COND_slave_background;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
static PSI_cond_info all_server_conds[]=
@ -1046,6 +1049,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_slave_background, "COND_slave_background", 0},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
};
@ -1053,7 +1057,7 @@ static PSI_cond_info all_server_conds[]=
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_main,
key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_init, key_rpl_parallel_thread;
key_thread_slave_background, key_rpl_parallel_thread;
static PSI_thread_info all_server_threads[]=
{
@ -1079,7 +1083,7 @@ static PSI_thread_info all_server_threads[]=
{ &key_thread_main, "main", PSI_FLAG_GLOBAL},
{ &key_thread_one_connection, "one_connection", 0},
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
{ &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL},
{ &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL},
{ &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
};
@ -2173,6 +2177,8 @@ static void clean_up_mutexes()
mysql_mutex_destroy(&LOCK_prepare_ordered);
mysql_cond_destroy(&COND_prepare_ordered);
mysql_mutex_destroy(&LOCK_commit_ordered);
mysql_mutex_destroy(&LOCK_slave_background);
mysql_cond_destroy(&COND_slave_background);
DBUG_VOID_RETURN;
}
@ -4387,6 +4393,9 @@ static int init_thread_environment()
mysql_cond_init(key_COND_prepare_ordered, &COND_prepare_ordered, NULL);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_slave_background, &LOCK_slave_background,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_slave_background, &COND_slave_background, NULL);
#ifdef HAVE_OPENSSL
mysql_mutex_init(key_LOCK_des_key_file,
@ -9468,6 +9477,8 @@ PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room i
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
PSI_stage_info stage_slave_background_process_request= { 0, "Processing requests", 0};
PSI_stage_info stage_slave_background_wait_request= { 0, "Waiting for requests", 0};
#ifdef HAVE_PSI_INTERFACE
@ -9591,7 +9602,9 @@ PSI_stage_info *all_server_stages[]=
& stage_waiting_to_get_readlock,
& stage_master_gtid_wait_primary,
& stage_master_gtid_wait,
& stage_gtid_wait_other_connection
& stage_gtid_wait_other_connection,
& stage_slave_background_process_request,
& stage_slave_background_wait_request
};
PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection;

View file

@ -309,8 +309,8 @@ extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init,
key_rpl_parallel_thread;
key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_background, key_rpl_parallel_thread;
extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
key_file_dbopt, key_file_des_key_file, key_file_ERRMSG, key_select_to_file,
@ -451,6 +451,8 @@ extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
extern PSI_stage_info stage_master_gtid_wait_primary;
extern PSI_stage_info stage_master_gtid_wait;
extern PSI_stage_info stage_gtid_wait_other_connection;
extern PSI_stage_info stage_slave_background_process_request;
extern PSI_stage_info stage_slave_background_wait_request;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
/**
@ -518,7 +520,8 @@ extern mysql_mutex_t
LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables, LOCK_user_conn,
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count;
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count,
LOCK_slave_background;
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
#ifdef HAVE_OPENSSL
extern mysql_mutex_t LOCK_des_key_file;
@ -529,6 +532,7 @@ extern mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
extern mysql_rwlock_t LOCK_system_variables_hash;
extern mysql_cond_t COND_thread_count;
extern mysql_cond_t COND_manager;
extern mysql_cond_t COND_slave_background;
extern int32 thread_running;
extern int32 thread_count;
extern my_atomic_rwlock_t thread_running_lock, thread_count_lock;

View file

@ -156,6 +156,7 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
thd->clear_error();
thd->reset_killed();
thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(rgi->worker_error);
}
@ -188,6 +189,25 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
}
static void
register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
rpl_parallel_entry *entry)
{
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
{
/*
Register that the commit of this event group must wait for the
commit of the previous event group to complete before it may
complete itself, so that we preserve commit order.
*/
wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
}
}
#ifndef DBUG_OFF
static int
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
@ -205,6 +225,40 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
#endif
/*
If we detect a deadlock due to eg. storage engine locks that conflict with
the fixed commit order, then the later transaction will be killed
asynchroneously to allow the former to complete its commit.
In this case, we convert the 'killed' error into a deadlock error, and retry
the later transaction. */
static void
convert_kill_to_deadlock_error(rpl_group_info *rgi)
{
THD *thd= rgi->thd;
if (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED &&
rgi->killed_for_retry)
{
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
my_error(ER_LOCK_DEADLOCK, MYF(0));
rgi->killed_for_retry= false;
thd->reset_killed();
}
}
static bool
is_group_ending(Log_event *ev, Log_event_type event_type)
{
return event_type == XID_EVENT ||
(event_type == QUERY_EVENT &&
(((Query_log_event *)ev)->is_commit() ||
((Query_log_event *)ev)->is_rollback()));
}
static int
retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
rpl_parallel_thread::queued_event *orig_qev)
@ -221,11 +275,46 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
ulonglong cur_offset, old_offset;
char log_name[FN_REFLEN];
THD *thd= rgi->thd;
rpl_parallel_entry *entry= rgi->parallel_entry;
ulong retries= 0;
do_retry:
event_count= 0;
err= 0;
/*
If we already started committing before getting the deadlock (or other
error) that caused us to need to retry, we have already signalled
subsequent transactions that we have started committing. This is
potentially a problem, as now we will rollback, and if subsequent
transactions would start to execute now, they could see an unexpected
state of the database and get eg. key not found or duplicate key error.
However, to get a deadlock in the first place, there must have been
another earlier transaction that is waiting for us. Thus that other
transaction has _not_ yet started to commit, and any subsequent
transactions will still be waiting at this point.
So here, we decrement back the count of transactions that started
committing (if we already incremented it), undoing the effect of an
earlier mark_start_commit(). Then later, when the retry succeeds and we
commit again, we can do a new mark_start_commit() and eventually wake up
subsequent transactions at the proper time.
We need to do the unmark before the rollback, to be sure that the
transaction we deadlocked with will not signal that it started to commit
until after the unmark.
*/
rgi->unmark_start_commit();
/*
We might get the deadlock error that causes the retry during commit, while
sitting in wait_for_prior_commit(). If this happens, we will have a
pending error in the wait_for_commit object. So clear this by
unregistering (and later re-registering) the wait.
*/
if(thd->wait_for_commit_ptr)
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
rgi->cleanup_context(thd, 1);
mysql_mutex_lock(&rli->data_lock);
@ -233,6 +322,10 @@ do_retry:
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
mysql_mutex_lock(&entry->LOCK_parallel_entry);
register_wait_for_prior_event_group_commit(rgi, entry);
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
strcpy(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
{
@ -319,6 +412,9 @@ do_retry:
err= 1;
goto err;
}
if (is_group_ending(ev, event_type))
rgi->mark_start_commit();
err= rpt_handle_event(qev, rpt);
++event_count;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
@ -332,6 +428,7 @@ do_retry:
err= dbug_simulate_tmp_error(rgi, thd););
if (err)
{
convert_kill_to_deadlock_error(rgi);
if (has_temporary_error(thd))
{
++retries;
@ -599,17 +696,9 @@ handle_rpl_parallel_thread(void *arg)
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
skip_event_group= true;
else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
{
/*
Register that the commit of this event group must wait for the
commit of the previous event group to complete before it may
complete itself, so that we preserve commit order.
*/
wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
}
else
register_wait_for_prior_event_group_commit(rgi, entry);
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
&did_enter_cond, &old_stage);
@ -651,10 +740,7 @@ handle_rpl_parallel_thread(void *arg)
}
}
group_ending= event_type == XID_EVENT ||
(event_type == QUERY_EVENT &&
(((Query_log_event *)events->ev)->is_commit() ||
((Query_log_event *)events->ev)->is_rollback()));
group_ending= is_group_ending(events->ev, event_type);
if (group_ending && likely(!rgi->worker_error))
{
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
@ -674,8 +760,12 @@ handle_rpl_parallel_thread(void *arg)
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
err= dbug_simulate_tmp_error(rgi, thd););
if (err && has_temporary_error(thd))
err= retry_event_group(rgi, rpt, events);
if (err)
{
convert_kill_to_deadlock_error(rgi);
if (has_temporary_error(thd) && slave_trans_retries > 0)
err= retry_event_group(rgi, rpt, events);
}
}
else
{
@ -691,10 +781,14 @@ handle_rpl_parallel_thread(void *arg)
events->next= qevs_to_free;
qevs_to_free= events;
if (unlikely(err) && !rgi->worker_error)
if (unlikely(err))
{
slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, err);
if (!rgi->worker_error)
{
slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, err);
}
thd->reset_killed();
}
if (end_of_group)
{
@ -1096,6 +1190,7 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rgi->relay_log= rli->last_inuse_relaylog;
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
rgi->killed_for_retry= false;
return rgi;
}

View file

@ -1843,6 +1843,7 @@ rpl_group_info::mark_start_commit()
}
<<<<<<< TREE
/*
Format the current GTID as a string suitable for printing in error messages.
@ -1863,6 +1864,36 @@ rpl_group_info::gtid_info()
}
=======
/*
Undo the effect of a prior mark_start_commit().
This is only used for retrying a transaction in parallel replication, after
we have encountered a deadlock or other temporary error.
When we get such a deadlock, it means that the current group of transactions
did not yet all start committing (else they would not have deadlocked). So
we will not yet have woken up anything in the next group, our rgi->gco is
still live, and we can simply decrement the counter (to be incremented again
later, when the retry succeeds and reaches the commit step).
*/
void
rpl_group_info::unmark_start_commit()
{
rpl_parallel_entry *e;
if (!did_mark_start_commit)
return;
e= this->parallel_entry;
mysql_mutex_lock(&e->LOCK_parallel_entry);
--e->count_committing_event_groups;
mysql_mutex_unlock(&e->LOCK_parallel_entry);
did_mark_start_commit= false;
}
>>>>>>> MERGE-SOURCE
rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
: rpl_filter(filter)
{

View file

@ -646,6 +646,7 @@ struct rpl_group_info
inuse_relaylog *relay_log;
uint64 retry_start_offset;
uint64 retry_event_count;
bool killed_for_retry;
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
@ -735,6 +736,7 @@ struct rpl_group_info
void mark_start_commit_no_lock();
void mark_start_commit();
char *gtid_info();
void unmark_start_commit();
time_t get_row_stmt_start_timestamp()
{

View file

@ -287,13 +287,22 @@ static void init_slave_psi_keys(void)
#endif /* HAVE_PSI_INTERFACE */
static bool slave_init_thread_running;
static bool slave_background_thread_running;
static bool slave_background_thread_gtid_loaded;
struct slave_background_kill_t {
slave_background_kill_t *next;
THD *to_kill;
int errcode;
} *slave_background_kill_list;
pthread_handler_t
handle_slave_init(void *arg __attribute__((unused)))
handle_slave_background(void *arg __attribute__((unused)))
{
THD *thd;
PSI_stage_info old_stage;
bool stop;
my_thread_init();
thd= new THD;
@ -301,7 +310,10 @@ handle_slave_init(void *arg __attribute__((unused)))
mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id= thread_id++;
mysql_mutex_unlock(&LOCK_thread_count);
thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
thd->store_globals();
thd->security_ctx->skip_grants();
thd->set_command(COM_DAEMON);
thd_proc_info(thd, "Loading slave GTID position from table");
if (rpl_load_gtid_slave_state(thd))
@ -311,13 +323,53 @@ handle_slave_init(void *arg __attribute__((unused)))
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());
mysql_mutex_lock(&LOCK_thread_count);
threads.append(thd);
slave_background_thread_gtid_loaded= true;
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
THD_STAGE_INFO(thd, stage_slave_background_process_request);
do
{
slave_background_kill_t *kill_list;
mysql_mutex_lock(&LOCK_slave_background);
thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
&stage_slave_background_wait_request,
&old_stage);
for (;;)
{
stop= abort_loop || thd->killed;
kill_list= slave_background_kill_list;
if (stop || kill_list)
break;
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
}
slave_background_kill_list= NULL;
thd->EXIT_COND(&old_stage);
while (kill_list)
{
slave_background_kill_t *p = kill_list;
kill_list= p->next;
mysql_mutex_lock(&p->to_kill->LOCK_thd_data);
/* ToDo: mark the p->errcode error code somehow ... ? */
p->to_kill->awake(KILL_QUERY);
mysql_mutex_unlock(&p->to_kill->LOCK_thd_data);
my_free(p);
}
} while (!stop);
mysql_mutex_lock(&LOCK_thread_count);
delete thd;
mysql_mutex_unlock(&LOCK_thread_count);
my_thread_end();
mysql_mutex_lock(&LOCK_thread_count);
slave_init_thread_running= false;
slave_background_thread_running= false;
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
@ -325,21 +377,57 @@ handle_slave_init(void *arg __attribute__((unused)))
}
void
slave_background_kill_request(THD *to_kill, int errcode)
{
slave_background_kill_t *p=
(slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME));
if (p)
{
p->to_kill= to_kill;
p->errcode= errcode;
to_kill->rgi_slave->killed_for_retry= true;
mysql_mutex_lock(&LOCK_slave_background);
p->next= slave_background_kill_list;
slave_background_kill_list= p;
mysql_mutex_unlock(&LOCK_slave_background);
mysql_cond_signal(&COND_slave_background);
}
}
/*
Start the slave background thread.
This thread is currently used for two purposes:
1. To load the GTID state from mysql.gtid_slave_pos at server start; reading
from table requires valid THD, which is otherwise not available during
server init.
2. To kill worker thread transactions during parallel replication, when a
storage engine attempts to take an errorneous conflicting lock that would
cause a deadlock. Killing is done asynchroneously, as the kill may not
be safe within the context of a callback from inside storage engine
locking code.
*/
static int
run_slave_init_thread()
start_slave_background_thread()
{
pthread_t th;
slave_init_thread_running= true;
if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib,
handle_slave_init, NULL))
slave_background_thread_running= true;
slave_background_thread_gtid_loaded= false;
if (mysql_thread_create(key_thread_slave_background,
&th, &connection_attrib, handle_slave_background,
NULL))
{
sql_print_error("Failed to create thread while initialising slave");
return 1;
}
mysql_mutex_lock(&LOCK_thread_count);
while (slave_init_thread_running)
while (!slave_background_thread_gtid_loaded)
mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
@ -358,7 +446,7 @@ int init_slave()
init_slave_psi_keys();
#endif
if (run_slave_init_thread())
if (start_slave_background_thread())
return 1;
/*

View file

@ -238,6 +238,7 @@ pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(rpl_group_info *rgi, THD *thd);
pthread_handler_t handle_slave_sql(void *arg);
bool net_request_file(NET* net, const char* fname);
void slave_background_kill_request(THD *to_kill, int errcode);
extern bool volatile abort_loop;
extern Master_info main_mi, *active_mi; /* active_mi for multi-master */

View file

@ -914,7 +914,7 @@ send_result_message:
protocol->store(operator_name, system_charset_info);
if (result_code) // either mysql_recreate_table or analyze failed
{
DBUG_ASSERT(thd->is_error() || thd->killed);
DBUG_ASSERT(thd->is_error());
if (thd->is_error())
{
const char *err_msg= thd->get_stmt_da()->message();

View file

@ -2084,7 +2084,10 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
DBUG_RETURN(TRUE);
if (!(flags & MYSQL_OPEN_IGNORE_KILLED) && thd->killed)
{
thd->send_kill_message();
DBUG_RETURN(TRUE);
}
/*
Check if we're trying to take a write lock in a read only transaction.

View file

@ -4217,6 +4217,70 @@ extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd)
return thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
}
extern "C" int
thd_need_wait_for(const MYSQL_THD thd)
{
return thd && thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
}
extern "C" void
thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd)
{
rpl_group_info *rgi;
rpl_group_info *other_rgi;
if (!thd || !other_thd)
return;
rgi= thd->rgi_slave;
other_rgi= other_thd->rgi_slave;
if (!rgi || !other_rgi)
return;
if (!rgi->is_parallel_exec)
return;
if (rgi->rli != other_rgi->rli)
return;
if (!rgi->gtid_sub_id)
return;
if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
return;
if (rgi->gtid_sub_id > other_rgi->gtid_sub_id)
return;
/*
This transaction is about to wait for another transaction that is required
by replication binlog order to commit after. This would cause a deadlock.
So send a kill to the other transaction, with a temporary error; this will
cause replication to rollback (and later re-try) the other transaction,
releasing the lock for this transaction so replication can proceed.
*/
#ifdef HAVE_REPLICATION
slave_background_kill_request(other_thd, ER_LOCK_DEADLOCK);
#endif
}
extern "C" int
thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd)
{
rpl_group_info *rgi= thd->rgi_slave;
rpl_group_info *other_rgi= other_thd->rgi_slave;
if (!rgi || !other_rgi)
return 1;
if (!rgi->is_parallel_exec)
return 1;
if (rgi->rli != other_rgi->rli)
return 1;
if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
return 1;
/*
These two threads are doing parallel replication within the same
replication domain. Their commit order is already fixed, so we do not need
gap locks or similar to otherwise enforce ordering (and in fact such locks
could lead to unnecessary deadlocks and transaction retry).
*/
return 0;
}
extern "C" int thd_non_transactional_update(const MYSQL_THD thd)
{
return(thd->transaction.all.modified_non_trans_table);

View file

@ -1357,7 +1357,8 @@ enum enum_thread_type
SYSTEM_THREAD_NDBCLUSTER_BINLOG= 8,
SYSTEM_THREAD_EVENT_SCHEDULER= 16,
SYSTEM_THREAD_EVENT_WORKER= 32,
SYSTEM_THREAD_BINLOG_BACKGROUND= 64
SYSTEM_THREAD_BINLOG_BACKGROUND= 64,
SYSTEM_THREAD_SLAVE_BACKGROUND= 128,
};
inline char const *

View file

@ -156,7 +156,7 @@ static uchar *next_free_record_pos(HP_SHARE *info)
("record file full. records: %lu max_records: %lu "
"data_length: %llu index_length: %llu "
"max_table_size: %llu",
info->records, info->max_records,
(unsigned long)info->records, info->max_records,
info->data_length, info->index_length,
info->max_table_size));
my_errno=HA_ERR_RECORD_FILE_FULL;

View file

@ -49,6 +49,7 @@ Created 5/7/1996 Heikki Tuuri
#include "btr0btr.h"
#include "dict0boot.h"
#include <set>
#include "mysql/plugin.h"
/* Restricts the length of search we will do in the waits-for
graph of transactions */
@ -3873,7 +3874,15 @@ lock_deadlock_search(
/* Select the joining transaction as the victim. */
return(ctx->start->id);
} else if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
} else {
/* We do not need to report autoinc locks to the upper
layer. These locks are released before commit, so they can
not cause deadlocks with binlog-fixed commit order. */
if (lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)
thd_report_wait_for(ctx->start->mysql_thd,
lock->trx->mysql_thd);
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
/* Another trx ahead has requested a lock in an
incompatible mode, and is itself waiting for a lock. */
@ -3898,8 +3907,9 @@ lock_deadlock_search(
lock = lock_get_next_lock(ctx, lock, heap_no);
}
} else {
} else {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
}
}

View file

@ -49,6 +49,7 @@ Created 5/7/1996 Heikki Tuuri
#include "btr0btr.h"
#include "dict0boot.h"
#include <set>
#include "mysql/plugin.h"
/* Restricts the length of search we will do in the waits-for
graph of transactions */
@ -3896,7 +3897,15 @@ lock_deadlock_search(
/* Select the joining transaction as the victim. */
return(ctx->start->id);
} else if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
} else {
/* We do not need to report autoinc locks to the upper
layer. These locks are released before commit, so they can
not cause deadlocks with binlog-fixed commit order. */
if (lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)
thd_report_wait_for(ctx->start->mysql_thd,
lock->trx->mysql_thd);
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
/* Another trx ahead has requested a lock in an
incompatible mode, and is itself waiting for a lock. */
@ -3921,8 +3930,9 @@ lock_deadlock_search(
lock = lock_get_next_lock(ctx, lock, heap_no);
}
} else {
} else {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
}
}