mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-31 10:56:12 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			4186 lines
		
	
	
	
		
			114 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			4186 lines
		
	
	
	
		
			114 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
 | |
|    Copyright (c) 2020, 2022, MariaDB Corporation.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or modify
 | |
|    it under the terms of the GNU General Public License as published by
 | |
|    the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA */
 | |
| 
 | |
| 
 | |
| /* Definitions for MariaDB global transaction ID (GTID). */
 | |
| #include <type_traits>
 | |
| 
 | |
| #ifndef MYSQL_CLIENT
 | |
| #include "mariadb.h"
 | |
| #include "sql_priv.h"
 | |
| #include "unireg.h"
 | |
| #include "mariadb.h"
 | |
| #include "sql_base.h"
 | |
| #include "sql_parse.h"
 | |
| #include "key.h"
 | |
| #include "rpl_rli.h"
 | |
| #include "slave.h"
 | |
| #include "log_event.h"
 | |
| 
 | |
| const LEX_CSTRING rpl_gtid_slave_state_table_name=
 | |
|   { STRING_WITH_LEN("gtid_slave_pos") };
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
 | |
|                                    rpl_group_info *rgi)
 | |
| {
 | |
|   int err;
 | |
|   /*
 | |
|     Add the gtid to the HASH in the replication slave state.
 | |
| 
 | |
|     We must do this only _after_ commit, so that for parallel replication,
 | |
|     there will not be an attempt to delete the corresponding table row before
 | |
|     it is even committed.
 | |
|   */
 | |
|   err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi);
 | |
|   if (err)
 | |
|   {
 | |
|     sql_print_warning("Slave: Out of memory during slave state maintenance. "
 | |
|                       "Some no longer necessary rows in table "
 | |
|                       "mysql.%s may be left undeleted.",
 | |
|                       rpl_gtid_slave_state_table_name.str);
 | |
|     /*
 | |
|       Such failure is not fatal. We will fail to delete the row for this
 | |
|       GTID, but it will do no harm and will be removed automatically on next
 | |
|       server restart.
 | |
|     */
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
 | |
| {
 | |
|   DBUG_ENTER("rpl_slave_state::record_and_update_gtid");
 | |
| 
 | |
|   /*
 | |
|     Update the GTID position, if we have it and did not already update
 | |
|     it in a GTID transaction.
 | |
|   */
 | |
|   if (rgi->gtid_pending)
 | |
|   {
 | |
|     uint64 sub_id= rgi->gtid_sub_id;
 | |
|     void *hton= NULL;
 | |
| 
 | |
|     rgi->gtid_pending= false;
 | |
|     if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
 | |
|     {
 | |
|       if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton))
 | |
|         DBUG_RETURN(1);
 | |
|       update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
 | |
|     }
 | |
|     rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
 | |
|   }
 | |
|   DBUG_RETURN(0);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Check GTID event execution when --gtid-ignore-duplicates.
 | |
| 
 | |
|   The idea with --gtid-ignore-duplicates is that we allow multiple master
 | |
|   connections (in multi-source replication) to all receive the same GTIDs and
 | |
|   event groups. Only one instance of each is applied; we use the sequence
 | |
|   number in the GTID to decide whether a GTID has already been applied.
 | |
| 
 | |
|   So if the seq_no of a GTID (or a higher sequence number) has already been
 | |
|   applied, then the event should be skipped. If not then the event should be
 | |
|   applied.
 | |
| 
 | |
|   To avoid two master connections trying to apply the same event
 | |
|   simultaneously, only one is allowed to work in any given domain at any point
 | |
|   in time. The associated Relay_log_info object is called the owner of the
 | |
|   domain (and there can be multiple parallel worker threads working in that
 | |
|   domain for that Relay_log_info). Any other Relay_log_info/master connection
 | |
|   must wait for the domain to become free, or for their GTID to have been
 | |
|   applied, before being allowed to proceed.
 | |
| 
 | |
|   Returns:
 | |
|     0  This GTID is already applied, it should be skipped.
 | |
|     1  The GTID is not yet applied; this rli is now the owner, and must apply
 | |
|        the event and release the domain afterwards.
 | |
|    -1  Error (out of memory to allocate a new element for the domain).
 | |
| */
 | |
| int
 | |
| rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi)
 | |
| {
 | |
|   uint32 domain_id= gtid->domain_id;
 | |
|   uint64 seq_no= gtid->seq_no;
 | |
|   rpl_slave_state::element *elem;
 | |
|   int res;
 | |
|   bool did_enter_cond= false;
 | |
|   PSI_stage_info old_stage;
 | |
|   THD *UNINIT_VAR(thd);
 | |
|   Relay_log_info *rli= rgi->rli;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_slave_state);
 | |
|   if (!(elem= get_element(domain_id)))
 | |
|   {
 | |
|     my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|     res= -1;
 | |
|     goto err;
 | |
|   }
 | |
|   /*
 | |
|     Note that the elem pointer does not change once inserted in the hash. So
 | |
|     we can re-use the pointer without looking it up again in the hash after
 | |
|     each lock release and re-take.
 | |
|   */
 | |
| 
 | |
|   for (;;)
 | |
|   {
 | |
|     if (elem->highest_seq_no >= seq_no)
 | |
|     {
 | |
|       /* This sequence number is already applied, ignore it. */
 | |
|       res= 0;
 | |
|       rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE;
 | |
|       break;
 | |
|     }
 | |
|     if (!elem->owner_rli)
 | |
|     {
 | |
|       /* The domain became free, grab it and apply the event. */
 | |
|       elem->owner_rli= rli;
 | |
|       elem->owner_count= 1;
 | |
|       rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
 | |
|       res= 1;
 | |
|       break;
 | |
|     }
 | |
|     if (elem->owner_rli == rli)
 | |
|     {
 | |
|       /* Already own this domain, increment reference count and apply event. */
 | |
|       ++elem->owner_count;
 | |
|       rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
 | |
|       res= 1;
 | |
|       break;
 | |
|     }
 | |
|     thd= rgi->thd;
 | |
|     if (unlikely(thd->check_killed()))
 | |
|     {
 | |
|       res= -1;
 | |
|       break;
 | |
|     }
 | |
|     /*
 | |
|       Someone else is currently processing this GTID (or an earlier one).
 | |
|       Wait for them to complete (or fail), and then check again.
 | |
|     */
 | |
|     if (!did_enter_cond)
 | |
|     {
 | |
|       thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state,
 | |
|                       &stage_gtid_wait_other_connection, &old_stage);
 | |
|       did_enter_cond= true;
 | |
|     }
 | |
|     mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
 | |
|                     &LOCK_slave_state);
 | |
|   }
 | |
| 
 | |
| err:
 | |
|   if (did_enter_cond)
 | |
|     thd->EXIT_COND(&old_stage);
 | |
|   else
 | |
|     mysql_mutex_unlock(&LOCK_slave_state);
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_slave_state::release_domain_owner(rpl_group_info *rgi)
 | |
| {
 | |
|   element *elem= NULL;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_slave_state);
 | |
|   if (!(elem= get_element(rgi->current_gtid.domain_id)))
 | |
|   {
 | |
|     /*
 | |
|       We cannot really deal with error here, as we are already called in an
 | |
|       error handling case (transaction failure and rollback).
 | |
| 
 | |
|       However, get_element() only fails if the element did not exist already
 | |
|       and could not be allocated due to out-of-memory - and if it did not
 | |
|       exist, then we would not get here in the first place.
 | |
|     */
 | |
|     mysql_mutex_unlock(&LOCK_slave_state);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER)
 | |
|   {
 | |
|     uint32 count= elem->owner_count;
 | |
|     DBUG_ASSERT(count > 0);
 | |
|     DBUG_ASSERT(elem->owner_rli == rgi->rli);
 | |
|     --count;
 | |
|     elem->owner_count= count;
 | |
|     if (count == 0)
 | |
|     {
 | |
|       elem->owner_rli= NULL;
 | |
|       mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
 | |
|     }
 | |
|   }
 | |
|   rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
 | |
|   mysql_mutex_unlock(&LOCK_slave_state);
 | |
| }
 | |
| 
 | |
| 
 | |
| static void
 | |
| rpl_slave_state_free_element(void *arg)
 | |
| {
 | |
|   struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
 | |
|   mysql_cond_destroy(&elem->COND_wait_gtid);
 | |
|   mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates);
 | |
|   my_free(elem);
 | |
| }
 | |
| 
 | |
| 
 | |
| rpl_slave_state::rpl_slave_state()
 | |
|   : pending_gtid_count(0), last_sub_id(0), gtid_pos_tables(0), loaded(false)
 | |
| {
 | |
|   mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state,
 | |
|                    MY_MUTEX_INIT_SLOW);
 | |
|   my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
 | |
|                offsetof(element, domain_id), sizeof(element::domain_id),
 | |
|                NULL, rpl_slave_state_free_element, HASH_UNIQUE);
 | |
|   my_init_dynamic_array(PSI_INSTRUMENT_ME, >id_sort_array, sizeof(rpl_gtid),
 | |
|                         8, 8, MYF(0));
 | |
| }
 | |
| 
 | |
| 
 | |
| rpl_slave_state::~rpl_slave_state()
 | |
| {
 | |
|   free_gtid_pos_tables(gtid_pos_tables.load(std::memory_order_relaxed));
 | |
|   truncate_hash();
 | |
|   my_hash_free(&hash);
 | |
|   delete_dynamic(>id_sort_array);
 | |
|   mysql_mutex_destroy(&LOCK_slave_state);
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_slave_state::truncate_hash()
 | |
| {
 | |
|   uint32 i;
 | |
| 
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     element *e= (element *)my_hash_element(&hash, i);
 | |
|     list_element *l= e->list;
 | |
|     list_element *next;
 | |
|     while (l)
 | |
|     {
 | |
|       next= l->next;
 | |
|       my_free(l);
 | |
|       l= next;
 | |
|     }
 | |
|     /* The element itself is freed by the hash element free function. */
 | |
|   }
 | |
|   my_hash_reset(&hash);
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
 | |
|                         uint64 seq_no, void *hton, rpl_group_info *rgi)
 | |
| {
 | |
|   int res;
 | |
|   mysql_mutex_lock(&LOCK_slave_state);
 | |
|   res= update_nolock(domain_id, server_id, sub_id, seq_no, hton, rgi);
 | |
|   mysql_mutex_unlock(&LOCK_slave_state);
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| rpl_slave_state::update_nolock(uint32 domain_id, uint32 server_id, uint64 sub_id,
 | |
|                                uint64 seq_no, void *hton, rpl_group_info *rgi)
 | |
| {
 | |
|   element *elem= NULL;
 | |
|   list_element *list_elem= NULL;
 | |
| 
 | |
|   DBUG_ASSERT(hton || !loaded);
 | |
|   mysql_mutex_assert_owner(&LOCK_slave_state);
 | |
|   if (!(elem= get_element(domain_id)))
 | |
|     return 1;
 | |
| 
 | |
|   if (seq_no > elem->highest_seq_no)
 | |
|     elem->highest_seq_no= seq_no;
 | |
|   if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no)
 | |
|   {
 | |
|     /*
 | |
|       Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
 | |
|       Signal (and remove) them. The waiter will handle all the processing
 | |
|       of all pending MASTER_GTID_WAIT(), so we do not slow down the
 | |
|       replication SQL thread.
 | |
|     */
 | |
|     elem->gtid_waiter= NULL;
 | |
|     mysql_cond_broadcast(&elem->COND_wait_gtid);
 | |
|   }
 | |
| 
 | |
|   if (rgi)
 | |
|   {
 | |
|     if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER)
 | |
|     {
 | |
| #ifdef DBUG_ASSERT_EXISTS
 | |
|       Relay_log_info *rli= rgi->rli;
 | |
| #endif
 | |
|       uint32 count= elem->owner_count;
 | |
|       DBUG_ASSERT(count > 0);
 | |
|       DBUG_ASSERT(elem->owner_rli == rli);
 | |
|       --count;
 | |
|       elem->owner_count= count;
 | |
|       if (count == 0)
 | |
|       {
 | |
|         elem->owner_rli= NULL;
 | |
|         mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
 | |
|       }
 | |
|     }
 | |
|     rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
 | |
|   }
 | |
| 
 | |
|   if (!(list_elem= (list_element *)my_malloc(PSI_INSTRUMENT_ME,
 | |
|                                              sizeof(*list_elem), MYF(MY_WME))))
 | |
|     return 1;
 | |
|   list_elem->domain_id= domain_id;
 | |
|   list_elem->server_id= server_id;
 | |
|   list_elem->sub_id= sub_id;
 | |
|   list_elem->seq_no= seq_no;
 | |
|   list_elem->hton= hton;
 | |
| 
 | |
|   elem->add(list_elem);
 | |
|   if (last_sub_id < sub_id)
 | |
|     last_sub_id= sub_id;
 | |
| 
 | |
| #ifdef HAVE_REPLICATION
 | |
|   ++pending_gtid_count;
 | |
|   if (pending_gtid_count >= opt_gtid_cleanup_batch_size)
 | |
|   {
 | |
|     pending_gtid_count = 0;
 | |
|     slave_background_gtid_pending_delete_request();
 | |
|   }
 | |
| #endif
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| struct rpl_slave_state::element *
 | |
| rpl_slave_state::get_element(uint32 domain_id)
 | |
| {
 | |
|   struct element *elem;
 | |
| 
 | |
|   elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id,
 | |
|                                   sizeof(domain_id));
 | |
|   if (elem)
 | |
|     return elem;
 | |
| 
 | |
|   if (!(elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem), MYF(MY_WME))))
 | |
|     return NULL;
 | |
|   elem->list= NULL;
 | |
|   elem->domain_id= domain_id;
 | |
|   elem->highest_seq_no= 0;
 | |
|   elem->gtid_waiter= NULL;
 | |
|   elem->owner_rli= NULL;
 | |
|   elem->owner_count= 0;
 | |
|   mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
 | |
|   mysql_cond_init(key_COND_gtid_ignore_duplicates,
 | |
|                   &elem->COND_gtid_ignore_duplicates, 0);
 | |
|   if (my_hash_insert(&hash, (uchar *)elem))
 | |
|   {
 | |
|     my_free(elem);
 | |
|     return NULL;
 | |
|   }
 | |
|   return elem;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| rpl_slave_state::put_back_list(list_element *list)
 | |
| {
 | |
|   element *e= NULL;
 | |
|   int err= 0;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_slave_state);
 | |
|   while (list)
 | |
|   {
 | |
|     list_element *next= list->next;
 | |
| 
 | |
|     if ((!e || e->domain_id != list->domain_id) &&
 | |
|         !(e= (element *)my_hash_search(&hash, (const uchar *)&list->domain_id,
 | |
|                                        sizeof(list->domain_id))))
 | |
|     {
 | |
|       err= 1;
 | |
|       goto end;
 | |
|     }
 | |
|     e->add(list);
 | |
|     list= next;
 | |
|   }
 | |
| 
 | |
| end:
 | |
|   mysql_mutex_unlock(&LOCK_slave_state);
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| rpl_slave_state::truncate_state_table(THD *thd)
 | |
| {
 | |
|   TABLE_LIST tlist;
 | |
|   int err= 0;
 | |
| 
 | |
|   tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name,
 | |
|                        NULL, TL_WRITE);
 | |
|   tlist.mdl_request.set_type(MDL_EXCLUSIVE);
 | |
|   if (!(err= open_and_lock_tables(thd, &tlist, FALSE,
 | |
|                                   MYSQL_OPEN_IGNORE_LOGGING_FORMAT)))
 | |
|   {
 | |
|     DBUG_ASSERT(!tlist.table->file->row_logging);
 | |
|     tlist.table->s->tdc->flush(thd, true);
 | |
|     err= tlist.table->file->ha_truncate();
 | |
| 
 | |
|     if (err)
 | |
|     {
 | |
|       ha_rollback_trans(thd, FALSE);
 | |
|       close_thread_tables(thd);
 | |
|       ha_rollback_trans(thd, TRUE);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       ha_commit_trans(thd, FALSE);
 | |
|       close_thread_tables(thd);
 | |
|       ha_commit_trans(thd, TRUE);
 | |
|     }
 | |
|     thd->release_transactional_locks();
 | |
|   }
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| 
 | |
| static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
 | |
|   { { STRING_WITH_LEN("domain_id") },
 | |
|     { STRING_WITH_LEN("int(10) unsigned") },
 | |
|     {NULL, 0} },
 | |
|   { { STRING_WITH_LEN("sub_id") },
 | |
|     { STRING_WITH_LEN("bigint(20) unsigned") },
 | |
|     {NULL, 0} },
 | |
|   { { STRING_WITH_LEN("server_id") },
 | |
|     { STRING_WITH_LEN("int(10) unsigned") },
 | |
|     {NULL, 0} },
 | |
|   { { STRING_WITH_LEN("seq_no") },
 | |
|     { STRING_WITH_LEN("bigint(20) unsigned") },
 | |
|     {NULL, 0} },
 | |
| };
 | |
| 
 | |
| static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
 | |
| 
 | |
| static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= {
 | |
|   array_elements(mysql_rpl_slave_state_coltypes),
 | |
|   mysql_rpl_slave_state_coltypes,
 | |
|   array_elements(mysql_rpl_slave_state_pk_parts),
 | |
|   mysql_rpl_slave_state_pk_parts
 | |
| };
 | |
| 
 | |
| static Table_check_intact_log_error gtid_table_intact;
 | |
| 
 | |
| /*
 | |
|   Check that the mysql.gtid_slave_pos table has the correct definition.
 | |
| */
 | |
| int
 | |
| gtid_check_rpl_slave_state_table(TABLE *table)
 | |
| {
 | |
|   int err;
 | |
| 
 | |
|   if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef)))
 | |
|     my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
 | |
|              rpl_gtid_slave_state_table_name.str);
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine
 | |
|   that is already in use by the current transaction, if any.
 | |
| */
 | |
| void
 | |
| rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
 | |
