mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-31 10:56:12 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			2669 lines
		
	
	
	
		
			86 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			2669 lines
		
	
	
	
		
			86 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (c) 2006, 2017, Oracle and/or its affiliates.
 | |
|    Copyright (c) 2010, 2020, MariaDB Corporation.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or modify
 | |
|    it under the terms of the GNU General Public License as published by
 | |
|    the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software Foundation,
 | |
|    51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
 | |
| 
 | |
| #include "mariadb.h"
 | |
| #include "sql_priv.h"
 | |
| #include "unireg.h"                             // HAVE_*
 | |
| #include "rpl_mi.h"
 | |
| #include "rpl_rli.h"
 | |
| #include "sql_base.h"                        // close_thread_tables
 | |
| #include <my_dir.h>    // For MY_STAT
 | |
| #include "sql_repl.h"  // For check_binlog_magic
 | |
| #include "log_event.h" // Format_description_log_event, Log_event,
 | |
|                        // FORMAT_DESCRIPTION_LOG_EVENT, ROTATE_EVENT,
 | |
|                        // PREFIX_SQL_LOAD
 | |
| #include "rpl_utility.h"
 | |
| #include "transaction.h"
 | |
| #include "sql_parse.h"                          // end_trans, ROLLBACK
 | |
| #include "slave.h"
 | |
| #include <mysql/plugin.h>
 | |
| #include <mysql/service_thd_wait.h>
 | |
| #include "lock.h"
 | |
| #include "sql_table.h"
 | |
| 
 | |
| static int count_relay_log_space(Relay_log_info* rli);
 | |
| bool xa_trans_force_rollback(THD *thd);
 | |
| /**
 | |
|    Current replication state (hash of last GTID executed, per replication
 | |
|    domain).
 | |
| */
 | |
| rpl_slave_state *rpl_global_gtid_slave_state;
 | |
| /* Object used for MASTER_GTID_WAIT(). */
 | |
| gtid_waiting rpl_global_gtid_waiting;
 | |
| 
 | |
| Relay_log_info::Relay_log_info(bool is_slave_recovery, const char* thread_name)
 | |
|   :Slave_reporting_capability(thread_name),
 | |
|    replicate_same_server_id(::replicate_same_server_id),
 | |
|    info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
 | |
|    sync_counter(0), is_relay_log_recovery(is_slave_recovery),
 | |
|    save_temporary_tables(0),
 | |
|    mi(0), inuse_relaylog_list(0), last_inuse_relaylog(0),
 | |
|    cur_log_old_open_count(0), error_on_rli_init_info(false),
 | |
|    group_relay_log_pos(0), event_relay_log_pos(0),
 | |
|    group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
 | |
|    sql_thread_caught_up(true),
 | |
|    last_master_timestamp(0), newest_master_timestamp(0), slave_timestamp(0),
 | |
|    slave_skip_counter(0),
 | |
|    abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
 | |
|    gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
 | |
|    slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
 | |
|    until_log_pos(0), is_until_before_gtids(false),
 | |
|    retried_trans(0), executed_entries(0),
 | |
|    last_trans_retry_count(0), sql_delay(0), sql_delay_end(0),
 | |
|    until_relay_log_names_defer(false),
 | |
|    m_flags(0)
 | |
| {
 | |
|   DBUG_ENTER("Relay_log_info::Relay_log_info");
 | |
| 
 | |
|   relay_log.is_relay_log= TRUE;
 | |
|   relay_log_state.init();
 | |
| #ifdef HAVE_PSI_INTERFACE
 | |
|   relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
 | |
|                          key_RELAYLOG_COND_relay_log_updated,
 | |
|                          key_RELAYLOG_COND_bin_log_updated,
 | |
|                          key_file_relaylog,
 | |
|                          key_file_relaylog_cache,
 | |
|                          key_file_relaylog_index,
 | |
|                          key_file_relaylog_index_cache,
 | |
|                          key_RELAYLOG_COND_queue_busy,
 | |
|                          key_LOCK_relaylog_end_pos);
 | |
| #endif
 | |
| 
 | |
|   group_relay_log_name[0]= event_relay_log_name[0]=
 | |
|     group_master_log_name[0]= 0;
 | |
|   until_log_name[0]= ign_master_log_name_end[0]= 0;
 | |
|   max_relay_log_size= global_system_variables.max_relay_log_size;
 | |
|   bzero((char*) &info_file, sizeof(info_file));
 | |
|   bzero((char*) &cache_buf, sizeof(cache_buf));
 | |
|   bzero(&last_seen_gtid, sizeof(last_seen_gtid));
 | |
|   mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
 | |
|   mysql_mutex_init(key_relay_log_info_data_lock,
 | |
|                    &data_lock, MY_MUTEX_INIT_FAST);
 | |
|   mysql_mutex_init(key_relay_log_info_log_space_lock,
 | |
|                    &log_space_lock, MY_MUTEX_INIT_FAST);
 | |
|   mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
 | |
|   mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
 | |
|   mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
 | |
|   mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
 | |
|   relay_log.init_pthread_objects();
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| Relay_log_info::~Relay_log_info()
 | |
