mirror of
https://github.com/MariaDB/server.git
synced 2025-01-16 12:02:42 +01:00
2629 lines
84 KiB
C++
2629 lines
84 KiB
C++
/* Copyright (c) 2006, 2017, Oracle and/or its affiliates.
|
|
Copyright (c) 2010, 2020, MariaDB Corporation.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software Foundation,
|
|
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
|
|
|
|
#include "mariadb.h"
|
|
#include "sql_priv.h"
|
|
#include "unireg.h" // HAVE_*
|
|
#include "rpl_mi.h"
|
|
#include "rpl_rli.h"
|
|
#include "sql_base.h" // close_thread_tables
|
|
#include <my_dir.h> // For MY_STAT
|
|
#include "sql_repl.h" // For check_binlog_magic
|
|
#include "log_event.h" // Format_description_log_event, Log_event,
|
|
// FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT,
|
|
// PREFIX_SQL_LOAD
|
|
#include "rpl_utility.h"
|
|
#include "transaction.h"
|
|
#include "sql_parse.h" // end_trans, ROLLBACK
|
|
#include "slave.h"
|
|
#include <mysql/plugin.h>
|
|
#include <mysql/service_thd_wait.h>
|
|
#include "lock.h"
|
|
#include "sql_table.h"
|
|
|
|
static int count_relay_log_space(Relay_log_info* rli);
|
|
bool xa_trans_force_rollback(THD *thd);
|
|
/**
|
|
Current replication state (hash of last GTID executed, per replication
|
|
domain).
|
|
*/
|
|
rpl_slave_state *rpl_global_gtid_slave_state;
|
|
/* Object used for MASTER_GTID_WAIT(). */
|
|
gtid_waiting rpl_global_gtid_waiting;
|
|
|
|
Relay_log_info::Relay_log_info(bool is_slave_recovery, const char* thread_name)
|
|
:Slave_reporting_capability(thread_name),
|
|
replicate_same_server_id(::replicate_same_server_id),
|
|
info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
|
|
sync_counter(0), is_relay_log_recovery(is_slave_recovery),
|
|
save_temporary_tables(0),
|
|
mi(0), inuse_relaylog_list(0), last_inuse_relaylog(0),
|
|
cur_log_old_open_count(0), error_on_rli_init_info(false),
|
|
group_relay_log_pos(0), event_relay_log_pos(0),
|
|
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
|
|
last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
|
|
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
|
|
gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
|
|
slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
|
|
until_log_pos(0), is_until_before_gtids(false),
|
|
retried_trans(0), executed_entries(0),
|
|
last_trans_retry_count(0), sql_delay(0), sql_delay_end(0),
|
|
until_relay_log_names_defer(false),
|
|
m_flags(0)
|
|
{
|
|
DBUG_ENTER("Relay_log_info::Relay_log_info");
|
|
|
|
relay_log.is_relay_log= TRUE;
|
|
relay_log_state.init();
|
|
#ifdef HAVE_PSI_INTERFACE
|
|
relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
|
|
key_RELAYLOG_COND_relay_log_updated,
|
|
key_RELAYLOG_COND_bin_log_updated,
|
|
key_file_relaylog,
|
|
key_file_relaylog_cache,
|
|
key_file_relaylog_index,
|
|
key_file_relaylog_index_cache,
|
|
key_RELAYLOG_COND_queue_busy,
|
|
key_LOCK_relaylog_end_pos);
|
|
#endif
|
|
|
|
group_relay_log_name[0]= event_relay_log_name[0]=
|
|
group_master_log_name[0]= 0;
|
|
until_log_name[0]= ign_master_log_name_end[0]= 0;
|
|
max_relay_log_size= global_system_variables.max_relay_log_size;
|
|
bzero((char*) &info_file, sizeof(info_file));
|
|
bzero((char*) &cache_buf, sizeof(cache_buf));
|
|
bzero(&last_seen_gtid, sizeof(last_seen_gtid));
|
|
mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
|
|
mysql_mutex_init(key_relay_log_info_data_lock,
|
|
&data_lock, MY_MUTEX_INIT_FAST);
|
|
mysql_mutex_init(key_relay_log_info_log_space_lock,
|
|
&log_space_lock, MY_MUTEX_INIT_FAST);
|
|
mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
|
|
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
|
|
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
|
|
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
|
|
relay_log.init_pthread_objects();
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
Relay_log_info::~Relay_log_info()
|
|
{
|
|
DBUG_ENTER("Relay_log_info::~Relay_log_info");
|
|
|
|
reset_inuse_relaylog();
|
|
mysql_mutex_destroy(&run_lock);
|
|
mysql_mutex_destroy(&data_lock);
|
|
mysql_mutex_destroy(&log_space_lock);
|
|
mysql_cond_destroy(&data_cond);
|
|
mysql_cond_destroy(&start_cond);
|
|
mysql_cond_destroy(&stop_cond);
|
|
mysql_cond_destroy(&log_space_cond);
|
|
relay_log.cleanup();
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
/**
|
|
Read the relay_log.info file.
|
|
|
|
@param info_fname The name of the file to read from.
|
|
@retval 0 success
|
|
@retval 1 failure
|
|
*/
|
|
int Relay_log_info::init(const char* info_fname)
|
|
{
|
|
char fname[FN_REFLEN+128];
|
|
const char* msg = 0;
|
|
int error = 0;
|
|
mysql_mutex_t *log_lock;
|
|
DBUG_ENTER("Relay_log_info::init");
|
|
|
|
if (inited) // Set if this function called
|
|
DBUG_RETURN(0);
|
|
|
|
log_lock= relay_log.get_log_lock();
|
|
fn_format(fname, info_fname, mysql_data_home, "", 4+32);
|
|
mysql_mutex_lock(&data_lock);
|
|
cur_log_fd = -1;
|
|
slave_skip_counter=0;
|
|
abort_pos_wait=0;
|
|
log_space_limit= relay_log_space_limit;
|
|
log_space_total= 0;
|
|
|
|
if (unlikely(error_on_rli_init_info))
|
|
goto err;
|
|
|
|
char pattern[FN_REFLEN];
|
|
(void) my_realpath(pattern, slave_load_tmpdir, 0);
|
|
if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
|
|
MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
|
|
{
|
|
mysql_mutex_unlock(&data_lock);
|
|
sql_print_error("Unable to use slave's temporary directory %s",
|
|
slave_load_tmpdir);
|
|
DBUG_RETURN(1);
|
|
}
|
|
unpack_filename(slave_patternload_file, pattern);
|
|
slave_patternload_file_size= strlen(slave_patternload_file);
|
|
|
|
/*
|
|
The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
|
|
Note that the I/O thread flushes it to disk after writing every
|
|
event, in flush_master_info(mi, 1, ?).
|
|
*/
|
|
|
|
{
|
|
/* Reports an error and returns, if the --relay-log's path
|
|
is a directory.*/
|
|
if (opt_relay_logname &&
|
|
opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
|
|
{
|
|
mysql_mutex_unlock(&data_lock);
|
|
sql_print_error("Path '%s' is a directory name, please specify \
|
|
a file name for --relay-log option", opt_relay_logname);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
/* Reports an error and returns, if the --relay-log-index's path
|
|
is a directory.*/
|
|
if (opt_relaylog_index_name &&
|
|
opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
|
|
== FN_LIBCHAR)
|
|
{
|
|
mysql_mutex_unlock(&data_lock);
|
|
sql_print_error("Path '%s' is a directory name, please specify \
|
|
a file name for --relay-log-index option", opt_relaylog_index_name);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
char buf[FN_REFLEN];
|
|
const char *ln;
|
|
static bool name_warning_sent= 0;
|
|
ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
|
|
1, buf);
|
|
/* We send the warning only at startup, not after every RESET SLAVE */
|
|
if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent &&
|
|
!opt_bootstrap)
|
|
{
|
|
/*
|
|
User didn't give us info to name the relay log index file.
|
|
Picking `hostname`-relay-bin.index like we do, causes replication to
|
|
fail if this slave's hostname is changed later. So, we would like to
|
|
instead require a name. But as we don't want to break many existing
|
|
setups, we only give warning, not error.
|
|
*/
|
|
sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
|
|
" so replication "
|
|
"may break when this MariaDB server acts as a "
|
|
"replica and has its hostname changed. Please "
|
|
"use '--log-basename=#' or '--relay-log=%s' to avoid "
|
|
"this problem.", ln);
|
|
name_warning_sent= 1;
|
|
}
|
|
|
|
/* For multimaster, add connection name to relay log filenames */
|
|
char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
|
|
char *buf_relaylog_index_name= opt_relaylog_index_name;
|
|
|
|
create_logfile_name_with_suffix(buf_relay_logname,
|
|
sizeof(buf_relay_logname),
|
|
ln, 1, &mi->cmp_connection_name);
|
|
ln= buf_relay_logname;
|
|
|
|
if (opt_relaylog_index_name)
|
|
{
|
|
buf_relaylog_index_name= buf_relaylog_index_name_buff;
|
|
create_logfile_name_with_suffix(buf_relaylog_index_name_buff,
|
|
sizeof(buf_relaylog_index_name_buff),
|
|
opt_relaylog_index_name, 0,
|
|
&mi->cmp_connection_name);
|
|
}
|
|
|
|
/*
|
|
note, that if open() fails, we'll still have index file open
|
|
but a destructor will take care of that
|
|
*/
|
|
mysql_mutex_lock(log_lock);
|
|
if (relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
|
|
relay_log.open(ln, 0, 0, SEQ_READ_APPEND,
|
|
(ulong)max_relay_log_size, 1, TRUE))
|
|
{
|
|
mysql_mutex_unlock(log_lock);
|
|
mysql_mutex_unlock(&data_lock);
|
|
sql_print_error("Failed when trying to open logs for '%s' in Relay_log_info::init(). Error: %M", ln, my_errno);
|
|
DBUG_RETURN(1);
|
|
}
|
|
mysql_mutex_unlock(log_lock);
|
|
}
|
|
|
|
/* if file does not exist */
|
|
if (access(fname,F_OK))
|
|
{
|
|
/*
|
|
If someone removed the file from underneath our feet, just close
|
|
the old descriptor and re-create the old file
|
|
*/
|
|
if (info_fd >= 0)
|
|
mysql_file_close(info_fd, MYF(MY_WME));
|
|
if ((info_fd= mysql_file_open(key_file_relay_log_info,
|
|
fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
|
|
{
|
|
sql_print_error("Failed to create a new relay log info file ("
|
|
"file '%s', errno %d)", fname, my_errno);
|
|
msg= current_thd->get_stmt_da()->message();
|
|
goto err;
|
|
}
|
|
if (init_io_cache(&info_file, info_fd, LOG_BIN_IO_SIZE, READ_CACHE, 0L,0,
|
|
MYF(MY_WME)))
|
|
{
|
|
sql_print_error("Failed to create a cache on relay log info file '%s'",
|
|
fname);
|
|
msg= current_thd->get_stmt_da()->message();
|
|
goto err;
|
|
}
|
|
|
|
/* Init relay log with first entry in the relay index file */
|
|
if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
|
|
&msg, 0))
|
|
{
|
|
sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
|
|
goto err;
|
|
}
|
|
group_master_log_name[0]= 0;
|
|
group_master_log_pos= 0;
|
|
}
|
|
else // file exists
|
|
{
|
|
if (info_fd >= 0)
|
|
reinit_io_cache(&info_file, READ_CACHE, 0L,0,0);
|
|
else
|
|
{
|
|
int error=0;
|
|
if ((info_fd= mysql_file_open(key_file_relay_log_info,
|
|
fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
|
|
{
|
|
sql_print_error("\
|
|
Failed to open the existing relay log info file '%s' (errno %d)",
|
|
fname, my_errno);
|
|
error= 1;
|
|
}
|
|
else if (init_io_cache(&info_file, info_fd,
|
|
LOG_BIN_IO_SIZE, READ_CACHE, 0L, 0, MYF(MY_WME)))
|
|
{
|
|
sql_print_error("Failed to create a cache on relay log info file '%s'",
|
|
fname);
|
|
error= 1;
|
|
}
|
|
if (unlikely(error))
|
|
{
|
|
if (info_fd >= 0)
|
|
mysql_file_close(info_fd, MYF(0));
|
|
info_fd= -1;
|
|
mysql_mutex_lock(log_lock);
|
|
relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
|
|
mysql_mutex_unlock(log_lock);
|
|
mysql_mutex_unlock(&data_lock);
|
|
DBUG_RETURN(1);
|
|
}
|
|
}
|
|
|
|
int relay_log_pos, master_log_pos, lines;
|
|
char *first_non_digit;
|
|
|
|
/*
|
|
Starting from MySQL 5.6.x, relay-log.info has a new format.
|
|
Now, its first line contains the number of lines in the file.
|
|
By reading this number we can determine which version our master.info
|
|
comes from. We can't simply count the lines in the file, since
|
|
versions before 5.6.x could generate files with more lines than
|
|
needed. If first line doesn't contain a number, or if it
|
|
contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
|
|
then the file is treated like a file from pre-5.6.x version.
|
|
There is no ambiguity when reading an old master.info: before
|
|
5.6.x, the first line contained the binlog's name, which is
|
|
either empty or has an extension (contains a '.'), so can't be
|
|
confused with an integer.
|
|
|
|
So we're just reading first line and trying to figure which
|
|
version is this.
|
|
*/
|
|
|
|
/*
|
|
The first row is temporarily stored in mi->master_log_name, if
|
|
it is line count and not binlog name (new format) it will be
|
|
overwritten by the second row later.
|
|
*/
|
|
if (init_strvar_from_file(group_relay_log_name,
|
|
sizeof(group_relay_log_name),
|
|
&info_file, ""))
|
|
{
|
|
msg="Error reading slave log configuration";
|
|
goto err;
|
|
}
|
|
|
|
lines= strtoul(group_relay_log_name, &first_non_digit, 10);
|
|
|
|
if (group_relay_log_name[0] != '\0' &&
|
|
*first_non_digit == '\0' &&
|
|
lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
|
|
{
|
|
DBUG_PRINT("info", ("relay_log_info file is in new format."));
|
|
/* Seems to be new format => read relay log name from next line */
|
|
if (init_strvar_from_file(group_relay_log_name,
|
|
sizeof(group_relay_log_name),
|
|
&info_file, ""))
|
|
{
|
|
msg="Error reading slave log configuration";
|
|
goto err;
|
|
}
|
|
}
|
|
else
|
|
DBUG_PRINT("info", ("relay_log_info file is in old format."));
|
|
|
|
if (init_intvar_from_file(&relay_log_pos,
|
|
&info_file, BIN_LOG_HEADER_SIZE) ||
|
|
init_strvar_from_file(group_master_log_name,
|
|
sizeof(group_master_log_name),
|
|
&info_file, "") ||
|
|
init_intvar_from_file(&master_log_pos, &info_file, 0) ||
|
|
(lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY &&
|
|
init_intvar_from_file(&sql_delay, &info_file, 0)))
|
|
{
|
|
msg="Error reading slave log configuration";
|
|
goto err;
|
|
}
|
|
|
|
strmake_buf(event_relay_log_name,group_relay_log_name);
|
|
group_relay_log_pos= event_relay_log_pos= relay_log_pos;
|
|
group_master_log_pos= master_log_pos;
|
|
|
|
if (is_relay_log_recovery && init_recovery(mi, &msg))
|
|
goto err;
|
|
|
|
relay_log_state.load(rpl_global_gtid_slave_state);
|
|
if (init_relay_log_pos(this,
|
|
group_relay_log_name,
|
|
group_relay_log_pos,
|
|
0 /* no data lock*/,
|
|
&msg, 0))
|
|
{
|
|
sql_print_error("Failed to open the relay log '%s' (relay_log_pos %llu)",
|
|
group_relay_log_name, group_relay_log_pos);
|
|
goto err;
|
|
}
|
|
}
|
|
|
|
DBUG_PRINT("info", ("my_b_tell(cur_log)=%llu event_relay_log_pos=%llu",
|
|
my_b_tell(cur_log), event_relay_log_pos));
|
|
DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
|
|
DBUG_ASSERT(my_b_tell(cur_log) == event_relay_log_pos);
|
|
|
|
/*
|
|
Now change the cache from READ to WRITE - must do this
|
|
before Relay_log_info::flush()
|
|
*/
|
|
reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1);
|
|
if (unlikely((error= flush())))
|
|
{
|
|
msg= "Failed to flush relay log info file";
|
|
goto err;
|
|
}
|
|
if (count_relay_log_space(this))
|
|
{
|
|
msg="Error counting relay log space";
|
|
goto err;
|
|
}
|
|
inited= 1;
|
|
error_on_rli_init_info= false;
|
|
mysql_mutex_unlock(&data_lock);
|
|
DBUG_RETURN(0);
|
|
|
|
err:
|
|
error_on_rli_init_info= true;
|
|
if (msg)
|
|
sql_print_error("%s", msg);
|
|
end_io_cache(&info_file);
|
|
if (info_fd >= 0)
|
|
mysql_file_close(info_fd, MYF(0));
|
|
info_fd= -1;
|
|
mysql_mutex_lock(log_lock);
|
|
relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
|
|
mysql_mutex_unlock(log_lock);
|
|
mysql_mutex_unlock(&data_lock);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
|
|
static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
|
|
{
|
|
MY_STAT s;
|
|
DBUG_ENTER("add_relay_log");
|
|
if (!mysql_file_stat(key_file_relaylog,
|
|
linfo->log_file_name, &s, MYF(0)))
|
|
{
|
|
sql_print_error("log %s listed in the index, but failed to stat",
|
|
linfo->log_file_name);
|
|
DBUG_RETURN(1);
|
|
}
|
|
rli->log_space_total += s.st_size;
|
|
DBUG_PRINT("info",("log_space_total: %llu", uint64(rli->log_space_total)));
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
|
|
static int count_relay_log_space(Relay_log_info* rli)
|
|
{
|
|
LOG_INFO linfo;
|
|
DBUG_ENTER("count_relay_log_space");
|
|
rli->log_space_total= 0;
|
|
if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
|
|
{
|
|
sql_print_error("Could not find first log while counting relay log space");
|
|
DBUG_RETURN(1);
|
|
}
|
|
do
|
|
{
|
|
if (add_relay_log(rli,&linfo))
|
|
DBUG_RETURN(1);
|
|
} while (!rli->relay_log.find_next_log(&linfo, 1));
|
|
/*
|
|
As we have counted everything, including what may have written in a
|
|
preceding write, we must reset bytes_written, or we may count some space
|
|
twice.
|
|
*/
|
|
rli->relay_log.reset_bytes_written();
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
|
|
/*
|
|
Reset UNTIL condition for Relay_log_info
|
|
|
|
SYNOPSYS
|
|
clear_until_condition()
|
|
rli - Relay_log_info structure where UNTIL condition should be reset
|
|
*/
|
|
|
|
void Relay_log_info::clear_until_condition()
|
|
{
|
|
DBUG_ENTER("clear_until_condition");
|
|
|
|
until_condition= Relay_log_info::UNTIL_NONE;
|
|
until_log_name[0]= 0;
|
|
until_log_pos= 0;
|
|
until_relay_log_names_defer= false;
|
|
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
/*
|
|
Read the correct format description event for starting to replicate from
|
|
a given position in a relay log file.
|
|
*/
|
|
Format_description_log_event *
|
|
read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
|
|
const char **errmsg)
|
|
{
|
|
Log_event *ev;
|
|
Format_description_log_event *fdev;
|
|
bool found= false;
|
|
|
|
fdev= new Format_description_log_event(4);
|
|
|
|
while (!found)
|
|
{
|
|
Log_event_type typ;
|
|
|
|
/*
|
|
Read the possible Format_description_log_event; if position
|
|
was 4, no need, it will be read naturally.
|
|
*/
|
|
DBUG_PRINT("info",("looking for a Format_description_log_event"));
|
|
|
|
if (my_b_tell(cur_log) >= start_pos)
|
|
break;
|
|
|
|
if (!(ev= Log_event::read_log_event(cur_log, fdev,
|
|
opt_slave_sql_verify_checksum)))
|
|
{
|
|
DBUG_PRINT("info",("could not read event, cur_log->error=%d",
|
|
cur_log->error));
|
|
if (cur_log->error) /* not EOF */
|
|
{
|
|
*errmsg= "I/O error reading event at position 4";
|
|
delete fdev;
|
|
return NULL;
|
|
}
|
|
break;
|
|
}
|
|
typ= ev->get_type_code();
|
|
if (typ == FORMAT_DESCRIPTION_EVENT)
|
|
{
|
|
Format_description_log_event *old= fdev;
|
|
DBUG_PRINT("info",("found Format_description_log_event"));
|
|
fdev= (Format_description_log_event*) ev;
|
|
fdev->copy_crypto_data(old);
|
|
delete old;
|
|
|
|
/*
|
|
As ev was returned by read_log_event, it has passed is_valid(), so
|
|
my_malloc() in ctor worked, no need to check again.
|
|
*/
|
|
/*
|
|
Ok, we found a Format_description event. But it is not sure that this
|
|
describes the whole relay log; indeed, one can have this sequence
|
|
(starting from position 4):
|
|
Format_desc (of slave)
|
|
Rotate (of master)
|
|
Format_desc (of master)
|
|
So the Format_desc which really describes the rest of the relay log
|
|
is the 3rd event (it can't be further than that, because we rotate
|
|
the relay log when we queue a Rotate event from the master).
|
|
But what describes the Rotate is the first Format_desc.
|
|
So what we do is:
|
|
go on searching for Format_description events, until you exceed the
|
|
position (argument 'pos') or until you find another event than Rotate
|
|
or Format_desc.
|
|
*/
|
|
}
|
|
else if (typ == START_ENCRYPTION_EVENT)
|
|
{
|
|
if (fdev->start_decryption((Start_encryption_log_event*) ev))
|
|
{
|
|
*errmsg= "Unable to set up decryption of binlog.";
|
|
delete ev;
|
|
delete fdev;
|
|
return NULL;
|
|
}
|
|
delete ev;
|
|
}
|
|
else
|
|
{
|
|
DBUG_PRINT("info",("found event of another type=%d", typ));
|
|
found= (typ != ROTATE_EVENT);
|
|
delete ev;
|
|
}
|
|
}
|
|
return fdev;
|
|
}
|
|
|
|
|
|
/*
|
|
Open the given relay log
|
|
|
|
SYNOPSIS
|
|
init_relay_log_pos()
|
|
rli Relay information (will be initialized)
|
|
log Name of relay log file to read from. NULL = First log
|
|
pos Position in relay log file
|
|
need_data_lock Set to 1 if this functions should do mutex locks
|
|
errmsg Store pointer to error message here
|
|
look_for_description_event
|
|
1 if we should look for such an event. We only need
|
|
this when the SQL thread starts and opens an existing
|
|
relay log and has to execute it (possibly from an
|
|
offset >4); then we need to read the first event of
|
|
the relay log to be able to parse the events we have
|
|
to execute.
|
|
|
|
DESCRIPTION
|
|
- Close old open relay log files.
|
|
- If we are using the same relay log as the running IO-thread, then set
|
|
rli->cur_log to point to the same IO_CACHE entry.
|
|
- If not, open the 'log' binary file.
|
|
|
|
TODO
|
|
- check proper initialization of group_master_log_name/group_master_log_pos
|
|
|
|
RETURN VALUES
|
|
0 ok
|
|
1 error. errmsg is set to point to the error message
|
|
*/
|
|
|
|
int init_relay_log_pos(Relay_log_info* rli,const char* log,
|
|
ulonglong pos, bool need_data_lock,
|
|
const char** errmsg,
|
|
bool look_for_description_event)
|
|
{
|
|
DBUG_ENTER("init_relay_log_pos");
|
|
DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
|
|
|
|
*errmsg=0;
|
|
mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
|
|
|
|
if (need_data_lock)
|
|
mysql_mutex_lock(&rli->data_lock);
|
|
|
|
/*
|
|
Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
|
|
is, too, and init_slave() too; these 2 functions allocate a description
|
|
event in init_relay_log_pos, which is not freed by the terminating SQL slave
|
|
thread as that thread is not started by these functions. So we have to free
|
|
the description_event here, in case, so that there is no memory leak in
|
|
running, say, CHANGE MASTER.
|
|
*/
|
|
delete rli->relay_log.description_event_for_exec;
|
|
rli->relay_log.description_event_for_exec= new Format_description_log_event(4);
|
|
|
|
mysql_mutex_lock(log_lock);
|
|
|
|
/* Close log file and free buffers if it's already open */
|
|
if (rli->cur_log_fd >= 0)
|
|
{
|
|
end_io_cache(&rli->cache_buf);
|
|
mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
|
|
rli->cur_log_fd = -1;
|
|
}
|
|
|
|
rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
|
|
rli->clear_flag(Relay_log_info::IN_STMT);
|
|
rli->clear_flag(Relay_log_info::IN_TRANSACTION);
|
|
|
|
/*
|
|
Test to see if the previous run was with the skip of purging
|
|
If yes, we do not purge when we restart
|
|
*/
|
|
if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
|
|
{
|
|
*errmsg="Could not find first log during relay log initialization";
|
|
goto err;
|
|
}
|
|
|
|
if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
|
|
{
|
|
*errmsg="Could not find target log during relay log initialization";
|
|
goto err;
|
|
}
|
|
strmake_buf(rli->group_relay_log_name,rli->linfo.log_file_name);
|
|
strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name);
|
|
if (rli->relay_log.is_active(rli->linfo.log_file_name))
|
|
{
|
|
/*
|
|
The IO thread is using this log file.
|
|
In this case, we will use the same IO_CACHE pointer to
|
|
read data as the IO thread is using to write data.
|
|
*/
|
|
my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
|
|
if (check_binlog_magic(rli->cur_log,errmsg))
|
|
goto err;
|
|
rli->cur_log_old_open_count=rli->relay_log.get_open_count();
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
Open the relay log and set rli->cur_log to point at this one
|
|
*/
|
|
if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
|
|
rli->linfo.log_file_name,errmsg)) < 0)
|
|
goto err;
|
|
rli->cur_log = &rli->cache_buf;
|
|
}
|
|
/*
|
|
In all cases, check_binlog_magic() has been called so we're at offset 4 for
|
|
sure.
|
|
*/
|
|
if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
|
|
{
|
|
if (look_for_description_event)
|
|
{
|
|
Format_description_log_event *fdev;
|
|
if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg)))
|
|
goto err;
|
|
delete rli->relay_log.description_event_for_exec;
|
|
rli->relay_log.description_event_for_exec= fdev;
|
|
}
|
|
my_b_seek(rli->cur_log,(off_t)pos);
|
|
DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%llu rli->event_relay_log_pos=%llu",
|
|
my_b_tell(rli->cur_log), rli->event_relay_log_pos));
|
|
|
|
}
|
|
|
|
err:
|
|
/*
|
|
If we don't purge, we can't honour relay_log_space_limit ;
|
|
silently discard it
|
|
*/
|
|
if (!relay_log_purge)
|
|
rli->log_space_limit= 0;
|
|
mysql_cond_broadcast(&rli->data_cond);
|
|
|
|
mysql_mutex_unlock(log_lock);
|
|
|
|
if (need_data_lock)
|
|
mysql_mutex_unlock(&rli->data_lock);
|
|
if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
|
|
*errmsg= "Invalid Format_description log event; could be out of memory";
|
|
|
|
DBUG_PRINT("info", ("Returning %d from init_relay_log_pos", (*errmsg)?1:0));
|
|
|
|
DBUG_RETURN ((*errmsg) ? 1 : 0);
|
|
}
|
|
|
|
|
|
/*
|
|
Waits until the SQL thread reaches (has executed up to) the
|
|
log/position or timed out.
|
|
|
|
SYNOPSIS
|
|
wait_for_pos()
|
|
thd client thread that sent SELECT MASTER_POS_WAIT
|
|
log_name log name to wait for
|
|
log_pos position to wait for
|
|
timeout timeout in seconds before giving up waiting
|
|
|
|
NOTES
|
|
timeout is longlong whereas it should be ulong ; but this is
|
|
to catch if the user submitted a negative timeout.
|
|
|
|
RETURN VALUES
|
|
-2 improper arguments (log_pos<0)
|
|
or slave not running, or master info changed
|
|
during the function's execution,
|
|
or client thread killed. -2 is translated to NULL by caller
|
|
-1 timed out
|
|
>=0 number of log events the function had to wait
|
|
before reaching the desired log/position
|
|
*/
|
|
|
|
int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
|
|
longlong log_pos,
|
|
longlong timeout)
|
|
{
|
|
int event_count = 0;
|
|
ulong init_abort_pos_wait;
|
|
int error=0;
|
|
struct timespec abstime; // for timeout checking
|
|
PSI_stage_info old_stage;
|
|
DBUG_ENTER("Relay_log_info::wait_for_pos");
|
|
|
|
if (!inited)
|
|
DBUG_RETURN(-2);
|
|
|
|
DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
|
|
log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
|
|
|
|
set_timespec(abstime,timeout);
|
|
mysql_mutex_lock(&data_lock);
|
|
thd->ENTER_COND(&data_cond, &data_lock,
|
|
&stage_waiting_for_the_slave_thread_to_advance_position,
|
|
&old_stage);
|
|
/*
|
|
This function will abort when it notices that some CHANGE MASTER or
|
|
RESET MASTER has changed the master info.
|
|
To catch this, these commands modify abort_pos_wait ; We just monitor
|
|
abort_pos_wait and see if it has changed.
|
|
Why do we have this mechanism instead of simply monitoring slave_running
|
|
in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
|
|
the SQL thread be stopped?
|
|
This is becasue if someones does:
|
|
STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
|
|
the change may happen very quickly and we may not notice that
|
|
slave_running briefly switches between 1/0/1.
|
|
*/
|
|
init_abort_pos_wait= abort_pos_wait;
|
|
|
|
/*
|
|
We'll need to
|
|
handle all possible log names comparisons (e.g. 999 vs 1000).
|
|
We use ulong for string->number conversion ; this is no
|
|
stronger limitation than in find_uniq_filename in sql/log.cc
|
|
*/
|
|
ulong log_name_extension;
|
|
char log_name_tmp[FN_REFLEN]; //make a char[] from String
|
|
|
|
strmake(log_name_tmp, log_name->ptr(), MY_MIN(log_name->length(), FN_REFLEN-1));
|
|
|
|
char *p= fn_ext(log_name_tmp);
|
|
char *p_end;
|
|
if (!*p || log_pos<0)
|
|
{
|
|
error= -2; //means improper arguments
|
|
goto err;
|
|
}
|
|
// Convert 0-3 to 4
|
|
log_pos= MY_MAX(log_pos, BIN_LOG_HEADER_SIZE);
|
|
/* p points to '.' */
|
|
log_name_extension= strtoul(++p, &p_end, 10);
|
|
/*
|
|
p_end points to the first invalid character.
|
|
If it equals to p, no digits were found, error.
|
|
If it contains '\0' it means conversion went ok.
|
|
*/
|
|
if (p_end==p || *p_end)
|
|
{
|
|
error= -2;
|
|
goto err;
|
|
}
|
|
|
|
/* The "compare and wait" main loop */
|
|
while (!thd->killed &&
|
|
init_abort_pos_wait == abort_pos_wait &&
|
|
slave_running)
|
|
{
|
|
bool pos_reached;
|
|
int cmp_result= 0;
|
|
|
|
DBUG_PRINT("info",
|
|
("init_abort_pos_wait: %ld abort_pos_wait: %ld",
|
|
init_abort_pos_wait, abort_pos_wait));
|
|
DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu",
|
|
group_master_log_name, (ulong) group_master_log_pos));
|
|
|
|
/*
|
|
group_master_log_name can be "", if we are just after a fresh
|
|
replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
|
|
(before we have executed one Rotate event from the master) or
|
|
(rare) if the user is doing a weird slave setup (see next
|
|
paragraph). If group_master_log_name is "", we assume we don't
|
|
have enough info to do the comparison yet, so we just wait until
|
|
more data. In this case master_log_pos is always 0 except if
|
|
somebody (wrongly) sets this slave to be a slave of itself
|
|
without using --replicate-same-server-id (an unsupported
|
|
configuration which does nothing), then group_master_log_pos
|
|
will grow and group_master_log_name will stay "".
|
|
*/
|
|
if (*group_master_log_name)
|
|
{
|
|
char *basename= (group_master_log_name +
|
|
dirname_length(group_master_log_name));
|
|
/*
|
|
First compare the parts before the extension.
|
|
Find the dot in the master's log basename,
|
|
and protect against user's input error :
|
|
if the names do not match up to '.' included, return error
|
|
*/
|
|
char *q= (char*)(fn_ext(basename)+1);
|
|
if (strncmp(basename, log_name_tmp, (int)(q-basename)))
|
|
{
|
|
error= -2;
|
|
break;
|
|
}
|
|
// Now compare extensions.
|
|
char *q_end;
|
|
ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
|
|
if (group_master_log_name_extension < log_name_extension)
|
|
cmp_result= -1 ;
|
|
else
|
|
cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
|
|
|
|
pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
|
|
cmp_result > 0);
|
|
if (pos_reached || thd->killed)
|
|
break;
|
|
}
|
|
|
|
//wait for master update, with optional timeout.
|
|
|
|
DBUG_PRINT("info",("Waiting for master update"));
|
|
/*
|
|
We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
|
|
will wake us up.
|
|
*/
|
|
thd_wait_begin(thd, THD_WAIT_BINLOG);
|
|
if (timeout > 0)
|
|
{
|
|
/*
|
|
Note that mysql_cond_timedwait checks for the timeout
|
|
before for the condition ; i.e. it returns ETIMEDOUT
|
|
if the system time equals or exceeds the time specified by abstime
|
|
before the condition variable is signaled or broadcast, _or_ if
|
|
the absolute time specified by abstime has already passed at the time
|
|
of the call.
|
|
For that reason, mysql_cond_timedwait will do the "timeoutting" job
|
|
even if its condition is always immediately signaled (case of a loaded
|
|
master).
|
|
*/
|
|
error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
|
|
}
|
|
else
|
|
mysql_cond_wait(&data_cond, &data_lock);
|
|
thd_wait_end(thd);
|
|
DBUG_PRINT("info",("Got signal of master update or timed out"));
|
|
if (error == ETIMEDOUT || error == ETIME)
|
|
{
|
|
error= -1;
|
|
break;
|
|
}
|
|
error=0;
|
|
event_count++;
|
|
DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
|
|
}
|
|
|
|
err:
|
|
thd->EXIT_COND(&old_stage);
|
|
DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
|
|
improper_arguments: %d timed_out: %d",
|
|
thd->killed_errno(),
|
|
(int) (init_abort_pos_wait != abort_pos_wait),
|
|
(int) slave_running,
|
|
(int) (error == -2),
|
|
(int) (error == -1)));
|
|
if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
|
|
!slave_running)
|
|
{
|
|
error= -2;
|
|
}
|
|
DBUG_RETURN( error ? error : event_count );
|
|
}
|
|
|
|
|
|
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
|
|
rpl_group_info *rgi,
|
|
bool skip_lock)
|
|
{
|
|
DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
|
|
|
|
if (skip_lock)
|
|
mysql_mutex_assert_owner(&data_lock);
|
|
else
|
|
mysql_mutex_lock(&data_lock);
|
|
|
|
rgi->inc_event_relay_log_pos();
|
|
DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
|
|
(long) log_pos, (long) group_master_log_pos));
|
|
if (rgi->is_parallel_exec)
|
|
{
|
|
/* In case of parallel replication, do not update the position backwards. */
|
|
int cmp= compare_log_name(group_relay_log_name, rgi->event_relay_log_name);
|
|
if (cmp < 0)
|
|
{
|
|
group_relay_log_pos= rgi->future_event_relay_log_pos;
|
|
strmake_buf(group_relay_log_name, rgi->event_relay_log_name);
|
|
} else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
|
|
group_relay_log_pos= rgi->future_event_relay_log_pos;
|
|
|
|
/*
|
|
In the parallel case we need to update the master_log_name here, rather
|
|
than in Rotate_log_event::do_update_pos().
|
|
*/
|
|
cmp= compare_log_name(group_master_log_name, rgi->future_event_master_log_name);
|
|
if (cmp <= 0)
|
|
{
|
|
if (cmp < 0)
|
|
{
|
|
safe_strcpy(group_master_log_name, sizeof(group_master_log_name),
|
|
rgi->future_event_master_log_name);
|
|
group_master_log_pos= log_pos;
|
|
}
|
|
else if (group_master_log_pos < log_pos)
|
|
group_master_log_pos= log_pos;
|
|
}
|
|
|
|
/*
|
|
In the parallel case, we only update the Seconds_Behind_Master at the
|
|
end of a transaction. In the non-parallel case, the value is updated as
|
|
soon as an event is read from the relay log; however this would be too
|
|
confusing for the user, seeing the slave reported as up-to-date when
|
|
potentially thousands of events are still queued up for worker threads
|
|
waiting for execution.
|
|
*/
|
|
if (rgi->last_master_timestamp &&
|
|
rgi->last_master_timestamp > last_master_timestamp)
|
|
last_master_timestamp= rgi->last_master_timestamp;
|
|
}
|
|
else
|
|
{
|
|
/* Non-parallel case. */
|
|
group_relay_log_pos= event_relay_log_pos;
|
|
strmake_buf(group_relay_log_name, event_relay_log_name);
|
|
notify_group_relay_log_name_update();
|
|
if (log_pos) // not 3.23 binlogs (no log_pos there) and not Stop_log_event
|
|
group_master_log_pos= log_pos;
|
|
}
|
|
|
|
/*
|
|
If the slave does not support transactions and replicates a transaction,
|
|
users should not trust group_master_log_pos (which they can display with
|
|
SHOW SLAVE STATUS or read from relay-log.info), because to compute
|
|
group_master_log_pos the slave relies on log_pos stored in the master's
|
|
binlog, but if we are in a master's transaction these positions are always
|
|
the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
|
|
not advance as it should on the non-transactional slave (it advances by
|
|
big leaps, whereas it should advance by small leaps).
|
|
*/
|
|
/*
|
|
In 4.x we used the event's len to compute the positions here. This is
|
|
wrong if the event was 3.23/4.0 and has been converted to 5.0, because
|
|
then the event's len is not what is was in the master's binlog, so this
|
|
will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
|
|
replication: Exec_master_log_pos is wrong). Only way to solve this is to
|
|
have the original offset of the end of the event the relay log. This is
|
|
what we do in 5.0: log_pos has become "end_log_pos" (because the real use
|
|
of log_pos in 4.0 was to compute the end_log_pos; so better to store
|
|
end_log_pos instead of begin_log_pos.
|
|
If we had not done this fix here, the problem would also have appeared
|
|
when the slave and master are 5.0 but with different event length (for
|
|
example the slave is more recent than the master and features the event
|
|
UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
|
|
SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
|
|
value which would lead to badly broken replication.
|
|
Even the relay_log_pos will be corrupted in this case, because the len is
|
|
the relay log is not "val".
|
|
With the end_log_pos solution, we avoid computations involving lengthes.
|
|
*/
|
|
mysql_cond_broadcast(&data_cond);
|
|
if (!skip_lock)
|
|
mysql_mutex_unlock(&data_lock);
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
void Relay_log_info::close_temporary_tables()
|
|
{
|
|
DBUG_ENTER("Relay_log_info::close_temporary_tables");
|
|
|
|
TMP_TABLE_SHARE *share;
|
|
TABLE *table;
|
|
|
|
if (!save_temporary_tables)
|
|
{
|
|
/* There are no temporary tables. */
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
while ((share= save_temporary_tables->pop_front()))
|
|
{
|
|
/*
|
|
Iterate over the list of tables for this TABLE_SHARE and close them.
|
|
*/
|
|
while ((table= share->all_tmp_tables.pop_front()))
|
|
{
|
|
DBUG_PRINT("tmptable", ("closing table: '%s'.'%s'",
|
|
table->s->db.str, table->s->table_name.str));
|
|
|
|
/* Reset in_use as the table may have been created by another thd */
|
|
table->in_use= 0;
|
|
/*
|
|
Lets not free TABLE_SHARE here as there could be multiple TABLEs opened
|
|
for the same table (TABLE_SHARE).
|
|
*/
|
|
closefrm(table);
|
|
my_free(table);
|
|
}
|
|
|
|
/*
|
|
Don't ask for disk deletion. For now, anyway they will be deleted when
|
|
slave restarts, but it is a better intention to not delete them.
|
|
*/
|
|
|
|
free_table_share(share);
|
|
my_free(share);
|
|
}
|
|
|
|
/* By now, there mustn't be any elements left in the list. */
|
|
DBUG_ASSERT(save_temporary_tables->is_empty());
|
|
|
|
my_free(save_temporary_tables);
|
|
save_temporary_tables= NULL;
|
|
slave_open_temp_tables= 0;
|
|
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
/*
|
|
purge_relay_logs()
|
|
|
|
@param rli Relay log information
|
|
@param thd thread id. May be zero during startup
|
|
|
|
NOTES
|
|
Assumes to have a run lock on rli and that no slave thread are running.
|
|
*/
|
|
|
|
int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
|
|
const char** errmsg)
|
|
{
|
|
int error=0;
|
|
const char *ln;
|
|
char name_buf[FN_REFLEN];
|
|
DBUG_ENTER("purge_relay_logs");
|
|
|
|
/*
|
|
Even if rli->inited==0, we still try to empty rli->master_log_* variables.
|
|
Indeed, rli->inited==0 does not imply that they already are empty.
|
|
It could be that slave's info initialization partly succeeded :
|
|
for example if relay-log.info existed but *relay-bin*.*
|
|
have been manually removed, Relay_log_info::init() reads the old
|
|
relay-log.info and fills rli->master_log_*, then Relay_log_info::init()
|
|
checks for the existence of the relay log, this fails and
|
|
Relay_log_info::init() leaves rli->inited to 0.
|
|
In that pathological case, rli->master_log_pos* will be properly reinited
|
|
at the next START SLAVE (as RESET SLAVE or CHANGE
|
|
MASTER, the callers of purge_relay_logs, will delete bogus *.info files
|
|
or replace them with correct files), however if the user does SHOW SLAVE
|
|
STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
|
|
In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
|
|
to display fine in any case.
|
|
*/
|
|
|
|
rli->group_master_log_name[0]= 0;
|
|
rli->group_master_log_pos= 0;
|
|
|
|
if (!rli->inited)
|
|
{
|
|
DBUG_PRINT("info", ("rli->inited == 0"));
|
|
if (rli->error_on_rli_init_info)
|
|
{
|
|
ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
|
|
1, name_buf);
|
|
|
|
if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
|
|
{
|
|
sql_print_error("Unable to purge relay log files. Failed to open relay "
|
|
"log index file:%s.", rli->relay_log.get_index_fname());
|
|
DBUG_RETURN(1);
|
|
}
|
|
mysql_mutex_lock(rli->relay_log.get_log_lock());
|
|
if (rli->relay_log.open(ln, 0, 0, SEQ_READ_APPEND,
|
|
(ulong)(rli->max_relay_log_size ? rli->max_relay_log_size :
|
|
max_binlog_size), 1, TRUE))
|
|
{
|
|
sql_print_error("Unable to purge relay log files. Failed to open relay "
|
|
"log file:%s.", rli->relay_log.get_log_fname());
|
|
mysql_mutex_unlock(rli->relay_log.get_log_lock());
|
|
DBUG_RETURN(1);
|
|
}
|
|
mysql_mutex_unlock(rli->relay_log.get_log_lock());
|
|
}
|
|
else
|
|
DBUG_RETURN(0);
|
|
}
|
|
else
|
|
{
|
|
DBUG_ASSERT(rli->slave_running == 0);
|
|
DBUG_ASSERT(rli->mi->slave_running == 0);
|
|
}
|
|
mysql_mutex_lock(&rli->data_lock);
|
|
|
|
/*
|
|
we close the relay log fd possibly left open by the slave SQL thread,
|
|
to be able to delete it; the relay log fd possibly left open by the slave
|
|
I/O thread will be closed naturally in reset_logs() by the
|
|
close(LOG_CLOSE_TO_BE_OPENED) call
|
|
*/
|
|
if (rli->cur_log_fd >= 0)
|
|
{
|
|
end_io_cache(&rli->cache_buf);
|
|
mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
|
|
rli->cur_log_fd= -1;
|
|
}
|
|
|
|
if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0, 0))
|
|
{
|
|
*errmsg = "Failed during log reset";
|
|
error=1;
|
|
goto err;
|
|
}
|
|
rli->relay_log_state.load(rpl_global_gtid_slave_state);
|
|
if (!just_reset)
|
|
{
|
|
/* Save name of used relay log file */
|
|
strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname());
|
|
strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname());
|
|
rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
|
|
rli->log_space_total= 0;
|
|
|
|
if (count_relay_log_space(rli))
|
|
{
|
|
*errmsg= "Error counting relay log space";
|
|
error=1;
|
|
goto err;
|
|
}
|
|
error= init_relay_log_pos(rli, rli->group_relay_log_name,
|
|
rli->group_relay_log_pos,
|
|
0 /* do not need data lock */, errmsg, 0);
|
|
}
|
|
else
|
|
{
|
|
/* Ensure relay log names are not used */
|
|
rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0;
|
|
}
|
|
|
|
if (!rli->inited && rli->error_on_rli_init_info)
|
|
{
|
|
mysql_mutex_lock(rli->relay_log.get_log_lock());
|
|
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
|
|
mysql_mutex_unlock(rli->relay_log.get_log_lock());
|
|
}
|
|
err:
|
|
DBUG_PRINT("info",("log_space_total: %llu", uint64(rli->log_space_total)));
|
|
mysql_mutex_unlock(&rli->data_lock);
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
|
|
/*
|
|
Check if condition stated in UNTIL clause of START SLAVE is reached.
|
|
SYNOPSYS
|
|
Relay_log_info::is_until_satisfied()
|
|
master_beg_pos position of the beginning of to be executed event
|
|
(not log_pos member of the event that points to the
|
|
beginning of the following event)
|
|
|
|
|
|
DESCRIPTION
|
|
Checks if UNTIL condition is reached. Uses caching result of last
|
|
comparison of current log file name and target log file name. So cached
|
|
value should be invalidated if current log file name changes
|
|
(see Relay_log_info::notify_... functions).
|
|
|
|
This caching is needed to avoid of expensive string comparisons and
|
|
strtol() conversions needed for log names comparison. We don't need to
|
|
compare them each time this function is called, we only need to do this
|
|
when current log name changes. If we have UNTIL_MASTER_POS condition we
|
|
need to do this only after Rotate_log_event::do_apply_event() (which is
|
|
rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
|
|
condition then we should invalidate cached comarison value after
|
|
inc_group_relay_log_pos() which called for each group of events (so we
|
|
have some benefit if we have something like queries that use
|
|
autoincrement or if we have transactions).
|
|
|
|
Should be called ONLY if until_condition != UNTIL_NONE !
|
|
|
|
In the parallel execution mode and UNTIL_MASTER_POS the file name is
|
|
presented by future_event_master_log_name which may be ahead of
|
|
group_master_log_name. Log_event::log_pos does relate to it nevertheless
|
|
so the pair comprises a correct binlog coordinate.
|
|
Internal group events and events that have zero log_pos also
|
|
produce the zero for the local log_pos which may not lead to the
|
|
function falsely return true.
|
|
In UNTIL_RELAY_POS the original caching and notification are simplified
|
|
to straightforward files comparison when the current event can't be
|
|
a part of an event group.
|
|
|
|
RETURN VALUE
|
|
true - condition met or error happened (condition seems to have
|
|
bad log file name)
|
|
false - condition not met
|
|
*/
|
|
|
|
bool Relay_log_info::is_until_satisfied(Log_event *ev)
|
|
{
|
|
const char *log_name;
|
|
ulonglong log_pos;
|
|
/* Prevents stopping within transaction; needed solely for Relay UNTIL. */
|
|
bool in_trans= false;
|
|
|
|
DBUG_ENTER("Relay_log_info::is_until_satisfied");
|
|
|
|
if (until_condition == UNTIL_MASTER_POS)
|
|
{
|
|
log_name= (mi->using_parallel() ? future_event_master_log_name
|
|
: group_master_log_name);
|
|
log_pos= (get_flag(Relay_log_info::IN_TRANSACTION) || !ev || !ev->log_pos) ?
|
|
(mi->using_parallel() ? 0 : group_master_log_pos) :
|
|
ev->log_pos - ev->data_written;
|
|
}
|
|
else
|
|
{
|
|
DBUG_ASSERT(until_condition == UNTIL_RELAY_POS);
|
|
if (!mi->using_parallel())
|
|
{
|
|
log_name= group_relay_log_name;
|
|
log_pos= group_relay_log_pos;
|
|
}
|
|
else
|
|
{
|
|
log_name= event_relay_log_name;
|
|
log_pos= event_relay_log_pos;
|
|
in_trans= get_flag(Relay_log_info::IN_TRANSACTION);
|
|
/*
|
|
until_log_names_cmp_result is set to UNKNOWN either
|
|
- by a non-group event *and* only when it is in the middle of a group
|
|
- or by a group event when the preceding group made the above
|
|
non-group event to defer the resetting.
|
|
*/
|
|
if ((ev && !Log_event::is_group_event(ev->get_type_code())))
|
|
{
|
|
if (in_trans)
|
|
{
|
|
until_relay_log_names_defer= true;
|
|
}
|
|
else
|
|
{
|
|
until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
|
|
until_relay_log_names_defer= false;
|
|
}
|
|
}
|
|
else if (!in_trans && until_relay_log_names_defer)
|
|
{
|
|
until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
|
|
until_relay_log_names_defer= false;
|
|
}
|
|
}
|
|
}
|
|
|
|
DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%llu",
|
|
group_master_log_name, group_master_log_pos));
|
|
DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%llu",
|
|
group_relay_log_name, group_relay_log_pos));
|
|
DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%llu",
|
|
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
|
|
log_name, log_pos));
|
|
DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%llu",
|
|
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
|
|
until_log_name, until_log_pos));
|
|
|
|
if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
|
|
{
|
|
/*
|
|
We have no cached comparison results so we should compare log names
|
|
and cache result.
|
|
If we are after RESET SLAVE, and the SQL slave thread has not processed
|
|
any event yet, it could be that group_master_log_name is "". In that case,
|
|
just wait for more events (as there is no sensible comparison to do).
|
|
*/
|
|
|
|
if (*log_name)
|
|
{
|
|
const char *basename= log_name + dirname_length(log_name);
|
|
|
|
const char *q= (const char*)(fn_ext(basename)+1);
|
|
if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
|
|
{
|
|
/* Now compare extensions. */
|
|
char *q_end;
|
|
ulong log_name_extension= strtoul(q, &q_end, 10);
|
|
if (log_name_extension < until_log_name_extension)
|
|
until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
|
|
else
|
|
until_log_names_cmp_result=
|
|
(log_name_extension > until_log_name_extension) ?
|
|
UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
|
|
}
|
|
else
|
|
{
|
|
/* Probably error so we aborting */
|
|
sql_print_error("Slave SQL thread is stopped because UNTIL "
|
|
"condition is bad.");
|
|
DBUG_RETURN(TRUE);
|
|
}
|
|
}
|
|
else
|
|
DBUG_RETURN(until_log_pos == 0);
|
|
}
|
|
|
|
DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
|
|
(log_pos >= until_log_pos && !in_trans)) ||
|
|
until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
|
|
}
|
|
|
|
|
|
bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
|
|
rpl_group_info *rgi)
|
|
{
|
|
int error= 0;
|
|
DBUG_ENTER("Relay_log_info::stmt_done");
|
|
|
|
DBUG_ASSERT(!belongs_to_client());
|
|
DBUG_ASSERT(rgi->rli == this);
|
|
/*
|
|
If in a transaction, and if the slave supports transactions, just
|
|
inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
|
|
(not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
|
|
BEGIN/COMMIT, not with SET AUTOCOMMIT= .
|
|
|
|
We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
|
|
is also used for single row transactions.
|
|
|
|
CAUTION: opt_using_transactions means innodb || bdb ; suppose the
|
|
master supports InnoDB and BDB, but the slave supports only BDB,
|
|
problems will arise: - suppose an InnoDB table is created on the
|
|
master, - then it will be MyISAM on the slave - but as
|
|
opt_using_transactions is true, the slave will believe he is
|
|
transactional with the MyISAM table. And problems will come when
|
|
one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
|
|
resume at BEGIN whereas there has not been any rollback). This is
|
|
the problem of using opt_using_transactions instead of a finer
|
|
"does the slave support _transactional handler used on the
|
|
master_".
|
|
|
|
More generally, we'll have problems when a query mixes a
|
|
transactional handler and MyISAM and STOP SLAVE is issued in the
|
|
middle of the "transaction". START SLAVE will resume at BEGIN
|
|
while the MyISAM table has already been updated.
|
|
*/
|
|
if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
|
|
opt_using_transactions)
|
|
rgi->inc_event_relay_log_pos();
|
|
else
|
|
{
|
|
inc_group_relay_log_pos(event_master_log_pos, rgi);
|
|
if (rpl_global_gtid_slave_state->record_and_update_gtid(thd, rgi))
|
|
{
|
|
report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
|
|
"Failed to update GTID state in %s.%s, slave state may become "
|
|
"inconsistent: %d: %s",
|
|
"mysql", rpl_gtid_slave_state_table_name.str,
|
|
thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message());
|
|
/*
|
|
At this point we are not in a transaction (for example after DDL),
|
|
so we can not roll back. Anyway, normally updates to the slave
|
|
state table should not fail, and if they do, at least we made the
|
|
DBA aware of the problem in the error log.
|
|
*/
|
|
}
|
|
DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
|
|
if (mi->using_gtid == Master_info::USE_GTID_NO)
|
|
{
|
|
if (rgi->is_parallel_exec)
|
|
mysql_mutex_lock(&data_lock);
|
|
if (flush())
|
|
error= 1;
|
|
if (rgi->is_parallel_exec)
|
|
mysql_mutex_unlock(&data_lock);
|
|
}
|
|
DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
|
|
}
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
|
|
int
|
|
Relay_log_info::alloc_inuse_relaylog(const char *name)
|
|
{
|
|
inuse_relaylog *ir;
|
|
uint32 gtid_count;
|
|
rpl_gtid *gtid_list;
|
|
|
|
gtid_count= relay_log_state.count();
|
|
if (!(gtid_list= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME,
|
|
sizeof(*gtid_list)*gtid_count, MYF(MY_WME))))
|
|
{
|
|
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count);
|
|
return 1;
|
|
}
|
|
if (!(ir= new inuse_relaylog(this, gtid_list, gtid_count, name)))
|
|
{
|
|
my_free(gtid_list);
|
|
my_error(ER_OUTOFMEMORY, MYF(0), (int) sizeof(*ir));
|
|
return 1;
|
|
}
|
|
if (relay_log_state.get_gtid_list(gtid_list, gtid_count))
|
|
{
|
|
my_free(gtid_list);
|
|
delete ir;
|
|
DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */);
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
return 1;
|
|
}
|
|
|
|
if (!inuse_relaylog_list)
|
|
inuse_relaylog_list= ir;
|
|
else
|
|
{
|
|
last_inuse_relaylog->completed= true;
|
|
last_inuse_relaylog->next= ir;
|
|
}
|
|
last_inuse_relaylog= ir;
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
void
|
|
Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir)
|
|
{
|
|
my_free(ir->relay_log_state);
|
|
delete ir;
|
|
}
|
|
|
|
|
|
void
|
|
Relay_log_info::reset_inuse_relaylog()
|
|
{
|
|
inuse_relaylog *cur= inuse_relaylog_list;
|
|
while (cur)
|
|
{
|
|
DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
|
|
inuse_relaylog *next= cur->next;
|
|
free_inuse_relaylog(cur);
|
|
cur= next;
|
|
}
|
|
inuse_relaylog_list= last_inuse_relaylog= NULL;
|
|
}
|
|
|
|
|
|
int
|
|
Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
|
|
{
|
|
int res= 0;
|
|
while (count)
|
|
{
|
|
if (relay_log_state.update_nolock(gtid_list))
|
|
res= 1;
|
|
++gtid_list;
|
|
--count;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
|
|
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
|
|
struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; };
|
|
|
|
static int
|
|
scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
|
|
LEX_CSTRING *tablename, void **out_hton)
|
|
{
|
|
TABLE_LIST tlist;
|
|
TABLE *UNINIT_VAR(table);
|
|
bool table_opened= false;
|
|
bool table_scanned= false;
|
|
struct gtid_pos_element tmp_entry, *entry;
|
|
int err= 0;
|
|
|
|
thd->reset_for_next_command();
|
|
tlist.init_one_table(&MYSQL_SCHEMA_NAME, tablename, NULL, TL_READ);
|
|
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
|
|
goto end;
|
|
table_opened= true;
|
|
table= tlist.table;
|
|
|
|
if ((err= gtid_check_rpl_slave_state_table(table)))
|
|
goto end;
|
|
|
|
bitmap_set_all(table->read_set);
|
|
if (unlikely(err= table->file->ha_rnd_init_with_error(1)))
|
|
goto end;
|
|
|
|
table_scanned= true;
|
|
for (;;)
|
|
{
|
|
uint32 domain_id, server_id;
|
|
uint64 sub_id, seq_no;
|
|
uchar *rec;
|
|
|
|
if ((err= table->file->ha_rnd_next(table->record[0])))
|
|
{
|
|
if (err == HA_ERR_END_OF_FILE)
|
|
break;
|
|
else
|
|
{
|
|
table->file->print_error(err, MYF(0));
|
|
goto end;
|
|
}
|
|
}
|
|
domain_id= (uint32)table->field[0]->val_int();
|
|
sub_id= (ulonglong)table->field[1]->val_int();
|
|
server_id= (uint32)table->field[2]->val_int();
|
|
seq_no= (ulonglong)table->field[3]->val_int();
|
|
DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu",
|
|
(unsigned)domain_id, (unsigned)server_id,
|
|
(ulong)seq_no, (ulong)sub_id));
|
|
|
|
tmp_entry.sub_id= sub_id;
|
|
tmp_entry.gtid.domain_id= domain_id;
|
|
tmp_entry.gtid.server_id= server_id;
|
|
tmp_entry.gtid.seq_no= seq_no;
|
|
tmp_entry.hton= table->s->db_type();
|
|
if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
|
|
{
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
goto end;
|
|
}
|
|
|
|
if ((rec= my_hash_search(hash, (const uchar *)&domain_id,
|
|
sizeof(domain_id))))
|
|
{
|
|
entry= (struct gtid_pos_element *)rec;
|
|
if (entry->sub_id >= sub_id)
|
|
continue;
|
|
entry->sub_id= sub_id;
|
|
DBUG_ASSERT(entry->gtid.domain_id == domain_id);
|
|
entry->gtid.server_id= server_id;
|
|
entry->gtid.seq_no= seq_no;
|
|
entry->hton= table->s->db_type();
|
|
}
|
|
else
|
|
{
|
|
if (!(entry= (struct gtid_pos_element *)my_malloc(PSI_INSTRUMENT_ME,
|
|
sizeof(*entry), MYF(MY_WME))))
|
|
{
|
|
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
|
|
err= 1;
|
|
goto end;
|
|
}
|
|
entry->sub_id= sub_id;
|
|
entry->gtid.domain_id= domain_id;
|
|
entry->gtid.server_id= server_id;
|
|
entry->gtid.seq_no= seq_no;
|
|
entry->hton= table->s->db_type();
|
|
if ((err= my_hash_insert(hash, (uchar *)entry)))
|
|
{
|
|
my_free(entry);
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
goto end;
|
|
}
|
|
}
|
|
}
|
|
err= 0; /* Clear HA_ERR_END_OF_FILE */
|
|
|
|
end:
|
|
if (table_scanned)
|
|
{
|
|
table->file->ha_index_or_rnd_end();
|
|
ha_commit_trans(thd, FALSE);
|
|
trans_commit(thd);
|
|
}
|
|
if (table_opened)
|
|
{
|
|
*out_hton= table->s->db_type();
|
|
close_thread_tables(thd);
|
|
thd->release_transactional_locks();
|
|
}
|
|
return err;
|
|
}
|
|
|
|
|
|
/*
|
|
Look for all tables mysql.gtid_slave_pos*. Read all rows from each such
|
|
table found into ARRAY. For each domain id, put the row with highest sub_id
|
|
into HASH.
|
|
*/
|
|
static int
|
|
scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_CSTRING *, void *),
|
|
void *cb_data)
|
|
{
|
|
char path[FN_REFLEN];
|
|
MY_DIR *dirp;
|
|
|
|
thd->reset_for_next_command();
|
|
if (lock_schema_name(thd, Lex_ident_db_normalized(MYSQL_SCHEMA_NAME)))
|
|
return 1;
|
|
|
|
build_table_filename(path, sizeof(path) - 1, MYSQL_SCHEMA_NAME.str, "", "", 0);
|
|
if (!(dirp= my_dir(path, MYF(MY_DONT_SORT))))
|
|
{
|
|
my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno);
|
|
close_thread_tables(thd);
|
|
thd->release_transactional_locks();
|
|
return 1;
|
|
}
|
|
else
|
|
{
|
|
size_t i;
|
|
Dynamic_array<LEX_CSTRING*> files(PSI_INSTRUMENT_MEM,
|
|
dirp->number_of_files);
|
|
Discovered_table_list tl(thd, &files);
|
|
int err;
|
|
|
|
err= ha_discover_table_names(thd, &MYSQL_SCHEMA_NAME, dirp, &tl, false);
|
|
my_dirend(dirp);
|
|
close_thread_tables(thd);
|
|
thd->release_transactional_locks();
|
|
if (err)
|
|
return err;
|
|
|
|
for (i = 0; i < files.elements(); ++i)
|
|
{
|
|
if (strncmp(files.at(i)->str,
|
|
rpl_gtid_slave_state_table_name.str,
|
|
rpl_gtid_slave_state_table_name.length) == 0)
|
|
{
|
|
if ((err= (*cb)(thd, files.at(i), cb_data)))
|
|
return err;
|
|
}
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
struct load_gtid_state_cb_data {
|
|
HASH *hash;
|
|
DYNAMIC_ARRAY *array;
|
|
struct rpl_slave_state::gtid_pos_table *table_list;
|
|
struct rpl_slave_state::gtid_pos_table *default_entry;
|
|
};
|
|
|
|
static int
|
|
process_gtid_pos_table(THD *thd, LEX_CSTRING *table_name, void *hton,
|
|
struct load_gtid_state_cb_data *data)
|
|
{
|
|
struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr;
|
|
bool is_default=
|
|
(strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0);
|
|
|
|
/*
|
|
Ignore tables with duplicate storage engine, with a warning.
|
|
Prefer the default mysql.gtid_slave_pos over another table
|
|
mysql.gtid_slave_posXXX with the same storage engine.
|
|
*/
|
|
next_ptr= &data->table_list;
|
|
entry= data->table_list;
|
|
while (entry)
|
|
{
|
|
if (entry->table_hton == hton)
|
|
{
|
|
static const char *warning_msg= "Ignoring redundant table mysql.%s "
|
|
"since mysql.%s has the same storage engine";
|
|
if (!is_default)
|
|
{
|
|
/* Ignore the redundant table. */
|
|
sql_print_warning(warning_msg, table_name->str, entry->table_name.str);
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
sql_print_warning(warning_msg, entry->table_name.str, table_name->str);
|
|
/* Delete the redundant table, and proceed to add this one instead. */
|
|
*next_ptr= entry->next;
|
|
my_free(entry);
|
|
break;
|
|
}
|
|
}
|
|
next_ptr= &entry->next;
|
|
entry= entry->next;
|
|
}
|
|
|
|
p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name,
|
|
hton, rpl_slave_state::GTID_POS_AVAILABLE);
|
|
if (!p)
|
|
return 1;
|
|
p->next= data->table_list;
|
|
data->table_list= p;
|
|
if (is_default)
|
|
data->default_entry= p;
|
|
return 0;
|
|
}
|
|
|
|
|
|
/*
|
|
Put tables corresponding to @@gtid_pos_auto_engines at the end of the list,
|
|
marked to be auto-created if needed.
|
|
*/
|
|
static int
|
|
gtid_pos_auto_create_tables(rpl_slave_state::gtid_pos_table **list_ptr)
|
|
{
|
|
plugin_ref *auto_engines;
|
|
int err= 0;
|
|
mysql_mutex_lock(&LOCK_global_system_variables);
|
|
for (auto_engines= opt_gtid_pos_auto_plugins;
|
|
!err && auto_engines && *auto_engines;
|
|
++auto_engines)
|
|
{
|
|
void *hton= plugin_hton(*auto_engines);
|
|
char buf[FN_REFLEN+1];
|
|
LEX_CSTRING table_name;
|
|
char *p;
|
|
rpl_slave_state::gtid_pos_table *entry, **next_ptr;
|
|
|
|
/* See if this engine is already in the list. */
|
|
next_ptr= list_ptr;
|
|
entry= *list_ptr;
|
|
while (entry)
|
|
{
|
|
if (entry->table_hton == hton)
|
|
break;
|
|
next_ptr= &entry->next;
|
|
entry= entry->next;
|
|
}
|
|
if (entry)
|
|
continue;
|
|
|
|
/* Add an auto-create entry for this engine at end of list. */
|
|
p= strmake(buf, rpl_gtid_slave_state_table_name.str, FN_REFLEN);
|
|
p= strmake(p, "_", FN_REFLEN - (p - buf));
|
|
p= strmake(p, plugin_name(*auto_engines)->str, FN_REFLEN - (p - buf));
|
|
table_name.str= buf;
|
|
table_name.length= p - buf;
|
|
table_case_convert(const_cast<char*>(table_name.str),
|
|
static_cast<uint>(table_name.length));
|
|
entry= rpl_global_gtid_slave_state->alloc_gtid_pos_table
|
|
(&table_name, hton, rpl_slave_state::GTID_POS_AUTO_CREATE);
|
|
if (!entry)
|
|
{
|
|
err= 1;
|
|
break;
|
|
}
|
|
*next_ptr= entry;
|
|
}
|
|
mysql_mutex_unlock(&LOCK_global_system_variables);
|
|
return err;
|
|
}
|
|
|
|
|
|
static int
|
|
load_gtid_state_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
|
|
{
|
|
int err;
|
|
load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
|
|
void *hton;
|
|
|
|
if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
|
|
table_name, &hton)))
|
|
return err;
|
|
return process_gtid_pos_table(thd, table_name, hton, data);
|
|
}
|
|
|
|
|
|
int
|
|
rpl_load_gtid_slave_state(THD *thd)
|
|
{
|
|
bool array_inited= false;
|
|
struct gtid_pos_element tmp_entry, *entry;
|
|
HASH hash;
|
|
DYNAMIC_ARRAY array;
|
|
int err= 0;
|
|
uint32 i;
|
|
load_gtid_state_cb_data cb_data;
|
|
rpl_slave_state::list_element *old_gtids_list;
|
|
DBUG_ENTER("rpl_load_gtid_slave_state");
|
|
|
|
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
bool loaded= rpl_global_gtid_slave_state->loaded;
|
|
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
if (loaded)
|
|
DBUG_RETURN(0);
|
|
|
|
cb_data.table_list= NULL;
|
|
cb_data.default_entry= NULL;
|
|
my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
|
|
offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
|
|
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
|
|
if ((err= my_init_dynamic_array(PSI_INSTRUMENT_ME, &array,
|
|
sizeof(gtid_pos_element), 0, 0, MYF(0))))
|
|
goto end;
|
|
array_inited= true;
|
|
|
|
cb_data.hash = &hash;
|
|
cb_data.array = &array;
|
|
if ((err= scan_all_gtid_slave_pos_table(thd, load_gtid_state_cb, &cb_data)))
|
|
goto end;
|
|
|
|
if (!cb_data.default_entry)
|
|
{
|
|
/*
|
|
If the mysql.gtid_slave_pos table does not exist, but at least one other
|
|
table is available, arbitrarily pick the first in the list to use as
|
|
default.
|
|
*/
|
|
cb_data.default_entry= cb_data.table_list;
|
|
}
|
|
if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
|
|
goto end;
|
|
|
|
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
if (rpl_global_gtid_slave_state->loaded)
|
|
{
|
|
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
goto end;
|
|
}
|
|
|
|
if (!cb_data.table_list)
|
|
{
|
|
my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
|
|
rpl_gtid_slave_state_table_name.str);
|
|
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
err= 1;
|
|
goto end;
|
|
}
|
|
|
|
for (i= 0; i < array.elements; ++i)
|
|
{
|
|
get_dynamic(&array, (uchar *)&tmp_entry, i);
|
|
if ((err= rpl_global_gtid_slave_state->update_nolock(tmp_entry.gtid.domain_id,
|
|
tmp_entry.gtid.server_id,
|
|
tmp_entry.sub_id,
|
|
tmp_entry.gtid.seq_no,
|
|
tmp_entry.hton,
|
|
NULL)))
|
|
{
|
|
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
goto end;
|
|
}
|
|
}
|
|
|
|
for (i= 0; i < hash.records; ++i)
|
|
{
|
|
entry= (struct gtid_pos_element *)my_hash_element(&hash, i);
|
|
if (opt_bin_log &&
|
|
mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
|
|
entry->gtid.seq_no))
|
|
{
|
|
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
goto end;
|
|
}
|
|
}
|
|
|
|
rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
|
|
cb_data.default_entry);
|
|
cb_data.table_list= NULL;
|
|
rpl_global_gtid_slave_state->loaded= true;
|
|
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
|
|
/* Clear out no longer needed elements now. */
|
|
old_gtids_list=
|
|
rpl_global_gtid_slave_state->gtid_grab_pending_delete_list();
|
|
rpl_global_gtid_slave_state->gtid_delete_pending(thd, &old_gtids_list);
|
|
if (old_gtids_list)
|
|
rpl_global_gtid_slave_state->put_back_list(old_gtids_list);
|
|
|
|
end:
|
|
if (array_inited)
|
|
delete_dynamic(&array);
|
|
my_hash_free(&hash);
|
|
if (cb_data.table_list)
|
|
rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
|
|
DBUG_RETURN(err);
|
|
}
|
|
|
|
|
|
static int
|
|
find_gtid_pos_tables_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
|
|
{
|
|
load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
|
|
TABLE_LIST tlist;
|
|
TABLE *table= NULL;
|
|
int err;
|
|
|
|
thd->reset_for_next_command();
|
|
tlist.init_one_table(&MYSQL_SCHEMA_NAME, table_name, NULL, TL_READ);
|
|
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
|
|
goto end;
|
|
table= tlist.table;
|
|
|
|
if ((err= gtid_check_rpl_slave_state_table(table)))
|
|
goto end;
|
|
err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data);
|
|
|
|
end:
|
|
if (table)
|
|
{
|
|
ha_commit_trans(thd, FALSE);
|
|
ha_commit_trans(thd, TRUE);
|
|
close_thread_tables(thd);
|
|
thd->release_transactional_locks();
|
|
}
|
|
|
|
return err;
|
|
}
|
|
|
|
|
|
/*
|
|
Re-compute the list of available mysql.gtid_slave_posXXX tables.
|
|
|
|
This is done at START SLAVE to pick up any newly created tables without
|
|
requiring server restart.
|
|
*/
|
|
int
|
|
find_gtid_slave_pos_tables(THD *thd)
|
|
{
|
|
int err= 0;
|
|
load_gtid_state_cb_data cb_data;
|
|
uint num_running;
|
|
|
|
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
bool loaded= rpl_global_gtid_slave_state->loaded;
|
|
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
if (!loaded)
|
|
return 0;
|
|
|
|
cb_data.table_list= NULL;
|
|
cb_data.default_entry= NULL;
|
|
if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data)))
|
|
goto end;
|
|
|
|
if (!cb_data.table_list)
|
|
{
|
|
my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
|
|
rpl_gtid_slave_state_table_name.str);
|
|
err= 1;
|
|
goto end;
|
|
}
|
|
if (!cb_data.default_entry)
|
|
{
|
|
/*
|
|
If the mysql.gtid_slave_pos table does not exist, but at least one other
|
|
table is available, arbitrarily pick the first in the list to use as
|
|
default.
|
|
*/
|
|
cb_data.default_entry= cb_data.table_list;
|
|
}
|
|
if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
|
|
goto end;
|
|
|
|
mysql_mutex_lock(&LOCK_active_mi);
|
|
num_running= any_slave_sql_running(true);
|
|
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
if (num_running <= 1)
|
|
{
|
|
/*
|
|
If no slave is running now, the count will be 1, since this SQL thread
|
|
which is starting is included in the count. In this case, we can safely
|
|
replace the list, no-one can be trying to read it without lock.
|
|
*/
|
|
DBUG_ASSERT(num_running == 1);
|
|
rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
|
|
cb_data.default_entry);
|
|
cb_data.table_list= NULL;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
If there are SQL threads running, we cannot safely remove the old list.
|
|
However we can add new entries, and warn about any tables that
|
|
disappeared, but may still be visible to running SQL threads.
|
|
*/
|
|
rpl_slave_state::gtid_pos_table *new_entry, **next_ptr_ptr;
|
|
auto old_entry= rpl_global_gtid_slave_state->
|
|
gtid_pos_tables.load(std::memory_order_relaxed);
|
|
while (old_entry)
|
|
{
|
|
new_entry= cb_data.table_list;
|
|
while (new_entry)
|
|
{
|
|
if (new_entry->table_hton == old_entry->table_hton)
|
|
break;
|
|
new_entry= new_entry->next;
|
|
}
|
|
if (!new_entry)
|
|
sql_print_warning("The table mysql.%s was removed. "
|
|
"This change will not take full effect "
|
|
"until all SQL threads have been restarted",
|
|
old_entry->table_name.str);
|
|
old_entry= old_entry->next;
|
|
}
|
|
next_ptr_ptr= &cb_data.table_list;
|
|
new_entry= cb_data.table_list;
|
|
while (new_entry)
|
|
{
|
|
/* Check if we already have a table with this storage engine. */
|
|
old_entry= rpl_global_gtid_slave_state->
|
|
gtid_pos_tables.load(std::memory_order_relaxed);
|
|
while (old_entry)
|
|
{
|
|
if (new_entry->table_hton == old_entry->table_hton)
|
|
break;
|
|
old_entry= old_entry->next;
|
|
}
|
|
if (old_entry)
|
|
{
|
|
/* This new_entry is already available in the list. */
|
|
next_ptr_ptr= &new_entry->next;
|
|
new_entry= new_entry->next;
|
|
}
|
|
else
|
|
{
|
|
/* Move this new_entry to the list. */
|
|
rpl_slave_state::gtid_pos_table *next= new_entry->next;
|
|
rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry);
|
|
*next_ptr_ptr= next;
|
|
new_entry= next;
|
|
}
|
|
}
|
|
}
|
|
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
|
|
mysql_mutex_unlock(&LOCK_active_mi);
|
|
|
|
end:
|
|
if (cb_data.table_list)
|
|
rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
|
|
return err;
|
|
}
|
|
|
|
|
|
void
|
|
rpl_group_info::reinit(Relay_log_info *rli)
|
|
{
|
|
this->rli= rli;
|
|
tables_to_lock= NULL;
|
|
tables_to_lock_count= 0;
|
|
trans_retries= 0;
|
|
last_event_start_time= 0;
|
|
gtid_sub_id= 0;
|
|
commit_id= 0;
|
|
gtid_pending= false;
|
|
worker_error= 0;
|
|
row_stmt_start_timestamp= 0;
|
|
long_find_row_note_printed= false;
|
|
did_mark_start_commit= false;
|
|
gtid_ev_flags2= 0;
|
|
gtid_ev_flags_extra= 0;
|
|
gtid_ev_sa_seq_no= 0;
|
|
last_master_timestamp = 0;
|
|
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
|
|
speculation= SPECULATE_NO;
|
|
rpt= NULL;
|
|
start_alter_ev= NULL;
|
|
direct_commit_alter= false;
|
|
commit_orderer.reinit();
|
|
}
|
|
|
|
rpl_group_info::rpl_group_info(Relay_log_info *rli)
|
|
: thd(0), wait_commit_sub_id(0),
|
|
wait_commit_group_info(0), parallel_entry(0),
|
|
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
|
|
gtid_ev_flags2(0), gtid_ev_flags_extra(0), gtid_ev_sa_seq_no(0),
|
|
reserved_start_alter_thread(0), finish_event_group_called(0), rpt(NULL),
|
|
start_alter_ev(NULL), direct_commit_alter(false), sa_info(NULL)
|
|
{
|
|
reinit(rli);
|
|
bzero(¤t_gtid, sizeof(current_gtid));
|
|
mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
|
|
MY_MUTEX_INIT_FAST);
|
|
mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
|
|
}
|
|
|
|
rpl_group_info::~rpl_group_info()
|
|
{
|
|
free_annotate_event();
|
|
delete deferred_events;
|
|
mysql_mutex_destroy(&sleep_lock);
|
|
mysql_cond_destroy(&sleep_cond);
|
|
}
|
|
|
|
|
|
int
|
|
event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
|
|
{
|
|
uint64 sub_id= rpl_global_gtid_slave_state->next_sub_id(gev->domain_id);
|
|
if (!sub_id)
|
|
{
|
|
/* Out of memory caused hash insertion to fail. */
|
|
return 1;
|
|
}
|
|
rgi->gtid_sub_id= sub_id;
|
|
rgi->current_gtid.domain_id= gev->domain_id;
|
|
rgi->current_gtid.server_id= gev->server_id;
|
|
rgi->current_gtid.seq_no= gev->seq_no;
|
|
rgi->commit_id= gev->commit_id;
|
|
rgi->gtid_pending= true;
|
|
rgi->sa_info= NULL;
|
|
return 0;
|
|
}
|
|
|
|
|
|
void
|
|
delete_or_keep_event_post_apply(rpl_group_info *rgi,
|
|
Log_event_type typ, Log_event *ev)
|
|
{
|
|
/*
|
|
ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
|
|
thread-safe for parallel replication.
|
|
*/
|
|
|
|
switch (typ) {
|
|
case FORMAT_DESCRIPTION_EVENT:
|
|
/*
|
|
Format_description_log_event should not be deleted because it
|
|
will be used to read info about the relay log's format;
|
|
it will be deleted when the SQL thread does not need it,
|
|
i.e. when this thread terminates.
|
|
*/
|
|
break;
|
|
case ANNOTATE_ROWS_EVENT:
|
|
/*
|
|
Annotate_rows event should not be deleted because after it has
|
|
been applied, thd->query points to the string inside this event.
|
|
The thd->query will be used to generate new Annotate_rows event
|
|
during applying the subsequent Rows events.
|
|
*/
|
|
rgi->set_annotate_event((Annotate_rows_log_event*) ev);
|
|
break;
|
|
case DELETE_ROWS_EVENT_V1:
|
|
case UPDATE_ROWS_EVENT_V1:
|
|
case WRITE_ROWS_EVENT_V1:
|
|
case DELETE_ROWS_EVENT:
|
|
case UPDATE_ROWS_EVENT:
|
|
case WRITE_ROWS_EVENT:
|
|
case WRITE_ROWS_COMPRESSED_EVENT:
|
|
case DELETE_ROWS_COMPRESSED_EVENT:
|
|
case UPDATE_ROWS_COMPRESSED_EVENT:
|
|
case WRITE_ROWS_COMPRESSED_EVENT_V1:
|
|
case UPDATE_ROWS_COMPRESSED_EVENT_V1:
|
|
case DELETE_ROWS_COMPRESSED_EVENT_V1:
|
|
/*
|
|
After the last Rows event has been applied, the saved Annotate_rows
|
|
event (if any) is not needed anymore and can be deleted.
|
|
*/
|
|
if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
|
|
rgi->free_annotate_event();
|
|
/* fall through */
|
|
default:
|
|
DBUG_PRINT("info", ("Deleting the event after it has been executed"));
|
|
if (!rgi->is_deferred_event(ev))
|
|
delete ev;
|
|
break;
|
|
}
|
|
}
|
|
|
|
|
|
void rpl_group_info::cleanup_context(THD *thd, bool error, bool keep_domain_owner)
|
|
{
|
|
DBUG_ENTER("rpl_group_info::cleanup_context");
|
|
DBUG_PRINT("enter", ("error: %d", (int) error));
|
|
|
|
DBUG_ASSERT(this->thd == thd);
|
|
/*
|
|
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
|
|
may have opened tables, which we cannot be sure have been closed (because
|
|
maybe the Rows_log_event have not been found or will not be, because slave
|
|
SQL thread is stopping, or relay log has a missing tail etc). So we close
|
|
all thread's tables. And so the table mappings have to be cancelled.
|
|
2) Rows_log_event::do_apply_event() may even have started statements or
|
|
transactions on them, which we need to rollback in case of error.
|
|
3) If finding a Format_description_log_event after a BEGIN, we also need
|
|
to rollback before continuing with the next events.
|
|
4) so we need this "context cleanup" function.
|
|
*/
|
|
if (unlikely(error))
|
|
{
|
|
trans_rollback_stmt(thd); // if a "statement transaction"
|
|
/* trans_rollback() also resets OPTION_GTID_BEGIN */
|
|
trans_rollback(thd); // if a "real transaction"
|
|
/*
|
|
Now that we have rolled back the transaction, make sure we do not
|
|
erroneously update the GTID position.
|
|
*/
|
|
gtid_pending= false;
|
|
}
|
|
m_table_map.clear_tables();
|
|
slave_close_thread_tables(thd);
|
|
|
|
if (unlikely(error))
|
|
{
|
|
// leave alone any XA prepared transactions
|
|
if (thd->transaction->xid_state.is_explicit_XA() &&
|
|
thd->transaction->xid_state.get_state_code() != XA_PREPARED)
|
|
xa_trans_force_rollback(thd);
|
|
|
|
thd->release_transactional_locks();
|
|
|
|
if (thd == rli->sql_driver_thd)
|
|
{
|
|
/*
|
|
Reset flags. This is needed to handle incident events and errors in
|
|
the relay log noticed by the sql driver thread.
|
|
*/
|
|
rli->clear_flag(Relay_log_info::IN_STMT);
|
|
rli->clear_flag(Relay_log_info::IN_TRANSACTION);
|
|
}
|
|
|
|
/*
|
|
Ensure we always release the domain for others to process, when using
|
|
--gtid-ignore-duplicates.
|
|
*/
|
|
if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL && !keep_domain_owner)
|
|
rpl_global_gtid_slave_state->release_domain_owner(this);
|
|
}
|
|
|
|
/*
|
|
Cleanup for the flags that have been set at do_apply_event.
|
|
*/
|
|
thd->variables.option_bits&= ~(OPTION_NO_FOREIGN_KEY_CHECKS |
|
|
OPTION_RELAXED_UNIQUE_CHECKS |
|
|
OPTION_NO_CHECK_CONSTRAINT_CHECKS);
|
|
|
|
/*
|
|
Reset state related to long_find_row notes in the error log:
|
|
- timestamp
|
|
- flag that decides whether the slave prints or not
|
|
*/
|
|
reset_row_stmt_start_timestamp();
|
|
unset_long_find_row_note_printed();
|
|
|
|
DBUG_EXECUTE_IF("inject_sleep_gtid_100_x_x", {
|
|
if (current_gtid.domain_id == 100)
|
|
my_sleep(50000);
|
|
};);
|
|
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
void rpl_group_info::clear_tables_to_lock()
|
|
{
|
|
DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
|
|
#ifndef DBUG_OFF
|
|
/**
|
|
When replicating in RBR and MyISAM Merge tables are involved
|
|
open_and_lock_tables (called in do_apply_event) appends the
|
|
base tables to the list of tables_to_lock. Then these are
|
|
removed from the list in close_thread_tables (which is called
|
|
before we reach this point).
|
|
|
|
This assertion just confirms that we get no surprises at this
|
|
point.
|
|
*/
|
|
uint i=0;
|
|
for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
|
|
DBUG_ASSERT(i == tables_to_lock_count);
|
|
#endif
|
|
|
|
while (tables_to_lock)
|
|
{
|
|
uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
|
|
if (tables_to_lock->m_tabledef_valid)
|
|
{
|
|
tables_to_lock->m_tabledef.table_def::~table_def();
|
|
tables_to_lock->m_tabledef_valid= FALSE;
|
|
}
|
|
|
|
/*
|
|
If blob fields were used during conversion of field values
|
|
from the master table into the slave table, then we need to
|
|
free the memory used temporarily to store their values before
|
|
copying into the slave's table.
|
|
*/
|
|
if (tables_to_lock->m_conv_table)
|
|
free_blobs(tables_to_lock->m_conv_table);
|
|
|
|
tables_to_lock=
|
|
static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
|
|
tables_to_lock_count--;
|
|
my_free(to_free);
|
|
}
|
|
DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
void rpl_group_info::slave_close_thread_tables(THD *thd)
|
|
{
|
|
DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
|
|
thd->get_stmt_da()->set_overwrite_status(true);
|
|
#ifdef WITH_WSREP
|
|
// This can happen e.g. when table_def::compatible_with fails and sets a error
|
|
// but thd->is_error() is false then. However, we do not want to commit
|
|
// statement on Galera instead we want to rollback it as later in
|
|
// apply_write_set we rollback transaction and that can't be done
|
|
// after wsrep transaction state is s_committed.
|
|
if (WSREP(thd))
|
|
(thd->is_error() || thd->is_slave_error) ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
|
|
else
|
|
#endif
|
|
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
|
|
thd->get_stmt_da()->set_overwrite_status(false);
|
|
|
|
close_thread_tables(thd);
|
|
/*
|
|
- If transaction rollback was requested due to deadlock
|
|
perform it and release metadata locks.
|
|
- If inside a multi-statement transaction,
|
|
defer the release of metadata locks until the current
|
|
transaction is either committed or rolled back. This prevents
|
|
other statements from modifying the table for the entire
|
|
duration of this transaction. This provides commit ordering
|
|
and guarantees serializability across multiple transactions.
|
|
- If in autocommit mode, or outside a transactional context,
|
|
automatically release metadata locks of the current statement.
|
|
*/
|
|
if (thd->transaction_rollback_request)
|
|
{
|
|
trans_rollback_implicit(thd);
|
|
thd->release_transactional_locks();
|
|
}
|
|
else if (! thd->in_multi_stmt_transaction_mode())
|
|
thd->release_transactional_locks();
|
|
else
|
|
thd->mdl_context.release_statement_locks();
|
|
|
|
clear_tables_to_lock();
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco,
|
|
rpl_group_info *rgi)
|
|
{
|
|
group_commit_orderer *tmp;
|
|
uint64 count= ++e->count_committing_event_groups;
|
|
/* Signal any following GCO whose wait_count has been reached now. */
|
|
tmp= gco;
|
|
|
|
DBUG_ASSERT(!tmp->gc_done);
|
|
|
|
while ((tmp= tmp->next_gco))
|
|
{
|
|
DBUG_ASSERT(!tmp->gc_done);
|
|
|
|
uint64 wait_count= tmp->wait_count;
|
|
if (wait_count > count)
|
|
break;
|
|
mysql_cond_broadcast(&tmp->COND_group_commit_orderer);
|
|
}
|
|
}
|
|
|
|
|
|
void
|
|
rpl_group_info::mark_start_commit_no_lock()
|
|
{
|
|
if (did_mark_start_commit)
|
|
return;
|
|
did_mark_start_commit= true;
|
|
mark_start_commit_inner(parallel_entry, gco, this);
|
|
}
|
|
|
|
|
|
void
|
|
rpl_group_info::mark_start_commit()
|
|
{
|
|
rpl_parallel_entry *e;
|
|
|
|
if (did_mark_start_commit)
|
|
return;
|
|
did_mark_start_commit= true;
|
|
|
|
e= this->parallel_entry;
|
|
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
|
mark_start_commit_inner(e, gco, this);
|
|
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
|
}
|
|
|
|
|
|
/*
|
|
Format the current GTID as a string suitable for printing in error messages.
|
|
|
|
The string is stored in a buffer inside rpl_group_info, so remains valid
|
|
until next call to gtid_info() or until destruction of rpl_group_info.
|
|
|
|
If no GTID is available, then NULL is returned.
|
|
*/
|
|
char *
|
|
rpl_group_info::gtid_info() const
|
|
{
|
|
if (!gtid_sub_id || !current_gtid.seq_no)
|
|
return NULL;
|
|
my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu",
|
|
current_gtid.domain_id, current_gtid.server_id,
|
|
current_gtid.seq_no);
|
|
return gtid_info_buf;
|
|
}
|
|
|
|
|
|
/*
|
|
Undo the effect of a prior mark_start_commit().
|
|
|
|
This is only used for retrying a transaction in parallel replication, after
|
|
we have encountered a deadlock or other temporary error.
|
|
|
|
When we get such a deadlock, it means that the current group of transactions
|
|
did not yet all start committing (else they would not have deadlocked). So
|
|
we will not yet have woken up anything in the next group, our rgi->gco is
|
|
still live, and we can simply decrement the counter (to be incremented again
|
|
later, when the retry succeeds and reaches the commit step).
|
|
*/
|
|
void
|
|
rpl_group_info::unmark_start_commit()
|
|
{
|
|
rpl_parallel_entry *e;
|
|
|
|
if (!did_mark_start_commit)
|
|
return;
|
|
did_mark_start_commit= false;
|
|
|
|
e= this->parallel_entry;
|
|
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
|
--e->count_committing_event_groups;
|
|
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
|
}
|
|
|
|
|
|
rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
|
|
: rpl_filter(filter)
|
|
{
|
|
cached_charset_invalidate();
|
|
}
|
|
|
|
|
|
void rpl_sql_thread_info::cached_charset_invalidate()
|
|
{
|
|
DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
|
|
|
|
/* Full of zeroes means uninitialized. */
|
|
bzero(cached_charset, sizeof(cached_charset));
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
bool rpl_sql_thread_info::cached_charset_compare(char *charset) const
|
|
{
|
|
DBUG_ENTER("rpl_group_info::cached_charset_compare");
|
|
|
|
if (memcmp(cached_charset, charset, sizeof(cached_charset)))
|
|
{
|
|
memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
|
|
DBUG_RETURN(1);
|
|
}
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
|
|
/**
|
|
Store the file and position where the slave's SQL thread are in the
|
|
relay log.
|
|
|
|
Notes:
|
|
|
|
- This function should be called either from the slave SQL thread,
|
|
or when the slave thread is not running. (It reads the
|
|
group_{relay|master}_log_{pos|name} and delay fields in the rli
|
|
object. These may only be modified by the slave SQL thread or by
|
|
a client thread when the slave SQL thread is not running.)
|
|
|
|
- If there is an active transaction, then we do not update the
|
|
position in the relay log. This is to ensure that we re-execute
|
|
statements if we die in the middle of an transaction that was
|
|
rolled back.
|
|
|
|
- As a transaction never spans binary logs, we don't have to handle
|
|
the case where we do a relay-log-rotation in the middle of the
|
|
transaction. If transactions could span several binlogs, we would
|
|
have to ensure that we do not delete the relay log file where the
|
|
transaction started before switching to a new relay log file.
|
|
|
|
- Error can happen if writing to file fails or if flushing the file
|
|
fails.
|
|
|
|
@param rli The object representing the Relay_log_info.
|
|
|
|
@todo Change the log file information to a binary format to avoid
|
|
calling longlong2str.
|
|
|
|
@return 0 on success, 1 on error.
|
|
*/
|
|
bool Relay_log_info::flush()
|
|
{
|
|
bool error=0;
|
|
|
|
DBUG_ENTER("Relay_log_info::flush()");
|
|
|
|
IO_CACHE *file = &info_file;
|
|
// 2*file name, 2*long long, 2*unsigned long, 6*'\n'
|
|
char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos;
|
|
my_b_seek(file, 0L);
|
|
pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10);
|
|
*pos++='\n';
|
|
pos=strmov(pos, group_relay_log_name);
|
|
*pos++='\n';
|
|
pos=longlong10_to_str(group_relay_log_pos, pos, 10);
|
|
*pos++='\n';
|
|
pos=strmov(pos, group_master_log_name);
|
|
*pos++='\n';
|
|
pos=longlong10_to_str(group_master_log_pos, pos, 10);
|
|
*pos++='\n';
|
|
pos= longlong10_to_str(sql_delay, pos, 10);
|
|
*pos++= '\n';
|
|
if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)))
|
|
error=1;
|
|
if (flush_io_cache(file))
|
|
error=1;
|
|
if (sync_relayloginfo_period &&
|
|
!error &&
|
|
++sync_counter >= sync_relayloginfo_period)
|
|
{
|
|
if (my_sync(info_fd, MYF(MY_WME)))
|
|
error=1;
|
|
sync_counter= 0;
|
|
}
|
|
/*
|
|
Flushing the relay log is done by the slave I/O thread
|
|
or by the user on STOP SLAVE.
|
|
*/
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
#endif
|