From 9bb989a9d196755fe411551f27e6198ef6819159 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 21 Mar 2013 11:03:31 +0100 Subject: [PATCH] MDEV-26: Global transaction ID. Fix MDEV-4275 - I/O thread restart duplicates events in the relay log. The first time we connect to master after CHANGE MASTER or restart, we connect from the GTID position. But then subsequent reconnects or IO thread restarts reconnect with the old-style file/offset binlog pos from where it left off at last disconnect. This is necessary to avoid duplicate events in the relay logs, as there is nothing that synchronises the SQL thread update of GTID state (multiple threads in case of multi-source) with IO thread reconnects. Test cases. Some small cleanups and fixes. --- mysql-test/suite/rpl/r/rpl_gtid_crash.result | 16 +++++ .../suite/rpl/r/rpl_gtid_startpos.result | 21 ++++++ .../suite/rpl/t/rpl_gtid_crash-master.opt | 1 + mysql-test/suite/rpl/t/rpl_gtid_crash.test | 71 +++++++++++++++++++ mysql-test/suite/rpl/t/rpl_gtid_startpos.test | 47 +++++++++++- sql/log.cc | 7 +- sql/log_event.h | 29 ++++++++ sql/rpl_gtid.h | 5 ++ sql/rpl_mi.cc | 10 +-- sql/rpl_mi.h | 7 +- sql/slave.cc | 61 ++++++++-------- sql/sql_repl.cc | 30 ++++---- 12 files changed, 249 insertions(+), 56 deletions(-) create mode 100644 mysql-test/suite/rpl/r/rpl_gtid_crash.result create mode 100644 mysql-test/suite/rpl/t/rpl_gtid_crash-master.opt create mode 100644 mysql-test/suite/rpl/t/rpl_gtid_crash.test diff --git a/mysql-test/suite/rpl/r/rpl_gtid_crash.result b/mysql-test/suite/rpl/r/rpl_gtid_crash.result new file mode 100644 index 00000000000..89e340d0de4 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_gtid_crash.result @@ -0,0 +1,16 @@ +include/rpl_init.inc [topology=1->2] +*** Test crashing master, causing slave IO thread to reconnect while SQL thread is running *** +CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1, 0); +include/stop_slave.inc +CHANGE MASTER TO master_host = '127.0.0.1', master_port = MASTER_PORT, +MASTER_GTID_POS=AUTO; +INSERT INTO t1 VALUES (2,1); +INSERT INTO t1 VALUES (3,1); +include/start_slave.inc +SET SESSION debug_dbug="+d,crash_dispatch_command_before"; +SELECT 1; +Got one of the listed errors +INSERT INTO t1 VALUES (1000, 3); +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/r/rpl_gtid_startpos.result b/mysql-test/suite/rpl/r/rpl_gtid_startpos.result index 3e9f91c9c09..13712c910f7 100644 --- a/mysql-test/suite/rpl/r/rpl_gtid_startpos.result +++ b/mysql-test/suite/rpl/r/rpl_gtid_startpos.result @@ -67,4 +67,25 @@ a 3 4 DROP TABLE t1; +*** MDEV-4275: I/O thread restart duplicates events in relay log *** +include/stop_slave.inc +RESET SLAVE ALL; +RESET MASTER; +RESET MASTER; +CHANGE MASTER TO master_host='127.0.0.1', master_port=MASTER_PORT, master_user='root', master_gtid_pos=''; +include/start_slave.inc +CREATE TABLE t1 (a INT PRIMARY KEY); +INSERT INTO t1 VALUES (1); +SELECT * FROM t1; +a +1 +include/stop_slave_io.inc +START SLAVE IO_THREAD; +include/wait_for_slave_io_to_start.inc +INSERT INTO t1 VALUES (2); +SELECT * FROM t1 ORDER BY a; +a +1 +2 +DROP TABLE t1; include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_gtid_crash-master.opt b/mysql-test/suite/rpl/t/rpl_gtid_crash-master.opt new file mode 100644 index 00000000000..425fda95086 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_gtid_crash-master.opt @@ -0,0 +1 @@ +--skip-stack-trace --skip-core-file diff --git a/mysql-test/suite/rpl/t/rpl_gtid_crash.test b/mysql-test/suite/rpl/t/rpl_gtid_crash.test new file mode 100644 index 00000000000..7f0413bfb6c --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_gtid_crash.test @@ -0,0 +1,71 @@ +--source include/have_innodb.inc +--let $rpl_topology=1->2 +--source include/rpl_init.inc + +--echo *** Test crashing master, causing slave IO thread to reconnect while SQL thread is running *** + +--connection server_1 +CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1, 0); +--save_master_pos + +--connection server_2 +--sync_with_master +--source include/stop_slave.inc +--replace_result $MASTER_MYPORT MASTER_PORT +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT, + MASTER_GTID_POS=AUTO; + +--connection server_1 +INSERT INTO t1 VALUES (2,1); +INSERT INTO t1 VALUES (3,1); + +--connection server_2 +--source include/start_slave.inc + +--connection server_1 + +--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect +wait-rpl_gtid_crash.test +EOF + +let $1=200; +--disable_query_log +while ($1) +{ + eval INSERT INTO t1 VALUES ($1 + 10, 2); + dec $1; +} +--enable_query_log + +SET SESSION debug_dbug="+d,crash_dispatch_command_before"; +--error 2006,2013 +SELECT 1; + +--remove_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect +--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect +restart-rpl_gtid_crash.test +EOF + +--enable_reconnect +--source include/wait_until_connected_again.inc + +--connection server_2 +--let $wait_condition= SELECT COUNT(*) = 200 FROM t1 WHERE b=2 +--source include/wait_condition.inc + +--connection server_1 +INSERT INTO t1 VALUES (1000, 3); + +--connection server_2 +--let $wait_condition= SELECT COUNT(*) = 1 FROM t1 WHERE b=3 +--source include/wait_condition.inc + +--connection server_1 +DROP TABLE t1; + +--connection default +--enable_reconnect +--source include/wait_until_connected_again.inc + +--source include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_gtid_startpos.test b/mysql-test/suite/rpl/t/rpl_gtid_startpos.test index 58f93ba6c54..32fb608080c 100644 --- a/mysql-test/suite/rpl/t/rpl_gtid_startpos.test +++ b/mysql-test/suite/rpl/t/rpl_gtid_startpos.test @@ -98,9 +98,54 @@ START SLAVE; --source include/wait_condition.inc SELECT * FROM t1 ORDER by a; -# Clean up. +--connection server_1 +DROP TABLE t1; +--save_master_pos +--connection server_2 +--sync_with_master + + +--echo *** MDEV-4275: I/O thread restart duplicates events in relay log *** + +--connection server_2 +--source include/stop_slave.inc +RESET SLAVE ALL; +RESET MASTER; + +--connection server_1 +RESET MASTER; + +--connection server_2 +--replace_result $MASTER_MYPORT MASTER_PORT +eval CHANGE MASTER TO master_host='127.0.0.1', master_port=$MASTER_MYPORT, master_user='root', master_gtid_pos=''; +--source include/start_slave.inc + +--connection server_1 +CREATE TABLE t1 (a INT PRIMARY KEY); +INSERT INTO t1 VALUES (1); +--save_master_pos + +--connection server_2 +--sync_with_master +SELECT * FROM t1; + +--source include/stop_slave_io.inc +START SLAVE IO_THREAD; +--source include/wait_for_slave_io_to_start.inc + +--connection server_1 +INSERT INTO t1 VALUES (2); +--save_master_pos + +--connection server_2 +--sync_with_master + +SELECT * FROM t1 ORDER BY a; + +# Clean up. --connection server_1 DROP TABLE t1; + --source include/rpl_end.inc diff --git a/sql/log.cc b/sql/log.cc index ab7c0fd0d96..f17fb0ff619 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -3702,7 +3702,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log) const char* save_name; DBUG_ENTER("reset_logs"); - ha_reset_logs(thd); + if (thd) + ha_reset_logs(thd); /* We need to get both locks to be sure that no one is trying to write to the index log file. @@ -8486,7 +8487,9 @@ binlog_background_thread(void *arg __attribute__((unused))) #ifdef HAVE_REPLICATION if (rpl_load_gtid_slave_state(thd)) sql_print_warning("Failed to load slave replication state from table " - "%s.%s", "mysql", rpl_gtid_slave_state_table_name.str); + "%s.%s: %u: %s", "mysql", + rpl_gtid_slave_state_table_name.str, + thd->stmt_da->sql_errno(), thd->stmt_da->message()); #endif mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); diff --git a/sql/log_event.h b/sql/log_event.h index 6b91756cc8a..82369b35140 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1319,6 +1319,35 @@ public: return do_shall_skip(rli); } + + /* + Check if an event is non-final part of a stand-alone event group, + such as Intvar_log_event (such events should be processed as part + of the following event group, not individually). + */ + static bool is_part_of_group(enum Log_event_type ev_type) + { + switch (ev_type) + { + case GTID_EVENT: + case INTVAR_EVENT: + case RAND_EVENT: + case USER_VAR_EVENT: + case TABLE_MAP_EVENT: + case ANNOTATE_ROWS_EVENT: + return true; + case DELETE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case WRITE_ROWS_EVENT: + /* + ToDo: also check for non-final Rows_log_event (though such events + are usually in a BEGIN-COMMIT group). + */ + default: + return false; + } + } + protected: /** diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index bdc88b5c2b5..e63d8439803 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -31,6 +31,11 @@ struct rpl_gtid }; +enum enum_gtid_skip_type { + GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION +}; + + /* Replication slave state. diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 608a30448aa..7ed8b9794ec 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -37,7 +37,7 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, checksum_alg_before_fd(BINLOG_CHECKSUM_ALG_UNDEF), connect_retry(DEFAULT_CONNECT_RETRY), inited(0), abort_slave(0), slave_running(0), slave_run_id(0), sync_counter(0), - heartbeat_period(0), received_heartbeats(0), master_id(0), gtid_pos_auto(0) + heartbeat_period(0), received_heartbeats(0), master_id(0), using_gtid(0) { host[0] = 0; user[0] = 0; password[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; @@ -436,8 +436,8 @@ file '%s')", fname); */ while (!init_strvar_from_file(buf, sizeof(buf), &mi->file, 0)) { - if (0 == strncmp(buf, STRING_WITH_LEN("gtid_pos_auto="))) - mi->gtid_pos_auto= (0 != atoi(buf + sizeof("gtid_pos_auto"))); + if (0 == strncmp(buf, STRING_WITH_LEN("using_gtid="))) + mi->using_gtid= (0 != atoi(buf + sizeof("using_gtid"))); } } } @@ -581,14 +581,14 @@ int flush_master_info(Master_info* mi, my_b_printf(file, "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n" "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n" - "gtid_pos_auto=%d\n", + "using_gtid=%d\n", LINES_IN_MASTER_INFO, mi->master_log_name, llstr(mi->master_log_pos, lbuf), mi->host, mi->user, mi->password, mi->port, mi->connect_retry, (int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert, mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert, - heartbeat_buf, "", ignore_server_ids_buf, mi->gtid_pos_auto); + heartbeat_buf, "", ignore_server_ids_buf, mi->using_gtid); my_free(ignore_server_ids_buf); err= flush_io_cache(file); if (sync_masterinfo_period && !err && diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 61ee5227693..b6a3e7d91b9 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -126,8 +126,11 @@ class Master_info : public Slave_reporting_capability ulonglong received_heartbeats; // counter of received heartbeat events DYNAMIC_ARRAY ignore_server_ids; ulong master_id; - /* If last CHANGE MASTER was MASTER_GTID_POS=AUTO. */ - bool gtid_pos_auto; + /* + True if slave position is set using GTID state rather than old-style + file/offset binlog position. + */ + bool using_gtid; }; int init_master_info(Master_info* mi, const char* master_info_fname, const char* slave_info_fname, diff --git a/sql/slave.cc b/sql/slave.cc index 9ceeee13480..efbd12e04d5 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -398,32 +398,6 @@ int init_recovery(Master_info* mi, const char** errmsg) DBUG_RETURN(0); } - -/* - When connecting a slave to a master with GTID, we reset the relay log - coordinates of the SQL thread and clear the master coordinates of SQL and IO - threads. - - This way we ensure that we start from the correct place even after a change - to new master or a crash where relay log coordinates may be wrong (GTID - state is crash safe but master.info is not). And we get the correct master - coordinates set upon reading the initial fake rotate event sent from master. -*/ -static void -reset_coordinates_for_gtid(Master_info *mi, Relay_log_info *rli) -{ - mi->master_log_pos= 0; - mi->master_log_name[0]= 0; - rli->group_master_log_pos= 0; - rli->group_master_log_name[0]= 0; - rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE; - strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(), - sizeof(rli->group_relay_log_name)-1); - rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; - strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(), - sizeof(mi->rli.event_relay_log_name)-1); -} - /** Convert slave skip errors bitmap into a printable string. @@ -811,6 +785,7 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, mysql_mutex_t *lock_io=0, *lock_sql=0, *lock_cond_io=0, *lock_cond_sql=0; mysql_cond_t* cond_io=0, *cond_sql=0; int error=0; + const char *errmsg; DBUG_ENTER("start_slave_threads"); if (need_slave_mutex) @@ -826,6 +801,22 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, lock_cond_sql = &mi->rli.run_lock; } + /* + If we are using GTID and both SQL and IO threads are stopped, then get + rid of all relay logs. + + Relay logs are not very useful when using GTID, except as a buffer + between the fetch in the IO thread and the apply in SQL thread. However + while one of the threads is running, they are in use and cannot be + removed. + */ + if (mi->using_gtid && !mi->slave_running && !mi->rli.slave_running) + { + purge_relay_logs(&mi->rli, NULL, 0, &errmsg); + mi->master_log_name[0]= 0; + mi->master_log_pos= 0; + } + if (thread_mask & SLAVE_IO) error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE @@ -1813,9 +1804,17 @@ past_checksum: after_set_capability: #endif - /* Request dump start from slave replication GTID state. */ + /* + Request dump start from slave replication GTID state. - if (mi->gtid_pos_auto) + Only request GTID position the first time we connect after CHANGE MASTER + or after starting both IO or SQL thread. + + Otherwise, if the IO thread was ahead of the SQL thread before the + restart or reconnect, we might end up re-fetching and hence re-applying + the same event(s) again. + */ + if (mi->using_gtid && !mi->master_log_name[0]) { int rc; char str_buf[256]; @@ -1866,7 +1865,7 @@ after_set_capability: } } } - else + if (!mi->using_gtid) { /* If we are not using GTID to connect this time, then instead request @@ -2435,7 +2434,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, } // Master_Server_id protocol->store((uint32) mi->master_id); - protocol->store((uint32) (mi->gtid_pos_auto != 0)); + protocol->store((uint32) (mi->using_gtid != 0)); if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -3412,8 +3411,6 @@ connected: if (ret == 1) /* Fatal error */ goto err; - if (mi->gtid_pos_auto) - reset_coordinates_for_gtid(mi, rli); if (ret == 2) { diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index d369dbc4dc2..572b0e67ae3 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -806,8 +806,9 @@ check_slave_start_position(THD *thd, slave_connection_state *st, requested by the slave, then we still give error (below, after the loop). */ - if (!(missing_domains++)) - missing_domain_gtid= domain_gtid; + if (!missing_domains) + missing_domain_gtid= *slave_gtid; + ++missing_domains; continue; } *errormsg= "Requested slave GTID state not found in binlog"; @@ -1176,10 +1177,6 @@ gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str) } -enum enum_gtid_skip_type { - GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION -}; - /* Helper function for mysql_binlog_send() to write an event down the slave connection. @@ -1234,12 +1231,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, switch (*gtid_skip_group) { case GTID_SKIP_STANDALONE: - if (event_type != GTID_EVENT && - event_type != INTVAR_EVENT && - event_type != RAND_EVENT && - event_type != USER_VAR_EVENT && - event_type != TABLE_MAP_EVENT && - event_type != ANNOTATE_ROWS_EVENT) + if (!Log_event::is_part_of_group(event_type)) *gtid_skip_group= GTID_SKIP_NOT; return NULL; case GTID_SKIP_TRANSACTION: @@ -2713,11 +2705,11 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) } if (lex_mi->gtid_pos_auto || lex_mi->gtid_pos_str.str) - mi->gtid_pos_auto= true; + mi->using_gtid= true; else if (lex_mi->gtid_pos_str.str || lex_mi->log_file_name || lex_mi->pos || lex_mi->relay_log_name || lex_mi->relay_log_pos) - mi->gtid_pos_auto= false; + mi->using_gtid= false; /* If user did specify neither host nor port nor any log name nor any log @@ -2783,6 +2775,16 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) ret= TRUE; goto err; } + + if (mi->using_gtid) + { + /* + Clear the position in the master binlogs, so that we request the + correct GTID position. + */ + mi->master_log_name[0]= 0; + mi->master_log_pos= 0; + } } else {