| {
 | |
|   DBUG_ENTER("Relay_log_info::~Relay_log_info");
 | |
| 
 | |
|   reset_inuse_relaylog();
 | |
|   mysql_mutex_destroy(&run_lock);
 | |
|   mysql_mutex_destroy(&data_lock);
 | |
|   mysql_mutex_destroy(&log_space_lock);
 | |
|   mysql_cond_destroy(&data_cond);
 | |
|   mysql_cond_destroy(&start_cond);
 | |
|   mysql_cond_destroy(&stop_cond);
 | |
|   mysql_cond_destroy(&log_space_cond);
 | |
|   relay_log.cleanup();
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Read the relay_log.info file.
 | |
| 
 | |
|   @param info_fname The name of the file to read from.
 | |
|   @retval 0 success
 | |
|   @retval 1 failure
 | |
| */
 | |
| int Relay_log_info::init(const char* info_fname)
 | |
| {
 | |
|   char fname[FN_REFLEN+128];
 | |
|   const char* msg = 0;
 | |
|   int error = 0;
 | |
|   mysql_mutex_t *log_lock;
 | |
|   DBUG_ENTER("Relay_log_info::init");
 | |
| 
 | |
|   if (inited)                       // Set if this function called
 | |
|     DBUG_RETURN(0);
 | |
| 
 | |
|   log_lock= relay_log.get_log_lock();
 | |
|   fn_format(fname, info_fname, mysql_data_home, "", 4+32);
 | |
|   mysql_mutex_lock(&data_lock);
 | |
|   cur_log_fd = -1;
 | |
|   slave_skip_counter=0;
 | |
|   abort_pos_wait=0;
 | |
|   log_space_limit= relay_log_space_limit;
 | |
|   log_space_total= 0;
 | |
| 
 | |
|   if (unlikely(error_on_rli_init_info))
 | |
|     goto err;
 | |
| 
 | |
|   char pattern[FN_REFLEN];
 | |
|   (void) my_realpath(pattern, slave_load_tmpdir, 0);
 | |
|   if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
 | |
|             MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
 | |
|   {
 | |
|     mysql_mutex_unlock(&data_lock);
 | |
|     sql_print_error("Unable to use slave's temporary directory %s",
 | |
|                     slave_load_tmpdir);
 | |
|     DBUG_RETURN(1);
 | |
|   }
 | |
|   unpack_filename(slave_patternload_file, pattern);
 | |
|   slave_patternload_file_size= strlen(slave_patternload_file);
 | |
| 
 | |
|   /*
 | |
|     The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
 | |
|     Note that the I/O thread flushes it to disk after writing every
 | |
|     event, in flush_master_info(mi, 1, ?).
 | |
|   */
 | |
| 
 | |
|   {
 | |
|     /* Reports an error and returns, if the --relay-log's path 
 | |
|        is a directory.*/
 | |
|     if (opt_relay_logname && 
 | |
|         opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
 | |
|     {
 | |
|       mysql_mutex_unlock(&data_lock);
 | |
|       sql_print_error("Path '%s' is a directory name, please specify \
 | |
| a file name for --relay-log option", opt_relay_logname);
 | |
|       DBUG_RETURN(1);
 | |
|     }
 | |
| 
 | |
|     /* Reports an error and returns, if the --relay-log-index's path 
 | |
|        is a directory.*/
 | |
|     if (opt_relaylog_index_name && 
 | |
|         opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1] 
 | |
|         == FN_LIBCHAR)
 | |
|     {
 | |
|       mysql_mutex_unlock(&data_lock);
 | |
|       sql_print_error("Path '%s' is a directory name, please specify \
 | |
| a file name for --relay-log-index option", opt_relaylog_index_name);
 | |
|       DBUG_RETURN(1);
 | |
|     }
 | |
| 
 | |
|     char buf[FN_REFLEN];
 | |
|     const char *ln;
 | |
|     static bool name_warning_sent= 0;
 | |
|     ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
 | |
|                                      1, buf);
 | |
|     /* We send the warning only at startup, not after every RESET SLAVE */
 | |
|     if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent &&
 | |
|         !opt_bootstrap)
 | |
|     {
 | |
|       /*
 | |
|         User didn't give us info to name the relay log index file.
 | |
|         Picking `hostname`-relay-bin.index like we do, causes replication to
 | |
|         fail if this slave's hostname is changed later. So, we would like to
 | |
|         instead require a name. But as we don't want to break many existing
 | |
|         setups, we only give warning, not error.
 | |
|       */
 | |
|       sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
 | |
|                         " so replication "
 | |
|                         "may break when this MariaDB server acts as a "
 | |
|                         "replica and has its hostname changed. Please "
 | |
|                         "use '--log-basename=#' or '--relay-log=%s' to avoid "
 | |
|                         "this problem.", ln);
 | |
|       name_warning_sent= 1;
 | |
|     }
 | |
| 
 | |
|     /* For multimaster, add connection name to relay log filenames */
 | |
|     char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
 | |
|     char *buf_relaylog_index_name= opt_relaylog_index_name;
 | |
| 
 | |
|     create_logfile_name_with_suffix(buf_relay_logname,
 | |
|                                     sizeof(buf_relay_logname),
 | |
|                                     ln, 1, &mi->cmp_connection_name);
 | |
|     ln= buf_relay_logname;
 | |
| 
 | |
|     if (opt_relaylog_index_name)
 | |
|     {
 | |
|       buf_relaylog_index_name= buf_relaylog_index_name_buff; 
 | |
|       create_logfile_name_with_suffix(buf_relaylog_index_name_buff,
 | |
|                                       sizeof(buf_relaylog_index_name_buff),
 | |
|                                       opt_relaylog_index_name, 0,
 | |
|                                       &mi->cmp_connection_name);
 | |
|     }
 | |
| 
 | |
|     /*
 | |
|       note, that if open() fails, we'll still have index file open
 | |
|       but a destructor will take care of that
 | |
|     */
 | |
|     mysql_mutex_lock(log_lock);
 | |
|     if (relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
 | |
|         relay_log.open(ln, 0, 0, SEQ_READ_APPEND,
 | |
|                        (ulong)max_relay_log_size, 1, TRUE))
 | |
|     {
 | |
|       mysql_mutex_unlock(log_lock);
 | |
|       mysql_mutex_unlock(&data_lock);
 | |
|       sql_print_error("Failed when trying to open logs for '%s' in Relay_log_info::init(). Error: %iE", ln, my_errno);
 | |
|       DBUG_RETURN(1);
 | |
|     }
 | |
|     mysql_mutex_unlock(log_lock);
 | |
|   }
 | |
| 
 | |
|   /* if file does not exist */
 | |
|   if (access(fname,F_OK))
 | |
|   {
 | |
|     /*
 | |
|       If someone removed the file from underneath our feet, just close
 | |
|       the old descriptor and re-create the old file
 | |
|     */
 | |
|     if (info_fd >= 0)
 | |
|       mysql_file_close(info_fd, MYF(MY_WME));
 | |
|     if ((info_fd= mysql_file_open(key_file_relay_log_info,
 | |
|                                   fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 | |
|     {
 | |
|       sql_print_error("Failed to create a new relay log info file ("
 | |
|                       "file '%s', errno %d)", fname, my_errno);
 | |
|       msg= current_thd->get_stmt_da()->message();
 | |
|       goto err;
 | |
|     }
 | |
|     if (init_io_cache(&info_file, info_fd, LOG_BIN_IO_SIZE, READ_CACHE, 0L,0,
 | |
|                       MYF(MY_WME)))
 | |
|     {
 | |
|       sql_print_error("Failed to create a cache on relay log info file '%s'",
 | |
|                       fname);
 | |
|       msg= current_thd->get_stmt_da()->message();
 | |
|       goto err;
 | |
|     }
 | |
| 
 | |
|     /* Init relay log with first entry in the relay index file */
 | |
|     if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
 | |
|                            &msg, 0))
 | |
|     {
 | |
|       sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
 | |
|       goto err;
 | |
|     }
 | |
|     group_master_log_name[0]= 0;
 | |
|     group_master_log_pos= 0;
 | |
|   }
 | |
|   else // file exists
 | |
|   {
 | |
|     if (info_fd >= 0)
 | |
|       reinit_io_cache(&info_file, READ_CACHE, 0L,0,0);
 | |
|     else
 | |
|     {
 | |
|       int error=0;
 | |
|       if ((info_fd= mysql_file_open(key_file_relay_log_info,
 | |
|                                     fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 | |
|       {
 | |
|         sql_print_error("\
 | |
| Failed to open the existing relay log info file '%s' (errno %d)",
 | |
|                         fname, my_errno);
 | |
|         error= 1;
 | |
|       }
 | |
|       else if (init_io_cache(&info_file, info_fd,
 | |
|                              LOG_BIN_IO_SIZE, READ_CACHE, 0L, 0, MYF(MY_WME)))
 | |
|       {
 | |
|         sql_print_error("Failed to create a cache on relay log info file '%s'",
 | |
|                         fname);
 | |
|         error= 1;
 | |
|       }
 | |
|       if (unlikely(error))
 | |
|       {
 | |
|         if (info_fd >= 0)
 | |
|           mysql_file_close(info_fd, MYF(0));
 | |
|         info_fd= -1;
 | |
|         mysql_mutex_lock(log_lock);
 | |
|         relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 | |
|         mysql_mutex_unlock(log_lock);
 | |
|         mysql_mutex_unlock(&data_lock);
 | |
|         DBUG_RETURN(1);
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     int relay_log_pos, master_log_pos, lines;
 | |
|     char *first_non_digit;
 | |
| 
 | |
|     /*
 | |
|       Starting from MySQL 5.6.x, relay-log.info has a new format.
 | |
|       Now, its first line contains the number of lines in the file.
 | |
|       By reading this number we can determine which version our master.info
 | |
|       comes from. We can't simply count the lines in the file, since
 | |
|       versions before 5.6.x could generate files with more lines than
 | |
|       needed. If first line doesn't contain a number, or if it
 | |
|       contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
 | |
|       then the file is treated like a file from pre-5.6.x version.
 | |
|       There is no ambiguity when reading an old master.info: before
 | |
|       5.6.x, the first line contained the binlog's name, which is
 | |
|       either empty or has an extension (contains a '.'), so can't be
 | |
|       confused with an integer.
 | |
| 
 | |
|       So we're just reading first line and trying to figure which
 | |
|       version is this.
 | |
|     */
 | |
| 
 | |
|     /*
 | |
|       The first row is temporarily stored in mi->master_log_name, if
 | |
|       it is line count and not binlog name (new format) it will be
 | |
|       overwritten by the second row later.
 | |
|     */
 | |
|     if (init_strvar_from_file(group_relay_log_name,
 | |
|                               sizeof(group_relay_log_name),
 | |
|                               &info_file, ""))
 | |
|     {
 | |
|       msg="Error reading slave log configuration";
 | |
|       goto err;
 | |
|     }
 | |
| 
 | |
|     lines= strtoul(group_relay_log_name, &first_non_digit, 10);
 | |
| 
 | |
|     if (group_relay_log_name[0] != '\0' &&
 | |
|         *first_non_digit == '\0' &&
 | |
|         lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
 | |
|     {
 | |
|       DBUG_PRINT("info", ("relay_log_info file is in new format."));
 | |
|       /* Seems to be new format => read relay log name from next line */
 | |
|       if (init_strvar_from_file(group_relay_log_name,
 | |
|                                 sizeof(group_relay_log_name),
 | |
|                                 &info_file, ""))
 | |
|       {
 | |
|         msg="Error reading slave log configuration";
 | |
|         goto err;
 | |
|       }
 | |
|     }
 | |
|     else
 | |
|       DBUG_PRINT("info", ("relay_log_info file is in old format."));
 | |
| 
 | |
|     if (init_intvar_from_file(&relay_log_pos,
 | |
|                               &info_file, BIN_LOG_HEADER_SIZE) ||
 | |
|         init_strvar_from_file(group_master_log_name,
 | |
|                               sizeof(group_master_log_name),
 | |
|                               &info_file, "") ||
 | |
|         init_intvar_from_file(&master_log_pos, &info_file, 0) ||
 | |
|         (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY &&
 | |
|          init_intvar_from_file(&sql_delay, &info_file, 0)))
 | |
|     {
 | |
|       msg="Error reading slave log configuration";
 | |
|       goto err;
 | |
|     }
 | |
| 
 | |
|     strmake_buf(event_relay_log_name,group_relay_log_name);
 | |
|     group_relay_log_pos= event_relay_log_pos= relay_log_pos;
 | |
|     group_master_log_pos= master_log_pos;
 | |
| 
 | |
|     if (is_relay_log_recovery && init_recovery(mi, &msg))
 | |
|       goto err;
 | |
| 
 | |
|     relay_log_state.load(rpl_global_gtid_slave_state);
 | |
|     if (init_relay_log_pos(this,
 | |
|                            group_relay_log_name,
 | |
|                            group_relay_log_pos,
 | |
|                            0 /* no data lock*/,
 | |
|                            &msg, 0))
 | |
|     {
 | |
|       sql_print_error("Failed to open the relay log '%s' (relay_log_pos %llu)",
 | |
|                       group_relay_log_name, group_relay_log_pos);
 | |
|       goto err;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   DBUG_PRINT("info", ("my_b_tell(cur_log)=%llu event_relay_log_pos=%llu",
 | |
|                       my_b_tell(cur_log), event_relay_log_pos));
 | |
|   DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 | |
|   DBUG_ASSERT(my_b_tell(cur_log) == event_relay_log_pos);
 | |
| 
 | |
|   /*
 | |
|     Now change the cache from READ to WRITE - must do this
 | |
|     before Relay_log_info::flush()
 | |
|   */
 | |
|   reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1);
 | |
|   if (unlikely((error= flush())))
 | |
|   {
 | |
|     msg= "Failed to flush relay log info file";
 | |
|     goto err;
 | |
|   }
 | |
|   if (count_relay_log_space(this))
 | |
|   {
 | |
|     msg="Error counting relay log space";
 | |
|     goto err;
 | |
|   }
 | |
|   inited= 1;
 | |
|   error_on_rli_init_info= false;
 | |
|   mysql_mutex_unlock(&data_lock);
 | |
|   DBUG_RETURN(0);
 | |
| 
 | |
| err:
 | |
|   error_on_rli_init_info= true;
 | |
|   if (msg)
 | |
|     sql_print_error("%s", msg);
 | |
|   end_io_cache(&info_file);
 | |
|   if (info_fd >= 0)
 | |
|     mysql_file_close(info_fd, MYF(0));
 | |
|   info_fd= -1;
 | |
|   mysql_mutex_lock(log_lock);
 | |
|   relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 | |
|   mysql_mutex_unlock(log_lock);
 | |
|   mysql_mutex_unlock(&data_lock);
 | |
|   DBUG_RETURN(1);
 | |
| }
 | |
| 
 | |
| 
 | |
| static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
 | |
| {
 | |
|   MY_STAT s;
 | |
|   DBUG_ENTER("add_relay_log");
 | |
|   if (!mysql_file_stat(key_file_relaylog,
 | |
|                        linfo->log_file_name, &s, MYF(0)))
 | |
|   {
 | |
|     sql_print_error("log %s listed in the index, but failed to stat",
 | |
|                     linfo->log_file_name);
 | |
|     DBUG_RETURN(1);
 | |
|   }
 | |
|   rli->log_space_total += s.st_size;
 | |
|   DBUG_PRINT("info",("log_space_total: %llu", uint64(rli->log_space_total)));
 | |
|   DBUG_RETURN(0);
 | |
| }
 | |
| 
 | |
| 
 | |
| static int count_relay_log_space(Relay_log_info* rli)
 | |
| {
 | |
|   LOG_INFO linfo;
 | |
|   DBUG_ENTER("count_relay_log_space");
 | |
|   rli->log_space_total= 0;
 | |
|   if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
 | |
|   {
 | |
|     sql_print_error("Could not find first log while counting relay log space");
 | |
|     DBUG_RETURN(1);
 | |
|   }
 | |
|   do
 | |
|   {
 | |
|     if (add_relay_log(rli,&linfo))
 | |
|       DBUG_RETURN(1);
 | |
|   } while (!rli->relay_log.find_next_log(&linfo, 1));
 | |
|   /*
 | |
|      As we have counted everything, including what may have written in a
 | |
|      preceding write, we must reset bytes_written, or we may count some space
 | |
|      twice.
 | |
|   */
 | |
|   rli->relay_log.reset_bytes_written();
 | |
|   DBUG_RETURN(0);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|    Reset UNTIL condition for Relay_log_info
 | |
| 
 | |
|    SYNOPSYS
 | |
|     clear_until_condition()
 | |
|       rli - Relay_log_info structure where UNTIL condition should be reset
 | |
|  */
 | |
| 
 | |
| void Relay_log_info::clear_until_condition()
 | |
| {
 | |
|   DBUG_ENTER("clear_until_condition");
 | |
| 
 | |
|   until_condition= Relay_log_info::UNTIL_NONE;
 | |
|   until_log_name[0]= 0;
 | |
|   until_log_pos= 0;
 | |
|   until_relay_log_names_defer= false;
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Read the correct format description event for starting to replicate from
 | |
|   a given position in a relay log file.
 | |
| */
 | |
| Format_description_log_event *
 | |
| read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
 | |
|                                  const char **errmsg)
 | |
| {
 | |
|   Log_event *ev;
 | |
|   Format_description_log_event *fdev;
 | |
|   bool found= false;
 | |
| 
 | |
|   fdev= new Format_description_log_event(4);
 | |
| 
 | |
|   while (!found)
 | |
|   {
 | |
|     Log_event_type typ;
 | |
| 
 | |
|     /*
 | |
|       Read the possible Format_description_log_event; if position
 | |
|       was 4, no need, it will be read naturally.
 | |
|     */
 | |
|     DBUG_PRINT("info",("looking for a Format_description_log_event"));
 | |
| 
 | |
|     if (my_b_tell(cur_log) >= start_pos)
 | |
|       break;
 | |
| 
 | |
|     int read_error;
 | |
|     if (!(ev= Log_event::read_log_event(cur_log, &read_error, fdev,
 | |
|                                         opt_slave_sql_verify_checksum)))
 | |
|     {
 | |
|       DBUG_PRINT("info",("could not read event, read_error=%d",
 | |
|                          read_error));
 | |
|       if (read_error) /* not EOF */
 | |
|       {
 | |
|         *errmsg= "I/O error reading event at position 4";
 | |
|         delete fdev;
 | |
|         return NULL;
 | |
|       }
 | |
|       break;
 | |
|     }
 | |
|     typ= ev->get_type_code();
 | |
|     if (typ == FORMAT_DESCRIPTION_EVENT)
 | |
|     {
 | |
|       Format_description_log_event *old= fdev;
 | |
|       DBUG_PRINT("info",("found Format_description_log_event"));
 | |
|       fdev= (Format_description_log_event*) ev;
 | |
|       fdev->copy_crypto_data(old);
 | |
|       delete old;
 | |
| 
 | |
|       /*
 | |
|         As ev was returned by read_log_event, it has passed is_valid(), so
 | |
|         my_malloc() in ctor worked, no need to check again.
 | |
|       */
 | |
|       /*
 | |
|         Ok, we found a Format_description event. But it is not sure that this
 | |
|         describes the whole relay log; indeed, one can have this sequence
 | |
|         (starting from position 4):
 | |
|         Format_desc (of slave)
 | |
|         Rotate (of master)
 | |
|         Format_desc (of master)
 | |
|         So the Format_desc which really describes the rest of the relay log
 | |
|         is the 3rd event (it can't be further than that, because we rotate
 | |
|         the relay log when we queue a Rotate event from the master).
 | |
|         But what describes the Rotate is the first Format_desc.
 | |
|         So what we do is:
 | |
|         go on searching for Format_description events, until you exceed the
 | |
|         position (argument 'pos') or until you find another event than Rotate
 | |
|         or Format_desc.
 | |
|       */
 | |
|     }
 | |
|     else if (typ == START_ENCRYPTION_EVENT)
 | |
|     {
 | |
|       if (fdev->start_decryption((Start_encryption_log_event*) ev))
 | |
|       {
 | |
|         *errmsg= "Unable to set up decryption of binlog.";
 | |
|         delete ev;
 | |
|         delete fdev;
 | |
|         return NULL;
 | |
|       }
 | |
|       delete ev;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       DBUG_PRINT("info",("found event of another type=%d", typ));
 | |
|       found= (typ != ROTATE_EVENT);
 | |
|       delete ev;
 | |
|     }
 | |
|   }
 | |
|   return fdev;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Open the given relay log
 | |
| 
 | |
|   SYNOPSIS
 | |
|     init_relay_log_pos()
 | |
|     rli                 Relay information (will be initialized)
 | |
|     log                 Name of relay log file to read from. NULL = First log
 | |
|     pos                 Position in relay log file
 | |
|     need_data_lock      Set to 1 if this functions should do mutex locks
 | |
|     errmsg              Store pointer to error message here
 | |
|     look_for_description_event
 | |
|                         1 if we should look for such an event. We only need
 | |
|                         this when the SQL thread starts and opens an existing
 | |
|                         relay log and has to execute it (possibly from an
 | |
|                         offset >4); then we need to read the first event of
 | |
|                         the relay log to be able to parse the events we have
 | |
|                         to execute.
 | |
| 
 | |
|   DESCRIPTION
 | |
|   - Close old open relay log files.
 | |
|   - If we are using the same relay log as the running IO-thread, then set
 | |
|     rli->cur_log to point to the same IO_CACHE entry.
 | |
|   - If not, open the 'log' binary file.
 | |
| 
 | |
|   TODO
 | |
|     - check proper initialization of group_master_log_name/group_master_log_pos
 | |
| 
 | |
|   RETURN VALUES
 | |
|     0   ok
 | |
|     1   error.  errmsg is set to point to the error message
 | |
| */
 | |
| 
 | |
| int init_relay_log_pos(Relay_log_info* rli,const char* log,
 | |
|                        ulonglong pos, bool need_data_lock,
 | |
|                        const char** errmsg,
 | |
|                        bool look_for_description_event)
 | |
| {
 | |
|   DBUG_ENTER("init_relay_log_pos");
 | |
|   DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
 | |
| 
 | |
|   *errmsg=0;
 | |
|   mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
 | |
| 
 | |
|   if (need_data_lock)
 | |
|     mysql_mutex_lock(&rli->data_lock);
 | |
| 
 | |
|   /*
 | |
|     Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
 | |
|     is, too, and init_slave() too; these 2 functions allocate a description
 | |
|     event in init_relay_log_pos, which is not freed by the terminating SQL slave
 | |
|     thread as that thread is not started by these functions. So we have to free
 | |
|     the description_event here, in case, so that there is no memory leak in
 | |
|     running, say, CHANGE MASTER.
 | |
|   */
 | |
|   delete rli->relay_log.description_event_for_exec;
 | |
|   rli->relay_log.description_event_for_exec= new Format_description_log_event(4);
 | |
| 
 | |
|   mysql_mutex_lock(log_lock);
 | |
| 
 | |
|   /* Close log file and free buffers if it's already open */
 | |
|   if (rli->cur_log_fd >= 0)
 | |
|   {
 | |
|     end_io_cache(&rli->cache_buf);
 | |
|     mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
 | |
|     rli->cur_log_fd = -1;
 | |
|   }
 | |
| 
 | |
|   rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
 | |
|   rli->clear_flag(Relay_log_info::IN_STMT);
 | |
|   rli->clear_flag(Relay_log_info::IN_TRANSACTION);
 | |
| 
 | |
|   /*
 | |
|     Test to see if the previous run was with the skip of purging
 | |
|     If yes, we do not purge when we restart
 | |
|   */
 | |
|   if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
 | |
|   {
 | |
|     *errmsg="Could not find first log during relay log initialization";
 | |
|     goto err;
 | |
|   }
 | |
| 
 | |
|   if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
 | |
|   {
 | |
|     *errmsg="Could not find target log during relay log initialization";
 | |
|     goto err;
 | |
|   }
 | |
|   strmake_buf(rli->group_relay_log_name,rli->linfo.log_file_name);
 | |
|   strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name);
 | |
|   if (rli->relay_log.is_active(rli->linfo.log_file_name))
 | |
|   {
 | |
|     /*
 | |
|       The IO thread is using this log file.
 | |
|       In this case, we will use the same IO_CACHE pointer to
 | |
|       read data as the IO thread is using to write data.
 | |
|     */
 | |
|     my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
 | |
|     if (check_binlog_magic(rli->cur_log,errmsg))
 | |
|       goto err;
 | |
|     rli->cur_log_old_open_count=rli->relay_log.get_open_count();
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     /*
 | |
|       Open the relay log and set rli->cur_log to point at this one
 | |
|     */
 | |
|     if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
 | |
|                                      rli->linfo.log_file_name,errmsg)) < 0)
 | |
|       goto err;
 | |
|     rli->cur_log = &rli->cache_buf;
 | |
|   }
 | |
|   /*
 | |
|     In all cases, check_binlog_magic() has been called so we're at offset 4 for
 | |
|     sure.
 | |
|   */
 | |
|   if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
 | |
|   {
 | |
|     if (look_for_description_event)
 | |
|     {
 | |
|       Format_description_log_event *fdev;
 | |
|       if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg)))
 | |
|         goto err;
 | |
|       delete rli->relay_log.description_event_for_exec;
 | |
|       rli->relay_log.description_event_for_exec= fdev;
 | |
|     }
 | |
|     my_b_seek(rli->cur_log,(off_t)pos);
 | |
|     DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%llu rli->event_relay_log_pos=%llu",
 | |
|                         my_b_tell(rli->cur_log), rli->event_relay_log_pos));
 | |
| 
 | |
|   }
 | |
| 
 | |
| err:
 | |
|   /*
 | |
|     If we don't purge, we can't honour relay_log_space_limit ;
 | |
|     silently discard it
 | |
|   */
 | |
|   if (!relay_log_purge)
 | |
|     rli->log_space_limit= 0;
 | |
|   mysql_cond_broadcast(&rli->data_cond);
 | |
| 
 | |
|   mysql_mutex_unlock(log_lock);
 | |
| 
 | |
|   if (need_data_lock)
 | |
|     mysql_mutex_unlock(&rli->data_lock);
 | |
|   if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
 | |
|     *errmsg= "Invalid Format_description log event; could be out of memory";
 | |
| 
 | |
|   DBUG_PRINT("info", ("Returning %d from init_relay_log_pos", (*errmsg)?1:0));
 | |
| 
 | |
|   DBUG_RETURN ((*errmsg) ? 1 : 0);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Waits until the SQL thread reaches (has executed up to) the
 | |
|   log/position or timed out.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     wait_for_pos()
 | |
|     thd             client thread that sent SELECT MASTER_POS_WAIT
 | |
|     log_name        log name to wait for
 | |
|     log_pos         position to wait for
 | |
|     timeout         timeout in seconds before giving up waiting
 | |
| 
 | |
|   NOTES
 | |
|     timeout is longlong whereas it should be ulong ; but this is
 | |
|     to catch if the user submitted a negative timeout.
 | |
| 
 | |
|   RETURN VALUES
 | |
|     -2          improper arguments (log_pos<0)
 | |
|                 or slave not running, or master info changed
 | |
|                 during the function's execution,
 | |
|                 or client thread killed. -2 is translated to NULL by caller
 | |
|     -1          timed out
 | |
|     >=0         number of log events the function had to wait
 | |
|                 before reaching the desired log/position
 | |
|  */
 | |
| 
 | |
| int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
 | |
|                                     longlong log_pos,
 | |
|                                     longlong timeout)
 | |
| {
 | |
|   int event_count = 0;
 | |
|   ulong init_abort_pos_wait;
 | |
|   int error=0;
 | |
|   struct timespec abstime; // for timeout checking
 | |
|   PSI_stage_info old_stage;
 | |
|   DBUG_ENTER("Relay_log_info::wait_for_pos");
 | |
| 
 | |
|   if (!inited)
 | |
|     DBUG_RETURN(-2);
 | |
| 
 | |
|   DBUG_PRINT("enter",("log_name: '%s'  log_pos: %lu  timeout: %lu",
 | |
|                       log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
 | |
| 
 | |
|   set_timespec(abstime,timeout);
 | |
|   mysql_mutex_lock(&data_lock);
 | |
|   thd->ENTER_COND(&data_cond, &data_lock,
 | |
|                   &stage_waiting_for_the_slave_thread_to_advance_position,
 | |
|                   &old_stage);
 | |
|   /*
 | |
|      This function will abort when it notices that some CHANGE MASTER or
 | |
|      RESET MASTER has changed the master info.
 | |
|      To catch this, these commands modify abort_pos_wait ; We just monitor
 | |
|      abort_pos_wait and see if it has changed.
 | |
|      Why do we have this mechanism instead of simply monitoring slave_running
 | |
|      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
 | |
|      the SQL thread be stopped?
 | |
|      This is becasue if someones does:
 | |
|      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
 | |
|      the change may happen very quickly and we may not notice that
 | |
|      slave_running briefly switches between 1/0/1.
 | |
|   */
 | |
|   init_abort_pos_wait= abort_pos_wait;
 | |
| 
 | |
|   /*
 | |
|     We'll need to
 | |
|     handle all possible log names comparisons (e.g. 999 vs 1000).
 | |
|     We use ulong for string->number conversion ; this is no
 | |
|     stronger limitation than in find_uniq_filename in sql/log.cc
 | |
|   */
 | |
|   ulong log_name_extension;
 | |
|   char log_name_tmp[FN_REFLEN]; //make a char[] from String
 | |
| 
 | |
|   strmake(log_name_tmp, log_name->ptr(), MY_MIN(log_name->length(), FN_REFLEN-1));
 | |
| 
 | |
|   char *p= fn_ext(log_name_tmp);
 | |
|   char *p_end;
 | |
|   if (!*p || log_pos<0)
 | |
|   {
 | |
|     error= -2; //means improper arguments
 | |
|     goto err;
 | |
|   }
 | |
|   // Convert 0-3 to 4
 | |
|   log_pos= MY_MAX(log_pos, BIN_LOG_HEADER_SIZE);
 | |
|   /* p points to '.' */
 | |
|   log_name_extension= strtoul(++p, &p_end, 10);
 | |
|   /*
 | |
|     p_end points to the first invalid character.
 | |
|     If it equals to p, no digits were found, error.
 | |
|     If it contains '\0' it means conversion went ok.
 | |
|   */
 | |
|   if (p_end==p || *p_end)
 | |
|   {
 | |
|     error= -2;
 | |
|     goto err;
 | |
|   }
 | |
| 
 | |
|   /* The "compare and wait" main loop */
 | |
|   while (!thd->killed &&
 | |
|          init_abort_pos_wait == abort_pos_wait &&
 | |
|          slave_running)
 | |
|   {
 | |
|     bool pos_reached;
 | |
|     int cmp_result= 0;
 | |
| 
 | |
|     DBUG_PRINT("info",
 | |
|                ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
 | |
|                 init_abort_pos_wait, abort_pos_wait));
 | |
|     DBUG_PRINT("info",("group_master_log_name: '%s'  pos: %lu",
 | |
|                        group_master_log_name, (ulong) group_master_log_pos));
 | |
| 
 | |
|     /*
 | |
|       group_master_log_name can be "", if we are just after a fresh
 | |
|       replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
 | |
|       (before we have executed one Rotate event from the master) or
 | |
|       (rare) if the user is doing a weird slave setup (see next
 | |
|       paragraph).  If group_master_log_name is "", we assume we don't
 | |
|       have enough info to do the comparison yet, so we just wait until
 | |
|       more data. In this case master_log_pos is always 0 except if
 | |
|       somebody (wrongly) sets this slave to be a slave of itself
 | |
|       without using --replicate-same-server-id (an unsupported
 | |
|       configuration which does nothing), then group_master_log_pos
 | |
|       will grow and group_master_log_name will stay "".
 | |
|     */
 | |
|     if (*group_master_log_name)
 | |
|     {
 | |
|       char *basename= (group_master_log_name +
 | |
|                        dirname_length(group_master_log_name));
 | |
|       /*
 | |
|         First compare the parts before the extension.
 | |
|         Find the dot in the master's log basename,
 | |
|         and protect against user's input error :
 | |
|         if the names do not match up to '.' included, return error
 | |
|       */
 | |
|       char *q= (char*)(fn_ext(basename)+1);
 | |
|       if (strncmp(basename, log_name_tmp, (int)(q-basename)))
 | |
|       {
 | |
|         error= -2;
 | |
|         break;
 | |
|       }
 | |
|       // Now compare extensions.
 | |
|       char *q_end;
 | |
|       ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
 | |
|       if (group_master_log_name_extension < log_name_extension)
 | |
|         cmp_result= -1 ;
 | |
|       else
 | |
|         cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
 | |
| 
 | |
|       pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
 | |
|                     cmp_result > 0);
 | |
|       if (pos_reached || thd->killed)
 | |
|         break;
 | |
|     }
 | |
| 
 | |
|     //wait for master update, with optional timeout.
 | |
| 
 | |
|     DBUG_PRINT("info",("Waiting for master update"));
 | |
|     /*
 | |
|       We are going to mysql_cond_(timed)wait(); if the SQL thread stops it
 | |
|       will wake us up.
 | |
|     */
 | |
|     thd_wait_begin(thd, THD_WAIT_BINLOG);
 | |
|     if (timeout > 0)
 | |
|     {
 | |
|       /*
 | |
|         Note that mysql_cond_timedwait checks for the timeout
 | |
|         before for the condition ; i.e. it returns ETIMEDOUT
 | |
|         if the system time equals or exceeds the time specified by abstime
 | |
|         before the condition variable is signaled or broadcast, _or_ if
 | |
|         the absolute time specified by abstime has already passed at the time
 | |
|         of the call.
 | |
|         For that reason, mysql_cond_timedwait will do the "timeoutting" job
 | |
|         even if its condition is always immediately signaled (case of a loaded
 | |
|         master).
 | |
|       */
 | |
|       error= mysql_cond_timedwait(&data_cond, &data_lock, &abstime);
 | |
|     }
 | |
|     else
 | |
|       mysql_cond_wait(&data_cond, &data_lock);
 | |
|     thd_wait_end(thd);
 | |
|     DBUG_PRINT("info",("Got signal of master update or timed out"));
 | |
|     if (error == ETIMEDOUT || error == ETIME)
 | |
|     {
 | |
|       my_printf_error(ER_UNKNOWN_ERROR,
 | |
|                       "Timeout waiting for %s:%llu. Current pos is %s:%llu",
 | |
|                       MYF(ME_ERROR_LOG | ME_NOTE),
 | |
|                       log_name_tmp, (ulonglong) log_pos,
 | |
|                       group_master_log_name, (ulonglong) group_master_log_pos);
 | |
|       error= -1;
 | |
|       break;
 | |
|     }
 | |
|     error=0;
 | |
|     event_count++;
 | |
|     DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
 | |
|   }
 | |
| 
 | |
| err:
 | |
|   thd->EXIT_COND(&old_stage);
 | |
|   DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
 | |
| improper_arguments: %d  timed_out: %d",
 | |
|                      thd->killed_errno(),
 | |
|                      (int) (init_abort_pos_wait != abort_pos_wait),
 | |
|                      (int) slave_running,
 | |
|                      (int) (error == -2),
 | |
|                      (int) (error == -1)));
 | |