| {
 | |
|   /*
 | |
|     See comments on rpl_slave_state::gtid_pos_tables for rules around proper
 | |
|     access to the list.
 | |
|   */
 | |
|   auto list= gtid_pos_tables.load(std::memory_order_acquire);
 | |
| 
 | |
|   Ha_trx_info *ha_info;
 | |
|   uint count = 0;
 | |
|   for (ha_info= thd->transaction->all.ha_list; ha_info; ha_info= ha_info->next())
 | |
|   {
 | |
|     void *trx_hton= ha_info->ht();
 | |
|     auto table_entry= list;
 | |
| 
 | |
|     if (!ha_info->is_trx_read_write() || trx_hton == &binlog_tp)
 | |
|       continue;
 | |
|     while (table_entry)
 | |
|     {
 | |
|       if (table_entry->table_hton == trx_hton)
 | |
|       {
 | |
|         if (likely(table_entry->state == GTID_POS_AVAILABLE))
 | |
|         {
 | |
|           *out_tablename= table_entry->table_name;
 | |
|           /*
 | |
|             Check if this is a cross-engine transaction, so we can correctly
 | |
|             maintain the rpl_transactions_multi_engine status variable.
 | |
|           */
 | |
|           if (count >= 1)
 | |
|             statistic_increment(rpl_transactions_multi_engine, LOCK_status);
 | |
|           else
 | |
|           {
 | |
|             for (;;)
 | |
|             {
 | |
|               ha_info= ha_info->next();
 | |
|               if (!ha_info)
 | |
|                 break;
 | |
|               if (ha_info->is_trx_read_write() && ha_info->ht() != &binlog_tp)
 | |
|               {
 | |
|                 statistic_increment(rpl_transactions_multi_engine, LOCK_status);
 | |
|                 break;
 | |
|               }
 | |
|             }
 | |
|           }
 | |
|           return;
 | |
|         }
 | |
|         /*
 | |
|           This engine is marked to automatically create the table.
 | |
|           We cannot easily do this here (possibly in the middle of a
 | |
|           transaction). But we can request the slave background thread
 | |
|           to create it, and in a short while it should become available
 | |
|           for following transactions.
 | |
|         */
 | |
| #ifdef HAVE_REPLICATION
 | |
|         slave_background_gtid_pos_create_request(table_entry);
 | |
| #endif
 | |
|         break;
 | |
|       }
 | |
|       table_entry= table_entry->next;
 | |
|     }
 | |
|     ++count;
 | |
|   }
 | |
|   /*
 | |
|     If we cannot find any table whose engine matches an engine that is
 | |
|     already active in the transaction, or if there is no current transaction
 | |
|     engines available, we return the default gtid_slave_pos table.
 | |
|   */
 | |
|   *out_tablename=
 | |
|     default_gtid_pos_table.load(std::memory_order_acquire)->table_name;
 | |
|   /* Record in status that we failed to find a suitable gtid_pos table. */
 | |
|   if (count > 0)
 | |
|   {
 | |
|     statistic_increment(transactions_gtid_foreign_engine, LOCK_status);
 | |
|     if (count > 1)
 | |
|       statistic_increment(rpl_transactions_multi_engine, LOCK_status);
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Write a gtid to the replication slave state table.
 | |
| 
 | |
|   Do it as part of the transaction, to get slave crash safety, or as a separate
 | |
|   transaction if !in_transaction (eg. MyISAM or DDL).
 | |
| 
 | |
|     gtid    The global transaction id for this event group.
 | |
|     sub_id  Value allocated within the sub_id when the event group was
 | |
|             read (sub_id must be consistent with commit order in master binlog).
 | |
| 
 | |
|   Note that caller must later ensure that the new gtid and sub_id is inserted
 | |
|   into the appropriate HASH element with rpl_slave_state.add(), so that it can
 | |
|   be deleted later. But this must only be done after COMMIT if in transaction.
 | |
| */
 | |
| int
 | |
| rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
 | |
|                              bool in_transaction, bool in_statement,
 | |
|                              void **out_hton)
 | |
| {
 | |
|   TABLE_LIST tlist;
 | |
|   int err= 0, not_sql_thread;
 | |
|   bool table_opened= false;
 | |
|   TABLE *table;
 | |
|   ulonglong thd_saved_option= thd->variables.option_bits;
 | |
|   Query_tables_list lex_backup;
 | |
|   wait_for_commit* suspended_wfc;
 | |
|   void *hton= NULL;
 | |
|   LEX_CSTRING gtid_pos_table_name;
 | |
|   TABLE *tbl= nullptr;
 | |
|   MDL_savepoint m_start_of_statement_svp(thd->mdl_context.mdl_savepoint());
 | |
|   DBUG_ENTER("record_gtid");
 | |
| 
 | |
|   *out_hton= NULL;
 | |
|   if (unlikely(!loaded))
 | |
|   {
 | |
|     /*
 | |
|       Probably the mysql.gtid_slave_pos table is missing (eg. upgrade) or
 | |
|       corrupt.
 | |
| 
 | |
|       We already complained loudly about this, but we can try to continue
 | |
|       until the DBA fixes it.
 | |
|     */
 | |
|     DBUG_RETURN(0);
 | |
|   }
 | |
| 
 | |
|   if (!in_statement)
 | |
|     thd->reset_for_next_command();
 | |
| 
 | |
|   if (thd->rgi_slave && (thd->rgi_slave->gtid_ev_flags_extra &
 | |
|                          Gtid_log_event::FL_START_ALTER_E1))
 | |
|   {
 | |
|     /*
 | |
|      store the open table table list in ptr, so that is close_thread_tables
 | |
|      is called start alter tables are not closed
 | |
|     */
 | |
|     mysql_mutex_lock(&thd->LOCK_thd_data);
 | |
|     tbl= thd->open_tables;
 | |
|     thd->open_tables= nullptr;
 | |
|     mysql_mutex_unlock(&thd->LOCK_thd_data);
 | |
|   }
 | |
|   /*
 | |
|     Only the SQL thread can call select_gtid_pos_table without a mutex
 | |
|     Other threads needs to use a mutex and take into account that the
 | |
|     result may change during execution, so we have to make a copy.
 | |
|   */
 | |
| 
 | |
|   if ((not_sql_thread= (thd->system_thread != SYSTEM_THREAD_SLAVE_SQL)))
 | |
|     mysql_mutex_lock(&LOCK_slave_state);
 | |
|   select_gtid_pos_table(thd, >id_pos_table_name);
 | |
|   if (not_sql_thread)
 | |
|   {
 | |
|     LEX_CSTRING *tmp= thd->make_clex_string(gtid_pos_table_name.str,
 | |
|                                             gtid_pos_table_name.length);
 | |
|     mysql_mutex_unlock(&LOCK_slave_state);
 | |
|     if (!tmp)
 | |
|       DBUG_RETURN(1);
 | |
|     gtid_pos_table_name= *tmp;
 | |
|   }
 | |
| 
 | |
|   DBUG_EXECUTE_IF("gtid_inject_record_gtid",
 | |
|                   {
 | |
|                     my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
 | |
|                     DBUG_RETURN(1);
 | |
|                   } );
 | |
| 
 | |
|   /*
 | |
|     If we are applying a non-transactional event group, we will be committing
 | |
|     here a transaction, but that does not imply that the event group has
 | |
|     completed or has been binlogged. So we should not trigger
 | |
|     wakeup_subsequent_commits() here.
 | |
| 
 | |
|     Note: An alternative here could be to put a call to mark_start_commit() in
 | |
|     stmt_done() before the call to record_and_update_gtid(). This would
 | |
|     prevent later calling mark_start_commit() after we have run
 | |
|     wakeup_subsequent_commits() from committing the GTID update transaction
 | |
|     (which must be avoided to avoid accessing freed group_commit_orderer
 | |
|     object). It would also allow following event groups to start slightly
 | |
|     earlier. And in the cases where record_gtid() is called without an active
 | |
|     transaction, the current statement should have been binlogged already, so
 | |
|     binlog order is preserved.
 | |
| 
 | |
|     But this is rather subtle, and potentially fragile. And it does not really
 | |
|     seem worth it; non-transactional loads are unlikely to benefit much from
 | |
|     parallel replication in any case. So for now, we go with the simple
 | |
|     suspend/resume of wakeup_subsequent_commits() here in record_gtid().
 | |
|   */
 | |
|   suspended_wfc= thd->suspend_subsequent_commits();
 | |
|   thd->lex->reset_n_backup_query_tables_list(&lex_backup);
 | |
|   tlist.init_one_table(&MYSQL_SCHEMA_NAME, >id_pos_table_name, NULL, TL_WRITE);
 | |
|   if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
 | |
|     goto end;
 | |
|   table_opened= true;
 | |
|   table= tlist.table;
 | |
|   hton= table->s->db_type();
 | |
|   table->file->row_logging= 0;                  // No binary logging
 | |
| 
 | |
|   if ((err= gtid_check_rpl_slave_state_table(table)))
 | |
|     goto end;
 | |
| 
 | |
| #ifdef WITH_WSREP
 | |
|   thd->wsrep_ignore_table= true; // Do not replicate mysql.gtid_slave_pos table
 | |
| #endif
 | |
| 
 | |
|   if (!in_transaction)
 | |
|   {
 | |
|     DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
 | |
|     thd->variables.option_bits&=
 | |
|       ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG |
 | |
|                    OPTION_GTID_BEGIN);
 | |
|   }
 | |
|   else
 | |
|     thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG;
 | |
| 
 | |
|   bitmap_set_all(table->write_set);
 | |
|   table->rpl_write_set= table->write_set;
 | |
| 
 | |
|   table->field[0]->store((ulonglong)gtid->domain_id, true);
 | |
|   table->field[1]->store(sub_id, true);
 | |
|   table->field[2]->store((ulonglong)gtid->server_id, true);
 | |
|   table->field[3]->store(gtid->seq_no, true);
 | |
|   DBUG_EXECUTE_IF("inject_crash_before_write_rpl_slave_state", DBUG_SUICIDE(););
 | |
|   if ((err= table->file->ha_write_row(table->record[0])))
 | |
|   {
 | |
|     table->file->print_error(err, MYF(0));
 | |
|     goto end;
 | |
|   }
 | |
|   *out_hton= hton;
 | |
| 
 | |
|   if(opt_bin_log &&
 | |
|      (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
 | |
|                                                        gtid->seq_no)))
 | |
|   {
 | |
|     my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|     goto end;
 | |
|   }
 | |
| end:
 | |
| 
 | |
|   if (table_opened)
 | |
|   {
 | |
|     if (err || (err= ha_commit_trans(thd, FALSE)))
 | |
|       ha_rollback_trans(thd, FALSE);
 | |
|     close_thread_tables(thd);
 | |
|     if (!thd->rgi_slave || !(thd->rgi_slave->gtid_ev_flags_extra &
 | |
|                              Gtid_log_event::FL_START_ALTER_E1))
 | |
|     {
 | |
|       if (in_transaction)
 | |
|         thd->mdl_context.release_statement_locks();
 | |
|       else
 | |
|         thd->release_transactional_locks();
 | |
|     }
 | |
|   }
 | |
|   if (thd->rgi_slave &&
 | |
|       thd->rgi_slave->gtid_ev_flags_extra & Gtid_log_event::FL_START_ALTER_E1)
 | |
|   {
 | |
|     mysql_mutex_lock(&thd->LOCK_thd_data);
 | |
|     thd->open_tables= tbl;
 | |
|     mysql_mutex_unlock(&thd->LOCK_thd_data);
 | |
|     thd->mdl_context.rollback_to_savepoint(m_start_of_statement_svp);
 | |
|   }
 | |
| 
 | |
| #ifdef WITH_WSREP
 | |
|   thd->wsrep_ignore_table= false;
 | |
| #endif
 | |
|   thd->lex->restore_backup_query_tables_list(&lex_backup);
 | |
|   thd->variables.option_bits= thd_saved_option;
 | |
|   thd->resume_subsequent_commits(suspended_wfc);
 | |
|   DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep",
 | |
|     {
 | |
|       if (gtid->server_id == 100)
 | |
|         my_sleep(500000);
 | |
|     });
 | |
|   DBUG_RETURN(err);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Return a list of all old GTIDs in any mysql.gtid_slave_pos* table that are
 | |
|   no longer needed and can be deleted from the table.
 | |
| 
 | |
|   Within each domain, we need to keep around the latest GTID (the one with the
 | |
|   highest sub_id), but any others in that domain can be deleted.
 | |
| */
 | |
| rpl_slave_state::list_element *
 | |
| rpl_slave_state::gtid_grab_pending_delete_list()
 | |
| {
 | |
|   uint32 i;
 | |
|   list_element *full_list;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_slave_state);
 | |
|   full_list= NULL;
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     element *elem= (element *)my_hash_element(&hash, i);
 | |
|     list_element *elist= elem->list;
 | |
|     list_element *last_elem, **best_ptr_ptr, *cur, *next;
 | |
|     uint64 best_sub_id;
 | |
| 
 | |
|     if (!elist)
 | |
|       continue;                                 /* Nothing here */
 | |
| 
 | |
|     /* Delete any old stuff, but keep around the most recent one. */
 | |
|     cur= elist;
 | |
|     best_sub_id= cur->sub_id;
 | |
|     best_ptr_ptr= &elist;
 | |
|     last_elem= cur;
 | |
|     while ((next= cur->next)) {
 | |
|       last_elem= next;
 | |
|       if (next->sub_id > best_sub_id)
 | |
|       {
 | |
|         best_sub_id= next->sub_id;
 | |
|         best_ptr_ptr= &cur->next;
 | |
|       }
 | |
|       cur= next;
 | |
|     }
 | |
|     /*
 | |
|       Append the new elements to the full list. Note the order is important;
 | |
|       we do it here so that we do not break the list if best_sub_id is the
 | |
|       last of the new elements.
 | |
|     */
 | |
|     last_elem->next= full_list;
 | |
|     /*
 | |
|       Delete the highest sub_id element from the old list, and put it back as
 | |
|       the single-element new list.
 | |
|     */
 | |
|     cur= *best_ptr_ptr;
 | |
|     *best_ptr_ptr= cur->next;
 | |
|     cur->next= NULL;
 | |
|     elem->list= cur;
 | |
| 
 | |
|     /*
 | |
|       Collect the full list so far here. Note that elist may have moved if we
 | |
|       deleted the first element, so order is again important.
 | |
|     */
 | |
|     full_list= elist;
 | |
|   }
 | |
|   mysql_mutex_unlock(&LOCK_slave_state);
 | |
| 
 | |
|   return full_list;
 | |
| }
 | |
| 
 | |
| 
 | |
| /* Find the mysql.gtid_slave_posXXX table associated with a given hton. */
 | |
| LEX_CSTRING *
 | |
| rpl_slave_state::select_gtid_pos_table(void *hton)
 | |
| {
 | |
|   /*
 | |
|     See comments on rpl_slave_state::gtid_pos_tables for rules around proper
 | |
|     access to the list.
 | |
|   */
 | |
|   auto table_entry= gtid_pos_tables.load(std::memory_order_acquire);
 | |
| 
 | |
|   while (table_entry)
 | |
|   {
 | |
|     if (table_entry->table_hton == hton)
 | |
|     {
 | |
|       if (likely(table_entry->state == GTID_POS_AVAILABLE))
 | |
|         return &table_entry->table_name;
 | |
|     }
 | |
|     table_entry= table_entry->next;
 | |
|   }
 | |
| 
 | |
|   return &default_gtid_pos_table.load(std::memory_order_acquire)->table_name;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_slave_state::gtid_delete_pending(THD *thd,
 | |
|                                      rpl_slave_state::list_element **list_ptr)
 | |
| {
 | |
|   int err= 0;
 | |
|   ulonglong thd_saved_option;
 | |
| 
 | |
|   if (unlikely(!loaded))
 | |
|     return;
 | |
| 
 | |
| #ifdef WITH_WSREP
 | |
|   thd->wsrep_ignore_table= true; // No Galera replication for mysql.gtid_pos_table
 | |
| #endif
 | |
| 
 | |
|   thd_saved_option= thd->variables.option_bits;
 | |
|   thd->variables.option_bits&=
 | |
|     ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG |
 | |
|                  OPTION_GTID_BEGIN);
 | |
| 
 | |
|   while (*list_ptr)
 | |
|   {
 | |
|     LEX_CSTRING *gtid_pos_table_name, *tmp_table_name;
 | |
|     Query_tables_list lex_backup;
 | |
|     TABLE_LIST tlist;
 | |
|     TABLE *table;
 | |
|     handler::Table_flags direct_pos= 0;
 | |
|     list_element *cur, **cur_ptr_ptr;
 | |
|     bool table_opened= false;
 | |
|     bool index_inited= false;
 | |
|     void *hton= (*list_ptr)->hton;
 | |
| 
 | |
|     thd->reset_for_next_command();
 | |
| 
 | |
|     /*
 | |
|       Only the SQL thread can call select_gtid_pos_table without a mutex
 | |
|       Other threads needs to use a mutex and take into account that the
 | |
|       result may change during execution, so we have to make a copy.
 | |
|     */
 | |
|     mysql_mutex_lock(&LOCK_slave_state);
 | |
|     tmp_table_name= select_gtid_pos_table(hton);
 | |
|     gtid_pos_table_name= thd->make_clex_string(tmp_table_name->str,
 | |
|                                                tmp_table_name->length);
 | |
|     mysql_mutex_unlock(&LOCK_slave_state);
 | |
|     if (!gtid_pos_table_name)
 | |
|     {
 | |
|       /* Out of memory - we can try again later. */
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     thd->lex->reset_n_backup_query_tables_list(&lex_backup);
 | |
|     tlist.init_one_table(&MYSQL_SCHEMA_NAME, gtid_pos_table_name, NULL, TL_WRITE);
 | |
|     if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
 | |
|       goto end;
 | |
|     table_opened= true;
 | |
|     table= tlist.table;
 | |
| 
 | |
|     if ((err= gtid_check_rpl_slave_state_table(table)))
 | |
|       goto end;
 | |
| 
 | |
|     direct_pos= table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION;
 | |
|     bitmap_set_all(table->write_set);
 | |
|     table->rpl_write_set= table->write_set;
 | |
| 
 | |
|     /* Now delete any already committed GTIDs. */
 | |
|     bitmap_set_bit(table->read_set, table->field[0]->field_index);
 | |
|     bitmap_set_bit(table->read_set, table->field[1]->field_index);
 | |
| 
 | |
|     if (!direct_pos)
 | |
|     {
 | |
|       if ((err= table->file->ha_index_init(0, 0)))
 | |
|       {
 | |
|         table->file->print_error(err, MYF(0));
 | |
|         goto end;
 | |
|       }
 | |
|       index_inited= true;
 | |
|     }
 | |
| 
 | |
|     cur = *list_ptr;
 | |
|     cur_ptr_ptr = list_ptr;
 | |
|     do
 | |
|     {
 | |
|       uchar key_buffer[4+8];
 | |
|       list_element *next= cur->next;
 | |
| 
 | |
|       if (cur->hton == hton)
 | |
|       {
 | |
|         int res;
 | |
| 
 | |
|         table->field[0]->store((ulonglong)cur->domain_id, true);
 | |
|         table->field[1]->store(cur->sub_id, true);
 | |
|         if (direct_pos)
 | |
|         {
 | |
|           res= table->file->ha_rnd_pos_by_record(table->record[0]);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|           key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
 | |
|           res= table->file->ha_index_read_map(table->record[0], key_buffer,
 | |
|                                               HA_WHOLE_KEY, HA_READ_KEY_EXACT);
 | |
|         }
 | |
|         DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete",
 | |
|               { res= 1;
 | |
|                 err= ENOENT;
 | |
|                 sql_print_error("<DEBUG> Error deleting old GTID row");
 | |
|               });
 | |
|         if (res)
 | |
|           /* We cannot find the row, assume it is already deleted. */
 | |
|           ;
 | |
|         else if ((err= table->file->ha_delete_row(table->record[0])))
 | |
|         {
 | |
|           sql_print_error("Error deleting old GTID row: %s",
 | |
|                           thd->get_stmt_da()->message());
 | |
|           /*
 | |
|             In case of error, we still discard the element from the list. We do
 | |
|             not want to endlessly error on the same element in case of table
 | |
|             corruption or such.
 | |
|           */
 | |
|         }
 | |
|         *cur_ptr_ptr= next;
 | |
|         my_free(cur);
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         /* Leave this one in the list until we get to the table for its hton. */
 | |
|         cur_ptr_ptr= &cur->next;
 | |
|       }
 | |
|       cur= next;
 | |
|       if (err)
 | |
|         break;
 | |
|     } while (cur);
 | |
| end:
 | |
|     if (table_opened)
 | |
|     {
 | |
|       DBUG_ASSERT(direct_pos || index_inited || err);
 | |
|       /*
 | |
|         Index may not be initialized if there was a failure during
 | |
|         'ha_index_init'. Hence check if index initialization is successful and
 | |
|         then invoke ha_index_end(). Ending an index which is not initialized
 | |
|         will lead to assert.
 | |
|       */
 | |
|       if (index_inited)
 | |
|         table->file->ha_index_end();
 | |
| 
 | |
|       if (err || (err= ha_commit_trans(thd, FALSE)))
 | |
|         ha_rollback_trans(thd, FALSE);
 | |
|     }
 | |
|     close_thread_tables(thd);
 | |
|     thd->release_transactional_locks();
 | |
|     thd->lex->restore_backup_query_tables_list(&lex_backup);
 | |
| 
 | |
|     if (err)
 | |
|       break;
 | |
|   }
 | |
|   thd->variables.option_bits= thd_saved_option;
 | |
| 
 | |
| #ifdef WITH_WSREP
 | |
|   thd->wsrep_ignore_table= false;
 | |
| #endif
 | |
| }
 | |