|   if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
 | |
|       !slave_running)
 | |
|   {
 | |
|     const char *cause= 0;
 | |
|     if (init_abort_pos_wait != abort_pos_wait)
 | |
|       cause= "CHANGE MASTER detected";
 | |
|     else if (!slave_running)
 | |
|       cause="slave is not running";
 | |
|     else
 | |
|       cause="connection was killed";
 | |
|     my_printf_error(ER_UNKNOWN_ERROR,
 | |
|                     "master_pos_wait() was aborted because %s",
 | |
|                     MYF(ME_ERROR_LOG | ME_NOTE),
 | |
|                     cause);
 | |
|     error= -2;
 | |
|   }
 | |
|   DBUG_RETURN( error ? error : event_count );
 | |
| }
 | |
| 
 | |
| 
 | |
| void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
 | |
|                                              rpl_group_info *rgi,
 | |
|                                              bool skip_lock)
 | |
| {
 | |
|   DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
 | |
| 
 | |
|   if (skip_lock)
 | |
|     mysql_mutex_assert_owner(&data_lock);
 | |
|   else
 | |
|     mysql_mutex_lock(&data_lock);
 | |
| 
 | |
|   rgi->inc_event_relay_log_pos();
 | |
|   DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu",
 | |
|                       (long) log_pos, (long) group_master_log_pos));
 | |