| 
 | |
| 
 | |
| uint64
 | |
| rpl_slave_state::next_sub_id(uint32 domain_id)
 | |
| {
 | |
|   uint64 sub_id= 0;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_slave_state);
 | |
|   sub_id= ++last_sub_id;
 | |
|   mysql_mutex_unlock(&LOCK_slave_state);
 | |
| 
 | |
|   return sub_id;
 | |
| }
 | |
| 
 | |
| /* A callback used in sorting of gtid list based on domain_id. */
 | |
| static int rpl_gtid_cmp_cb(const void *id1, const void *id2)
 | |
| {
 | |
|   uint32 d1= ((rpl_gtid *)id1)->domain_id;
 | |
|   uint32 d2= ((rpl_gtid *)id2)->domain_id;
 | |
| 
 | |
|   if (d1 < d2)
 | |
|     return -1;
 | |
|   else if (d1 > d2)
 | |
|     return 1;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| /* Format the specified gtid and store it in the given string buffer. */
 | |
| bool
 | |
| rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
 | |
| {
 | |
|   if (*first)
 | |
|     *first= false;
 | |
|   else
 | |
|     if (dest->append(','))
 | |
|       return true;
 | |
|   return
 | |
|     dest->append_ulonglong(gtid->domain_id) ||
 | |
|     dest->append('-') ||
 | |
|     dest->append_ulonglong(gtid->server_id) ||
 | |
|     dest->append('-') ||
 | |
|     dest->append_ulonglong(gtid->seq_no);
 | |
| }
 | |
| 
 | |
| /*
 | |
|   Sort the given gtid list based on domain_id and store them in the specified
 | |
|   string.
 | |
| */
 | |
| static bool
 | |
| rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr, String *str)
 | |