|   if (rgi->is_parallel_exec)
 | |
|   {
 | |
|     /* In case of parallel replication, do not update the position backwards. */
 | |
|     int cmp= compare_log_name(group_relay_log_name, rgi->event_relay_log_name);
 | |
|     if (cmp < 0)
 | |
|     {
 | |
|       group_relay_log_pos= rgi->future_event_relay_log_pos;
 | |
|       strmake_buf(group_relay_log_name, rgi->event_relay_log_name);
 | |
|     } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
 | |
|       group_relay_log_pos= rgi->future_event_relay_log_pos;
 | |
| 
 | |
|     /*
 | |
|       In the parallel case we need to update the master_log_name here, rather
 | |
|       than in Rotate_log_event::do_update_pos().
 | |
|     */
 | |
|     cmp= compare_log_name(group_master_log_name, rgi->future_event_master_log_name);
 | |
|     if (cmp <= 0)
 | |
|     {
 | |
|       if (cmp < 0)
 | |
|       {
 | |
|         safe_strcpy(group_master_log_name, sizeof(group_master_log_name),
 | |
|                     rgi->future_event_master_log_name);
 | |
|         group_master_log_pos= log_pos;
 | |
|       }
 | |
|       else if (group_master_log_pos < log_pos)
 | |
|         group_master_log_pos= log_pos;
 | |
|     }
 | |
| 
 | |
|     /*
 | |
|       In the parallel case, we only update the Seconds_Behind_Master at the
 | |
|       end of a transaction. In the non-parallel case, the value is updated as
 | |
|       soon as an event is read from the relay log; however this would be too
 | |
|       confusing for the user, seeing the slave reported as up-to-date when
 | |
|       potentially thousands of events are still queued up for worker threads
 | |
|       waiting for execution.
 | |
|     */
 | |
|     set_if_bigger(last_master_timestamp, rgi->last_master_timestamp);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     /* Non-parallel case. */
 | |
|     group_relay_log_pos= event_relay_log_pos;
 | |
|     strmake_buf(group_relay_log_name, event_relay_log_name);
 | |
|     notify_group_relay_log_name_update();
 | |
|     if (log_pos) // not 3.23 binlogs (no log_pos there) and not Stop_log_event
 | |
|       group_master_log_pos= log_pos;
 | |
|   }
 | |
|   set_if_bigger(slave_timestamp, rgi->last_master_timestamp);
 | |
| 
 | |
|   /*
 | |
|     If the slave does not support transactions and replicates a transaction,
 | |
|     users should not trust group_master_log_pos (which they can display with
 | |
|     SHOW SLAVE STATUS or read from relay-log.info), because to compute
 | |
|     group_master_log_pos the slave relies on log_pos stored in the master's
 | |
|     binlog, but if we are in a master's transaction these positions are always
 | |
|     the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
 | |
|     not advance as it should on the non-transactional slave (it advances by
 | |
|     big leaps, whereas it should advance by small leaps).
 | |
|   */
 | |
|   /*
 | |
|     In 4.x we used the event's len to compute the positions here. This is
 | |
|     wrong if the event was 3.23/4.0 and has been converted to 5.0, because
 | |
|     then the event's len is not what is was in the master's binlog, so this
 | |
|     will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
 | |
|     replication: Exec_master_log_pos is wrong). Only way to solve this is to
 | |
|     have the original offset of the end of the event the relay log. This is
 | |
|     what we do in 5.0: log_pos has become "end_log_pos" (because the real use
 | |
|     of log_pos in 4.0 was to compute the end_log_pos; so better to store
 | |
|     end_log_pos instead of begin_log_pos.
 | |
|     If we had not done this fix here, the problem would also have appeared
 | |
|     when the slave and master are 5.0 but with different event length (for
 | |
|     example the slave is more recent than the master and features the event
 | |
|     UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
 | |
|     SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
 | |
|     value which would lead to badly broken replication.
 | |
|     Even the relay_log_pos will be corrupted in this case, because the len is
 | |
|     the relay log is not "val".
 | |
|     With the end_log_pos solution, we avoid computations involving lengthes.
 | |
|   */
 | |
|   mysql_cond_broadcast(&data_cond);
 | |
|   if (!skip_lock)
 | |
|     mysql_mutex_unlock(&data_lock);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| void Relay_log_info::close_temporary_tables()
 | |
| {
 | |
|   DBUG_ENTER("Relay_log_info::close_temporary_tables");
 | |
| 
 | |
|   TMP_TABLE_SHARE *share;
 | |
|   TABLE *table;
 | |
| 
 | |
|   if (!save_temporary_tables)
 | |
|   {
 | |
|     /* There are no temporary tables. */
 | |
|     DBUG_VOID_RETURN;
 | |
|   }
 | |
| 
 | |
|   while ((share= save_temporary_tables->pop_front()))
 | |
|   {
 | |
|     /*
 | |
|       Iterate over the list of tables for this TABLE_SHARE and close them.
 | |
|     */
 | |
|     while ((table= share->all_tmp_tables.pop_front()))
 | |
|     {
 | |
|       DBUG_PRINT("tmptable", ("closing table: '%s'.'%s'",
 | |
|                               table->s->db.str, table->s->table_name.str));
 | |
| 
 | |
|       /* Reset in_use as the table may have been created by another thd */
 | |
|       table->in_use= 0;
 | |
|       /*
 | |
|         Lets not free TABLE_SHARE here as there could be multiple TABLEs opened
 | |
|         for the same table (TABLE_SHARE).
 | |
|       */
 | |
|       closefrm(table);
 | |
|       my_free(table);
 | |
|     }
 | |
| 
 | |
|     /*
 | |
|       Don't ask for disk deletion. For now, anyway they will be deleted when
 | |
|       slave restarts, but it is a better intention to not delete them.
 | |
|     */
 | |
| 
 | |
|     free_table_share(share);
 | |
|     my_free(share);
 | |
|   }
 | |
| 
 | |
|   /* By now, there mustn't be any elements left in the list. */
 | |
|   DBUG_ASSERT(save_temporary_tables->is_empty());
 | |
| 
 | |
|   my_free(save_temporary_tables);
 | |
|   save_temporary_tables= NULL;
 | |
|   slave_open_temp_tables= 0;
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| /*
 | |
|   purge_relay_logs()
 | |
| 
 | |
|   @param rli		Relay log information
 | |
|   @param thd		thread id. May be zero during startup
 | |
| 
 | |
|   NOTES
 | |
|     Assumes to have a run lock on rli and that no slave thread are running.
 | |
| */
 | |
| 
 | |
| int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
 | |
|                      const char** errmsg)
 | |
| {
 | |
|   int error=0;
 | |
|   const char *ln;
 | |
|   char name_buf[FN_REFLEN];
 | |
|   DBUG_ENTER("purge_relay_logs");
 | |
| 
 | |
|   /*
 | |
|     Even if rli->inited==0, we still try to empty rli->master_log_* variables.
 | |
|     Indeed, rli->inited==0 does not imply that they already are empty.
 | |
|     It could be that slave's info initialization partly succeeded :
 | |
|     for example if relay-log.info existed but *relay-bin*.*
 | |
|     have been manually removed, Relay_log_info::init() reads the old
 | |
|     relay-log.info and fills rli->master_log_*, then Relay_log_info::init()
 | |
|     checks for the existence of the relay log, this fails and
 | |
|     Relay_log_info::init() leaves rli->inited to 0.
 | |
|     In that pathological case, rli->master_log_pos* will be properly reinited
 | |
|     at the next START SLAVE (as RESET SLAVE or CHANGE
 | |
|     MASTER, the callers of purge_relay_logs, will delete bogus *.info files
 | |
|     or replace them with correct files), however if the user does SHOW SLAVE
 | |
|     STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
 | |
|     In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
 | |
|     to display fine in any case.
 | |
|   */
 | |
| 
 | |
|   rli->group_master_log_name[0]= 0;
 | |
|   rli->group_master_log_pos= 0;
 | |
| 
 | |
|   if (!rli->inited)
 | |
|   {
 | |
|     DBUG_PRINT("info", ("rli->inited == 0"));
 | |
|     if (rli->error_on_rli_init_info)
 | |
|     {
 | |
|       ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
 | |
|                                        1, name_buf);
 | |
| 
 | |
|       if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE))
 | |
|       {
 | |
|         sql_print_error("Unable to purge relay log files. Failed to open relay "
 | |
|                         "log index file:%s.", rli->relay_log.get_index_fname());
 | |
|         DBUG_RETURN(1);
 | |
|       }
 | |
|       mysql_mutex_lock(rli->relay_log.get_log_lock());
 | |
|       if (rli->relay_log.open(ln, 0, 0, SEQ_READ_APPEND,
 | |
|                              (ulong)(rli->max_relay_log_size ? rli->max_relay_log_size :
 | |
|                               max_binlog_size), 1, TRUE))
 | |
|       {
 | |
|         sql_print_error("Unable to purge relay log files. Failed to open relay "
 | |
|                         "log file:%s.", rli->relay_log.get_log_fname());
 | |
|         mysql_mutex_unlock(rli->relay_log.get_log_lock());
 | |
|         DBUG_RETURN(1);
 | |
|       }
 | |
|       mysql_mutex_unlock(rli->relay_log.get_log_lock());
 | |
|     }
 | |
|     else
 | |
|       DBUG_RETURN(0);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     DBUG_ASSERT(rli->slave_running == 0);
 | |
|     DBUG_ASSERT(rli->mi->slave_running == 0);
 | |
|   }
 | |
|   mysql_mutex_lock(&rli->data_lock);
 | |
| 
 | |
|   /*
 | |
|     we close the relay log fd possibly left open by the slave SQL thread,
 | |
|     to be able to delete it; the relay log fd possibly left open by the slave
 | |
|     I/O thread will be closed naturally in reset_logs() by the
 | |
|     close(LOG_CLOSE_TO_BE_OPENED) call
 | |
|   */
 | |
|   if (rli->cur_log_fd >= 0)
 | |
|   {
 | |
|     end_io_cache(&rli->cache_buf);
 | |
|     mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
 | |
|     rli->cur_log_fd= -1;
 | |
|   }
 | |
| 
 | |
|   if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0, 0))
 | |
|   {
 | |
|     *errmsg = "Failed during log reset";
 | |
|     error=1;
 | |
|     goto err;
 | |
|   }
 | |
|   rli->relay_log_state.load(rpl_global_gtid_slave_state);
 | |
|   if (!just_reset)
 | |
|   {
 | |
|     /* Save name of used relay log file */
 | |
|     strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname());
 | |
|     strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname());
 | |
|     rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
 | |
|     rli->log_space_total= 0;
 | |
| 
 | |
|     if (count_relay_log_space(rli))
 | |
|     {
 | |
|       *errmsg= "Error counting relay log space";
 | |
|       error=1;
 | |
|       goto err;
 | |
|     }
 | |
|     error= init_relay_log_pos(rli, rli->group_relay_log_name,
 | |
|                               rli->group_relay_log_pos,
 | |
|                               0 /* do not need data lock */, errmsg, 0);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     /* Ensure relay log names are not used */
 | |
|     rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0;
 | |
|   }
 | |
| 
 | |
|   if (!rli->inited && rli->error_on_rli_init_info)
 | |
|   {
 | |
|     mysql_mutex_lock(rli->relay_log.get_log_lock());
 | |
|     rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 | |
|     mysql_mutex_unlock(rli->relay_log.get_log_lock());
 | |
|   }
 | |
| err:
 | |
|   DBUG_PRINT("info",("log_space_total: %llu", uint64(rli->log_space_total)));
 | |
|   mysql_mutex_unlock(&rli->data_lock);
 | |
|   DBUG_RETURN(error);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|      Check if condition stated in UNTIL clause of START SLAVE is reached.
 | |