| {
 | |
|   bool first= true, res= true;
 | |
| 
 | |
|   sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
 | |
| 
 | |
|   for (uint i= 0; i < gtid_dynarr->elements; i ++)
 | |
|   {
 | |
|     rpl_gtid *gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
 | |
|     if (rpl_slave_state_tostring_helper(str, gtid, &first))
 | |
|       goto err;
 | |
|   }
 | |
|   res= false;
 | |
| 
 | |
| err:
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| /* Sort the given gtid list based on domain_id and call cb for each gtid. */
 | |
| static bool
 | |
| rpl_slave_state_tostring_helper(DYNAMIC_ARRAY *gtid_dynarr,
 | |
|                                 int (*cb)(rpl_gtid *, void *),
 | |
|                                 void *data)
 | |
| {
 | |
|   rpl_gtid *gtid;
 | |
|   bool res= true;
 | |
| 
 | |
|   sort_dynamic(gtid_dynarr, rpl_gtid_cmp_cb);
 | |
| 
 | |
|   for (uint i= 0; i < gtid_dynarr->elements; i ++)
 | |
|   {
 | |
|     gtid= dynamic_element(gtid_dynarr, i, rpl_gtid *);
 | |
|     if ((*cb)(gtid, data))
 | |
|       goto err;
 | |
|   }
 | |
|   res= false;
 | |
| 
 | |
| err:
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| int
 | |
| rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
 | |
|                          rpl_gtid *extra_gtids, uint32 num_extra,
 | |
|                          bool sort)
 | |
| {
 | |
|   uint32 i;
 | |
|   HASH gtid_hash;
 | |
|   uchar *rec;
 | |
|   rpl_gtid *gtid;
 | |
|   int res= 1;
 | |
|   bool locked= false;
 | |
| 
 | |
|   my_hash_init(PSI_INSTRUMENT_ME, >id_hash, &my_charset_bin, 32,
 | |
|                offsetof(rpl_gtid, domain_id), sizeof(rpl_gtid::domain_id),
 | |
|                NULL, NULL, HASH_UNIQUE);
 | |
|   for (i= 0; i < num_extra; ++i)
 | |
|     if (extra_gtids[i].server_id == global_system_variables.server_id &&
 | |
|         my_hash_insert(>id_hash, (uchar *)(&extra_gtids[i])))
 | |
|       goto err;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_slave_state);
 | |
|   locked= true;
 | |
|   reset_dynamic(>id_sort_array);
 | |
| 
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     uint64 best_sub_id;
 | |
|     rpl_gtid best_gtid;
 | |
|     element *e= (element *)my_hash_element(&hash, i);
 | |
|     list_element *l= e->list;
 | |
| 
 | |
|     if (!l)
 | |
|       continue;                                 /* Nothing here */
 | |
| 
 | |
|     best_gtid.domain_id= e->domain_id;
 | |
|     best_gtid.server_id= l->server_id;
 | |
|     best_gtid.seq_no= l->seq_no;
 | |
|     best_sub_id= l->sub_id;
 | |
|     while ((l= l->next))
 | |
|     {
 | |
|       if (l->sub_id > best_sub_id)
 | |
|       {
 | |
|         best_sub_id= l->sub_id;
 | |
|         best_gtid.server_id= l->server_id;
 | |
|         best_gtid.seq_no= l->seq_no;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     /* Check if we have something newer in the extra list. */
 | |
|     rec= my_hash_search(>id_hash, (const uchar *)&best_gtid.domain_id,
 | |
|                         sizeof(best_gtid.domain_id));
 | |
|     if (rec)
 | |
|     {
 | |
|       gtid= (rpl_gtid *)rec;
 | |
|       if (gtid->seq_no > best_gtid.seq_no)
 | |
|         memcpy(&best_gtid, gtid, sizeof(best_gtid));
 | |
|       if (my_hash_delete(>id_hash, rec))
 | |
|       {
 | |
|         goto err;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if ((res= sort ? insert_dynamic(>id_sort_array,
 | |
|                                     (const void *) &best_gtid) :
 | |
|          (*cb)(&best_gtid, data)))
 | |
|     {
 | |
|       goto err;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /* Also add any remaining extra domain_ids. */
 | |
|   for (i= 0; i < gtid_hash.records; ++i)
 | |
|   {
 | |
|     gtid= (rpl_gtid *)my_hash_element(>id_hash, i);
 | |
|     if ((res= sort ? insert_dynamic(>id_sort_array, (const void *) gtid) :
 | |
|          (*cb)(gtid, data)))
 | |
|     {
 | |
|       goto err;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (sort && rpl_slave_state_tostring_helper(>id_sort_array, cb, data))
 | |
|   {
 | |
|     goto err;
 | |
|   }
 | |
| 
 | |
|   res= 0;
 | |
| 
 | |
| err:
 | |
|   if (locked) mysql_mutex_unlock(&LOCK_slave_state);
 | |
|   my_hash_free(>id_hash);
 | |
| 
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| struct rpl_slave_state_tostring_data {
 | |
|   String *dest;
 | |
|   bool first;
 | |
| };
 | |
| static int
 | |
| rpl_slave_state_tostring_cb(rpl_gtid *gtid, void *data)
 | |
| {
 | |
|   rpl_slave_state_tostring_data *p= (rpl_slave_state_tostring_data *)data;
 | |
|   return rpl_slave_state_tostring_helper(p->dest, gtid, &p->first);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Prepare the current slave state as a string, suitable for sending to the
 | |
|   master to request to receive binlog events starting from that GTID state.
 | |
| 
 | |
|   The state consists of the most recently applied GTID for each domain_id,
 | |
|   ie. the one with the highest sub_id within each domain_id.
 | |
| 
 | |
|   Optionally, extra_gtids is a list of GTIDs from the binlog. This is used when
 | |
|   a server was previously a master and now needs to connect to a new master as
 | |
|   a slave. For each domain_id, if the GTID in the binlog was logged with our
 | |
|   own server_id _and_ has a higher seq_no than what is in the slave state,
 | |
|   then this should be used as the position to start replicating at. This
 | |
|   allows to promote a slave as new master, and connect the old master as a
 | |
|   slave with MASTER_GTID_POS=AUTO.
 | |
| */
 | |
| int
 | |
| rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
 | |
| {
 | |
|   struct rpl_slave_state_tostring_data data;
 | |
|   data.first= true;
 | |
|   data.dest= dest;
 | |
| 
 | |
|   return iterate(rpl_slave_state_tostring_cb, &data, extra_gtids,
 | |
|                  num_extra, true);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Lookup a domain_id in the current replication slave state.
 | |
| 
 | |
|   Returns false if the domain_id has no entries in the slave state.
 | |
|   Otherwise returns true, and fills in out_gtid with the corresponding
 | |
|   GTID.
 | |
| */
 | |
| bool
 | |
| rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
 | |
| {
 | |
|   element *elem;
 | |
|   list_element *list;
 | |
|   uint64 best_sub_id;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_slave_state);
 | |
|   elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id,
 | |
|                                   sizeof(domain_id));
 | |
|   if (!elem || !(list= elem->list))
 | |
|   {
 | |
|     mysql_mutex_unlock(&LOCK_slave_state);
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   out_gtid->domain_id= domain_id;
 | |
|   out_gtid->server_id= list->server_id;
 | |
|   out_gtid->seq_no= list->seq_no;
 | |
|   best_sub_id= list->sub_id;
 | |
| 
 | |
|   while ((list= list->next))
 | |
|   {
 | |
|     if (best_sub_id > list->sub_id)
 | |
|       continue;
 | |
|     best_sub_id= list->sub_id;
 | |
|     out_gtid->server_id= list->server_id;
 | |
|     out_gtid->seq_no= list->seq_no;
 | |
|   }
 | |
| 
 | |
|   mysql_mutex_unlock(&LOCK_slave_state);
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| #endif
 | |
| 
 | |
| /*
 | |
|   Parse a GTID at the start of a string, and update the pointer to point
 | |
|   at the first character after the parsed GTID.
 | |
| 
 | |
|   Returns 0 on ok, non-zero on parse error.
 | |
| */
 | |
| static int
 | |
| gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
 | |
| {
 | |
|   char *q;
 | |
|   const char *p= *ptr;
 | |
|   uint64 v1, v2, v3;
 | |
|   int err= 0;
 | |
| 
 | |
|   q= (char*) end;
 | |
|   v1= (uint64)my_strtoll10(p, &q, &err);
 | |
|   if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-')
 | |
|     return 1;
 | |
|   p= q+1;
 | |
|   q= (char*) end;
 | |
|   v2= (uint64)my_strtoll10(p, &q, &err);
 | |
|   if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-')
 | |
|     return 1;
 | |
|   p= q+1;
 | |
|   q= (char*) end;
 | |
|   v3= (uint64)my_strtoll10(p, &q, &err);
 | |
|   if (err != 0)
 | |
|     return 1;
 | |
| 
 | |
|   out_gtid->domain_id= (uint32) v1;
 | |
|   out_gtid->server_id= (uint32) v2;
 | |
|   out_gtid->seq_no= v3;
 | |
|   *ptr= q;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| rpl_gtid *
 | |
| gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
 | |
| {
 | |
|   const char *p= const_cast<char *>(str);
 | |
|   const char *end= p + str_len;
 | |
|   uint32 len= 0, alloc_len= 5;
 | |
|   rpl_gtid *list= NULL;
 | |
| 
 | |
|   for (;;)
 | |
|   {
 | |
|     rpl_gtid gtid;
 | |
| 
 | |
|     if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, >id))
 | |
|     {
 | |
|       my_free(list);
 | |
|       return NULL;
 | |
|     }
 | |
|     if ((!list || len >= alloc_len) &&
 | |
|         !(list=
 | |
|           (rpl_gtid *)my_realloc(PSI_INSTRUMENT_ME, list,
 | |
|                                  (alloc_len= alloc_len*2) * sizeof(rpl_gtid),
 | |
|                                  MYF(MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR))))
 | |
|       return NULL;
 | |
|     list[len++]= gtid;
 | |
| 
 | |
|     if (p == end)
 | |
|       break;
 | |
|     if (*p != ',')
 | |
|     {
 | |
|       my_free(list);
 | |
|       return NULL;
 | |
|     }
 | |
|     ++p;
 | |
|   }
 | |
|   *out_len= len;
 | |
|   return list;
 | |
| }
 | |
| 
 | |
| #ifndef MYSQL_CLIENT
 | |
| 
 | |
| /*
 | |
|   Update the slave replication state with the GTID position obtained from
 | |
|   master when connecting with old-style (filename,offset) position.
 | |
| 
 | |
|   If RESET is true then all existing entries are removed. Otherwise only
 | |
|   domain_ids mentioned in the STATE_FROM_MASTER are changed.
 | |
| 
 | |
|   Returns 0 if ok, non-zero if error.
 | |
| */
 | |
| int
 | |
| rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len,
 | |
|                       bool reset, bool in_statement)
 | |
| {
 | |
|   const char *end= state_from_master + len;
 | |
| 
 | |
|   mysql_mutex_assert_not_owner(&LOCK_slave_state);
 | |
|   if (reset)
 | |
|   {
 | |
|     if (truncate_state_table(thd))
 | |
|       return 1;
 | |
|     truncate_hash();
 | |
|   }
 | |
|   if (state_from_master == end)
 | |
|     return 0;
 | |
|   for (;;)
 | |
|   {
 | |
|     rpl_gtid gtid;
 | |
|     uint64 sub_id;
 | |
|     void *hton= NULL;
 | |
| 
 | |
|     if (gtid_parser_helper(&state_from_master, end, >id) ||
 | |
|         !(sub_id= next_sub_id(gtid.domain_id)) ||
 | |
|         record_gtid(thd, >id, sub_id, false, in_statement, &hton) ||
 | |
|         update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
 | |
|       return 1;
 | |
|     if (state_from_master == end)
 | |
|       break;
 | |
|     if (*state_from_master != ',')
 | |
|       return 1;
 | |
|     ++state_from_master;
 | |
|   }
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| bool
 | |
| rpl_slave_state::is_empty()
 | |
| {
 | |
|   uint32 i;
 | |
|   bool result= true;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_slave_state);
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     element *e= (element *)my_hash_element(&hash, i);
 | |
|     if (e->list)
 | |
|     {
 | |
|       result= false;
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
|   mysql_mutex_unlock(&LOCK_slave_state);
 | |
| 
 | |
|   return result;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list)
 | |
| {
 | |
|   struct gtid_pos_table *cur, *next;
 | |
| 
 | |
|   cur= list;
 | |
|   while (cur)
 | |
|   {
 | |
|     next= cur->next;
 | |
|     my_free(cur);
 | |
|     cur= next;
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Replace the list of available mysql.gtid_slave_posXXX tables with a new list.
 | |
|   The caller must be holding LOCK_slave_state. Additionally, this function
 | |
|   must only be called while all SQL threads are stopped.
 | |
| */
 | |
| void
 | |
| rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list,
 | |
|                                           rpl_slave_state::gtid_pos_table *default_entry)
 | |
| {
 | |
|   mysql_mutex_assert_owner(&LOCK_slave_state);
 | |
|   auto old_list= gtid_pos_tables.load(std::memory_order_relaxed);
 | |
|   gtid_pos_tables.store(new_list, std::memory_order_release);
 | |
|   default_gtid_pos_table.store(default_entry, std::memory_order_release);
 | |
|   free_gtid_pos_tables(old_list);
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry)
 | |
| {
 | |
|   mysql_mutex_assert_owner(&LOCK_slave_state);
 | |
|   entry->next= gtid_pos_tables.load(std::memory_order_relaxed);
 | |
|   gtid_pos_tables.store(entry, std::memory_order_release);
 | |
| }
 | |
| 
 | |
| 
 | |
| struct rpl_slave_state::gtid_pos_table *
 | |
| rpl_slave_state::alloc_gtid_pos_table(LEX_CSTRING *table_name, void *hton,
 | |
|                                       rpl_slave_state::gtid_pos_table_state state)
 | |
| {
 | |
|   struct gtid_pos_table *p;
 | |
|   char *allocated_str;
 | |
| 
 | |
|   if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME), &p, sizeof(*p),
 | |
|                        &allocated_str, table_name->length+1, NULL))
 | |
|   {
 | |
|     my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1));
 | |
|     return NULL;
 | |
|   }
 | |
|   memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0'
 | |
|   p->next = NULL;
 | |
|   p->table_hton= hton;
 | |
|   p->table_name.str= allocated_str;
 | |
|   p->table_name.length= table_name->length;
 | |
|   p->state= state;
 | |
|   return p;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_binlog_state_base::init()
 | |
| {
 | |
|   my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
 | |
|                offsetof(element, domain_id), sizeof(element::domain_id),
 | |
|                NULL, my_free, HASH_UNIQUE);
 | |
|   initialized= 1;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_binlog_state_base::reset_nolock()
 | |
| {
 | |
|   uint32 i;
 | |
| 
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|     my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
 | |
|   my_hash_reset(&hash);
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_binlog_state_base::free()
 | |
| {
 | |
|   if (initialized)
 | |
|   {
 | |
|     initialized= 0;
 | |
|     reset_nolock();
 | |
|     my_hash_free(&hash);
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| rpl_binlog_state_base::~rpl_binlog_state_base()
 | |
| {
 | |
|   free();
 | |
| }
 | |
| 
 | |
| 
 | |
| bool
 | |
| rpl_binlog_state_base::load_nolock(struct rpl_gtid *list, uint32 count)
 | |
| {
 | |
|   uint32 i;
 | |
|   bool res= false;
 | |
| 
 | |
|   reset_nolock();
 | |
|   for (i= 0; i < count; ++i)
 | |
|   {
 | |
|     if (update_nolock(&(list[i])))
 | |
|     {
 | |
|       res= true;
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| bool
 | |
| rpl_binlog_state_base::load_nolock(rpl_binlog_state_base *orig_state)
 | |
| {
 | |
|   ulong i, j;
 | |
|   HASH *h1= &orig_state->hash;
 | |
| 
 | |
|   reset_nolock();
 | |
|   for (i= 0; i < h1->records; ++i)
 | |
|   {
 | |
|     element *e= (element *)my_hash_element(h1, i);
 | |
|     HASH *h2= &e->hash;
 | |
|     const rpl_gtid *last_gtid= e->last_gtid;
 | |
|     for (j= 0; j < h2->records; ++j)
 | |
|     {
 | |
|       const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(h2, j);
 | |
|       if (gtid == last_gtid)
 | |
|         continue;
 | |
|       if (update_nolock(gtid))
 | |
|         return true;
 | |
|     }
 | |
|     if (likely(last_gtid) && update_nolock(last_gtid))
 | |
|       return true;
 | |
|   }
 | |
| 
 | |
|   return false;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Update replication state with a new GTID.
 | |
| 
 | |
|   If the (domain_id, server_id) pair already exists, then the new GTID replaces
 | |
|   the old one for that domain id. Else a new entry is inserted.
 | |
| 
 | |
|   Note that rpl_binlog_state_base::update_nolock() does not call my_error()
 | |
|   for out-of-memory, caller must do that if needed (eg. ER_OUT_OF_RESOURCES).
 | |
| 
 | |
|   Returns 0 for ok, 1 for error.
 | |
| */
 | |
| int
 | |
| rpl_binlog_state_base::update_nolock(const struct rpl_gtid *gtid)
 | |
| {
 | |
|   element *elem;
 | |
| 
 | |
|   if ((elem= (element *)my_hash_search(&hash,
 | |
|                                        (const uchar *)(>id->domain_id),
 | |
|                                        sizeof(gtid->domain_id))))
 | |
|   {
 | |
|     if (elem->seq_no_counter < gtid->seq_no)
 | |
|       elem->seq_no_counter= gtid->seq_no;
 | |
|     if (!elem->update_element(gtid))
 | |
|       return 0;
 | |
|   }
 | |
|   else if (!alloc_element_nolock(gtid))
 | |
|     return 0;
 | |
| 
 | |
|   return 1;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| rpl_binlog_state_base::alloc_element_nolock(const rpl_gtid *gtid)
 | |
| {
 | |
|   element *elem;
 | |
|   rpl_gtid *lookup_gtid;
 | |
| 
 | |
|   /* First time we see this domain_id; allocate a new element. */
 | |
|   elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem), MYF(0));
 | |
|   lookup_gtid= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*lookup_gtid),
 | |
|                                      MYF(0));
 | |
|   if (elem && lookup_gtid)
 | |
|   {
 | |
|     elem->domain_id= gtid->domain_id;
 | |
|     my_hash_init(PSI_INSTRUMENT_ME, &elem->hash, &my_charset_bin, 32,
 | |
|                  offsetof(rpl_gtid, server_id), sizeof(rpl_gtid::domain_id),
 | |
|                  NULL, my_free, HASH_UNIQUE);
 | |
|     elem->last_gtid= lookup_gtid;
 | |
|     elem->seq_no_counter= gtid->seq_no;
 | |
|     memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
 | |
|     if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
 | |
|     {
 | |
|       lookup_gtid= NULL;                        /* Do not free. */
 | |
|       if (0 == my_hash_insert(&hash, (const uchar *)elem))
 | |
|         return 0;
 | |
|     }
 | |
|     my_hash_free(&elem->hash);
 | |
|   }
 | |
| 
 | |
|   /* An error. */
 | |
|   if (elem)
 | |
|     my_free(elem);
 | |
|   if (lookup_gtid)
 | |
|     my_free(lookup_gtid);
 | |
|   return 1;
 | |
| }
 | |
| 
 | |
| 
 | |
| uint32
 | |
| rpl_binlog_state_base::count_nolock()
 | |
| {
 | |
|   uint32 c= 0;
 | |
|   uint32 i;
 | |
| 
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|     c+= ((element *)my_hash_element(&hash, i))->hash.records;
 | |
| 
 | |
|   return c;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| rpl_binlog_state_base::get_gtid_list_nolock(rpl_gtid *gtid_list, uint32 list_size)
 | |
| {
 | |
|   uint32 i, j, pos;
 | |
| 
 | |
|   pos= 0;
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     element *e= (element *)my_hash_element(&hash, i);
 | |
|     if (!e->last_gtid)
 | |
|     {
 | |
|       DBUG_ASSERT(e->hash.records==0);
 | |
|       continue;
 | |
|     }
 | |
|     for (j= 0; j <= e->hash.records; ++j)
 | |
|     {
 | |
|       const rpl_gtid *gtid;
 | |
|       if (j < e->hash.records)
 | |
|       {
 | |
|         gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
 | |
|         if (gtid == e->last_gtid)
 | |
|           continue;
 | |
|       }
 | |
|       else
 | |
|         gtid= e->last_gtid;
 | |
| 
 | |
|       if (pos >= list_size)
 | |
|         return 1;
 | |
|       memcpy(>id_list[pos++], gtid, sizeof(*gtid));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| rpl_gtid *
 | |
| rpl_binlog_state_base::find_nolock(uint32 domain_id, uint32 server_id)
 | |
| {
 | |
|   element *elem;
 | |
|   if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id,
 | |
|                                         sizeof(domain_id))))
 | |
|     return NULL;
 | |
|   return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id,
 | |
|                                     sizeof(server_id));
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Return true if this binlog state is before the position specified by the
 | |
|   passed-in slave_connection_state, false otherwise.
 | |
|   Note that if the GTID D-S-N is the last GTID added to the state in the
 | |
|   domain D, then the state is considered to come before the position D-S-N
 | |
|   within domain D.
 | |
| */
 | |
| bool
 | |
| rpl_binlog_state_base::is_before_pos(slave_connection_state *pos)
 | |
| {
 | |
|   /*
 | |
|     First check each GTID in the slave position, if it comes after what is
 | |
|     in the state.
 | |
|   */
 | |
|   for (uint32 i= 0; i < pos->hash.records; ++i)
 | |
|   {
 | |
|     const slave_connection_state::entry *e=
 | |
|       (const slave_connection_state::entry *)my_hash_element(&pos->hash, i);
 | |
|     /*
 | |
|       IF we have an entry with the same (domain_id, server_id),
 | |
|       AND either
 | |
|         (    we are ahead in that server_id
 | |
|           OR we are identical, but there's some other server_id after)
 | |
|       THEN that position lies before our state.
 | |
|     */
 | |
|     element *elem;
 | |
|     if ((elem= (element *)my_hash_search(&hash,
 | |
|                                          (const uchar *)&e->gtid.domain_id,
 | |
|                                          sizeof(e->gtid.domain_id))))
 | |
|     {
 | |
|       const rpl_gtid *g= (rpl_gtid *)
 | |
|         my_hash_search(&elem->hash, (const uchar *)&e->gtid.server_id,
 | |
|                        sizeof(e->gtid.server_id));
 | |
|       if (g != nullptr &&
 | |
|            ( g->seq_no > e->gtid.seq_no ||
 | |
|              ( g->seq_no == e->gtid.seq_no && g != elem->last_gtid) ))
 | |
|         return false;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /*
 | |
|     Then check the state, if there are any domains present that are missing
 | |
|     from the position.
 | |
|   */
 | |
|   for (uint32 i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     const element *elem= (const element *) my_hash_element(&hash, i);
 | |
|     if (likely(elem->hash.records > 0) &&
 | |
|         !pos->find(elem->domain_id))
 | |
|       return false;
 | |
|   }
 | |
| 
 | |
|   /* Nothing in our state lies after anything in the position. */
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| 
 | |
| void rpl_binlog_state::init()
 | |
| {
 | |
|   rpl_binlog_state_base::init();
 | |
|   my_init_dynamic_array(PSI_INSTRUMENT_ME, >id_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
 | |
|   mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
 | |
|                    MY_MUTEX_INIT_SLOW);
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| rpl_binlog_state::reset()
 | |
| {
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   reset_nolock();
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
| }
 | |
| 
 | |
| 
 | |
| void rpl_binlog_state::free()
 | |
| {
 | |
|   if (initialized)
 | |
|   {
 | |
|     rpl_binlog_state_base::free();
 | |
|     delete_dynamic(>id_sort_array);
 | |
|     mysql_mutex_destroy(&LOCK_binlog_state);
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| rpl_binlog_state::~rpl_binlog_state()
 | |
| {
 | |
|   free();
 | |
| }
 | |
| 
 | |
| 
 | |
| bool
 | |
| rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
 | |
| {
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   bool res= load_nolock(list, count);
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   if (res)
 | |
|     my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data)
 | |
| {
 | |
|   rpl_binlog_state *self= (rpl_binlog_state *)data;
 | |
|   return self->update_nolock(gtid);
 | |
| }
 | |
| 
 | |
| 
 | |
| bool
 | |
| rpl_binlog_state::load(rpl_slave_state *slave_pos)
 | |
| {
 | |
|   bool res= false;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   reset_nolock();
 | |
|   if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0, false))
 | |
|   {
 | |
|     my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|     res= true;
 | |
|   }
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
 | |
| {
 | |
|   int res= 0;
 | |
|   element *elem;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   if ((elem= (element *)my_hash_search(&hash,
 | |
|                                        (const uchar *)(>id->domain_id),
 | |
|                                        sizeof(gtid->domain_id))))
 | |
|   {
 | |
|     if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no)
 | |
|     {
 | |
|       my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id,
 | |
|                gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id,
 | |
|                elem->last_gtid->server_id, elem->last_gtid->seq_no);
 | |
|       res= 1;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       if (elem->seq_no_counter < gtid->seq_no)
 | |
|         elem->seq_no_counter= gtid->seq_no;
 | |
|       if (elem->update_element(gtid))
 | |
|         res= 1;
 | |
|     }
 | |
|   }
 | |
|   else if (alloc_element_nolock(gtid))
 | |
|   {
 | |
|     my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|     res= 1;
 | |
|   }
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Fill in a new GTID, allocating next sequence number, and update state
 | |
|   accordingly.
 | |
| */
 | |
| int
 | |
| rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
 | |
|                                         rpl_gtid *gtid)
 | |
| {
 | |
|   element *elem;
 | |
|   int res= 0;
 | |
| 
 | |
|   gtid->domain_id= domain_id;
 | |
|   gtid->server_id= server_id;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id),
 | |
|                                        sizeof(domain_id))))
 | |
|   {
 | |
|     gtid->seq_no= ++elem->seq_no_counter;
 | |
|     if (!elem->update_element(gtid))
 | |
|       goto end;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     gtid->seq_no= 1;
 | |
|     if (!alloc_element_nolock(gtid))
 | |
|       goto end;
 | |
|   }
 | |
| 
 | |
|   my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|   res= 1;
 | |
| end:
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| /* Helper functions for update. */
 | |
| int
 | |
| rpl_binlog_state::element::update_element(const rpl_gtid *gtid)
 | |
| {
 | |
|   rpl_gtid *lookup_gtid;
 | |
| 
 | |
|   /*
 | |
|     By far the most common case is that successive events within same
 | |
|     replication domain have the same server id (it changes only when
 | |
|     switching to a new master). So save a hash lookup in this case.
 | |
|   */
 | |
|   if (likely(last_gtid && last_gtid->server_id == gtid->server_id))
 | |
|   {
 | |
|     last_gtid->seq_no= gtid->seq_no;
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
|   lookup_gtid= (rpl_gtid *)
 | |
|     my_hash_search(&hash, (const uchar *)>id->server_id,
 | |
|                            sizeof(gtid->server_id));
 | |
|   if (lookup_gtid)
 | |
|   {
 | |
|     lookup_gtid->seq_no= gtid->seq_no;
 | |
|     last_gtid= lookup_gtid;
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
|   /* Allocate a new GTID and insert it. */
 | |
|   lookup_gtid= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*lookup_gtid),
 | |
|                                      MYF(MY_WME));
 | |
|   if (!lookup_gtid)
 | |
|     return 1;
 | |
|   memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
 | |
|   if (my_hash_insert(&hash, (const uchar *)lookup_gtid))
 | |
|   {
 | |
|     my_free(lookup_gtid);
 | |
|     return 1;
 | |
|   }
 | |
|   last_gtid= lookup_gtid;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Check that a new GTID can be logged without creating an out-of-order
 | |
|   sequence number with existing GTIDs.
 | |
| */
 | |
| bool
 | |
| rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
 | |
|                                         uint64 seq_no, bool no_error)
 | |
| {
 | |
|   element *elem;
 | |
|   bool res= 0;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   if ((elem= (element *)my_hash_search(&hash,
 | |
|                                        (const uchar *)(&domain_id),
 | |
|                                        sizeof(domain_id))) &&
 | |
|       elem->last_gtid && elem->last_gtid->seq_no >= seq_no)
 | |
|   {
 | |
|     if (!no_error)
 | |
|       my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no,
 | |
|                elem->last_gtid->domain_id, elem->last_gtid->server_id,
 | |
|                elem->last_gtid->seq_no);
 | |
|     res= 1;
 | |
|   }
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   When we see a new GTID that will not be binlogged (eg. slave thread
 | |
|   with --log-slave-updates=0), then we need to remember to allocate any
 | |
|   GTID seq_no of our own within that domain starting from there.
 | |
| 
 | |
|   Returns 0 if ok, non-zero if out-of-memory.
 | |
| */
 | |
| int
 | |
| rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
 | |
| {
 | |
|   element *elem;
 | |
|   int res;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id),
 | |
|                                        sizeof(domain_id))))
 | |
|   {
 | |
|     if (elem->seq_no_counter < seq_no)
 | |
|       elem->seq_no_counter= seq_no;
 | |
|     res= 0;
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   /* We need to allocate a new, empty element to remember the next seq_no. */
 | |
|   if (!(elem= (element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*elem),
 | |
|                                    MYF(MY_WME))))
 | |
|   {
 | |
|     res= 1;
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   elem->domain_id= domain_id;
 | |
|   my_hash_init(PSI_INSTRUMENT_ME, &elem->hash, &my_charset_bin, 32,
 | |
|                offsetof(rpl_gtid, server_id), sizeof(rpl_gtid::server_id),
 | |
|                NULL, my_free, HASH_UNIQUE);
 | |
|   elem->last_gtid= NULL;
 | |
|   elem->seq_no_counter= seq_no;
 | |
|   if (0 == my_hash_insert(&hash, (const uchar *)elem))
 | |
|   {
 | |
|     res= 0;
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   my_hash_free(&elem->hash);
 | |
|   my_free(elem);
 | |
|   res= 1;
 | |
| 
 | |
| end:
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Write binlog state to text file, so we can read it in again without having
 | |
|   to scan last binlog file (normal shutdown/startup, not crash recovery).
 | |
| 
 | |
|   The most recent GTID within each domain_id is written after any other GTID
 | |
|   within this domain.
 | |
| */
 | |
| int
 | |
| rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
 | |
| {
 | |
|   ulong i, j;
 | |
|   char buf[21];
 | |
|   int res= 0;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     element *e= (element *)my_hash_element(&hash, i);
 | |
|     if (!e->last_gtid)
 | |
|     {
 | |
|       DBUG_ASSERT(e->hash.records == 0);
 | |
|       continue;
 | |
|     }
 | |
|     for (j= 0; j <= e->hash.records; ++j)
 | |
|     {
 | |
|       const rpl_gtid *gtid;
 | |
|       if (j < e->hash.records)
 | |
|       {
 | |
|         gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
 | |
|         if (gtid == e->last_gtid)
 | |
|           continue;
 | |
|       }
 | |
|       else
 | |
|         gtid= e->last_gtid;
 | |
| 
 | |
|       longlong10_to_str(gtid->seq_no, buf, 10);
 | |
|       if (my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id,
 | |
|                       buf))
 | |
|       {
 | |
|         res= 1;
 | |
|         goto end;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
| end:
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| rpl_binlog_state::read_from_iocache(IO_CACHE *src)
 | |
| {
 | |
|   /* 10-digit - 10-digit - 20-digit \n \0 */
 | |
|   char buf[10+1+10+1+20+1+1];
 | |
|   const char *p, *end;
 | |
|   rpl_gtid gtid;
 | |
|   int res= 0;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   reset_nolock();
 | |
|   for (;;)
 | |
|   {
 | |
|     size_t len= my_b_gets(src, buf, sizeof(buf));
 | |
|     if (!len)
 | |
|       break;
 | |
|     p= buf;
 | |
|     end= buf + len;
 | |
|     if (gtid_parser_helper(&p, end, >id) ||
 | |
|         update_nolock(>id))
 | |
|     {
 | |
|       res= 1;
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| rpl_gtid *
 | |
| rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
 | |
| {
 | |
|   rpl_gtid *p;
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   p= find_nolock(domain_id, server_id);
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   return p;
 | |
| }
 | |
| 
 | |
| rpl_gtid *
 | |
| rpl_binlog_state::find_most_recent(uint32 domain_id)
 | |
| {
 | |
|   element *elem;
 | |
|   rpl_gtid *gtid= NULL;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id,
 | |
|                                   sizeof(domain_id));
 | |
|   if (elem && elem->last_gtid)
 | |
|     gtid= elem->last_gtid;
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
| 
 | |
|   return gtid;
 | |
| }
 | |
| 
 | |
| 
 | |
| uint32
 | |
| rpl_binlog_state::count()
 | |
| {
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   uint32 c= count_nolock();
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
| 
 | |
|   return c;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
 | |
| {
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   int res= get_gtid_list_nolock(gtid_list, list_size);
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Get a list of the most recently binlogged GTID, for each domain_id.
 | |
| 
 | |
|   This can be used when switching from being a master to being a slave,
 | |
|   to know where to start replicating from the new master.
 | |
| 
 | |
|   The returned list must be de-allocated with my_free().
 | |
| 
 | |
|   Returns 0 for ok, non-zero for out-of-memory.
 | |
| */
 | |
| int
 | |
| rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
 | |
| {
 | |
|   uint32 i;
 | |
|   uint32 alloc_size, out_size;
 | |
|   int res= 0;
 | |
| 
 | |
|   out_size= 0;
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   alloc_size= hash.records;
 | |
|   if (!(*list= (rpl_gtid *)my_malloc(PSI_INSTRUMENT_ME,
 | |
|                                      alloc_size * sizeof(rpl_gtid), MYF(MY_WME))))
 | |
|   {
 | |
|     res= 1;
 | |
|     goto end;
 | |
|   }
 | |
|   for (i= 0; i < alloc_size; ++i)
 | |
|   {
 | |
|     element *e= (element *)my_hash_element(&hash, i);
 | |
|     if (!e->last_gtid)
 | |
|       continue;
 | |
|     memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid));
 | |
|   }
 | |
| 
 | |
| end:
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   *size= out_size;
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| bool
 | |
| rpl_binlog_state::append_pos(String *str)
 | |
| {
 | |
|   uint32 i;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   reset_dynamic(>id_sort_array);
 | |
| 
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     element *e= (element *)my_hash_element(&hash, i);
 | |
|     if (e->last_gtid &&
 | |
|         insert_dynamic(>id_sort_array, (const void *) e->last_gtid))
 | |
|     {
 | |
|       mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|       return true;
 | |
|     }
 | |
|   }
 | |
|   rpl_slave_state_tostring_helper(>id_sort_array, str);
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
| 
 | |
|   return false;
 | |
| }
 | |
| 
 | |
| 
 | |
| bool
 | |
| rpl_binlog_state::append_state(String *str)
 | |
| {
 | |
|   uint32 i, j;
 | |
|   bool res= false;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
|   reset_dynamic(>id_sort_array);
 | |
| 
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     element *e= (element *)my_hash_element(&hash, i);
 | |
|     if (!e->last_gtid)
 | |
|     {
 | |
|       DBUG_ASSERT(e->hash.records==0);
 | |
|       continue;
 | |
|     }
 | |
|     for (j= 0; j <= e->hash.records; ++j)
 | |
|     {
 | |
|       const rpl_gtid *gtid;
 | |
|       if (j < e->hash.records)
 | |
|       {
 | |
|         gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
 | |
|         if (gtid == e->last_gtid)
 | |
|           continue;
 | |
|       }
 | |
|       else
 | |
|         gtid= e->last_gtid;
 | |
| 
 | |
|       if (insert_dynamic(>id_sort_array, (const void *) gtid))
 | |
|       {
 | |
|         res= true;
 | |
|         goto end;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   rpl_slave_state_tostring_helper(>id_sort_array, str);
 | |
| 
 | |
| end:
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| /**
 | |
|   Remove domains supplied by the first argument from binlog state.
 | |
|   Removal is done for any domain whose last gtids (from all its servers) match
 | |
|   ones in Gtid list event of the 2nd argument.
 | |
| 
 | |
|   @param  ids               gtid domain id sequence, may contain dups
 | |
|   @param  glev              pointer to Gtid list event describing
 | |
|                             the match condition
 | |
|   @param  errbuf [out]      pointer to possible error message array
 | |
| 
 | |
|   @retval NULL              as success when at least one domain is removed
 | |
|   @retval ""                empty string to indicate ineffective call
 | |
|                             when no domains removed
 | |
|   @retval NOT EMPTY string  otherwise an error message
 | |
| */
 | |
| const char*
 | |
| rpl_binlog_state::drop_domain(DYNAMIC_ARRAY *ids,
 | |
|                               Gtid_list_log_event *glev,
 | |
|                               char* errbuf)
 | |
| {
 | |
|   DYNAMIC_ARRAY domain_unique; // sequence (unsorted) of unique element*:s
 | |
|   rpl_binlog_state::element* domain_unique_buffer[16];
 | |
|   ulong k, l;
 | |
|   const char* errmsg= NULL;
 | |
| 
 | |
|   DBUG_ENTER("rpl_binlog_state::drop_domain");
 | |
| 
 | |
|   my_init_dynamic_array2(PSI_INSTRUMENT_ME, &domain_unique,
 | |
|                          sizeof(element*), domain_unique_buffer,
 | |
|                          sizeof(domain_unique_buffer) / sizeof(element*), 4, 0);
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_binlog_state);
 | |
| 
 | |
|   /*
 | |
|     Gtid list is supposed to come from a binlog's Gtid_list event and
 | |
|     therefore should be a subset of the current binlog state. That is
 | |
|     for every domain in the list the binlog state contains a gtid with
 | |
|     sequence number not less than that of the list.
 | |
|     Exceptions of this inclusion rule are:
 | |
|       A. the list may still refer to gtids from already deleted domains.
 | |
|          Files containing them must have been purged whereas the file
 | |
|          with the list is not yet.
 | |
|       B. out of order groups were injected
 | |
|       C. manually build list of binlog files violating the inclusion
 | |
|          constraint.
 | |
|     While A is a normal case (not necessarily distinguishable from C though),
 | |
|     B and C may require the user's attention so any (incl the A's suspected)
 | |
|     inconsistency is diagnosed and *warned*.
 | |
|   */
 | |
|   for (l= 0, errbuf[0]= 0; l < glev->count; l++, errbuf[0]= 0)
 | |
|   {
 | |
|     rpl_gtid* rb_state_gtid= find_nolock(glev->list[l].domain_id,
 | |
|                                          glev->list[l].server_id);
 | |
|     if (!rb_state_gtid)
 | |
|       sprintf(errbuf,
 | |
|               "missing gtids from the '%u-%u' domain-server pair which is "
 | |
|               "referred to in the gtid list describing an earlier state. Ignore "
 | |
|               "if the domain ('%u') was already explicitly deleted",
 | |
|               glev->list[l].domain_id, glev->list[l].server_id,
 | |
|               glev->list[l].domain_id);
 | |
|     else if (rb_state_gtid->seq_no < glev->list[l].seq_no)
 | |
|       sprintf(errbuf,
 | |
|               "having a gtid '%u-%u-%llu' which is less than "
 | |
|               "the '%u-%u-%llu' of the gtid list describing an earlier state. "
 | |
|               "The state may have been affected by manually injecting "
 | |
|               "a lower sequence number gtid or via replication",
 | |
|               rb_state_gtid->domain_id, rb_state_gtid->server_id,
 | |
|               rb_state_gtid->seq_no, glev->list[l].domain_id,
 | |
|               glev->list[l].server_id, glev->list[l].seq_no);
 | |
|     if (strlen(errbuf)) // use strlen() as cheap flag
 | |
|       push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
 | |
|                           ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
 | |
|                           "The current gtid binlog state is incompatible with "
 | |
|                           "a former one %s.", errbuf);
 | |
|   }
 | |
| 
 | |
|   /*
 | |
|     For each domain_id from ids
 | |
|       If the domain is already absent from the binlog state
 | |
|         Warn && continue
 | |
|       If any GTID with that domain in binlog state is missing from glev.list
 | |
|         Error out binlog state can't change
 | |
|   */
 | |
|   for (ulong i= 0; i < ids->elements; i++)
 | |
|   {
 | |
|     rpl_binlog_state::element *elem= NULL;
 | |
|     uint32 *ptr_domain_id;
 | |
|     bool all_found;
 | |
| 
 | |
|     ptr_domain_id= (uint32*) dynamic_array_ptr(ids, i);
 | |
|     elem= (rpl_binlog_state::element *)
 | |
|       my_hash_search(&hash, (const uchar *) ptr_domain_id,
 | |
|                      sizeof(ptr_domain_id[0]));
 | |
|     if (!elem)
 | |
|     {
 | |
|       push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
 | |
|                           ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
 | |
|                           "The gtid domain being deleted ('%lu') is not in "
 | |
|                           "the current binlog state", (unsigned long) *ptr_domain_id);
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     all_found= true;
 | |
|     for (k= 0; k < elem->hash.records && all_found; k++)
 | |
|     {
 | |
|       rpl_gtid *d_gtid= (rpl_gtid *)my_hash_element(&elem->hash, k);
 | |
|       bool match_found= false;
 | |
|       for (ulong l= 0; l < glev->count && !match_found; l++)
 | |
|         match_found= match_found || (*d_gtid == glev->list[l]);
 | |
|       if (!match_found)
 | |
|         all_found= false;
 | |
|     }
 | |
| 
 | |
|     if (!all_found)
 | |
|     {
 | |
|       sprintf(errbuf, "binlog files may contain gtids from the domain ('%u') "
 | |
|               "being deleted. Make sure to first purge those files",
 | |
|               *ptr_domain_id);
 | |
|       errmsg= errbuf;
 | |
|       goto end;
 | |
|     }
 | |
|     // compose a sequence of unique pointers to domain object
 | |
|     for (k= 0; k < domain_unique.elements; k++)
 | |
|     {
 | |
|       if ((rpl_binlog_state::element*) dynamic_array_ptr(&domain_unique, k)
 | |
|           == elem)
 | |
|         break; // domain_id's elem has been already in
 | |
|     }
 | |
|     if (k == domain_unique.elements) // proven not to have duplicates
 | |
|       insert_dynamic(&domain_unique, (uchar*) &elem);
 | |
|   }
 | |
| 
 | |
|   // Domain removal from binlog state
 | |
|   for (k= 0; k < domain_unique.elements; k++)
 | |
|   {
 | |
|     rpl_binlog_state::element *elem= *(rpl_binlog_state::element**)
 | |
|       dynamic_array_ptr(&domain_unique, k);
 | |
|     my_hash_free(&elem->hash);
 | |
|     my_hash_delete(&hash, (uchar*) elem);
 | |
|   }
 | |
| 
 | |
|   DBUG_ASSERT(strlen(errbuf) == 0);
 | |
| 
 | |
|   if (domain_unique.elements == 0)
 | |
|     errmsg= "";
 | |
| 
 | |
| end:
 | |
|   mysql_mutex_unlock(&LOCK_binlog_state);
 | |
|   delete_dynamic(&domain_unique);
 | |
| 
 | |
|   DBUG_RETURN(errmsg);
 | |
| }
 | |
| 
 | |
| slave_connection_state::slave_connection_state()
 | |
| {
 | |
|   my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
 | |
|                offsetof(entry, gtid) + offsetof(rpl_gtid, domain_id),
 | |
|                sizeof(rpl_gtid::domain_id), NULL, my_free, HASH_UNIQUE);
 | |
|   my_init_dynamic_array(PSI_INSTRUMENT_ME, >id_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
 | |
| }
 | |
| 
 | |
| 
 | |
| slave_connection_state::~slave_connection_state()
 | |
| {
 | |
|   my_hash_free(&hash);
 | |
|   delete_dynamic(>id_sort_array);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Create a hash from the slave GTID state that is sent to master when slave
 | |
|   connects to start replication.
 | |
| 
 | |
|   The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
 | |
| 
 | |
|      0-2-112,1-4-1022
 | |
| 
 | |
|   The state gives for each domain_id the GTID to start replication from for
 | |
|   the corresponding replication stream. So domain_id must be unique.
 | |
| 
 | |
|   Returns 0 if ok, non-zero if error due to malformed input.
 | |
| 
 | |
|   Note that input string is built by slave server, so it will not be incorrect
 | |
|   unless bug/corruption/malicious server. So we just need basic sanity check,
 | |
|   not fancy user-friendly error message.
 | |
| */
 | |
| 
 | |
| int
 | |
| slave_connection_state::load(const char *slave_request, size_t len)
 | |
| {
 | |
|   const char *p, *end;
 | |
|   uchar *rec;
 | |
|   rpl_gtid *gtid;
 | |
|   const entry *e;
 | |
| 
 | |
|   reset();
 | |
|   p= slave_request;
 | |
|   end= slave_request + len;
 | |
|   if (p == end)
 | |
|     return 0;
 | |
|   for (;;)
 | |
|   {
 | |
|     if (!(rec= (uchar *)my_malloc(PSI_INSTRUMENT_ME, sizeof(entry), MYF(MY_WME))))
 | |
|       return 1;
 | |
|     gtid= &((entry *)rec)->gtid;
 | |
|     if (gtid_parser_helper(&p, end, gtid))
 | |
|     {
 | |
|       my_free(rec);
 | |
|       my_error(ER_INCORRECT_GTID_STATE, MYF(0));
 | |
|       return 1;
 | |
|     }
 | |
|     if ((e= (const entry *)
 | |
|          my_hash_search(&hash, (const uchar *)(>id->domain_id),
 | |
|                         sizeof(gtid->domain_id))))
 | |
|     {
 | |
|       my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id,
 | |
|                gtid->server_id, (ulonglong)gtid->seq_no, e->gtid.domain_id,
 | |
|                e->gtid.server_id, (ulonglong)e->gtid.seq_no, gtid->domain_id);
 | |
|       my_free(rec);
 | |
|       return 1;
 | |
|     }
 | |
|     ((entry *)rec)->flags= 0;
 | |
|     if (my_hash_insert(&hash, rec))
 | |
|     {
 | |
|       my_free(rec);
 | |
|       my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|       return 1;
 | |
|     }
 | |
|     if (p == end)
 | |
|       break;                                         /* Finished. */
 | |
|     if (*p != ',')
 | |
|     {
 | |
|       my_error(ER_INCORRECT_GTID_STATE, MYF(0));
 | |
|       return 1;
 | |
|     }
 | |
|     ++p;
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
 | |
| {
 | |
|   uint32 i;
 | |
| 
 | |
|   reset();
 | |
|   for (i= 0; i < count; ++i)
 | |
|     if (update(>id_list[i]))
 | |
|       return 1;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| static int
 | |
| slave_connection_state_load_cb(rpl_gtid *gtid, void *data)
 | |
| {
 | |
|   slave_connection_state *state= (slave_connection_state *)data;
 | |
|   return state->update(gtid);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Same as rpl_slave_state::tostring(), but populates a slave_connection_state
 | |
|   instead.
 | |
| */
 | |
| int
 | |
| slave_connection_state::load(rpl_slave_state *state,
 | |
|                              rpl_gtid *extra_gtids, uint32 num_extra)
 | |
| {
 | |
|   reset();
 | |
|   return state->iterate(slave_connection_state_load_cb, this,
 | |
|                         extra_gtids, num_extra, false);
 | |
| }
 | |
| 
 | |
| 
 | |
| slave_connection_state::entry *
 | |
| slave_connection_state::find_entry(uint32 domain_id)
 | |
| {
 | |
|   return (entry *) my_hash_search(&hash, (const uchar *)(&domain_id),
 | |
|                                   sizeof(domain_id));
 | |
| }
 | |
| 
 | |
| 
 | |
| rpl_gtid *
 | |
| slave_connection_state::find(uint32 domain_id)
 | |
| {
 | |
|   entry *e= find_entry(domain_id);
 | |
|   if (!e)
 | |
|     return NULL;
 | |
|   return &e->gtid;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| slave_connection_state::update(const rpl_gtid *in_gtid)
 | |
| {
 | |
|   entry *e;
 | |
|   uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id),
 | |
|                              sizeof(in_gtid->domain_id));
 | |
|   if (rec)
 | |
|   {
 | |
|     e= (entry *)rec;
 | |
|     e->gtid= *in_gtid;
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
|   if (!(e= (entry *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*e), MYF(MY_WME))))
 | |
|     return 1;
 | |
|   e->gtid= *in_gtid;
 | |
|   e->flags= 0;
 | |
|   if (my_hash_insert(&hash, (uchar *)e))
 | |
|   {
 | |
|     my_free(e);
 | |
|     return 1;
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| slave_connection_state::remove(const rpl_gtid *in_gtid)
 | |
| {
 | |
|   uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id),
 | |
|                              sizeof(in_gtid->domain_id));
 | |
| #ifdef DBUG_ASSERT_EXISTS
 | |
|   bool err;
 | |
|   rpl_gtid *slave_gtid= &((entry *)rec)->gtid;
 | |
|   DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
 | |
|   DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
 | |
|   DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
 | |
|   err= 
 | |
| #endif
 | |
|     my_hash_delete(&hash, rec);
 | |
|   DBUG_ASSERT(!err);
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| slave_connection_state::remove_if_present(const rpl_gtid *in_gtid)
 | |
| {
 | |
|   uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id),
 | |
|                              sizeof(in_gtid->domain_id));
 | |
|   if (rec)
 | |
|     my_hash_delete(&hash, rec);
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| slave_connection_state::to_string(String *out_str)
 | |
| {
 | |
|   out_str->length(0);
 | |
|   return append_to_string(out_str);
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| slave_connection_state::append_to_string(String *out_str)
 | |
| {
 | |
|   uint32 i;
 | |
|   bool first;
 | |
| 
 | |
|   first= true;
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     const entry *e= (const entry *)my_hash_element(&hash, i);
 | |
|     if (rpl_slave_state_tostring_helper(out_str, &e->gtid, &first))
 | |
|       return 1;
 | |
|   }
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
 | |
| {
 | |
|   uint32 i, pos;
 | |
| 
 | |
|   pos= 0;
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     entry *e;
 | |
|     if (pos >= list_size)
 | |
|       return 1;
 | |
|     e= (entry *)my_hash_element(&hash, i);
 | |
|     memcpy(>id_list[pos++], &e->gtid, sizeof(e->gtid));
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Check if the GTID position has been reached, for mysql_binlog_send().
 | |
| 
 | |
|   The position has not been reached if we have anything in the state, unless
 | |
|   it has either the START_ON_EMPTY_DOMAIN flag set (which means it does not
 | |
|   belong to this master at all), or the START_OWN_SLAVE_POS (which means that
 | |
|   we start on an old position from when the server was a slave with
 | |
|   --log-slave-updates=0).
 | |
| */
 | |
| bool
 | |
| slave_connection_state::is_pos_reached()
 | |
| {
 | |
|   uint32 i;
 | |
| 
 | |
|   for (i= 0; i < hash.records; ++i)
 | |
|   {
 | |
|     entry *e= (entry *)my_hash_element(&hash, i);
 | |
|     if (!(e->flags & (START_OWN_SLAVE_POS|START_ON_EMPTY_DOMAIN)))
 | |
|       return false;
 | |
|   }
 | |
| 
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Execute a MASTER_GTID_WAIT().
 | |
|   The position to wait for is in gtid_str in string form.
 | |
|   The timeout in microseconds is in timeout_us, zero means no timeout.
 | |
| 
 | |
|   Returns:
 | |
|    1 for error.
 | |
|    0 for wait completed.
 | |
|   -1 for wait timed out.
 | |
| */
 | |
| int
 | |
| gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
 | |
| {
 | |
|   int err;
 | |
|   rpl_gtid *wait_pos;
 | |
|   uint32 count, i;
 | |
|   struct timespec wait_until, *wait_until_ptr;
 | |
|   ulonglong before;
 | |
| 
 | |
|   /* Wait for the empty position returns immediately. */
 | |
|   if (gtid_str->length() == 0)
 | |
|   {
 | |
|     status_var_increment(thd->status_var.master_gtid_wait_count);
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
|   if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
 | |
|                                             &count)))
 | |
|   {
 | |
|     my_error(ER_INCORRECT_GTID_STATE, MYF(0));
 | |
|     return 1;
 | |
|   }
 | |
|   status_var_increment(thd->status_var.master_gtid_wait_count);
 | |
|   before= microsecond_interval_timer();
 | |
| 
 | |
|   if (timeout_us >= 0)
 | |
|   {
 | |
|     set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
 | |
|     wait_until_ptr= &wait_until;
 | |
|   }
 | |
|   else
 | |
|     wait_until_ptr= NULL;
 | |
|   err= 0;
 | |
|   for (i= 0; i < count; ++i)
 | |
|   {
 | |
|     if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
 | |
|       break;
 | |
|   }
 | |
|   switch (err)
 | |
|   {
 | |
|     case -1:
 | |
|       status_var_increment(thd->status_var.master_gtid_wait_timeouts);
 | |
|       /* fall through */
 | |
|     case 0:
 | |
|       status_var_add(thd->status_var.master_gtid_wait_time,
 | |
|                      static_cast<ulong>
 | |
|                      (microsecond_interval_timer() - before));
 | |
|   }
 | |
|   my_free(wait_pos);
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
 | |
| {
 | |
|   queue_element *qe;
 | |
| 
 | |
|   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
 | |
|   if (queue_empty(&he->queue))
 | |
|     return;
 | |
|   qe= (queue_element *)queue_top(&he->queue);
 | |
|   qe->do_small_wait= true;
 | |
|   mysql_cond_signal(&qe->thd->COND_wakeup_ready);
 | |
| }
 | |
| 
 | |
| void
 | |
| gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
 | |
|                                 gtid_waiting::hash_element *he)
 | |
| {
 | |
|   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
 | |
| 
 | |
|   for (;;)
 | |
|   {
 | |
|     queue_element *qe;
 | |
| 
 | |
|     if (queue_empty(&he->queue))
 | |
|       break;
 | |
|     qe= (queue_element *)queue_top(&he->queue);
 | |
|     if (qe->wait_seq_no > wakeup_seq_no)
 | |
|       break;
 | |
|     DBUG_ASSERT(!qe->done);
 | |
|     queue_remove_top(&he->queue);
 | |
|     qe->done= true;;
 | |
|     mysql_cond_signal(&qe->thd->COND_wakeup_ready);
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Execute a MASTER_GTID_WAIT() for one specific domain.
 | |
| 
 | |
|   The implementation is optimised primarily for (1) minimal performance impact
 | |
|   on the slave replication threads, and secondarily for (2) quick performance
 | |
|   of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
 | |
|   read to clients in an async replication read-scaleout scenario.
 | |
| 
 | |
|   To achieve (1), we have a "small" wait and a "large" wait. The small wait
 | |
|   contends with the replication threads on the lock on the gtid_slave_pos, so
 | |
|   only minimal processing is done under that lock, and only a single waiter at
 | |
|   a time does the small wait.
 | |
| 
 | |
|   If there is already a small waiter, a new thread will either replace the
 | |
|   small waiter (if it needs to wait for an earlier sequence number), or
 | |
|   instead do a "large" wait.
 | |
| 
 | |
|   Once awoken on the small wait, the waiting thread releases the lock shared
 | |
|   with the SQL threads quickly, and then processes all waiters currently doing
 | |
|   the large wait using a different lock that does not impact replication.
 | |
| 
 | |
|   This way, the SQL threads only need to do a single check + possibly a
 | |
|   pthread_cond_signal() when updating the gtid_slave_state, and the time that
 | |
|   non-SQL threads contend for the lock on gtid_slave_state is minimized.
 | |
| 
 | |
|   There is always at least one thread that has the responsibility to ensure
 | |
|   that there is a small waiter; this thread has queue_element::do_small_wait
 | |
|   set to true. This thread will do the small wait until it is done, at which
 | |
|   point it will make sure to pass on the responsibility to another thread.
 | |
|   Normally only one thread has do_small_wait==true, but it can occasionally
 | |
|   happen that there is more than one, when threads race one another for the
 | |
|   lock on the small wait (this results in slightly increased activity on the
 | |
|   small lock but is otherwise harmless).
 | |
| 
 | |
|   Returns:
 | |
|      0  Wait completed normally
 | |
|     -1  Wait completed due to timeout
 | |
|      1  An error (my_error() will have been called to set the error in the da)
 | |
| */
 | |
| int
 | |
| gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
 | |
|                             struct timespec *wait_until)
 | |
| {
 | |
|   bool timed_out= false;
 | |
| #ifdef HAVE_REPLICATION
 | |
|   queue_element elem;
 | |
|   uint32 domain_id= wait_gtid->domain_id;
 | |
|   uint64 seq_no= wait_gtid->seq_no;
 | |
|   hash_element *he;
 | |
|   rpl_slave_state::element *slave_state_elem= NULL;
 | |
|   PSI_stage_info old_stage;
 | |
|   bool did_enter_cond= false;
 | |
| 
 | |
|   elem.wait_seq_no= seq_no;
 | |
|   elem.thd= thd;
 | |
|   elem.done= false;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_gtid_waiting);
 | |
|   if (!(he= get_entry(wait_gtid->domain_id)))
 | |
|   {
 | |
|     mysql_mutex_unlock(&LOCK_gtid_waiting);
 | |
|     return 1;
 | |
|   }
 | |
|   /*
 | |
|     If there is already another waiter with seq_no no larger than our own,
 | |
|     we are sure that there is already a small waiter that will wake us up
 | |
|     (or later pass the small wait responsibility to us). So in this case, we
 | |
|     do not need to touch the small wait lock at all.
 | |
|   */
 | |
|   elem.do_small_wait=
 | |
|     (queue_empty(&he->queue) ||
 | |
|      ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no);
 | |
| 
 | |
|   if (register_in_wait_queue(thd, wait_gtid, he, &elem))
 | |
|   {
 | |
|     mysql_mutex_unlock(&LOCK_gtid_waiting);
 | |
|     return 1;
 | |
|   }
 | |
|   /*
 | |
|     Loop, doing either the small or large wait as appropriate, until either
 | |
|     the position waited for is reached, or we get a kill or timeout.
 | |
|   */
 | |
|   for (;;)
 | |
|   {
 | |
|     mysql_mutex_assert_owner(&LOCK_gtid_waiting);
 | |
| 
 | |
|     if (elem.do_small_wait)
 | |
|     {
 | |
|       uint64 wakeup_seq_no;
 | |
|       queue_element *cur_waiter;
 | |
| 
 | |
|       mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|       /*
 | |
|         The elements in the gtid_slave_state_hash are never re-allocated once
 | |
|         they enter the hash, so we do not need to re-do the lookup after releasing
 | |
|         and re-aquiring the lock.
 | |
|       */
 | |
|       if (!slave_state_elem &&
 | |
|           !(slave_state_elem= rpl_global_gtid_slave_state->get_element(domain_id)))
 | |
|       {
 | |
|         mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|         remove_from_wait_queue(he, &elem);
 | |
|         promote_new_waiter(he);
 | |
|         if (did_enter_cond)
 | |
|           thd->EXIT_COND(&old_stage);
 | |
|         else
 | |
|           mysql_mutex_unlock(&LOCK_gtid_waiting);
 | |
|         my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|         return 1;
 | |
|       }
 | |
| 
 | |
|       if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
 | |
|       {
 | |
|         /*
 | |
|           We do not have to wait. (We will be removed from the wait queue when
 | |
|           we call process_wait_hash() below.
 | |
|         */
 | |
|         mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|       }
 | |
|       else if ((cur_waiter= slave_state_elem->gtid_waiter) &&
 | |
|                slave_state_elem->min_wait_seq_no <= seq_no)
 | |
|       {
 | |
|         /*
 | |
|           There is already a suitable small waiter, go do the large wait.
 | |
|           (Normally we would not have needed to check the small wait in this
 | |
|           case, but it can happen if we race with another thread for the small
 | |
|           lock).
 | |
|         */
 | |
|         elem.do_small_wait= false;
 | |
|         mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         /*
 | |
|           We have to do the small wait ourselves (stealing it from any thread
 | |
|           that might already be waiting for a later seq_no).
 | |
|         */
 | |
|         slave_state_elem->gtid_waiter= &elem;
 | |
|         slave_state_elem->min_wait_seq_no= seq_no;
 | |
|         if (cur_waiter)
 | |
|         {
 | |
|           /* We stole the wait, so wake up the old waiting thread. */
 | |
|           mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
 | |
|         }
 | |
| 
 | |
|         /* Release the large lock, and do the small wait. */
 | |
|         if (did_enter_cond)
 | |
|         {
 | |
|           thd->EXIT_COND(&old_stage);
 | |
|           did_enter_cond= false;
 | |
|         }
 | |
|         else
 | |
|           mysql_mutex_unlock(&LOCK_gtid_waiting);
 | |
|         thd->ENTER_COND(&slave_state_elem->COND_wait_gtid,
 | |
|                         &rpl_global_gtid_slave_state->LOCK_slave_state,
 | |
|                         &stage_master_gtid_wait_primary, &old_stage);
 | |
|         do
 | |
|         {
 | |
|           if (unlikely(thd->check_killed(1)))
 | |
|             break;
 | |
|           else if (wait_until)
 | |
|           {
 | |
|             int err=
 | |
|               mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
 | |
|                                    &rpl_global_gtid_slave_state->LOCK_slave_state,
 | |
|                                    wait_until);
 | |
|             if (err == ETIMEDOUT || err == ETIME)
 | |
|             {
 | |
|               timed_out= true;
 | |
|               break;
 | |
|             }
 | |
|           }
 | |
|           else
 | |
|             mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
 | |
|                             &rpl_global_gtid_slave_state->LOCK_slave_state);
 | |
|         } while (slave_state_elem->gtid_waiter == &elem);
 | |
|         wakeup_seq_no= slave_state_elem->highest_seq_no;
 | |
|         /*
 | |
|           If we aborted due to timeout or kill, remove us as waiter.
 | |
| 
 | |
|           If we were replaced by another waiter with a smaller seq_no, then we
 | |
|           no longer have responsibility for the small wait.
 | |
|         */
 | |
|         if ((cur_waiter= slave_state_elem->gtid_waiter))
 | |
|         {
 | |
|           if (cur_waiter == &elem)
 | |
|             slave_state_elem->gtid_waiter= NULL;
 | |
|           else if (slave_state_elem->min_wait_seq_no <= seq_no)
 | |
|             elem.do_small_wait= false;
 | |
|         }
 | |
|         thd->EXIT_COND(&old_stage);
 | |
| 
 | |
|         mysql_mutex_lock(&LOCK_gtid_waiting);
 | |
|       }
 | |
| 
 | |
|       /*
 | |
|         Note that hash_entry pointers do not change once allocated, so we do
 | |
|         not need to lookup `he' again after re-aquiring LOCK_gtid_waiting.
 | |
|       */
 | |
|       process_wait_hash(wakeup_seq_no, he);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       /* Do the large wait. */
 | |
|       if (!did_enter_cond)
 | |
|       {
 | |
|         thd->ENTER_COND(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
 | |
|                         &stage_master_gtid_wait, &old_stage);
 | |
|         did_enter_cond= true;
 | |
|       }
 | |
|       while (!elem.done && likely(!thd->check_killed(1)))
 | |
|       {
 | |
|         thd_wait_begin(thd, THD_WAIT_BINLOG);
 | |
|         if (wait_until)
 | |
|         {
 | |
|           int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
 | |
|                                         &LOCK_gtid_waiting, wait_until);
 | |
|           if (err == ETIMEDOUT || err == ETIME)
 | |
|             timed_out= true;
 | |
|         }
 | |
|         else
 | |
|           mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
 | |
|         thd_wait_end(thd);
 | |
|         if (elem.do_small_wait || timed_out)
 | |
|           break;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if ((thd->killed || timed_out) && !elem.done)
 | |
|     {
 | |
|       /* Aborted, so remove ourselves from the hash. */
 | |
|       remove_from_wait_queue(he, &elem);
 | |
|       elem.done= true;
 | |
|     }
 | |
|     if (elem.done)
 | |
|     {
 | |
|       /*
 | |
|         If our wait is done, but we have (or were passed) responsibility for
 | |
|         the small wait, then we need to pass on that task to someone else.
 | |
|       */
 | |
|       if (elem.do_small_wait)
 | |
|         promote_new_waiter(he);
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (did_enter_cond)
 | |
|     thd->EXIT_COND(&old_stage);
 | |
|   else
 | |
|     mysql_mutex_unlock(&LOCK_gtid_waiting);
 | |
|   if (thd->killed)
 | |
|     thd->send_kill_message();
 | |
| #endif  /* HAVE_REPLICATION */
 | |
|   return timed_out ? -1 : 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| static void
 | |
| free_hash_element(void *p)
 | |
| {
 | |
|   gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
 | |
|   delete_queue(&e->queue);
 | |
|   my_free(e);
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| gtid_waiting::init()
 | |
| {
 | |
|   my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
 | |
|                offsetof(hash_element, domain_id),
 | |
|                sizeof(hash_element::domain_id), NULL,
 | |
|                free_hash_element, HASH_UNIQUE);
 | |
|   mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| gtid_waiting::destroy()
 | |
| {
 | |
|   mysql_mutex_destroy(&LOCK_gtid_waiting);
 | |
|   my_hash_free(&hash);
 | |
| }
 | |
| 
 | |
| 
 | |
| static int
 | |
| cmp_queue_elem(void *, const void *a, const void *b)
 | |
| {
 | |
|   auto seq_no_a= *(static_cast<const uint64 *>(a));
 | |
|   auto seq_no_b= *(static_cast<const uint64 *>(b));
 | |
|   if (seq_no_a < seq_no_b)
 | |
|     return -1;
 | |
|   else if (seq_no_a == seq_no_b)
 | |
|     return 0;
 | |
|   else
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| 
 | |
| gtid_waiting::hash_element *
 | |
| gtid_waiting::get_entry(uint32 domain_id)
 | |
| {
 | |
|   hash_element *e;
 | |
| 
 | |
|   if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id,
 | |
|                                          sizeof(domain_id))))
 | |
|     return e;
 | |
| 
 | |
|   if (!(e= (hash_element *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*e), MYF(MY_WME))))
 | |
|     return NULL;
 | |
| 
 | |
|   if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
 | |
|                  cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
 | |
|   {
 | |
|     my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|     my_free(e);
 | |
|     return NULL;
 | |
|   }
 | |
|   e->domain_id= domain_id;
 | |
|   if (my_hash_insert(&hash, (uchar *)e))
 | |
|   {
 | |
|     my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|     delete_queue(&e->queue);
 | |
|     my_free(e);
 | |
|     return NULL;
 | |
|   }
 | |
|   return e;
 | |
| }
 | |
| 
 | |
| 
 | |
| int
 | |
| gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid,
 | |
|                                      gtid_waiting::hash_element *he,
 | |
|                                      gtid_waiting::queue_element *elem)
 | |
| {
 | |
|   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
 | |
| 
 | |
|   if (queue_insert_safe(&he->queue, (uchar *)elem))
 | |
|   {
 | |
|     my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|     return 1;
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| void
 | |
| gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
 | |
|                                      gtid_waiting::queue_element *elem)
 | |
| {
 | |
|   mysql_mutex_assert_owner(&LOCK_gtid_waiting);
 | |
| 
 | |
|   queue_remove(&he->queue, elem->queue_idx);
 | |
| }
 | |
| 
 | |
| #endif
 | |
| 
 | |
| void free_domain_lookup_element(void *p)
 | |
| {
 | |
|   struct Binlog_gtid_state_validator::audit_elem *audit_elem=
 | |
|       (struct Binlog_gtid_state_validator::audit_elem *) p;
 | |
|   delete_dynamic(&audit_elem->late_gtids_previous);
 | |
|   delete_dynamic(&audit_elem->late_gtids_real);
 | |
|   my_free(audit_elem);
 | |
| }
 | |
| 
 | |
| Binlog_gtid_state_validator::Binlog_gtid_state_validator()
 | |
| {
 | |
|   my_hash_init(PSI_INSTRUMENT_ME, &m_audit_elem_domain_lookup, &my_charset_bin, 32,
 | |
|                offsetof(struct audit_elem, domain_id), sizeof(uint32),
 | |
|                NULL, free_domain_lookup_element, HASH_UNIQUE);
 | |
| }
 | |
| 
 | |
| Binlog_gtid_state_validator::~Binlog_gtid_state_validator()
 | |
| {
 | |
|   my_hash_free(&m_audit_elem_domain_lookup);
 | |
| }
 | |
| 
 | |
| void Binlog_gtid_state_validator::initialize_start_gtids(rpl_gtid *start_gtids,
 | |
|                                                          size_t n_gtids)
 | |
| {
 | |
|   size_t i;
 | |
|   for(i= 0; i < n_gtids; i++)
 | |
|   {
 | |
|     rpl_gtid *domain_state_gtid= &start_gtids[i];
 | |
| 
 | |
|     /*
 | |
|       If we are initializing from a GLLE, we can have repeat domain ids from
 | |
|       differing servers, so we want to ensure our start gtid matches the last
 | |
|       known position
 | |
|     */
 | |
|     struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search(
 | |
|         &m_audit_elem_domain_lookup,
 | |
|         (const uchar *) &(domain_state_gtid->domain_id), 0);
 | |
|     if (audit_elem)
 | |
|     {
 | |
|       /*
 | |
|         We have this domain already specified, so try to overwrite with the
 | |
|         more recent GTID
 | |
|       */
 | |
|       if (domain_state_gtid->seq_no > audit_elem->start_gtid.seq_no)
 | |
|         audit_elem->start_gtid = *domain_state_gtid;
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     /* Initialize a new domain */
 | |
|     audit_elem= (struct audit_elem *) my_malloc(
 | |
|         PSI_NOT_INSTRUMENTED, sizeof(struct audit_elem), MYF(MY_WME));
 | |
|     if (!audit_elem)
 | |
|     {
 | |
|       my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|       return;
 | |
|     }
 | |
| 
 | |
|     audit_elem->domain_id= start_gtids[i].domain_id;
 | |
|     audit_elem->start_gtid= start_gtids[i];
 | |
|     audit_elem->last_gtid= {audit_elem->domain_id, 0, 0};
 | |
| 
 | |
|     my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_real,
 | |
|                           sizeof(rpl_gtid), 8, 8, MYF(0));
 | |
|     my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_previous,
 | |
|                           sizeof(rpl_gtid), 8, 8, MYF(0));
 | |
| 
 | |
|     if (my_hash_insert(&m_audit_elem_domain_lookup, (uchar *) audit_elem))
 | |
|     {
 | |
|       my_free(audit_elem);
 | |
|       my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|       return;
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| my_bool Binlog_gtid_state_validator::initialize_gtid_state(FILE *out,
 | |
|                                                            rpl_gtid *gtids,
 | |
|                                                            size_t n_gtids)
 | |
| {
 | |
|   size_t i;
 | |
|   my_bool err= FALSE;
 | |
| 
 | |
|   /*
 | |
|     We weren't initialized with starting positions explicitly, so assume the
 | |
|     starting positions of the current gtid state
 | |
|   */
 | |
|   if (!m_audit_elem_domain_lookup.records)
 | |
|     initialize_start_gtids(gtids, n_gtids);
 | |
| 
 | |
|   for(i= 0; i < n_gtids; i++)
 | |
|   {
 | |
|     rpl_gtid *domain_state_gtid= >ids[i];
 | |
| 
 | |
|     struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search(
 | |
|         &m_audit_elem_domain_lookup,
 | |
|         (const uchar *) &(domain_state_gtid->domain_id), 0);
 | |
| 
 | |
|     if (!audit_elem)
 | |
|     {
 | |
|       Binlog_gtid_state_validator::error(
 | |
|           out,
 | |
|           "Starting GTID position list does not specify an initial value "
 | |
|           "for domain %u, whose events may be present in the requested binlog "
 | |
|           "file(s). The last known position for this domain was %u-%u-%llu.",
 | |
|           domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid)));
 | |
|       err= TRUE;
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     if (audit_elem->start_gtid.seq_no < domain_state_gtid->seq_no)
 | |
|     {
 | |
|       Binlog_gtid_state_validator::error(
 | |
|           out,
 | |
|           "Binary logs are missing data for domain %u. Expected data to "
 | |
|           "start from state %u-%u-%llu; however, the initial GTID state of "
 | |
|           "the logs was %u-%u-%llu.",
 | |
|           domain_state_gtid->domain_id, PARAM_GTID(audit_elem->start_gtid),
 | |
|           PARAM_GTID((*domain_state_gtid)));
 | |
|       err= TRUE;
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     if (domain_state_gtid->seq_no > audit_elem->last_gtid.seq_no)
 | |
|       audit_elem->last_gtid= *domain_state_gtid;
 | |
|   }
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| my_bool Binlog_gtid_state_validator::verify_stop_state(FILE *out,
 | |
|                                                        rpl_gtid *stop_gtids,
 | |
|                                                        size_t n_stop_gtids)
 | |
| {
 | |
|   size_t i;
 | |
|   for(i= 0; i < n_stop_gtids; i++)
 | |
|   {
 | |
|     rpl_gtid *stop_gtid= &stop_gtids[i];
 | |
| 
 | |
|     struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search(
 | |
|         &m_audit_elem_domain_lookup,
 | |
|         (const uchar *) &(stop_gtid->domain_id), 0);
 | |
| 
 | |
|     /*
 | |
|       It is okay if stop gtid doesn't exist in current state because it will be treated
 | |
|       as a new domain
 | |
|     */
 | |
|     if (audit_elem && stop_gtid->seq_no <= audit_elem->start_gtid.seq_no)
 | |
|     {
 | |
|       Binlog_gtid_state_validator::error(
 | |
|           out,
 | |
|           "--stop-position GTID %u-%u-%llu does not exist in the "
 | |
|           "specified binlog files. The current GTID state of domain %u in the "
 | |
|           "specified binary logs is %u-%u-%llu",
 | |
|           PARAM_GTID((*stop_gtid)), stop_gtid->domain_id,
 | |
|           PARAM_GTID(audit_elem->start_gtid));
 | |
|       return TRUE;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /* No issues with any GTIDs */
 | |
|   return FALSE;
 | |
| }
 | |
| 
 | |
| my_bool
 | |
| Binlog_gtid_state_validator::verify_gtid_state(FILE *out,
 | |
|                                                rpl_gtid *domain_state_gtid)
 | |
| {
 | |
|   struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search(
 | |
|       &m_audit_elem_domain_lookup,
 | |
|       (const uchar *) &(domain_state_gtid->domain_id), 0);
 | |
| 
 | |
|   if (!audit_elem)
 | |
|   {
 | |
|     Binlog_gtid_state_validator::warn(
 | |
|         out,
 | |
|         "Binary logs are missing data for domain %u. The current binary log "
 | |
|         "specified its "
 | |
|         "current state for this domain as %u-%u-%llu, but neither the "
 | |
|         "starting GTID position list nor any processed events have "
 | |
|         "mentioned "
 | |
|         "this domain.",
 | |
|         domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid)));
 | |
|     return TRUE;
 | |
|   }
 | |
| 
 | |
|   if (audit_elem->last_gtid.seq_no < domain_state_gtid->seq_no)
 | |
|   {
 | |
|     Binlog_gtid_state_validator::warn(
 | |
|         out,
 | |
|         "Binary logs are missing data for domain %u. The current binary log "
 | |
|         "state is %u-%u-%llu, but the last seen event was %u-%u-%llu.",
 | |
|         domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid)),
 | |
|         PARAM_GTID(audit_elem->last_gtid));
 | |
|     return TRUE;
 | |
|   }
 | |
| 
 | |
|   return FALSE;
 | |
| }
 | |
| 
 | |
| my_bool Binlog_gtid_state_validator::record(rpl_gtid *gtid)
 | |
| {
 | |
|   struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search(
 | |
|       &m_audit_elem_domain_lookup, (const uchar *) &(gtid->domain_id), 0);
 | |
| 
 | |
|   if (!audit_elem)
 | |
|   {
 | |
|     /*
 | |
|       We haven't seen any GTIDs in this domian yet. Perform initial set up for
 | |
|       this domain so we can monitor its events.
 | |
|     */
 | |
|     audit_elem= (struct audit_elem *) my_malloc(
 | |
|         PSI_NOT_INSTRUMENTED, sizeof(struct audit_elem), MYF(MY_WME));
 | |
|     if (!audit_elem)
 | |
|     {
 | |
|       my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|       return TRUE;
 | |
|     }
 | |
| 
 | |
|     audit_elem->domain_id= gtid->domain_id;
 | |
|     audit_elem->last_gtid= *gtid;
 | |
|     audit_elem->start_gtid= {gtid->domain_id, 0, 0};
 | |
| 
 | |
|     my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_real,
 | |
|                           sizeof(rpl_gtid), 8, 8, MYF(0));
 | |
|     my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_previous,
 | |
|                           sizeof(rpl_gtid), 8, 8, MYF(0));
 | |
| 
 | |
|     if (my_hash_insert(&m_audit_elem_domain_lookup, (uchar *) audit_elem))
 | |
|     {
 | |
|       my_free(audit_elem);
 | |
|       my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|       return TRUE;
 | |
|     }
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     /* Out of order check */
 | |
|     if (gtid->seq_no <= audit_elem->last_gtid.seq_no &&
 | |
|         gtid->seq_no >= audit_elem->start_gtid.seq_no)
 | |
|     {
 | |
|       /* GTID is out of order */
 | |
|       insert_dynamic(&audit_elem->late_gtids_real, (const void *) gtid);
 | |
|       insert_dynamic(&audit_elem->late_gtids_previous,
 | |
|                      (const void *) &(audit_elem->last_gtid));
 | |
| 
 | |
|       return TRUE;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       /* GTID is valid */
 | |
|       audit_elem->last_gtid= *gtid;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return FALSE;
 | |
| }
 | |
| 
 | |
| /*
 | |
|   Data structure used to help pass data into report_audit_findings because
 | |
|   my_hash_iterate only passes one parameter
 | |
| */
 | |
| struct gtid_report_ctx
 | |
| {
 | |
|   FILE *out_file;
 | |
|   my_bool is_strict_mode;
 | |
|   my_bool contains_err;
 | |
| };
 | |
| 
 | |
| static my_bool report_audit_findings(void *entry, void *report_ctx_arg)
 | |
| {
 | |
|   struct Binlog_gtid_state_validator::audit_elem *audit_el=
 | |
|       (struct Binlog_gtid_state_validator::audit_elem *) entry;
 | |
| 
 | |
|   struct gtid_report_ctx *report_ctx=
 | |
|       (struct gtid_report_ctx *) report_ctx_arg;
 | |
|   FILE *out= report_ctx->out_file;
 | |
|   my_bool is_strict_mode= report_ctx->is_strict_mode;
 | |
|   size_t i;
 | |
|   void (*report_f)(FILE*, const char*, ...);
 | |
| 
 | |
|   if (is_strict_mode)
 | |
|     report_f= Binlog_gtid_state_validator::error;
 | |
|   else
 | |
|     report_f= Binlog_gtid_state_validator::warn;
 | |
| 
 | |
|   if (audit_el)
 | |
|   {
 | |
|     if (audit_el->last_gtid.seq_no < audit_el->start_gtid.seq_no)
 | |
|     {
 | |
|       report_f(out,
 | |
|              "Binary logs never reached expected GTID state of %u-%u-%llu",
 | |
|              PARAM_GTID(audit_el->start_gtid));
 | |
|       report_ctx->contains_err= TRUE;
 | |
|     }
 | |
| 
 | |
|     /* Report any out of order GTIDs */
 | |
|     for(i= 0; i < audit_el->late_gtids_real.elements; i++)
 | |
|     {
 | |
|       rpl_gtid *real_gtid=
 | |
|           (rpl_gtid *) dynamic_array_ptr(&(audit_el->late_gtids_real), i);
 | |
|       rpl_gtid *last_gtid= (rpl_gtid *) dynamic_array_ptr(
 | |
|           &(audit_el->late_gtids_previous), i);
 | |
|       DBUG_ASSERT(real_gtid && last_gtid);
 | |
| 
 | |
|       report_f(out,
 | |
|                "Found out of order GTID. Got %u-%u-%llu after %u-%u-%llu",
 | |
|                PARAM_GTID((*real_gtid)), PARAM_GTID((*last_gtid)));
 | |
|       report_ctx->contains_err= TRUE;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return FALSE;
 | |
| }
 | |
| 
 | |
| my_bool Binlog_gtid_state_validator::report(FILE *out, my_bool is_strict_mode)
 | |
| {
 | |
|   struct gtid_report_ctx report_ctx;
 | |
|   report_ctx.out_file= out;
 | |
|   report_ctx.is_strict_mode= is_strict_mode;
 | |
|   report_ctx.contains_err= FALSE;
 | |
|   my_hash_iterate(&m_audit_elem_domain_lookup, report_audit_findings, &report_ctx);
 | |
|   fflush(out);
 | |
|   return is_strict_mode ? report_ctx.contains_err : FALSE;
 | |
| }
 | |
| 
 | |
| Window_gtid_event_filter::Window_gtid_event_filter()
 | |
|     : m_has_start(FALSE), m_has_stop(FALSE), m_is_active(FALSE),
 | |
|       m_has_passed(FALSE)
 | |
| {
 | |
|   // m_start and m_stop do not need initial values if unused
 | |
| }
 | |
| 
 | |
| int Window_gtid_event_filter::set_start_gtid(rpl_gtid *start)
 | |
| {
 | |
|   if (m_has_start)
 | |
|   {
 | |
|     sql_print_error(
 | |
|         "Start position cannot have repeated domain "
 | |
|         "ids (found %u-%u-%llu when %u-%u-%llu was previously specified)",
 | |
|         PARAM_GTID((*start)), PARAM_GTID(m_start));
 | |
|     return 1;
 | |
|   }
 | |
| 
 | |
|   m_has_start= TRUE;
 | |
|   m_start= *start;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| int Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop)
 | |
| {
 | |
|   if (m_has_stop)
 | |
|   {
 | |
|     sql_print_error(
 | |
|         "Stop position cannot have repeated domain "
 | |
|         "ids (found %u-%u-%llu when %u-%u-%llu was previously specified)",
 | |
|         PARAM_GTID((*stop)), PARAM_GTID(m_stop));
 | |
|     return 1;
 | |
|   }
 | |
| 
 | |
|   m_has_stop= TRUE;
 | |
|   m_stop= *stop;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| my_bool Window_gtid_event_filter::is_range_invalid()
 | |
| {
 | |
|   if (m_has_start && m_has_stop && m_start.seq_no > m_stop.seq_no)
 | |
|   {
 | |
|     sql_print_error(
 | |
|         "Queried GTID range is invalid in strict mode. Stop position "
 | |
|         "%u-%u-%llu is not greater than or equal to start %u-%u-%llu.",
 | |
|         PARAM_GTID(m_stop), PARAM_GTID(m_start));
 | |
|     return TRUE;
 | |
|   }
 | |
|   return FALSE;
 | |
| }
 | |
| 
 | |
| static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary,
 | |
|                                           rpl_gtid *test_gtid)
 | |
| {
 | |
|   return test_gtid->domain_id == boundary->domain_id &&
 | |
|          test_gtid->seq_no >= boundary->seq_no;
 | |
| }
 | |
| 
 | |
| static inline my_bool is_gtid_at_or_before(rpl_gtid *boundary,
 | |
|                                      rpl_gtid *test_gtid)
 | |
| {
 | |
|   return test_gtid->domain_id == boundary->domain_id &&
 | |
|          test_gtid->seq_no <= boundary->seq_no;
 | |
| }
 | |
| 
 | |
| my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid)
 | |
| {
 | |
|   /* Assume result should be excluded to start */
 | |
|   my_bool should_exclude= TRUE;
 | |
| 
 | |
|   DBUG_ASSERT((m_has_start && gtid->domain_id == m_start.domain_id) ||
 | |
|               (m_has_stop && gtid->domain_id == m_stop.domain_id));
 | |
| 
 | |
|   if (!m_is_active && !m_has_passed)
 | |
|   {
 | |
|     /*
 | |
|       This filter has not yet been activated. Check if the gtid is within the
 | |
|       bounds of this window.
 | |
|     */
 | |
| 
 | |
|     if (!m_has_start && is_gtid_at_or_before(&m_stop, gtid))
 | |
|     {
 | |
|       /*
 | |
|         Start GTID was not provided, so we want to include everything from here
 | |
|         up to m_stop
 | |
|       */
 | |
|       m_is_active= TRUE;
 | |
|       should_exclude= FALSE;
 | |
|     }
 | |
|     else if ((m_has_start && is_gtid_at_or_after(&m_start, gtid)) &&
 | |
|              (!m_has_stop || is_gtid_at_or_before(&m_stop, gtid)))
 | |
|     {
 | |
|       m_is_active= TRUE;
 | |
| 
 | |
|       DBUG_PRINT("gtid-event-filter",
 | |
|                  ("Window: Begin (%d-%d-%llu, %d-%d-%llu]",
 | |
|                   PARAM_GTID(m_start), PARAM_GTID(m_stop)));
 | |
| 
 | |
|       /*
 | |
|         As the start of the range is exclusive, if this gtid is the start of
 | |
|         the range, exclude it
 | |
|       */
 | |
|       if (gtid->seq_no == m_start.seq_no)
 | |
|         should_exclude= TRUE;
 | |
|       else
 | |
|         should_exclude= FALSE;
 | |
| 
 | |
|       if (m_has_stop && gtid->seq_no == m_stop.seq_no)
 | |
|       {
 | |
|         m_has_passed= TRUE;
 | |
|         DBUG_PRINT("gtid-event-filter",
 | |
|                    ("Window: End (%d-%d-%llu, %d-%d-%llu]",
 | |
|                     PARAM_GTID(m_start), PARAM_GTID(m_stop)));
 | |
|       }
 | |
|     }
 | |
|   } /* if (!m_is_active && !m_has_passed) */
 | |
|   else if (m_is_active && !m_has_passed)
 | |
|   {
 | |
|     /*
 | |
|       This window is currently active so we want the event group to be included
 | |
|       in the results. Additionally check if we are at the end of the window.
 | |
|       If no end of the window is provided, go indefinitely
 | |
|     */
 | |
|     should_exclude= FALSE;
 | |
| 
 | |
|     if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid))
 | |
|     {
 | |
|       DBUG_PRINT("gtid-event-filter",
 | |
|                  ("Window: End (%d-%d-%llu, %d-%d-%llu]",
 | |
|                   PARAM_GTID(m_start), PARAM_GTID(m_stop)));
 | |
|       m_is_active= FALSE;
 | |
|       m_has_passed= TRUE;
 | |
| 
 | |
|       if (!is_gtid_at_or_before(&m_stop, gtid))
 | |
|       {
 | |
|         /*
 | |
|           The GTID is after the finite stop of the window, don't let it pass
 | |
|           through
 | |
|         */
 | |
|         should_exclude= TRUE;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return should_exclude;
 | |
| }
 | |
| 
 | |
| my_bool Window_gtid_event_filter::has_finished()
 | |
| {
 | |
|   return m_has_stop ? m_has_passed : FALSE;
 | |
| }
 | |
| 
 | |
| void free_u32_gtid_filter_element(void *p)
 | |
| {
 | |
|   gtid_filter_element<uint32> *gfe= (gtid_filter_element<uint32> *) p;
 | |
|   if (gfe->filter)
 | |
|     delete gfe->filter;
 | |
|   my_free(gfe);
 | |
| }
 | |
| 
 | |
| template <typename T>
 | |
| Id_delegating_gtid_event_filter<T>::Id_delegating_gtid_event_filter()
 | |
|     : m_num_stateful_filters(0), m_num_completed_filters(0),
 | |
|       m_id_restriction_mode(id_restriction_mode::MODE_NOT_SET)
 | |
| {
 | |
|   void (*free_func)(void *);
 | |
|   if (std::is_same<T,uint32>::value)
 | |
|     free_func= free_u32_gtid_filter_element;
 | |
|   else
 | |
|     DBUG_ASSERT(0);
 | |
| 
 | |
|   my_hash_init(PSI_INSTRUMENT_ME, &m_filters_by_id_hash, &my_charset_bin, 32,
 | |
|                offsetof(gtid_filter_element<T>, identifier),
 | |
|                sizeof(T), NULL, free_func,
 | |
|                HASH_UNIQUE);
 | |
| 
 | |
|   m_default_filter= new Accept_all_gtid_filter();
 | |
| }
 | |
| 
 | |
| template <typename T>
 | |
| Id_delegating_gtid_event_filter<T>::~Id_delegating_gtid_event_filter()
 | |
| {
 | |
|   my_hash_free(&m_filters_by_id_hash);
 | |
|   delete m_default_filter;
 | |
| }
 | |
| 
 | |
| template <typename T>
 | |
| void Id_delegating_gtid_event_filter<T>::set_default_filter(
 | |
|     Gtid_event_filter *filter)
 | |
| {
 | |
|   if (m_default_filter)
 | |
|     delete m_default_filter;
 | |
| 
 | |
|   m_default_filter= filter;
 | |
| }
 | |
| 
 | |
| template <typename T>
 | |
| gtid_filter_element<T> *
 | |
| Id_delegating_gtid_event_filter<T>::find_or_create_filter_element_for_id(
 | |
|     T filter_id)
 | |
| {
 | |
|   gtid_filter_element<T> *fe=
 | |
|       (gtid_filter_element<T> *) my_hash_search(
 | |
|           &m_filters_by_id_hash, (const uchar *) &filter_id, 0);
 | |
| 
 | |
|   if (!fe)
 | |
|   {
 | |
|     gtid_filter_element<T> *new_fe= (gtid_filter_element<T> *) my_malloc(
 | |
|         PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element<T>), MYF(MY_WME));
 | |
|     new_fe->filter= NULL;
 | |
|     new_fe->identifier= filter_id;
 | |
|     if (my_hash_insert(&m_filters_by_id_hash, (uchar*) new_fe))
 | |
|     {
 | |
|       my_free(new_fe);
 | |
|       my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|       return NULL;
 | |
|     }
 | |
|     fe= new_fe;
 | |
|   }
 | |
| 
 | |
|   return fe;
 | |
| }
 | |
| 
 | |
| template <typename T>
 | |
| my_bool Id_delegating_gtid_event_filter<T>::has_finished()
 | |
| {
 | |
|   /*
 | |
|     If all user-defined filters have deactivated, we are effectively
 | |
|     deactivated
 | |
|   */
 | |
|   return m_num_stateful_filters &&
 | |
|          m_num_completed_filters == m_num_stateful_filters;
 | |
| }
 | |
| 
 | |
| template <typename T>
 | |
| my_bool Id_delegating_gtid_event_filter<T>::exclude(rpl_gtid *gtid)
 | |
| {
 | |
|   T filter_id= get_id_from_gtid(gtid);
 | |
|   gtid_filter_element<T> *filter_element=
 | |
|       (gtid_filter_element<T> *) my_hash_search(&m_filters_by_id_hash,
 | |
|                                                 (const uchar *) &filter_id, 0);
 | |
|   Gtid_event_filter *filter=
 | |
|       (filter_element ? filter_element->filter : m_default_filter);
 | |
|   my_bool ret= TRUE;
 | |
| 
 | |
|   if(!filter_element || !filter->has_finished())
 | |
|   {
 | |
|     ret= filter->exclude(gtid);
 | |
| 
 | |
|     /*
 | |
|       If this is an explicitly defined filter, e.g. Window-based filter, check
 | |
|       if it has completed, and update the counter accordingly if so.
 | |
|     */
 | |
|     if (filter_element && filter->has_finished())
 | |
|       m_num_completed_filters++;
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| 
 | |
| template <typename F> Gtid_event_filter* create_event_filter()
 | |
| {
 | |
|   return new F();
 | |
| }
 | |
| 
 | |
| template <typename T>
 | |
| int Id_delegating_gtid_event_filter<T>::set_id_restrictions(
 | |
|     T *id_list, size_t n_ids, id_restriction_mode mode)
 | |
| {
 | |
|   static const char *WHITELIST_NAME= "do", *BLACKLIST_NAME= "ignore";
 | |
| 
 | |
|   size_t id_ctr;
 | |
|   int err;
 | |
|   const char *filter_name, *opposite_filter_name;
 | |
|   Gtid_event_filter *(*construct_filter)(void);
 | |
|   Gtid_event_filter *(*construct_default_filter)(void);
 | |
| 
 | |
|   /*
 | |
|     Set up variables which help this filter either be in whitelist or blacklist
 | |
|     mode
 | |
|   */
 | |
|   if (mode == Gtid_event_filter::id_restriction_mode::WHITELIST_MODE)
 | |
|   {
 | |
|     filter_name= WHITELIST_NAME;
 | |
|     opposite_filter_name= BLACKLIST_NAME;
 | |
|     construct_filter=
 | |
|         create_event_filter<Accept_all_gtid_filter>;
 | |
|     construct_default_filter=
 | |
|         create_event_filter<Reject_all_gtid_filter>;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     DBUG_ASSERT(mode ==
 | |
|                 Gtid_event_filter::id_restriction_mode::BLACKLIST_MODE);
 | |
|     filter_name= BLACKLIST_NAME;
 | |
|     opposite_filter_name= WHITELIST_NAME;
 | |
|     construct_filter=
 | |
|         create_event_filter<Reject_all_gtid_filter>;
 | |
|     construct_default_filter=
 | |
|         create_event_filter<Accept_all_gtid_filter>;
 | |
|   }
 | |
| 
 | |
|   if (m_id_restriction_mode !=
 | |
|       Gtid_event_filter::id_restriction_mode::MODE_NOT_SET)
 | |
|   {
 | |
|     if (mode != m_id_restriction_mode)
 | |
|     {
 | |
|       /*
 | |
|         If a rule specifying the opposite version of this has already been set,
 | |
|         error.
 | |
|       */
 | |
|       sql_print_error("Cannot create %s filtering rule for %s id because "
 | |
|                       "%s rule already exists",
 | |
|                       filter_name, get_id_type_name(),
 | |
|                       opposite_filter_name);
 | |
|       err= 1;
 | |
|       goto err;
 | |
|     }
 | |
| 
 | |
|     /* This filter is specified more than once, only use the latest values */
 | |
|     my_hash_reset(&m_filters_by_id_hash);
 | |
|   }
 | |
| 
 | |
|   for (id_ctr= 0; id_ctr < n_ids; id_ctr++)
 | |
|   {
 | |
|     T filter_id= id_list[id_ctr];
 | |
|     gtid_filter_element<T> *map_element=
 | |
|         find_or_create_filter_element_for_id(filter_id);
 | |
| 
 | |
|     if(map_element == NULL)
 | |
|     {
 | |
|       /*
 | |
|         If map_element is NULL, find_or_create_filter_element_for_id failed and
 | |
|         has already written the error message
 | |
|       */
 | |
|       err= 1;
 | |
|       goto err;
 | |
|     }
 | |
|     else if (map_element->filter == NULL)
 | |
|     {
 | |
|       map_element->filter= construct_filter();
 | |
|       m_num_stateful_filters++;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       DBUG_ASSERT(map_element->filter->get_filter_type() ==
 | |
|                   (mode ==
 | |
|                    Gtid_event_filter::id_restriction_mode::WHITELIST_MODE
 | |
|                    ? Gtid_event_filter::ACCEPT_ALL_GTID_FILTER_TYPE
 | |
|                    : Gtid_event_filter::REJECT_ALL_GTID_FILTER_TYPE));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /*
 | |
|     With a whitelist, we by only want to accept the ids which are specified.
 | |
|     Everything else should be denied.
 | |
| 
 | |
|     With a blacklist, we by default want to accept everything that is not
 | |
|     specified in the list
 | |
|   */
 | |
|   set_default_filter(construct_default_filter());
 | |
|   m_id_restriction_mode= mode;
 | |
|   err= 0;
 | |
| 
 | |
| err:
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| Window_gtid_event_filter *
 | |
| Domain_gtid_event_filter::find_or_create_window_filter_for_id(
 | |
|     decltype(rpl_gtid::domain_id) domain_id)
 | |
| {
 | |
|   gtid_filter_element<decltype(rpl_gtid::domain_id)> *filter_element=
 | |
|       find_or_create_filter_element_for_id(domain_id);
 | |
|   Window_gtid_event_filter *wgef= NULL;
 | |
| 
 | |
|   if (filter_element->filter == NULL)
 | |
|   {
 | |
|     /* New filter */
 | |
|     wgef= new Window_gtid_event_filter();
 | |
|     filter_element->filter= wgef;
 | |
|   }
 | |
|   else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE)
 | |
|   {
 | |
|     /* We have an existing window filter here */
 | |
|     wgef= (Window_gtid_event_filter *) filter_element->filter;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     /*
 | |
|       We have an existing filter but it is not of window type so propogate NULL
 | |
|       filter
 | |
|     */
 | |
|     sql_print_error("cannot subset domain id %d by position, another rule "
 | |
|                     "exists on that domain",
 | |
|                     domain_id);
 | |
|   }
 | |
| 
 | |
|   return wgef;
 | |
| }
 | |
| 
 | |
| static my_bool check_filter_entry_validity(void *entry,
 | |
|                                            void *are_filters_invalid_arg)
 | |
| {
 | |
|   gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe=
 | |
|       (gtid_filter_element<decltype(rpl_gtid::domain_id)> *) entry;
 | |
| 
 | |
|   if (fe)
 | |
|   {
 | |
|     Gtid_event_filter *gef= fe->filter;
 | |
|     if (gef->get_filter_type() == Gtid_event_filter::WINDOW_GTID_FILTER_TYPE)
 | |
|     {
 | |
|       Window_gtid_event_filter *wgef= (Window_gtid_event_filter *) gef;
 | |
|       if (wgef->is_range_invalid())
 | |
|       {
 | |
|         *((int *) are_filters_invalid_arg)= 1;
 | |
|         return TRUE;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   return FALSE;
 | |
| }
 | |
| 
 | |
| int Domain_gtid_event_filter::validate_window_filters()
 | |
| {
 | |
|   int are_filters_invalid= 0;
 | |
|   my_hash_iterate(&m_filters_by_id_hash, check_filter_entry_validity,
 | |
|                   &are_filters_invalid);
 | |
|   return are_filters_invalid;
 | |
| }
 | |
| 
 | |
| int Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid)
 | |
| {
 | |
|   int err= 0;
 | |
|   Window_gtid_event_filter *filter_to_update=
 | |
|       find_or_create_window_filter_for_id(gtid->domain_id);
 | |
| 
 | |
|   if (filter_to_update == NULL)
 | |
|   {
 | |
|     err= 1;
 | |
|   }
 | |
|   else if (!(err= filter_to_update->set_start_gtid(gtid)))
 | |
|   {
 | |
|     gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe=
 | |
|         (gtid_filter_element<decltype(rpl_gtid::domain_id)> *) my_hash_search(
 | |
|             &m_filters_by_id_hash, (const uchar *) &(gtid->domain_id), 0);
 | |
|     insert_dynamic(&m_start_filters, (const void *) &fe);
 | |
|   }
 | |
| 
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| int Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid)
 | |
| {
 | |
|   int err= 0;
 | |
|   Window_gtid_event_filter *filter_to_update=
 | |
|       find_or_create_window_filter_for_id(gtid->domain_id);
 | |
| 
 | |
|   if (filter_to_update == NULL)
 | |
|   {
 | |
|     err= 1;
 | |
|   }
 | |
|   else if (!(err= filter_to_update->set_stop_gtid(gtid)))
 | |
|   {
 | |
|     gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe=
 | |
|         (gtid_filter_element<decltype(rpl_gtid::domain_id)> *) my_hash_search(
 | |
|             &m_filters_by_id_hash, (const uchar *) &(gtid->domain_id), 0);
 | |
|     insert_dynamic(&m_stop_filters, (const void *) &fe);
 | |
| 
 | |
|     /*
 | |
|       A window with a stop position can be disabled, and is therefore stateful.
 | |
|     */
 | |
|     m_num_stateful_filters++;
 | |
| 
 | |
|     /*
 | |
|       Default filtering behavior changes with GTID stop positions, where we
 | |
|       exclude all domains not present in the stop list
 | |
|     */
 | |
|     if (m_default_filter->get_filter_type() == ACCEPT_ALL_GTID_FILTER_TYPE)
 | |
|     {
 | |
|       delete m_default_filter;
 | |
|       m_default_filter= new Reject_all_gtid_filter();
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return err;
 | |
| }
 | |
| 
 | |
| rpl_gtid *Domain_gtid_event_filter::get_start_gtids()
 | |
| {
 | |
|   rpl_gtid *gtid_list;
 | |
|   uint32 i;
 | |
|   size_t n_start_gtids= get_num_start_gtids();
 | |
| 
 | |
|   gtid_list= (rpl_gtid *) my_malloc(
 | |
|       PSI_INSTRUMENT_ME, n_start_gtids * sizeof(rpl_gtid), MYF(MY_WME));
 | |
| 
 | |
|   for (i = 0; i < n_start_gtids; i++)
 | |
|   {
 | |
|     gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe=
 | |
|         *(gtid_filter_element<decltype(rpl_gtid::domain_id)> **)
 | |
|             dynamic_array_ptr(&m_start_filters, i);
 | |
|     DBUG_ASSERT(fe->filter &&
 | |
|                 fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE);
 | |
|     Window_gtid_event_filter *wgef=
 | |
|         (Window_gtid_event_filter *) fe->filter;
 | |
| 
 | |
|     rpl_gtid win_start_gtid= wgef->get_start_gtid();
 | |
|     gtid_list[i]= win_start_gtid;
 | |
|   }
 | |
| 
 | |
|   return gtid_list;
 | |
| }
 | |
| 
 | |
| rpl_gtid *Domain_gtid_event_filter::get_stop_gtids()
 | |
| {
 | |
|   rpl_gtid *gtid_list;
 | |
|   uint32 i;
 | |
|   size_t n_stop_gtids= get_num_stop_gtids();
 | |
| 
 | |
|   gtid_list= (rpl_gtid *) my_malloc(
 | |
|       PSI_INSTRUMENT_ME, n_stop_gtids * sizeof(rpl_gtid), MYF(MY_WME));
 | |
| 
 | |
|   for (i = 0; i < n_stop_gtids; i++)
 | |
|   {
 | |
|     gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe=
 | |
|         *(gtid_filter_element<decltype(rpl_gtid::domain_id)> **)
 | |
|             dynamic_array_ptr(&m_stop_filters, i);
 | |
|     DBUG_ASSERT(fe->filter &&
 | |
|                 fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE);
 | |
|     Window_gtid_event_filter *wgef=
 | |
|         (Window_gtid_event_filter *) fe->filter;
 | |
| 
 | |
|     rpl_gtid win_stop_gtid= wgef->get_stop_gtid();
 | |
|     gtid_list[i]= win_stop_gtid;
 | |
|   }
 | |
| 
 | |
|   return gtid_list;
 | |
| }
 | |
| 
 | |
| void Domain_gtid_event_filter::clear_start_gtids()
 | |
| {
 | |
|   uint32 i;
 | |
|   for (i = 0; i < get_num_start_gtids(); i++)
 | |
|   {
 | |
|     gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe=
 | |
|         *(gtid_filter_element<decltype(rpl_gtid::domain_id)> **)
 | |
|             dynamic_array_ptr(&m_start_filters, i);
 | |
|     DBUG_ASSERT(fe->filter &&
 | |
|                 fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE);
 | |
|     Window_gtid_event_filter *wgef=
 | |
|         (Window_gtid_event_filter *) fe->filter;
 | |
| 
 | |
|     if (wgef->has_stop())
 | |
|     {
 | |
|       /*
 | |
|         Don't delete the whole filter if it already has a stop position attached
 | |
|       */
 | |
|       wgef->clear_start_pos();
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       /*
 | |
|         This domain only has a stop, so delete the whole filter
 | |
|       */
 | |
|       my_hash_delete(&m_filters_by_id_hash, (uchar *) fe);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   reset_dynamic(&m_start_filters);
 | |
| }
 | |
| 
 | |
| void Domain_gtid_event_filter::clear_stop_gtids()
 | |
| {
 | |
|   uint32 i;
 | |
| 
 | |
|   for (i = 0; i < get_num_stop_gtids(); i++)
 | |
|   {
 | |
|     gtid_filter_element<decltype(rpl_gtid::domain_id)> *fe=
 | |
|         *(gtid_filter_element<decltype(rpl_gtid::domain_id)> **)
 | |
|             dynamic_array_ptr(&m_stop_filters, i);
 | |
|     DBUG_ASSERT(fe->filter &&
 | |
|                 fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE);
 | |
|     Window_gtid_event_filter *wgef=
 | |
|         (Window_gtid_event_filter *) fe->filter;
 | |
| 
 | |
|     if (wgef->has_start())
 | |
|     {
 | |
|       /*
 | |
|         Don't delete the whole filter if it already has a start position
 | |
|         attached
 | |
|       */
 | |
|       wgef->clear_stop_pos();
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       /*
 | |
|         This domain only has a start, so delete the whole filter
 | |
|       */
 | |
|       my_hash_delete(&m_filters_by_id_hash, (uchar *) fe);
 | |
|     }
 | |
|     m_num_stateful_filters--;
 | |
|   }
 | |
| 
 | |
|   /*
 | |
|     Stop positions were cleared and we want to be inclusive again of other
 | |
|     domains again
 | |
|   */
 | |
|   if (m_default_filter->get_filter_type() == REJECT_ALL_GTID_FILTER_TYPE)
 | |
|   {
 | |
|     delete m_default_filter;
 | |
|     m_default_filter= new Accept_all_gtid_filter();
 | |
|   }
 | |
| 
 | |
|   reset_dynamic(&m_stop_filters);
 | |
| }
 | |
| 
 | |
| my_bool Domain_gtid_event_filter::exclude(rpl_gtid *gtid)
 | |
| {
 | |
|   my_bool include_domain= TRUE;
 | |
|   /*
 | |
|     If GTID stop positions are provided, we limit the domains which are output
 | |
|     to only be those specified with stop positions
 | |
|   */
 | |
|   if (get_num_stop_gtids())
 | |
|   {
 | |
|     decltype(rpl_gtid::domain_id) filter_id= get_id_from_gtid(gtid);
 | |
|     gtid_filter_element<decltype(rpl_gtid::domain_id)> *filter_element=
 | |
|         (gtid_filter_element<decltype(rpl_gtid::domain_id)> *) my_hash_search(
 | |
|             &m_filters_by_id_hash, (const uchar *) &filter_id, 0);
 | |
|     if (filter_element)
 | |
|     {
 | |
|       Gtid_event_filter *filter= filter_element->filter;
 | |
|       if (filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE)
 | |
|       {
 | |
|         Window_gtid_event_filter *wgef= (Window_gtid_event_filter *) filter;
 | |
|         include_domain= wgef->has_stop();
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return include_domain ? Id_delegating_gtid_event_filter::exclude(gtid)
 | |
|                         : TRUE;
 | |
| }
 | |
| 
 | |
| Intersecting_gtid_event_filter::Intersecting_gtid_event_filter(
 | |
|     Gtid_event_filter *filter1, Gtid_event_filter *filter2)
 | |
| {
 | |
|   my_init_dynamic_array(PSI_INSTRUMENT_ME, &m_filters,
 | |
|                         sizeof(Gtid_event_filter *), 3, 3, MYF(0));
 | |
|   insert_dynamic(&m_filters, (void *) &filter1);
 | |
|   insert_dynamic(&m_filters, (void *) &filter2);
 | |
| }
 | |
| 
 | |
| Intersecting_gtid_event_filter::~Intersecting_gtid_event_filter()
 | |
| {
 | |
|   Gtid_event_filter *tmp_filter= NULL;
 | |
|   ulong i;
 | |
|   for (i= 0; i < m_filters.elements; i++)
 | |
|   {
 | |
|     tmp_filter= *(Gtid_event_filter **) dynamic_array_ptr(&m_filters, i);
 | |
|     delete tmp_filter;
 | |
|   }
 | |
|   delete_dynamic(&m_filters);
 | |
| }
 | |
| 
 | |
| my_bool Intersecting_gtid_event_filter::exclude(rpl_gtid *gtid)
 | |
| {
 | |
|   Gtid_event_filter *tmp_filter= NULL;
 | |
|   ulong i;
 | |
|   for (i= 0; i < m_filters.elements; i++)
 | |
|   {
 | |
|     tmp_filter= *(Gtid_event_filter **) dynamic_array_ptr(&m_filters, i);
 | |
|     DBUG_ASSERT(tmp_filter);
 | |
|     if (tmp_filter->exclude(gtid))
 | |
|       return TRUE;
 | |
|   }
 | |
|   return FALSE;
 | |
| }
 | |
| 
 | |
| my_bool Intersecting_gtid_event_filter::has_finished()
 | |
| {
 | |
|   Gtid_event_filter *tmp_filter= NULL;
 | |
|   ulong i;
 | |
|   for (i= 0; i < m_filters.elements; i++)
 | |
|   {
 | |
|     tmp_filter= *(Gtid_event_filter **) dynamic_array_ptr(&m_filters, i);
 | |
|     DBUG_ASSERT(tmp_filter);
 | |
|     if (tmp_filter->has_finished())
 | |
|       return TRUE;
 | |
|   }
 | |
|   return FALSE;
 | |
| }
 | 