|    SYNOPSYS
 | |
|      Relay_log_info::is_until_satisfied()
 | |
|      master_beg_pos    position of the beginning of to be executed event
 | |
|                        (not log_pos member of the event that points to the
 | |
|                         beginning of the following event)
 | |
| 
 | |
| 
 | |
|    DESCRIPTION
 | |
|      Checks if UNTIL condition is reached. Uses caching result of last
 | |
|      comparison of current log file name and target log file name. So cached
 | |
|      value should be invalidated if current log file name changes
 | |
|      (see Relay_log_info::notify_... functions).
 | |
| 
 | |
|      This caching is needed to avoid of expensive string comparisons and
 | |
|      strtol() conversions needed for log names comparison. We don't need to
 | |
|      compare them each time this function is called, we only need to do this
 | |
|      when current log name changes. If we have UNTIL_MASTER_POS condition we
 | |
|      need to do this only after Rotate_log_event::do_apply_event() (which is
 | |
|      rare, so caching gives real benefit), and if we have UNTIL_RELAY_POS
 | |
|      condition then we should invalidate cached comarison value after
 | |
|      inc_group_relay_log_pos() which called for each group of events (so we
 | |
|      have some benefit if we have something like queries that use
 | |
|      autoincrement or if we have transactions).
 | |
| 
 | |
|      Should be called ONLY if until_condition != UNTIL_NONE !
 | |
| 
 | |
|      In the parallel execution mode and UNTIL_MASTER_POS the file name is
 | |
|      presented by future_event_master_log_name which may be ahead of
 | |
|      group_master_log_name. Log_event::log_pos does relate to it nevertheless
 | |
|      so the pair comprises a correct binlog coordinate.
 | |
|      Internal group events and events that have zero log_pos also
 | |
|      produce the zero for the local log_pos which may not lead to the
 | |
|      function falsely return true.
 | |
|      In UNTIL_RELAY_POS the original caching and notification are simplified
 | |
|      to straightforward files comparison when the current event can't be
 | |
|      a part of an event group.
 | |
| 
 | |
|    RETURN VALUE
 | |
|      true - condition met or error happened (condition seems to have
 | |
|             bad log file name)
 | |
|      false - condition not met
 | |
| */
 | |
| 
 | |
| bool Relay_log_info::is_until_satisfied(Log_event *ev)
 | |
| {
 | |
|   const char *log_name;
 | |
|   ulonglong log_pos;
 | |
|   /* Prevents stopping within transaction; needed solely for Relay UNTIL. */
 | |
|   bool in_trans= false;
 | |
| 
 | |
|   DBUG_ENTER("Relay_log_info::is_until_satisfied");
 | |
| 
 | |
|   if (until_condition == UNTIL_MASTER_POS)
 | |
|   {
 | |
|     log_name= (mi->using_parallel() ? future_event_master_log_name
 | |
|                                     : group_master_log_name);
 | |
|     log_pos= (get_flag(Relay_log_info::IN_TRANSACTION) || !ev || !ev->log_pos) ?
 | |
|       (mi->using_parallel() ? 0 : group_master_log_pos) :
 | |
|       ev->log_pos - ev->data_written;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     DBUG_ASSERT(until_condition == UNTIL_RELAY_POS);
 | |
|     if (!mi->using_parallel())
 | |
|     {
 | |
|       log_name= group_relay_log_name;
 | |
|       log_pos= group_relay_log_pos;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       log_name= event_relay_log_name;
 | |
|       log_pos=  event_relay_log_pos;
 | |
|       in_trans= get_flag(Relay_log_info::IN_TRANSACTION);
 | |
|       /*
 | |
|         until_log_names_cmp_result is set to UNKNOWN either
 | |
|         -  by a non-group event *and* only when it is in the middle of a group
 | |
|         -  or by a group event when the preceding group made the above
 | |
|            non-group event to defer the resetting.
 | |
|       */
 | |
|       if ((ev && !Log_event::is_group_event(ev->get_type_code())))
 | |
|       {
 | |
|         if (in_trans)
 | |
|         {
 | |
|           until_relay_log_names_defer= true;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|           until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
 | |
|           until_relay_log_names_defer= false;
 | |
|         }
 | |
|       }
 | |
|       else if (!in_trans && until_relay_log_names_defer)
 | |
|       {
 | |
|         until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
 | |
|         until_relay_log_names_defer= false;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%llu",
 | |
|                       group_master_log_name, group_master_log_pos));
 | |
|   DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%llu",
 | |
|                       group_relay_log_name, group_relay_log_pos));
 | |
|   DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%llu",
 | |
|                       until_condition == UNTIL_MASTER_POS ? "master" : "relay",
 | |
|                       log_name, log_pos));
 | |
|   DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%llu",
 | |
|                       until_condition == UNTIL_MASTER_POS ? "master" : "relay",
 | |
|                       until_log_name, until_log_pos));
 | |
| 
 | |
|   if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
 | |
|   {
 | |
|     /*
 | |
|       We have no cached comparison results so we should compare log names
 | |
|       and cache result.
 | |
|       If we are after RESET SLAVE, and the SQL slave thread has not processed
 | |
|       any event yet, it could be that group_master_log_name is "". In that case,
 | |
|       just wait for more events (as there is no sensible comparison to do).
 | |
|     */
 | |
| 
 | |
|     if (*log_name)
 | |
|     {
 | |
|       const char *basename= log_name + dirname_length(log_name);
 | |
| 
 | |
|       const char *q= (const char*)(fn_ext(basename)+1);
 | |
|       if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
 | |
|       {
 | |
|         /* Now compare extensions. */
 | |
|         char *q_end;
 | |
|         ulong log_name_extension= strtoul(q, &q_end, 10);
 | |
|         if (log_name_extension < until_log_name_extension)
 | |
|           until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
 | |
|         else
 | |
|           until_log_names_cmp_result=
 | |
|             (log_name_extension > until_log_name_extension) ?
 | |
|             UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         /* Probably error so we aborting */
 | |
|         sql_print_error("Slave SQL thread is stopped because UNTIL "
 | |
|                         "condition is bad.");
 | |
|         DBUG_RETURN(TRUE);
 | |
|       }
 | |
|     }
 | |
|     else
 | |
|       DBUG_RETURN(until_log_pos == 0);
 | |
|   }
 | |
| 
 | |
|   DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
 | |
|                 (log_pos >= until_log_pos && !in_trans)) ||
 | |
|                until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
 | |
| }
 | |
| 
 | |
| 
 | |
| bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
 | |
|                                rpl_group_info *rgi)
 | |
| {
 | |
|   int error= 0;
 | |
|   DBUG_ENTER("Relay_log_info::stmt_done");
 | |
| 
 | |
|   DBUG_ASSERT(!belongs_to_client());
 | |
|   DBUG_ASSERT(rgi->rli == this);
 | |
|   /*
 | |
|     If in a transaction, and if the slave supports transactions, just
 | |
|     inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
 | |
|     (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
 | |
|     BEGIN/COMMIT, not with SET AUTOCOMMIT= .
 | |
| 
 | |
|     We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
 | |
|     is also used for single row transactions.
 | |
| 
 | |
|     CAUTION: opt_using_transactions means innodb || bdb ; suppose the
 | |
|     master supports InnoDB and BDB, but the slave supports only BDB,
 | |
|     problems will arise: - suppose an InnoDB table is created on the
 | |
|     master, - then it will be MyISAM on the slave - but as
 | |
|     opt_using_transactions is true, the slave will believe he is
 | |
|     transactional with the MyISAM table. And problems will come when
 | |
|     one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
 | |
|     resume at BEGIN whereas there has not been any rollback).  This is
 | |
|     the problem of using opt_using_transactions instead of a finer
 | |
|     "does the slave support _transactional handler used on the
 | |
|     master_".
 | |
| 
 | |
|     More generally, we'll have problems when a query mixes a
 | |
|     transactional handler and MyISAM and STOP SLAVE is issued in the
 | |
|     middle of the "transaction". START SLAVE will resume at BEGIN
 | |
|     while the MyISAM table has already been updated.
 | |
|   */
 | |
|   if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
 | |
|       opt_using_transactions)
 | |
|     rgi->inc_event_relay_log_pos();
 | |
|   else
 | |
|   {
 | |
|     inc_group_relay_log_pos(event_master_log_pos, rgi);
 | |
|     if (rpl_global_gtid_slave_state->record_and_update_gtid(thd, rgi))
 | |
|     {
 | |
|       report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
 | |
|              "Failed to update GTID state in %s.%s, slave state may become "
 | |
|              "inconsistent: %d: %s",
 | |
|              "mysql", rpl_gtid_slave_state_table_name.str,
 | |
|              thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message());
 | |
|       /*
 | |
|         At this point we are not in a transaction (for example after DDL),
 | |
|         so we can not roll back. Anyway, normally updates to the slave
 | |
|         state table should not fail, and if they do, at least we made the
 | |
|         DBA aware of the problem in the error log.
 | |
|       */
 | |
|     }
 | |
|     DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
 | |
|     if (mi->using_gtid == Master_info::USE_GTID_NO)
 | |
|     {
 | |
|       if (rgi->is_parallel_exec)
 | |
|         mysql_mutex_lock(&data_lock);
 | |
|       if (flush())
 | |
|         error= 1;
 | |
|       if (rgi->is_parallel_exec)
 | |
|         mysql_mutex_unlock(&data_lock);
 | |
|     }
 | |
|     DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
 | |
|   }
 | |
|   DBUG_RETURN(error);
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| Relay_log_info::alloc_inuse_relaylog(const char *name)
 | |
| {
 | |
|   inuse_relaylog *ir;
 | |
|   uint32 gtid_count;
 | |
|   rpl_gtid *gtid_list;
 | |
| 
 | |
|   gtid_count= relay_log_state.count();
 | |
|   if (!(gtid_list= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME,
 | |
|                                  sizeof(*gtid_list)*gtid_count, MYF(MY_WME))))
 | |
|   {
 | |
|     my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count);
 | |
|     return 1;
 | |
|   }
 | |
|   if (!(ir= new inuse_relaylog(this, gtid_list, gtid_count, name)))
 | |
|   {
 | |
|     my_free(gtid_list);
 | |
|     my_error(ER_OUTOFMEMORY, MYF(0), (int) sizeof(*ir));
 | |
|     return 1;
 | |
|   }
 | |
|   if (relay_log_state.get_gtid_list(gtid_list, gtid_count))
 | |
|   {
 | |
|     my_free(gtid_list);
 | |
|     delete ir;
 | |
|     DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */);
 | |
|     my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|     return 1;
 | |
|   }
 | |
| 
 | |
|   if (!inuse_relaylog_list)
 | |
|     inuse_relaylog_list= ir;
 | |
|   else
 | |
|   {
 | |
|     last_inuse_relaylog->completed= true;
 | |
|     last_inuse_relaylog->next= ir;
 | |
|   }
 | |
|   last_inuse_relaylog= ir;
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir)
 | |
| {
 | |
|   my_free(ir->relay_log_state);
 | |
|   delete ir;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| Relay_log_info::reset_inuse_relaylog()
 | |
| {
 | |
|   inuse_relaylog *cur= inuse_relaylog_list;
 | |
|   while (cur)
 | |
|   {
 | |
|     DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
 | |
|     inuse_relaylog *next= cur->next;
 | |
|     free_inuse_relaylog(cur);
 | |
|     cur= next;
 | |
|   }
 | |
|   inuse_relaylog_list= last_inuse_relaylog= NULL;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
 | |
| {
 | |
|   int res= 0;
 | |
|   while (count)
 | |
|   {
 | |
|     if (relay_log_state.update_nolock(gtid_list))
 | |
|       res= 1;
 | |
|     ++gtid_list;
 | |
|     --count;
 | |
|   }
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 | |
| struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; };
 | |
| 
 | |
| static int
 | |
| scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
 | |
|                               LEX_CSTRING *tablename, void **out_hton)
 | |
| {
 | |
|   TABLE_LIST tlist;
 | |
|   TABLE *UNINIT_VAR(table);
 | |
|   bool table_opened= false;
 | |
|   bool table_scanned= false;
 | |
|   struct gtid_pos_element tmp_entry, *entry;
 | |
|   int err= 0;
 | |
| 
 | |
|   thd->reset_for_next_command();
 | |
|   tlist.init_one_table(&MYSQL_SCHEMA_NAME, tablename, NULL, TL_READ);
 | |
|   if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
 | |
|     goto end;
 | |
|   table_opened= true;
 | |
|   table= tlist.table;
 | |
| 
 | |
|   if ((err= gtid_check_rpl_slave_state_table(table)))
 | |
|     goto end;
 | |
| 
 | |
|   bitmap_set_all(table->read_set);
 | |
|   if (unlikely(err= table->file->ha_rnd_init_with_error(1)))
 | |
|     goto end;
 | |
| 
 | |
|   table_scanned= true;
 | |
|   for (;;)
 | |
|   {
 | |
|     uint32 domain_id, server_id;
 | |
|     uint64 sub_id, seq_no;
 | |
|     uchar *rec;
 | |
| 
 | |
|     if ((err= table->file->ha_rnd_next(table->record[0])))
 | |
|     {
 | |
|       if (err == HA_ERR_END_OF_FILE)
 | |
|         break;
 | |
|       else
 | |
|       {
 | |
|         table->file->print_error(err, MYF(0));
 | |
|         goto end;
 | |
|       }
 | |
|     }
 | |
|     domain_id= (uint32)table->field[0]->val_int();
 | |
|     sub_id= (ulonglong)table->field[1]->val_int();
 | |
|     server_id= (uint32)table->field[2]->val_int();
 | |
|     seq_no= (ulonglong)table->field[3]->val_int();
 | |
|     DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu",
 | |
|                         (unsigned)domain_id, (unsigned)server_id,
 | |
|                         (ulong)seq_no, (ulong)sub_id));
 | |
| 
 | |
|     tmp_entry.sub_id= sub_id;
 | |
|     tmp_entry.gtid.domain_id= domain_id;
 | |
|     tmp_entry.gtid.server_id= server_id;
 | |
|     tmp_entry.gtid.seq_no= seq_no;
 | |
|     tmp_entry.hton= table->s->db_type();
 | |
|     if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
 | |
|     {
 | |
|       my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|       goto end;
 | |
|     }
 | |
| 
 | |
|     if ((rec= my_hash_search(hash, (const uchar *)&domain_id,
 | |
|                              sizeof(domain_id))))
 | |
|     {
 | |
|       entry= (struct gtid_pos_element *)rec;
 | |
|       if (entry->sub_id >= sub_id)
 | |
|         continue;
 | |
|       entry->sub_id= sub_id;
 | |
|       DBUG_ASSERT(entry->gtid.domain_id == domain_id);
 | |
|       entry->gtid.server_id= server_id;
 | |
|       entry->gtid.seq_no= seq_no;
 | |
|       entry->hton= table->s->db_type();
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       if (!(entry= (struct gtid_pos_element *)my_malloc(PSI_INSTRUMENT_ME,
 | |
|                                                 sizeof(*entry), MYF(MY_WME))))
 | |
|       {
 | |
|         my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
 | |
|         err= 1;
 | |
|         goto end;
 | |
|       }
 | |
|       entry->sub_id= sub_id;
 | |
|       entry->gtid.domain_id= domain_id;
 | |
|       entry->gtid.server_id= server_id;
 | |
|       entry->gtid.seq_no= seq_no;
 | |
|       entry->hton= table->s->db_type();
 | |
|       if ((err= my_hash_insert(hash, (uchar *)entry)))
 | |
|       {
 | |
|         my_free(entry);
 | |
|         my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|         goto end;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   err= 0;                                       /* Clear HA_ERR_END_OF_FILE */
 | |
| 
 | |
| end:
 | |
|   if (table_scanned)
 | |
|   {
 | |
|     table->file->ha_index_or_rnd_end();
 | |
|     ha_commit_trans(thd, FALSE);
 | |
|     trans_commit(thd);
 | |
|   }
 | |
|   if (table_opened)
 | |
|   {
 | |
|     *out_hton= table->s->db_type();
 | |
|     close_thread_tables(thd);
 | |
|     thd->release_transactional_locks();
 | |
|   }
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Look for all tables mysql.gtid_slave_pos*. Read all rows from each such
 | |
|   table found into ARRAY. For each domain id, put the row with highest sub_id
 | |
|   into HASH.
 | |
| */
 | |
| static int
 | |
| scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_CSTRING *, void *),
 | |
|                               void *cb_data)
 | |
| {
 | |
|   char path[FN_REFLEN];
 | |
|   MY_DIR *dirp;
 | |
| 
 | |
|   thd->reset_for_next_command();
 | |
|   if (lock_schema_name(thd, Lex_ident_db_normalized(MYSQL_SCHEMA_NAME)))
 | |
|     return 1;
 | |
| 
 | |
|   build_table_filename(path, sizeof(path) - 1, MYSQL_SCHEMA_NAME.str, "", "", 0);
 | |
|   if (!(dirp= my_dir(path, MYF(MY_DONT_SORT))))
 | |
|   {
 | |
|     my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno);
 | |
|     close_thread_tables(thd);
 | |
|     thd->release_transactional_locks();
 | |
|     return 1;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     size_t i;
 | |
|     Dynamic_array<LEX_CSTRING*> files(PSI_INSTRUMENT_MEM,
 | |
|                                       dirp->number_of_files);
 | |
|     Discovered_table_list tl(thd, &files);
 | |
|     int err;
 | |
| 
 | |
|     err= ha_discover_table_names(thd, &MYSQL_SCHEMA_NAME, dirp, &tl, false);
 | |
|     my_dirend(dirp);
 | |
|     close_thread_tables(thd);
 | |
|     thd->release_transactional_locks();
 | |
|     if (err)
 | |
|       return err;
 | |
| 
 | |
|     for (i = 0; i < files.elements(); ++i)
 | |
|     {
 | |
|       if (strncmp(files.at(i)->str,
 | |
|                   rpl_gtid_slave_state_table_name.str,
 | |
|                   rpl_gtid_slave_state_table_name.length) == 0)
 | |
|       {
 | |
|         if ((err= (*cb)(thd, files.at(i), cb_data)))
 | |
|           return err;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| struct load_gtid_state_cb_data {
 | |
|   HASH *hash;
 | |
|   DYNAMIC_ARRAY *array;
 | |
|   struct rpl_slave_state::gtid_pos_table *table_list;
 | |
|   struct rpl_slave_state::gtid_pos_table *default_entry;
 | |
| };
 | |
| 
 | |
| static int
 | |
| process_gtid_pos_table(THD *thd, LEX_CSTRING *table_name, void *hton,
 | |
|                        struct load_gtid_state_cb_data *data)
 | |
| {
 | |
|   struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr;
 | |
|   bool is_default=
 | |
|     (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0);
 | |
| 
 | |
|   /*
 | |
|     Ignore tables with duplicate storage engine, with a warning.
 | |
|     Prefer the default mysql.gtid_slave_pos over another table
 | |
|     mysql.gtid_slave_posXXX with the same storage engine.
 | |
|   */
 | |
|   next_ptr= &data->table_list;
 | |
|   entry= data->table_list;
 | |
|   while (entry)
 | |
|   {
 | |
|     if (entry->table_hton == hton)
 | |
|     {
 | |
|       static const char *warning_msg= "Ignoring redundant table mysql.%s "
 | |
|         "since mysql.%s has the same storage engine";
 | |
|       if (!is_default)
 | |
|       {
 | |
|         /* Ignore the redundant table. */
 | |
|         sql_print_warning(warning_msg, table_name->str, entry->table_name.str);
 | |
|         return 0;
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         sql_print_warning(warning_msg, entry->table_name.str, table_name->str);
 | |
|         /* Delete the redundant table, and proceed to add this one instead. */
 | |
|         *next_ptr= entry->next;
 | |
|         my_free(entry);
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
|     next_ptr= &entry->next;
 | |
|     entry= entry->next;
 | |
|   }
 | |
| 
 | |
|   p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name,
 | |
|       hton, rpl_slave_state::GTID_POS_AVAILABLE);
 | |
|   if (!p)
 | |
|     return 1;
 | |
|   p->next= data->table_list;
 | |
|   data->table_list= p;
 | |
|   if (is_default)
 | |
|     data->default_entry= p;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Put tables corresponding to @@gtid_pos_auto_engines at the end of the list,
 | |
|   marked to be auto-created if needed.
 | |
| */
 | |
| static int
 | |
| gtid_pos_auto_create_tables(rpl_slave_state::gtid_pos_table **list_ptr)
 | |
| {
 | |
|   plugin_ref *auto_engines;
 | |
|   int err= 0;
 | |
|   mysql_mutex_lock(&LOCK_global_system_variables);
 | |
|   for (auto_engines= opt_gtid_pos_auto_plugins;
 | |
|        !err && auto_engines && *auto_engines;
 | |
|        ++auto_engines)
 | |
|   {
 | |
|     void *hton= plugin_hton(*auto_engines);
 | |
|     CharBuffer<FN_REFLEN> buf;
 | |
|     LEX_CSTRING table_name;
 | |
|     rpl_slave_state::gtid_pos_table *entry, **next_ptr;
 | |
| 
 | |
|     /* See if this engine is already in the list. */
 | |
|     next_ptr= list_ptr;
 | |
|     entry= *list_ptr;
 | |
|     while (entry)
 | |
|     {
 | |
|       if (entry->table_hton == hton)
 | |
|         break;
 | |
|       next_ptr= &entry->next;
 | |
|       entry= entry->next;
 | |
|     }
 | |
|     if (entry)
 | |
|       continue;
 | |
| 
 | |
|     /* Add an auto-create entry for this engine at end of list. */
 | |
|     buf.append_opt_casedn(files_charset_info, rpl_gtid_slave_state_table_name,
 | |
|                           lower_case_table_names)
 | |
|        .append({STRING_WITH_LEN("_")})
 | |
|        .append_opt_casedn(files_charset_info, *plugin_name(*auto_engines),
 | |
|                           lower_case_table_names);
 | |
|     table_name= buf.to_lex_cstring();
 | |
|     entry= rpl_global_gtid_slave_state->alloc_gtid_pos_table
 | |
|       (&table_name, hton, rpl_slave_state::GTID_POS_AUTO_CREATE);
 | |
|     if (!entry)
 | |
|     {
 | |
|       err= 1;
 | |
|       break;
 | |
|     }
 | |
|     *next_ptr= entry;
 | |
|   }
 | |
|   mysql_mutex_unlock(&LOCK_global_system_variables);
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| 
 | |
| static int
 | |
| load_gtid_state_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
 | |
| {
 | |
|   int err;
 | |
|   load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
 | |
|   void *hton;
 | |
| 
 | |
|   if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
 | |
|                                           table_name, &hton)))
 | |
|     return err;
 | |
|   return process_gtid_pos_table(thd, table_name, hton, data);
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| rpl_load_gtid_slave_state(THD *thd)
 | |
| {
 | |
|   bool array_inited= false;
 | |
|   struct gtid_pos_element tmp_entry, *entry;
 | |
|   HASH hash;
 | |
|   DYNAMIC_ARRAY array;
 | |
|   int err= 0;
 | |
|   uint32 i;
 | |
|   load_gtid_state_cb_data cb_data;
 | |
|   rpl_slave_state::list_element *old_gtids_list;
 | |
|   DBUG_ENTER("rpl_load_gtid_slave_state");
 | |
| 
 | |
|   mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|   bool loaded= rpl_global_gtid_slave_state->loaded;
 | |
|   mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|   if (loaded)
 | |
|     DBUG_RETURN(0);
 | |
| 
 | |
|   cb_data.table_list= NULL;
 | |
|   cb_data.default_entry= NULL;
 | |
|   my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
 | |
|                offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
 | |
|                sizeof(uint32), NULL, my_free, HASH_UNIQUE);
 | |
|   if ((err= my_init_dynamic_array(PSI_INSTRUMENT_ME, &array,
 | |
|                                   sizeof(gtid_pos_element), 0, 0, MYF(0))))
 | |
|     goto end;
 | |
|   array_inited= true;
 | |
| 
 | |
|   cb_data.hash = &hash;
 | |
|   cb_data.array = &array;
 | |
|   if ((err= scan_all_gtid_slave_pos_table(thd, load_gtid_state_cb, &cb_data)))
 | |
|     goto end;
 | |
| 
 | |
|   if (!cb_data.default_entry)
 | |
|   {
 | |
|     /*
 | |
|       If the mysql.gtid_slave_pos table does not exist, but at least one other
 | |
|       table is available, arbitrarily pick the first in the list to use as
 | |
|       default.
 | |
|     */
 | |
|     cb_data.default_entry= cb_data.table_list;
 | |
|   }
 | |
|   if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
 | |
|     goto end;
 | |
| 
 | |
|   mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|   if (rpl_global_gtid_slave_state->loaded)
 | |
|   {
 | |
|     mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   if (!cb_data.table_list)
 | |
|   {
 | |
|     my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
 | |
|              rpl_gtid_slave_state_table_name.str);
 | |
|     mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|     err= 1;
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   for (i= 0; i < array.elements; ++i)
 | |
|   {
 | |
|     get_dynamic(&array, (uchar *)&tmp_entry, i);
 | |
|     if ((err= rpl_global_gtid_slave_state->update_nolock(tmp_entry.gtid.domain_id,
 | |
|                                                   tmp_entry.gtid.server_id,
 | |
|                                                   tmp_entry.sub_id,
 | |
|                                                   tmp_entry.gtid.seq_no,
 | |
|                                                   tmp_entry.hton,
 | |
|                                                   NULL)))
 | |
|     {
 | |
|       mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|       my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|       goto end;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     entry= (struct gtid_pos_element *)my_hash_element(&hash, i);
 | |
|     if (opt_bin_log &&
 | |
|         mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
 | |
|                                                     entry->gtid.seq_no))
 | |
|     {
 | |
|       mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|       my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|       goto end;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
 | |
|                                                         cb_data.default_entry);
 | |
|   cb_data.table_list= NULL;
 | |
|   rpl_global_gtid_slave_state->loaded= true;
 | |
|   mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
| 
 | |
|   /* Clear out no longer needed elements now. */
 | |
|   old_gtids_list=
 | |
|     rpl_global_gtid_slave_state->gtid_grab_pending_delete_list();
 | |
|   rpl_global_gtid_slave_state->gtid_delete_pending(thd, &old_gtids_list);
 | |
|   if (old_gtids_list)
 | |
|     rpl_global_gtid_slave_state->put_back_list(old_gtids_list);
 | |
| 
 | |
| end:
 | |
|   if (array_inited)
 | |
|     delete_dynamic(&array);
 | |
|   my_hash_free(&hash);
 | |
|   if (cb_data.table_list)
 | |
|     rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
 | |
|   DBUG_RETURN(err);
 | |
| }
 | |
| 
 | |
| 
 | |
| static int
 | |
| find_gtid_pos_tables_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
 | |
| {
 | |
|   load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
 | |
|   TABLE_LIST tlist;
 | |
|   TABLE *table= NULL;
 | |
|   int err;
 | |
| 
 | |
|   thd->reset_for_next_command();
 | |
|   tlist.init_one_table(&MYSQL_SCHEMA_NAME, table_name, NULL, TL_READ);
 | |
|   if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
 | |
|     goto end;
 | |
|   table= tlist.table;
 | |
| 
 | |
|   if ((err= gtid_check_rpl_slave_state_table(table)))
 | |
|     goto end;
 | |
|   err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data);
 | |
| 
 | |
| end:
 | |
|   if (table)
 | |
|   {
 | |
|     ha_commit_trans(thd, FALSE);
 | |
|     ha_commit_trans(thd, TRUE);
 | |
|     close_thread_tables(thd);
 | |
|     thd->release_transactional_locks();
 | |
|   }
 | |
| 
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Re-compute the list of available mysql.gtid_slave_posXXX tables.
 | |
| 
 | |
|   This is done at START SLAVE to pick up any newly created tables without
 | |
|   requiring server restart.
 | |
| */
 | |
| int
 | |
| find_gtid_slave_pos_tables(THD *thd)
 | |
| {
 | |
|   int err= 0;
 | |
|   load_gtid_state_cb_data cb_data;
 | |
|   uint num_running;
 | |
| 
 | |
|   mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|   bool loaded= rpl_global_gtid_slave_state->loaded;
 | |
|   mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|   if (!loaded)
 | |
|     return 0;
 | |
| 
 | |
|   cb_data.table_list= NULL;
 | |
|   cb_data.default_entry= NULL;
 | |
|   if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data)))
 | |
|     goto end;
 | |
| 
 | |
|   if (!cb_data.table_list)
 | |
|   {
 | |
|     my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
 | |
|              rpl_gtid_slave_state_table_name.str);
 | |
|     err= 1;
 | |
|     goto end;
 | |
|   }
 | |
|   if (!cb_data.default_entry)
 | |
|   {
 | |
|     /*
 | |
|       If the mysql.gtid_slave_pos table does not exist, but at least one other
 | |
|       table is available, arbitrarily pick the first in the list to use as
 | |
|       default.
 | |
|     */
 | |
|     cb_data.default_entry= cb_data.table_list;
 | |
|   }
 | |
|   if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
 | |
|     goto end;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_active_mi);
 | |
|   num_running= any_slave_sql_running(true);
 | |
|   mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|   if (num_running <= 1)
 | |
|   {
 | |
|     /*
 | |
|       If no slave is running now, the count will be 1, since this SQL thread
 | |
|       which is starting is included in the count. In this case, we can safely
 | |
|       replace the list, no-one can be trying to read it without lock.
 | |
|     */
 | |
|     DBUG_ASSERT(num_running == 1);
 | |
|     rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
 | |
|                                                           cb_data.default_entry);
 | |
|     cb_data.table_list= NULL;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     /*
 | |
|       If there are SQL threads running, we cannot safely remove the old list.
 | |
|       However we can add new entries, and warn about any tables that
 | |
|       disappeared, but may still be visible to running SQL threads.
 | |
|     */
 | |
|     rpl_slave_state::gtid_pos_table *new_entry, **next_ptr_ptr;
 | |
|     auto old_entry= rpl_global_gtid_slave_state->
 | |
|                     gtid_pos_tables.load(std::memory_order_relaxed);
 | |
|     while (old_entry)
 | |
|     {
 | |
|       new_entry= cb_data.table_list;
 | |
|       while (new_entry)
 | |
|       {
 | |
|         if (new_entry->table_hton == old_entry->table_hton)
 | |
|           break;
 | |
|         new_entry= new_entry->next;
 | |
|       }
 | |
|       if (!new_entry)
 | |
|         sql_print_warning("The table mysql.%s was removed. "
 | |
|                           "This change will not take full effect "
 | |
|                           "until all SQL threads have been restarted",
 | |
|                           old_entry->table_name.str);
 | |
|       old_entry= old_entry->next;
 | |
|     }
 | |
|     next_ptr_ptr= &cb_data.table_list;
 | |
|     new_entry= cb_data.table_list;
 | |
|     while (new_entry)
 | |
|     {
 | |
|       /* Check if we already have a table with this storage engine. */
 | |
|       old_entry= rpl_global_gtid_slave_state->
 | |
|                  gtid_pos_tables.load(std::memory_order_relaxed);
 | |
|       while (old_entry)
 | |
|       {
 | |
|         if (new_entry->table_hton == old_entry->table_hton)
 | |
|           break;
 | |
|         old_entry= old_entry->next;
 | |
|       }
 | |
|       if (old_entry)
 | |
|       {
 | |
|         /* This new_entry is already available in the list. */
 | |
|         next_ptr_ptr= &new_entry->next;
 | |
|         new_entry= new_entry->next;
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         /* Move this new_entry to the list. */
 | |
|         rpl_slave_state::gtid_pos_table *next= new_entry->next;
 | |
|         rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry);
 | |
|         *next_ptr_ptr= next;
 | |
|         new_entry= next;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|   mysql_mutex_unlock(&LOCK_active_mi);
 | |
| 
 | |
| end:
 | |
|   if (cb_data.table_list)
 | |
|     rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_group_info::reinit(Relay_log_info *rli)
 | |
| {
 | |
|   this->rli= rli;
 | |
|   tables_to_lock= NULL;
 | |
|   tables_to_lock_count= 0;
 | |
|   trans_retries= 0;
 | |
|   last_event_start_time= 0;
 | |
|   gtid_sub_id= 0;
 | |
|   commit_id= 0;
 | |
|   gtid_pending= false;
 | |
|   worker_error= 0;
 | |
|   row_stmt_start_timestamp= 0;
 | |
|   long_find_row_note_printed= false;
 | |
|   did_mark_start_commit= false;
 | |
|   gtid_ev_flags2= 0;
 | |
|   gtid_ev_flags_extra= 0;
 | |
|   gtid_ev_sa_seq_no= 0;
 | |
|   last_master_timestamp = 0;
 | |
|   orig_exec_time= 0;
 | |
|   gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
 | |
|   speculation= SPECULATE_NO;
 | |
|   rpt= NULL;
 | |
|   start_alter_ev= NULL;
 | |
|   direct_commit_alter= false;
 | |
|   commit_orderer.reinit();
 | |
| }
 | |
| 
 | |
| rpl_group_info::rpl_group_info(Relay_log_info *rli)
 | |
|   : thd(0), wait_commit_sub_id(0),
 | |
|     wait_commit_group_info(0), parallel_entry(0),
 | |
|     deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
 | |
|     gtid_ev_flags2(0), gtid_ev_flags_extra(0), gtid_ev_sa_seq_no(0),
 | |
|     reserved_start_alter_thread(0), finish_event_group_called(0), rpt(NULL),
 | |
|     start_alter_ev(NULL), direct_commit_alter(false), sa_info(NULL)
 | |
| {
 | |
|   reinit(rli);
 | |
|   bzero(¤t_gtid, sizeof(current_gtid));
 | |
|   mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
 | |
|                    MY_MUTEX_INIT_FAST);
 | |
|   mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
 | |
| }
 | |
| 
 | |
| rpl_group_info::~rpl_group_info()
 | |
| {
 | |
|   free_annotate_event();
 | |
|   delete deferred_events;
 | |
|   mysql_mutex_destroy(&sleep_lock);
 | |
|   mysql_cond_destroy(&sleep_cond);
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
 | |
| {
 | |
|   uint64 sub_id= rpl_global_gtid_slave_state->next_sub_id(gev->domain_id);
 | |
|   if (!sub_id)
 | |
|   {
 | |
|     /* Out of memory caused hash insertion to fail. */
 | |
|     return 1;
 | |
|   }
 | |
|   rgi->gtid_sub_id= sub_id;
 | |
|   rgi->current_gtid.domain_id= gev->domain_id;
 | |
|   rgi->current_gtid.server_id= gev->server_id;
 | |
|   rgi->current_gtid.seq_no= gev->seq_no;
 | |
|   rgi->commit_id= gev->commit_id;
 | |
|   rgi->gtid_pending= true;
 | |
|   rgi->sa_info= NULL;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| delete_or_keep_event_post_apply(rpl_group_info *rgi,
 | |
|                                 Log_event_type typ, Log_event *ev)
 | |
| {
 | |
|   /*
 | |
|     ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
 | |
|     thread-safe for parallel replication.
 | |
|   */
 | |
| 
 | |
|   switch (typ) {
 | |
|   case FORMAT_DESCRIPTION_EVENT:
 | |
|     /*
 | |
|       Format_description_log_event should not be deleted because it
 | |
|       will be used to read info about the relay log's format;
 | |
|       it will be deleted when the SQL thread does not need it,
 | |
|       i.e. when this thread terminates.
 | |
|     */
 | |
|     break;
 | |
|   case ANNOTATE_ROWS_EVENT:
 | |
|     /*
 | |
|       Annotate_rows event should not be deleted because after it has
 | |
|       been applied, thd->query points to the string inside this event.
 | |
|       The thd->query will be used to generate new Annotate_rows event
 | |
|       during applying the subsequent Rows events.
 | |
|     */
 | |
|     rgi->set_annotate_event((Annotate_rows_log_event*) ev);
 | |
|     break;
 | |
|   case DELETE_ROWS_EVENT_V1:
 | |
|   case UPDATE_ROWS_EVENT_V1:
 | |
|   case WRITE_ROWS_EVENT_V1:
 | |
|   case DELETE_ROWS_EVENT:
 | |
|   case UPDATE_ROWS_EVENT:
 | |
|   case WRITE_ROWS_EVENT:
 | |
|   case WRITE_ROWS_COMPRESSED_EVENT:
 | |
|   case DELETE_ROWS_COMPRESSED_EVENT:
 | |
|   case UPDATE_ROWS_COMPRESSED_EVENT:
 | |
|   case WRITE_ROWS_COMPRESSED_EVENT_V1:
 | |
|   case UPDATE_ROWS_COMPRESSED_EVENT_V1:
 | |
|   case DELETE_ROWS_COMPRESSED_EVENT_V1:
 | |
|     /*
 | |
|       After the last Rows event has been applied, the saved Annotate_rows
 | |
|       event (if any) is not needed anymore and can be deleted.
 | |
|     */
 | |
|     if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
 | |
|       rgi->free_annotate_event();
 | |
|     /* fall through */
 | |
|   default:
 | |
|     DBUG_PRINT("info", ("Deleting the event after it has been executed"));
 | |
|     if (!rgi->is_deferred_event(ev))
 | |
|       delete ev;
 | |
|     break;
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| void rpl_group_info::cleanup_context(THD *thd, bool error, bool keep_domain_owner)
 | |
| {
 | |
|   DBUG_ENTER("rpl_group_info::cleanup_context");
 | |
|   DBUG_PRINT("enter", ("error: %d", (int) error));
 | |
|   
 | |
|   DBUG_ASSERT(this->thd == thd);
 | |
|   /*
 | |
|     1) Instances of Table_map_log_event, if ::do_apply_event() was
 | |
|     called on them, may have opened tables, which we cannot be sure
 | |
|     have been closed (because maybe the Rows_log_event have not been
 | |
|     found or will not be, because slave SQL thread is stopping, or
 | |
|     relay log has a missing tail etc). So we close all thread's
 | |
|     tables. And so the table mappings have to be cancelled.
 | |
|     2) Rows_log_event::do_apply_event() may even have started statements or
 | |
|     transactions on them, which we need to rollback in case of error.
 | |
|     3) If finding a Format_description_log_event after a BEGIN, we also need
 | |
|     to rollback before continuing with the next events.
 | |
|     4) so we need this "context cleanup" function.
 | |
|   */
 | |
|   if (unlikely(error))
 | |
|   {
 | |
|     /*
 | |
|       We have to reset the error as otherwise we get an assert in
 | |
|       trans_rollback() when it checks if the rollback caused an error.
 | |
|     */
 | |
|     thd->clear_error();
 | |
|     trans_rollback_stmt(thd); // if a "statement transaction"
 | |
|     /* trans_rollback() also resets OPTION_GTID_BEGIN */
 | |
|     trans_rollback(thd);      // if a "real transaction"
 | |
|     /*
 | |
|       Now that we have rolled back the transaction, make sure we do not
 | |
|       erroneously update the GTID position.
 | |
|     */
 | |
|     gtid_pending= false;
 | |
|   }
 | |
|   m_table_map.clear_tables();
 | |
|   slave_close_thread_tables(thd);
 | |
| 
 | |
|   if (unlikely(error))
 | |
|   {
 | |
|     // leave alone any XA prepared transactions
 | |
|     if (thd->transaction->xid_state.is_explicit_XA() &&
 | |
|         thd->transaction->xid_state.get_state_code() != XA_PREPARED)
 | |
|       xa_trans_force_rollback(thd);
 | |
| 
 | |
|     thd->release_transactional_locks();
 | |
| 
 | |
|     if (thd == rli->sql_driver_thd)
 | |
|     {
 | |
|       /*
 | |
|         Reset flags. This is needed to handle incident events and errors in
 | |
|         the relay log noticed by the sql driver thread.
 | |
|       */
 | |
|       rli->clear_flag(Relay_log_info::IN_STMT);
 | |
|       rli->clear_flag(Relay_log_info::IN_TRANSACTION);
 | |
|     }
 | |
| 
 | |
|     /*
 | |
|       Ensure we always release the domain for others to process, when using
 | |
|       --gtid-ignore-duplicates.
 | |
|     */
 | |
|     if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL && !keep_domain_owner)
 | |
|       rpl_global_gtid_slave_state->release_domain_owner(this);
 | |
|   }
 | |
| 
 | |
|   /*
 | |
|     Cleanup for the flags that have been set at do_apply_event.
 | |
|   */
 | |
|   thd->variables.option_bits&= ~(OPTION_NO_FOREIGN_KEY_CHECKS |
 | |
|                                  OPTION_RELAXED_UNIQUE_CHECKS |
 | |
|                                  OPTION_NO_CHECK_CONSTRAINT_CHECKS);
 | |
| 
 | |
|   /*
 | |
|     Reset state related to long_find_row notes in the error log:
 | |
|     - timestamp
 | |
|     - flag that decides whether the slave prints or not
 | |
|   */
 | |
|   reset_row_stmt_start_timestamp();
 | |
|   unset_long_find_row_note_printed();
 | |
| 
 | |
|   DBUG_EXECUTE_IF("inject_sleep_gtid_100_x_x", {
 | |
|       if (current_gtid.domain_id == 100)
 | |
|         my_sleep(50000);
 | |
|     };);
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| void rpl_group_info::clear_tables_to_lock()
 | |
| {
 | |
|   DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
 | |
| #ifndef DBUG_OFF
 | |
|   /**
 | |
|     When replicating in RBR and MyISAM Merge tables are involved
 | |
|     open_and_lock_tables (called in do_apply_event) appends the 
 | |
|     base tables to the list of tables_to_lock. Then these are 
 | |
|     removed from the list in close_thread_tables (which is called 
 | |
|     before we reach this point).
 | |
| 
 | |
|     This assertion just confirms that we get no surprises at this
 | |
|     point.
 | |
|    */
 | |
|   uint i=0;
 | |
|   for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
 | |
|   DBUG_ASSERT(i == tables_to_lock_count);
 | |
| #endif  
 | |
| 
 | |
|   while (tables_to_lock)
 | |
|   {
 | |
|     uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
 | |
|     if (tables_to_lock->m_tabledef_valid)
 | |
|     {
 | |
|       tables_to_lock->m_tabledef.table_def::~table_def();
 | |
|       tables_to_lock->m_tabledef_valid= FALSE;
 | |
|     }
 | |
| 
 | |
|     /*
 | |
|       If blob fields were used during conversion of field values 
 | |
|       from the master table into the slave table, then we need to 
 | |
|       free the memory used temporarily to store their values before
 | |
|       copying into the slave's table.
 | |
|     */
 | |
|     if (tables_to_lock->m_conv_table)
 | |
|       free_blobs(tables_to_lock->m_conv_table);
 | |
| 
 | |
|     tables_to_lock=
 | |
|       static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
 | |
|     tables_to_lock_count--;
 | |
|     my_free(to_free);
 | |
|   }
 | |
|   DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| void rpl_group_info::slave_close_thread_tables(THD *thd)
 | |
| {
 | |
|   DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
 | |
|   thd->get_stmt_da()->set_overwrite_status(true);
 | |
| #ifdef WITH_WSREP
 | |
|   // This can happen e.g. when table_def::compatible_with fails and sets a error
 | |
|   // but thd->is_error() is false then. However, we do not want to commit
 | |
|   // statement on Galera instead we want to rollback it as later in
 | |
|   // apply_write_set we rollback transaction and that can't be done
 | |
|   // after wsrep transaction state is s_committed.
 | |
|   if (WSREP(thd))
 | |
|     (thd->is_error() || thd->is_slave_error) ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
 | |
|   else
 | |
| #endif
 | |
|     thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
 | |
|   thd->get_stmt_da()->set_overwrite_status(false);
 | |
| 
 | |
|   close_thread_tables(thd);
 | |
|   /*
 | |
|     - If transaction rollback was requested due to deadlock
 | |
|     perform it and release metadata locks.
 | |
|     - If inside a multi-statement transaction,
 | |
|     defer the release of metadata locks until the current
 | |
|     transaction is either committed or rolled back. This prevents
 | |
|     other statements from modifying the table for the entire
 | |
|     duration of this transaction.  This provides commit ordering
 | |
|     and guarantees serializability across multiple transactions.
 | |
|     - If in autocommit mode, or outside a transactional context,
 | |
|     automatically release metadata locks of the current statement.
 | |
|   */
 | |
|   if (thd->transaction_rollback_request)
 | |
|   {
 | |
|     trans_rollback_implicit(thd);
 | |
|     thd->release_transactional_locks();
 | |
|   }
 | |
|   else if (! thd->in_multi_stmt_transaction_mode())
 | |
|     thd->release_transactional_locks();
 | |
|   else
 | |
|     thd->mdl_context.release_statement_locks();
 | |
| 
 | |
|   clear_tables_to_lock();
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| static void
 | |
| mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco,
 | |
|                         rpl_group_info *rgi)
 | |
| {
 | |
|   group_commit_orderer *tmp;
 | |
|   uint64 count= ++e->count_committing_event_groups;
 | |
|   /* Signal any following GCO whose wait_count has been reached now. */
 | |
|   tmp= gco;
 | |
| 
 | |
|   DBUG_ASSERT(!tmp->gc_done);
 | |
| 
 | |
|   while ((tmp= tmp->next_gco))
 | |
|   {
 | |
|     DBUG_ASSERT(!tmp->gc_done);
 | |
| 
 | |
|     uint64 wait_count= tmp->wait_count;
 | |
|     if (wait_count > count)
 | |
|       break;
 | |
|     mysql_cond_broadcast(&tmp->COND_group_commit_orderer);
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_group_info::mark_start_commit_no_lock()
 | |
| {
 | |
|   if (did_mark_start_commit)
 | |
|     return;
 | |
|   did_mark_start_commit= true;
 | |
|   mark_start_commit_inner(parallel_entry, gco, this);
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_group_info::mark_start_commit()
 | |
| {
 | |
|   rpl_parallel_entry *e;
 | |
| 
 | |
|   if (did_mark_start_commit)
 | |
|     return;
 | |
|   did_mark_start_commit= true;
 | |
| 
 | |
|   e= this->parallel_entry;
 | |
|   mysql_mutex_lock(&e->LOCK_parallel_entry);
 | |
|   mark_start_commit_inner(e, gco, this);
 | |
|   mysql_mutex_unlock(&e->LOCK_parallel_entry);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Format the current GTID as a string suitable for printing in error messages.
 | |
| 
 | |
|   The string is stored in a buffer inside rpl_group_info, so remains valid
 | |
|   until next call to gtid_info() or until destruction of rpl_group_info.
 | |
| 
 | |
|   If no GTID is available, then NULL is returned.
 | |
| */
 | |
| char *
 | |
| rpl_group_info::gtid_info() const
 | |
| {
 | |
|   if (!gtid_sub_id || !current_gtid.seq_no)
 | |
|     return NULL;
 | |
|   my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu",
 | |
|               current_gtid.domain_id, current_gtid.server_id,
 | |
|               current_gtid.seq_no);
 | |
|   return gtid_info_buf;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Undo the effect of a prior mark_start_commit().
 | |
| 
 | |
|   This is only used for retrying a transaction in parallel replication, after
 | |
|   we have encountered a deadlock or other temporary error.
 | |
| 
 | |
|   When we get such a deadlock, it means that the current group of transactions
 | |
|   did not yet all start committing (else they would not have deadlocked). So
 | |
|   we will not yet have woken up anything in the next group, our rgi->gco is
 | |
|   still live, and we can simply decrement the counter (to be incremented again
 | |
|   later, when the retry succeeds and reaches the commit step).
 | |
| */
 | |
| void
 | |
| rpl_group_info::unmark_start_commit()
 | |
| {
 | |
|   rpl_parallel_entry *e;
 | |
| 
 | |
|   if (!did_mark_start_commit)
 | |
|     return;
 | |
|   did_mark_start_commit= false;
 | |
| 
 | |
|   e= this->parallel_entry;
 | |
|   mysql_mutex_lock(&e->LOCK_parallel_entry);
 | |
|   /*
 | |
|     Assert that we have not already wrongly completed this GCO and signalled
 | |
|     the next one to start, only to now unmark and make the signal invalid.
 | |
|     This is to catch problems like MDEV-34696.
 | |
| 
 | |
|     The error inject rpl_parallel_simulate_temp_err_xid is used to test this
 | |
|     precise situation, that we handle it gracefully if it somehow occurs in a
 | |
|     release build. So disable the assert in this case.
 | |
|   */
 | |
| #ifndef DBUG_OFF
 | |
|   bool allow_unmark_after_complete= false;
 | |
|   DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_xid",
 | |
|                   allow_unmark_after_complete= true;);
 | |
|   DBUG_ASSERT(!gco->next_gco ||
 | |
|               gco->next_gco->wait_count > e->count_committing_event_groups ||
 | |
|               allow_unmark_after_complete);
 | |
| #endif
 | |
|   --e->count_committing_event_groups;
 | |
|   mysql_mutex_unlock(&e->LOCK_parallel_entry);
 | |
| }
 | |
| 
 | |
| 
 | |
| rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
 | |
|   : rpl_filter(filter)
 | |
| {
 | |
|   cached_charset_invalidate();
 | |
| }
 | |
| 
 | |
| 
 | |
| void rpl_sql_thread_info::cached_charset_invalidate()
 | |
| {
 | |
|   DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
 | |
| 
 | |
|   /* Full of zeroes means uninitialized. */
 | |
|   bzero(cached_charset, sizeof(cached_charset));
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| bool rpl_sql_thread_info::cached_charset_compare(char *charset) const
 | |
| {
 | |
|   DBUG_ENTER("rpl_group_info::cached_charset_compare");
 | |
| 
 | |
|   if (memcmp(cached_charset, charset, sizeof(cached_charset)))
 | |
|   {
 | |
|     memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
 | |
|     DBUG_RETURN(1);
 | |
|   }
 | |
|   DBUG_RETURN(0);
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Store the file and position where the slave's SQL thread are in the
 | |
|   relay log.
 | |
| 
 | |
|   Notes:
 | |
| 
 | |
|   - This function should be called either from the slave SQL thread,
 | |
|     or when the slave thread is not running.  (It reads the
 | |
|     group_{relay|master}_log_{pos|name} and delay fields in the rli
 | |
|     object.  These may only be modified by the slave SQL thread or by
 | |
|     a client thread when the slave SQL thread is not running.)
 | |
| 
 | |
|   - If there is an active transaction, then we do not update the
 | |
|     position in the relay log.  This is to ensure that we re-execute
 | |
|     statements if we die in the middle of an transaction that was
 | |
|     rolled back.
 | |
| 
 | |
|   - As a transaction never spans binary logs, we don't have to handle
 | |
|     the case where we do a relay-log-rotation in the middle of the
 | |
|     transaction.  If transactions could span several binlogs, we would
 | |
|     have to ensure that we do not delete the relay log file where the
 | |
|     transaction started before switching to a new relay log file.
 | |
| 
 | |
|   - Error can happen if writing to file fails or if flushing the file
 | |
|     fails.
 | |
| 
 | |
|   @param rli The object representing the Relay_log_info.
 | |
| 
 | |
|   @todo Change the log file information to a binary format to avoid
 | |
|   calling longlong2str.
 | |
| 
 | |
|   @return 0 on success, 1 on error.
 | |
| */
 | |
| bool Relay_log_info::flush()
 | |
| {
 | |
|   bool error=0;
 | |
| 
 | |
|   DBUG_ENTER("Relay_log_info::flush()");
 | |
| 
 | |
|   IO_CACHE *file = &info_file;
 | |
|   // 2*file name, 2*long long, 2*unsigned long, 6*'\n'
 | |
|   char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos;
 | |
|   my_b_seek(file, 0L);
 | |
|   pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10);
 | |
|   *pos++='\n';
 | |
|   pos=strmov(pos, group_relay_log_name);
 | |
|   *pos++='\n';
 | |
|   pos=longlong10_to_str(group_relay_log_pos, pos, 10);
 | |
|   *pos++='\n';
 | |
|   pos=strmov(pos, group_master_log_name);
 | |
|   *pos++='\n';
 | |
|   pos=longlong10_to_str(group_master_log_pos, pos, 10);
 | |
|   *pos++='\n';
 | |
|   pos= longlong10_to_str(sql_delay, pos, 10);
 | |
|   *pos++= '\n';
 | |
|   if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)))
 | |
|     error=1;
 | |
|   if (flush_io_cache(file))
 | |
|     error=1;
 | |
|   if (sync_relayloginfo_period &&
 | |
|       !error &&
 | |
|       ++sync_counter >= sync_relayloginfo_period)
 | |
|   {
 | |
|     if (my_sync(info_fd, MYF(MY_WME)))
 | |
|       error=1;
 | |
|     sync_counter= 0;
 | |
|   }
 | |
|   /* 
 | |
|     Flushing the relay log is done by the slave I/O thread 
 | |
|     or by the user on STOP SLAVE. 
 | |
|    */
 | |
|   DBUG_RETURN(error);
 | |
| }
 | |
| 
 | |
| #endif
 |