mirror of
https://github.com/MariaDB/server.git
synced 2025-01-18 04:53:01 +01:00
f9c2b402f4
Implement @@gtid_binlog_state. This is the internal state of the binlog (most recent GTID logged for every domain_id and server_id). This allows to save the state before RESET MASTER and restore it afterwards.
1546 lines
36 KiB
C++
1546 lines
36 KiB
C++
/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
|
|
|
|
|
|
/* Definitions for MariaDB global transaction ID (GTID). */
|
|
|
|
|
|
#include "sql_priv.h"
|
|
#include "my_sys.h"
|
|
#include "unireg.h"
|
|
#include "my_global.h"
|
|
#include "sql_base.h"
|
|
#include "sql_parse.h"
|
|
#include "key.h"
|
|
#include "rpl_gtid.h"
|
|
#include "rpl_rli.h"
|
|
|
|
|
|
const LEX_STRING rpl_gtid_slave_state_table_name=
|
|
{ C_STRING_WITH_LEN("gtid_slave_pos") };
|
|
|
|
|
|
void
|
|
rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
|
|
{
|
|
int err;
|
|
/*
|
|
Add the gtid to the HASH in the replication slave state.
|
|
|
|
We must do this only _after_ commit, so that for parallel replication,
|
|
there will not be an attempt to delete the corresponding table row before
|
|
it is even committed.
|
|
*/
|
|
lock();
|
|
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
|
|
unlock();
|
|
if (err)
|
|
{
|
|
sql_print_warning("Slave: Out of memory during slave state maintenance. "
|
|
"Some no longer necessary rows in table "
|
|
"mysql.%s may be left undeleted.",
|
|
rpl_gtid_slave_state_table_name.str);
|
|
/*
|
|
Such failure is not fatal. We will fail to delete the row for this
|
|
GTID, but it will do no harm and will be removed automatically on next
|
|
server restart.
|
|
*/
|
|
}
|
|
}
|
|
|
|
|
|
int
|
|
rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli)
|
|
{
|
|
uint64 sub_id;
|
|
|
|
/*
|
|
Update the GTID position, if we have it and did not already update
|
|
it in a GTID transaction.
|
|
*/
|
|
if ((sub_id= rli->gtid_sub_id))
|
|
{
|
|
rli->gtid_sub_id= 0;
|
|
if (record_gtid(thd, &rli->current_gtid, sub_id, false, false))
|
|
return 1;
|
|
update_state_hash(sub_id, &rli->current_gtid);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
rpl_slave_state::rpl_slave_state()
|
|
: inited(false), loaded(false)
|
|
{
|
|
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
|
|
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
|
|
}
|
|
|
|
|
|
rpl_slave_state::~rpl_slave_state()
|
|
{
|
|
}
|
|
|
|
|
|
void
|
|
rpl_slave_state::init()
|
|
{
|
|
DBUG_ASSERT(!inited);
|
|
mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW);
|
|
inited= true;
|
|
}
|
|
|
|
|
|
void
|
|
rpl_slave_state::truncate_hash()
|
|
{
|
|
uint32 i;
|
|
|
|
for (i= 0; i < hash.records; ++i)
|
|
{
|
|
element *e= (element *)my_hash_element(&hash, i);
|
|
list_element *l= e->list;
|
|
list_element *next;
|
|
while (l)
|
|
{
|
|
next= l->next;
|
|
my_free(l);
|
|
l= next;
|
|
}
|
|
/* The element itself is freed by the hash element free function. */
|
|
}
|
|
my_hash_reset(&hash);
|
|
}
|
|
|
|
void
|
|
rpl_slave_state::deinit()
|
|
{
|
|
if (!inited)
|
|
return;
|
|
truncate_hash();
|
|
my_hash_free(&hash);
|
|
mysql_mutex_destroy(&LOCK_slave_state);
|
|
}
|
|
|
|
|
|
int
|
|
rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
|
|
uint64 seq_no)
|
|
{
|
|
element *elem= NULL;
|
|
list_element *list_elem= NULL;
|
|
|
|
if (!(elem= get_element(domain_id)))
|
|
return 1;
|
|
|
|
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
|
|
return 1;
|
|
list_elem->server_id= server_id;
|
|
list_elem->sub_id= sub_id;
|
|
list_elem->seq_no= seq_no;
|
|
|
|
elem->add(list_elem);
|
|
return 0;
|
|
}
|
|
|
|
|
|
struct rpl_slave_state::element *
|
|
rpl_slave_state::get_element(uint32 domain_id)
|
|
{
|
|
struct element *elem;
|
|
|
|
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
|
|
if (elem)
|
|
return elem;
|
|
|
|
if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
|
|
return NULL;
|
|
elem->list= NULL;
|
|
elem->last_sub_id= 0;
|
|
elem->domain_id= domain_id;
|
|
if (my_hash_insert(&hash, (uchar *)elem))
|
|
{
|
|
my_free(elem);
|
|
return NULL;
|
|
}
|
|
return elem;
|
|
}
|
|
|
|
|
|
int
|
|
rpl_slave_state::put_back_list(uint32 domain_id, list_element *list)
|
|
{
|
|
element *e;
|
|
if (!(e= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
|
|
return 1;
|
|
while (list)
|
|
{
|
|
list_element *next= list->next;
|
|
e->add(list);
|
|
list= next;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
int
|
|
rpl_slave_state::truncate_state_table(THD *thd)
|
|
{
|
|
TABLE_LIST tlist;
|
|
int err= 0;
|
|
TABLE *table;
|
|
|
|
tlist.init_one_table(STRING_WITH_LEN("mysql"),
|
|
rpl_gtid_slave_state_table_name.str,
|
|
rpl_gtid_slave_state_table_name.length,
|
|
NULL, TL_WRITE);
|
|
if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
|
|
{
|
|
table= tlist.table;
|
|
table->no_replicate= 1;
|
|
err= table->file->ha_truncate();
|
|
|
|
if (err)
|
|
{
|
|
ha_rollback_trans(thd, FALSE);
|
|
close_thread_tables(thd);
|
|
ha_rollback_trans(thd, TRUE);
|
|
}
|
|
else
|
|
{
|
|
ha_commit_trans(thd, FALSE);
|
|
close_thread_tables(thd);
|
|
ha_commit_trans(thd, TRUE);
|
|
}
|
|
thd->mdl_context.release_transactional_locks();
|
|
}
|
|
|
|
return err;
|
|
}
|
|
|
|
|
|
static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
|
|
{ { C_STRING_WITH_LEN("domain_id") },
|
|
{ C_STRING_WITH_LEN("int(10) unsigned") },
|
|
{NULL, 0} },
|
|
{ { C_STRING_WITH_LEN("sub_id") },
|
|
{ C_STRING_WITH_LEN("bigint(20) unsigned") },
|
|
{NULL, 0} },
|
|
{ { C_STRING_WITH_LEN("server_id") },
|
|
{ C_STRING_WITH_LEN("int(10) unsigned") },
|
|
{NULL, 0} },
|
|
{ { C_STRING_WITH_LEN("seq_no") },
|
|
{ C_STRING_WITH_LEN("bigint(20) unsigned") },
|
|
{NULL, 0} },
|
|
};
|
|
|
|
static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
|
|
|
|
static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= {
|
|
array_elements(mysql_rpl_slave_state_coltypes),
|
|
mysql_rpl_slave_state_coltypes,
|
|
array_elements(mysql_rpl_slave_state_pk_parts),
|
|
mysql_rpl_slave_state_pk_parts
|
|
};
|
|
|
|
class Gtid_db_intact : public Table_check_intact
|
|
{
|
|
protected:
|
|
void report_error(uint, const char *fmt, ...)
|
|
{
|
|
va_list args;
|
|
va_start(args, fmt);
|
|
error_log_print(ERROR_LEVEL, fmt, args);
|
|
va_end(args);
|
|
}
|
|
};
|
|
|
|
static Gtid_db_intact gtid_table_intact;
|
|
|
|
/*
|
|
Check that the mysql.gtid_slave_pos table has the correct definition.
|
|
*/
|
|
int
|
|
gtid_check_rpl_slave_state_table(TABLE *table)
|
|
{
|
|
int err;
|
|
|
|
if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef)))
|
|
my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
|
|
rpl_gtid_slave_state_table_name.str);
|
|
return err;
|
|
}
|
|
|
|
|
|
/*
|
|
Write a gtid to the replication slave state table.
|
|
|
|
Do it as part of the transaction, to get slave crash safety, or as a separate
|
|
transaction if !in_transaction (eg. MyISAM or DDL).
|
|
|
|
gtid The global transaction id for this event group.
|
|
sub_id Value allocated within the sub_id when the event group was
|
|
read (sub_id must be consistent with commit order in master binlog).
|
|
|
|
Note that caller must later ensure that the new gtid and sub_id is inserted
|
|
into the appropriate HASH element with rpl_slave_state.add(), so that it can
|
|
be deleted later. But this must only be done after COMMIT if in transaction.
|
|
*/
|
|
int
|
|
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
|
|
bool in_transaction, bool in_statement)
|
|
{
|
|
TABLE_LIST tlist;
|
|
int err= 0;
|
|
bool table_opened= false;
|
|
TABLE *table;
|
|
list_element *elist= 0, *next;
|
|
element *elem;
|
|
ulonglong thd_saved_option= thd->variables.option_bits;
|
|
Query_tables_list lex_backup;
|
|
|
|
if (unlikely(!loaded))
|
|
{
|
|
/*
|
|
Probably the mysql.gtid_slave_pos table is missing (eg. upgrade) or
|
|
corrupt.
|
|
|
|
We already complained loudly about this, but we can try to continue
|
|
until the DBA fixes it.
|
|
*/
|
|
return 0;
|
|
}
|
|
|
|
if (!in_statement)
|
|
mysql_reset_thd_for_next_command(thd, 0);
|
|
|
|
DBUG_EXECUTE_IF("gtid_inject_record_gtid",
|
|
{
|
|
my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
|
|
return 1;
|
|
} );
|
|
|
|
thd->lex->reset_n_backup_query_tables_list(&lex_backup);
|
|
tlist.init_one_table(STRING_WITH_LEN("mysql"),
|
|
rpl_gtid_slave_state_table_name.str,
|
|
rpl_gtid_slave_state_table_name.length,
|
|
NULL, TL_WRITE);
|
|
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
|
|
goto end;
|
|
table_opened= true;
|
|
table= tlist.table;
|
|
|
|
if ((err= gtid_check_rpl_slave_state_table(table)))
|
|
goto end;
|
|
|
|
table->no_replicate= 1;
|
|
if (!in_transaction)
|
|
thd->variables.option_bits&=
|
|
~(ulonglong)(OPTION_NOT_AUTOCOMMIT|OPTION_BEGIN);
|
|
|
|
bitmap_set_all(table->write_set);
|
|
|
|
table->field[0]->store((ulonglong)gtid->domain_id, true);
|
|
table->field[1]->store(sub_id, true);
|
|
table->field[2]->store((ulonglong)gtid->server_id, true);
|
|
table->field[3]->store(gtid->seq_no, true);
|
|
DBUG_EXECUTE_IF("inject_crash_before_write_rpl_slave_state", DBUG_SUICIDE(););
|
|
if ((err= table->file->ha_write_row(table->record[0])))
|
|
{
|
|
table->file->print_error(err, MYF(0));
|
|
goto end;
|
|
}
|
|
|
|
if(opt_bin_log &&
|
|
(err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
|
|
gtid->seq_no)))
|
|
{
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
goto end;
|
|
}
|
|
|
|
lock();
|
|
if ((elem= get_element(gtid->domain_id)) == NULL)
|
|
{
|
|
unlock();
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
err= 1;
|
|
goto end;
|
|
}
|
|
if ((elist= elem->grab_list()) != NULL)
|
|
{
|
|
/* Delete any old stuff, but keep around the most recent one. */
|
|
list_element *cur= elist;
|
|
uint64 best_sub_id= cur->sub_id;
|
|
list_element **best_ptr_ptr= &elist;
|
|
while ((next= cur->next))
|
|
{
|
|
if (next->sub_id > best_sub_id)
|
|
{
|
|
best_sub_id= next->sub_id;
|
|
best_ptr_ptr= &cur->next;
|
|
}
|
|
cur= next;
|
|
}
|
|
/*
|
|
Delete the highest sub_id element from the old list, and put it back as
|
|
the single-element new list.
|
|
*/
|
|
cur= *best_ptr_ptr;
|
|
*best_ptr_ptr= cur->next;
|
|
cur->next= NULL;
|
|
elem->list= cur;
|
|
}
|
|
unlock();
|
|
|
|
if (!elist)
|
|
goto end;
|
|
|
|
/* Now delete any already committed rows. */
|
|
bitmap_set_bit(table->read_set, table->field[0]->field_index);
|
|
bitmap_set_bit(table->read_set, table->field[1]->field_index);
|
|
|
|
if ((err= table->file->ha_index_init(0, 0)))
|
|
{
|
|
table->file->print_error(err, MYF(0));
|
|
goto end;
|
|
}
|
|
while (elist)
|
|
{
|
|
uchar key_buffer[4+8];
|
|
|
|
DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete",
|
|
{ err= ENOENT;
|
|
table->file->print_error(err, MYF(0));
|
|
/* `break' does not work inside DBUG_EXECUTE_IF */
|
|
goto dbug_break; });
|
|
|
|
next= elist->next;
|
|
|
|
table->field[1]->store(elist->sub_id, true);
|
|
/* domain_id is already set in table->record[0] from write_row() above. */
|
|
key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
|
|
if (table->file->ha_index_read_map(table->record[1], key_buffer,
|
|
HA_WHOLE_KEY, HA_READ_KEY_EXACT))
|
|
/* We cannot find the row, assume it is already deleted. */
|
|
;
|
|
else if ((err= table->file->ha_delete_row(table->record[1])))
|
|
table->file->print_error(err, MYF(0));
|
|
/*
|
|
In case of error, we still discard the element from the list. We do
|
|
not want to endlessly error on the same element in case of table
|
|
corruption or such.
|
|
*/
|
|
my_free(elist);
|
|
elist= next;
|
|
if (err)
|
|
break;
|
|
}
|
|
IF_DBUG(dbug_break:, )
|
|
table->file->ha_index_end();
|
|
|
|
end:
|
|
|
|
if (table_opened)
|
|
{
|
|
if (err)
|
|
{
|
|
/*
|
|
If error, we need to put any remaining elist back into the HASH so we
|
|
can do another delete attempt later.
|
|
*/
|
|
if (elist)
|
|
{
|
|
lock();
|
|
put_back_list(gtid->domain_id, elist);
|
|
unlock();
|
|
}
|
|
|
|
ha_rollback_trans(thd, FALSE);
|
|
close_thread_tables(thd);
|
|
}
|
|
else
|
|
{
|
|
ha_commit_trans(thd, FALSE);
|
|
close_thread_tables(thd);
|
|
}
|
|
if (in_transaction)
|
|
thd->mdl_context.release_statement_locks();
|
|
else
|
|
thd->mdl_context.release_transactional_locks();
|
|
}
|
|
thd->lex->restore_backup_query_tables_list(&lex_backup);
|
|
thd->variables.option_bits= thd_saved_option;
|
|
return err;
|
|
}
|
|
|
|
|
|
uint64
|
|
rpl_slave_state::next_sub_id(uint32 domain_id)
|
|
{
|
|
uint64 sub_id= 0;
|
|
element *elem;
|
|
|
|
lock();
|
|
elem= get_element(domain_id);
|
|
if (elem)
|
|
sub_id= ++elem->last_sub_id;
|
|
unlock();
|
|
|
|
return sub_id;
|
|
}
|
|
|
|
|
|
bool
|
|
rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
|
|
{
|
|
if (*first)
|
|
*first= false;
|
|
else
|
|
if (dest->append(",",1))
|
|
return true;
|
|
return
|
|
dest->append_ulonglong(gtid->domain_id) ||
|
|
dest->append("-",1) ||
|
|
dest->append_ulonglong(gtid->server_id) ||
|
|
dest->append("-",1) ||
|
|
dest->append_ulonglong(gtid->seq_no);
|
|
}
|
|
|
|
|
|
int
|
|
rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
|
|
rpl_gtid *extra_gtids, uint32 num_extra)
|
|
{
|
|
uint32 i;
|
|
HASH gtid_hash;
|
|
uchar *rec;
|
|
rpl_gtid *gtid;
|
|
int res= 1;
|
|
|
|
my_hash_init(>id_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id),
|
|
sizeof(uint32), NULL, NULL, HASH_UNIQUE);
|
|
for (i= 0; i < num_extra; ++i)
|
|
if (extra_gtids[i].server_id == global_system_variables.server_id &&
|
|
my_hash_insert(>id_hash, (uchar *)(&extra_gtids[i])))
|
|
goto err;
|
|
|
|
lock();
|
|
|
|
for (i= 0; i < hash.records; ++i)
|
|
{
|
|
uint64 best_sub_id;
|
|
rpl_gtid best_gtid;
|
|
element *e= (element *)my_hash_element(&hash, i);
|
|
list_element *l= e->list;
|
|
|
|
if (!l)
|
|
continue; /* Nothing here */
|
|
|
|
best_gtid.domain_id= e->domain_id;
|
|
best_gtid.server_id= l->server_id;
|
|
best_gtid.seq_no= l->seq_no;
|
|
best_sub_id= l->sub_id;
|
|
while ((l= l->next))
|
|
{
|
|
if (l->sub_id > best_sub_id)
|
|
{
|
|
best_sub_id= l->sub_id;
|
|
best_gtid.server_id= l->server_id;
|
|
best_gtid.seq_no= l->seq_no;
|
|
}
|
|
}
|
|
|
|
/* Check if we have something newer in the extra list. */
|
|
rec= my_hash_search(>id_hash, (const uchar *)&best_gtid.domain_id, 0);
|
|
if (rec)
|
|
{
|
|
gtid= (rpl_gtid *)rec;
|
|
if (gtid->seq_no > best_gtid.seq_no)
|
|
memcpy(&best_gtid, gtid, sizeof(best_gtid));
|
|
if (my_hash_delete(>id_hash, rec))
|
|
{
|
|
unlock();
|
|
goto err;
|
|
}
|
|
}
|
|
|
|
if ((res= (*cb)(&best_gtid, data)))
|
|
{
|
|
unlock();
|
|
goto err;
|
|
}
|
|
}
|
|
|
|
unlock();
|
|
|
|
/* Also add any remaining extra domain_ids. */
|
|
for (i= 0; i < gtid_hash.records; ++i)
|
|
{
|
|
gtid= (rpl_gtid *)my_hash_element(>id_hash, i);
|
|
if ((res= (*cb)(gtid, data)))
|
|
goto err;
|
|
}
|
|
|
|
res= 0;
|
|
|
|
err:
|
|
my_hash_free(>id_hash);
|
|
|
|
return res;
|
|
}
|
|
|
|
|
|
struct rpl_slave_state_tostring_data {
|
|
String *dest;
|
|
bool first;
|
|
};
|
|
static int
|
|
rpl_slave_state_tostring_cb(rpl_gtid *gtid, void *data)
|
|
{
|
|
rpl_slave_state_tostring_data *p= (rpl_slave_state_tostring_data *)data;
|
|
return rpl_slave_state_tostring_helper(p->dest, gtid, &p->first);
|
|
}
|
|
|
|
|
|
/*
|
|
Prepare the current slave state as a string, suitable for sending to the
|
|
master to request to receive binlog events starting from that GTID state.
|
|
|
|
The state consists of the most recently applied GTID for each domain_id,
|
|
ie. the one with the highest sub_id within each domain_id.
|
|
|
|
Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when
|
|
a server was previously a master and now needs to connect to a new master as
|
|
a slave. For each domain_id, if the GTID in the binlog was logged with our
|
|
own server_id _and_ has a higher seq_no than what is in the slave state,
|
|
then this should be used as the position to start replicating at. This
|
|
allows to promote a slave as new master, and connect the old master as a
|
|
slave with MASTER_GTID_POS=AUTO.
|
|
*/
|
|
int
|
|
rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
|
|
{
|
|
struct rpl_slave_state_tostring_data data;
|
|
data.first= true;
|
|
data.dest= dest;
|
|
|
|
return iterate(rpl_slave_state_tostring_cb, &data, extra_gtids, num_extra);
|
|
}
|
|
|
|
|
|
/*
|
|
Lookup a domain_id in the current replication slave state.
|
|
|
|
Returns false if the domain_id has no entries in the slave state.
|
|
Otherwise returns true, and fills in out_gtid with the corresponding
|
|
GTID.
|
|
*/
|
|
bool
|
|
rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
|
|
{
|
|
element *elem;
|
|
list_element *list;
|
|
uint64 best_sub_id;
|
|
|
|
lock();
|
|
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
|
|
if (!elem || !(list= elem->list))
|
|
{
|
|
unlock();
|
|
return false;
|
|
}
|
|
|
|
out_gtid->domain_id= domain_id;
|
|
out_gtid->server_id= list->server_id;
|
|
out_gtid->seq_no= list->seq_no;
|
|
best_sub_id= list->sub_id;
|
|
|
|
while ((list= list->next))
|
|
{
|
|
if (best_sub_id > list->sub_id)
|
|
continue;
|
|
best_sub_id= list->sub_id;
|
|
out_gtid->server_id= list->server_id;
|
|
out_gtid->seq_no= list->seq_no;
|
|
}
|
|
|
|
unlock();
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
Parse a GTID at the start of a string, and update the pointer to point
|
|
at the first character after the parsed GTID.
|
|
|
|
Returns 0 on ok, non-zero on parse error.
|
|
*/
|
|
static int
|
|
gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid)
|
|
{
|
|
char *q;
|
|
char *p= *ptr;
|
|
uint64 v1, v2, v3;
|
|
int err= 0;
|
|
|
|
q= end;
|
|
v1= (uint64)my_strtoll10(p, &q, &err);
|
|
if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-')
|
|
return 1;
|
|
p= q+1;
|
|
q= end;
|
|
v2= (uint64)my_strtoll10(p, &q, &err);
|
|
if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-')
|
|
return 1;
|
|
p= q+1;
|
|
q= end;
|
|
v3= (uint64)my_strtoll10(p, &q, &err);
|
|
if (err != 0)
|
|
return 1;
|
|
|
|
out_gtid->domain_id= v1;
|
|
out_gtid->server_id= v2;
|
|
out_gtid->seq_no= v3;
|
|
*ptr= q;
|
|
return 0;
|
|
}
|
|
|
|
|
|
rpl_gtid *
|
|
gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
|
|
{
|
|
char *p= const_cast<char *>(str);
|
|
char *end= p + str_len;
|
|
uint32 len= 0, alloc_len= 5;
|
|
rpl_gtid *list= NULL;
|
|
|
|
for (;;)
|
|
{
|
|
rpl_gtid gtid;
|
|
|
|
if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, >id))
|
|
{
|
|
my_free(list);
|
|
return NULL;
|
|
}
|
|
if ((!list || len >= alloc_len) &&
|
|
!(list=
|
|
(rpl_gtid *)my_realloc(list,
|
|
(alloc_len= alloc_len*2) * sizeof(rpl_gtid),
|
|
MYF(MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR))))
|
|
return NULL;
|
|
list[len++]= gtid;
|
|
|
|
if (p == end)
|
|
break;
|
|
if (*p != ',')
|
|
{
|
|
my_free(list);
|
|
return NULL;
|
|
}
|
|
++p;
|
|
}
|
|
*out_len= len;
|
|
return list;
|
|
}
|
|
|
|
|
|
/*
|
|
Update the slave replication state with the GTID position obtained from
|
|
master when connecting with old-style (filename,offset) position.
|
|
|
|
If RESET is true then all existing entries are removed. Otherwise only
|
|
domain_ids mentioned in the STATE_FROM_MASTER are changed.
|
|
|
|
Returns 0 if ok, non-zero if error.
|
|
*/
|
|
int
|
|
rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
|
|
bool reset, bool in_statement)
|
|
{
|
|
char *end= state_from_master + len;
|
|
|
|
if (reset)
|
|
{
|
|
if (truncate_state_table(thd))
|
|
return 1;
|
|
truncate_hash();
|
|
}
|
|
if (state_from_master == end)
|
|
return 0;
|
|
for (;;)
|
|
{
|
|
rpl_gtid gtid;
|
|
uint64 sub_id;
|
|
|
|
if (gtid_parser_helper(&state_from_master, end, >id) ||
|
|
!(sub_id= next_sub_id(gtid.domain_id)) ||
|
|
record_gtid(thd, >id, sub_id, false, in_statement) ||
|
|
update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no))
|
|
return 1;
|
|
if (state_from_master == end)
|
|
break;
|
|
if (*state_from_master != ',')
|
|
return 1;
|
|
++state_from_master;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
bool
|
|
rpl_slave_state::is_empty()
|
|
{
|
|
uint32 i;
|
|
bool result= true;
|
|
|
|
lock();
|
|
for (i= 0; i < hash.records; ++i)
|
|
{
|
|
element *e= (element *)my_hash_element(&hash, i);
|
|
if (e->list)
|
|
{
|
|
result= false;
|
|
break;
|
|
}
|
|
}
|
|
unlock();
|
|
|
|
return result;
|
|
}
|
|
|
|
|
|
rpl_binlog_state::rpl_binlog_state()
|
|
{
|
|
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
|
|
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
|
|
mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
|
|
MY_MUTEX_INIT_SLOW);
|
|
initialized= 1;
|
|
}
|
|
|
|
|
|
void
|
|
rpl_binlog_state::reset()
|
|
{
|
|
uint32 i;
|
|
|
|
for (i= 0; i < hash.records; ++i)
|
|
my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
|
|
my_hash_reset(&hash);
|
|
}
|
|
|
|
void rpl_binlog_state::free()
|
|
{
|
|
if (initialized)
|
|
{
|
|
initialized= 0;
|
|
reset();
|
|
my_hash_free(&hash);
|
|
mysql_mutex_destroy(&LOCK_binlog_state);
|
|
}
|
|
}
|
|
|
|
|
|
bool
|
|
rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
|
|
{
|
|
uint32 i;
|
|
|
|
reset();
|
|
for (i= 0; i < count; ++i)
|
|
{
|
|
if (update(&(list[i]), false))
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
|
|
rpl_binlog_state::~rpl_binlog_state()
|
|
{
|
|
free();
|
|
}
|
|
|
|
|
|
/*
|
|
Update replication state with a new GTID.
|
|
|
|
If the (domain_id, server_id) pair already exists, then the new GTID replaces
|
|
the old one for that domain id. Else a new entry is inserted.
|
|
|
|
Returns 0 for ok, 1 for error.
|
|
*/
|
|
int
|
|
rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
|
|
{
|
|
element *elem;
|
|
|
|
if ((elem= (element *)my_hash_search(&hash,
|
|
(const uchar *)(>id->domain_id), 0)))
|
|
{
|
|
if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no)
|
|
{
|
|
my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id,
|
|
gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id,
|
|
elem->last_gtid->server_id, elem->last_gtid->seq_no);
|
|
return 1;
|
|
}
|
|
if (elem->seq_no_counter < gtid->seq_no)
|
|
elem->seq_no_counter= gtid->seq_no;
|
|
if (!elem->update_element(gtid))
|
|
return 0;
|
|
}
|
|
else if (!alloc_element(gtid))
|
|
return 0;
|
|
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
return 1;
|
|
}
|
|
|
|
|
|
/*
|
|
Fill in a new GTID, allocating next sequence number, and update state
|
|
accordingly.
|
|
*/
|
|
int
|
|
rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
|
|
rpl_gtid *gtid)
|
|
{
|
|
element *elem;
|
|
|
|
gtid->domain_id= domain_id;
|
|
gtid->server_id= server_id;
|
|
|
|
if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
|
|
{
|
|
gtid->seq_no= ++elem->seq_no_counter;
|
|
if (!elem->update_element(gtid))
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
gtid->seq_no= 1;
|
|
if (!alloc_element(gtid))
|
|
return 0;
|
|
}
|
|
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
return 1;
|
|
}
|
|
|
|
|
|
/* Helper functions for update. */
|
|
int
|
|
rpl_binlog_state::element::update_element(const rpl_gtid *gtid)
|
|
{
|
|
rpl_gtid *lookup_gtid;
|
|
|
|
/*
|
|
By far the most common case is that successive events within same
|
|
replication domain have the same server id (it changes only when
|
|
switching to a new master). So save a hash lookup in this case.
|
|
*/
|
|
if (likely(last_gtid && last_gtid->server_id == gtid->server_id))
|
|
{
|
|
last_gtid->seq_no= gtid->seq_no;
|
|
return 0;
|
|
}
|
|
|
|
lookup_gtid= (rpl_gtid *)
|
|
my_hash_search(&hash, (const uchar *)>id->server_id, 0);
|
|
if (lookup_gtid)
|
|
{
|
|
lookup_gtid->seq_no= gtid->seq_no;
|
|
last_gtid= lookup_gtid;
|
|
return 0;
|
|
}
|
|
|
|
/* Allocate a new GTID and insert it. */
|
|
lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
|
|
if (!lookup_gtid)
|
|
return 1;
|
|
memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
|
|
if (my_hash_insert(&hash, (const uchar *)lookup_gtid))
|
|
{
|
|
my_free(lookup_gtid);
|
|
return 1;
|
|
}
|
|
last_gtid= lookup_gtid;
|
|
return 0;
|
|
}
|
|
|
|
|
|
int
|
|
rpl_binlog_state::alloc_element(const rpl_gtid *gtid)
|
|
{
|
|
element *elem;
|
|
rpl_gtid *lookup_gtid;
|
|
|
|
/* First time we see this domain_id; allocate a new element. */
|
|
elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
|
|
lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
|
|
if (elem && lookup_gtid)
|
|
{
|
|
elem->domain_id= gtid->domain_id;
|
|
my_hash_init(&elem->hash, &my_charset_bin, 32,
|
|
offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
|
|
HASH_UNIQUE);
|
|
elem->last_gtid= lookup_gtid;
|
|
elem->seq_no_counter= gtid->seq_no;
|
|
memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
|
|
if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
|
|
{
|
|
lookup_gtid= NULL; /* Do not free. */
|
|
if (0 == my_hash_insert(&hash, (const uchar *)elem))
|
|
return 0;
|
|
}
|
|
my_hash_free(&elem->hash);
|
|
}
|
|
|
|
/* An error. */
|
|
if (elem)
|
|
my_free(elem);
|
|
if (lookup_gtid)
|
|
my_free(lookup_gtid);
|
|
return 1;
|
|
}
|
|
|
|
|
|
/*
|
|
Check that a new GTID can be logged without creating an out-of-order
|
|
sequence number with existing GTIDs.
|
|
*/
|
|
bool
|
|
rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
|
|
uint64 seq_no)
|
|
{
|
|
element *elem;
|
|
|
|
if ((elem= (element *)my_hash_search(&hash,
|
|
(const uchar *)(&domain_id), 0)) &&
|
|
elem->last_gtid && elem->last_gtid->seq_no >= seq_no)
|
|
{
|
|
my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no,
|
|
elem->last_gtid->domain_id, elem->last_gtid->server_id,
|
|
elem->last_gtid->seq_no);
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
/*
|
|
When we see a new GTID that will not be binlogged (eg. slave thread
|
|
with --log-slave-updates=0), then we need to remember to allocate any
|
|
GTID seq_no of our own within that domain starting from there.
|
|
|
|
Returns 0 if ok, non-zero if out-of-memory.
|
|
*/
|
|
int
|
|
rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
|
|
{
|
|
element *elem;
|
|
|
|
if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
|
|
{
|
|
if (elem->seq_no_counter < seq_no)
|
|
elem->seq_no_counter= seq_no;
|
|
return 0;
|
|
}
|
|
|
|
/* We need to allocate a new, empty element to remember the next seq_no. */
|
|
if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
|
|
return 1;
|
|
|
|
elem->domain_id= domain_id;
|
|
my_hash_init(&elem->hash, &my_charset_bin, 32,
|
|
offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
|
|
HASH_UNIQUE);
|
|
elem->last_gtid= NULL;
|
|
elem->seq_no_counter= seq_no;
|
|
if (0 == my_hash_insert(&hash, (const uchar *)elem))
|
|
return 0;
|
|
|
|
my_hash_free(&elem->hash);
|
|
my_free(elem);
|
|
return 1;
|
|
}
|
|
|
|
|
|
/*
|
|
Write binlog state to text file, so we can read it in again without having
|
|
to scan last binlog file (normal shutdown/startup, not crash recovery).
|
|
|
|
The most recent GTID within each domain_id is written after any other GTID
|
|
within this domain.
|
|
*/
|
|
int
|
|
rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
|
|
{
|
|
ulong i, j;
|
|
char buf[21];
|
|
|
|
for (i= 0; i < hash.records; ++i)
|
|
{
|
|
size_t res;
|
|
element *e= (element *)my_hash_element(&hash, i);
|
|
if (!e->last_gtid)
|
|
{
|
|
DBUG_ASSERT(e->hash.records == 0);
|
|
continue;
|
|
}
|
|
for (j= 0; j <= e->hash.records; ++j)
|
|
{
|
|
const rpl_gtid *gtid;
|
|
if (j < e->hash.records)
|
|
{
|
|
gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
|
|
if (gtid == e->last_gtid)
|
|
continue;
|
|
}
|
|
else
|
|
gtid= e->last_gtid;
|
|
|
|
longlong10_to_str(gtid->seq_no, buf, 10);
|
|
res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf);
|
|
if (res == (size_t) -1)
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
int
|
|
rpl_binlog_state::read_from_iocache(IO_CACHE *src)
|
|
{
|
|
/* 10-digit - 10-digit - 20-digit \n \0 */
|
|
char buf[10+1+10+1+20+1+1];
|
|
char *p, *end;
|
|
rpl_gtid gtid;
|
|
|
|
reset();
|
|
for (;;)
|
|
{
|
|
size_t res= my_b_gets(src, buf, sizeof(buf));
|
|
if (!res)
|
|
break;
|
|
p= buf;
|
|
end= buf + res;
|
|
if (gtid_parser_helper(&p, end, >id))
|
|
return 1;
|
|
if (update(>id, false))
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
rpl_gtid *
|
|
rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
|
|
{
|
|
element *elem;
|
|
if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
|
|
return NULL;
|
|
return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
|
|
}
|
|
|
|
rpl_gtid *
|
|
rpl_binlog_state::find_most_recent(uint32 domain_id)
|
|
{
|
|
element *elem;
|
|
|
|
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
|
|
if (elem && elem->last_gtid)
|
|
return elem->last_gtid;
|
|
return NULL;
|
|
}
|
|
|
|
|
|
uint32
|
|
rpl_binlog_state::count()
|
|
{
|
|
uint32 c= 0;
|
|
uint32 i;
|
|
|
|
for (i= 0; i < hash.records; ++i)
|
|
c+= ((element *)my_hash_element(&hash, i))->hash.records;
|
|
|
|
return c;
|
|
}
|
|
|
|
|
|
int
|
|
rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
|
|
{
|
|
uint32 i, j, pos;
|
|
|
|
pos= 0;
|
|
for (i= 0; i < hash.records; ++i)
|
|
{
|
|
element *e= (element *)my_hash_element(&hash, i);
|
|
if (!e->last_gtid)
|
|
{
|
|
DBUG_ASSERT(e->hash.records==0);
|
|
continue;
|
|
}
|
|
for (j= 0; j <= e->hash.records; ++j)
|
|
{
|
|
const rpl_gtid *gtid;
|
|
if (j < e->hash.records)
|
|
{
|
|
gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
|
|
if (gtid == e->last_gtid)
|
|
continue;
|
|
}
|
|
else
|
|
gtid= e->last_gtid;
|
|
|
|
if (pos >= list_size)
|
|
return 1;
|
|
memcpy(>id_list[pos++], gtid, sizeof(*gtid));
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
/*
|
|
Get a list of the most recently binlogged GTID, for each domain_id.
|
|
|
|
This can be used when switching from being a master to being a slave,
|
|
to know where to start replicating from the new master.
|
|
|
|
The returned list must be de-allocated with my_free().
|
|
|
|
Returns 0 for ok, non-zero for out-of-memory.
|
|
*/
|
|
int
|
|
rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
|
|
{
|
|
uint32 i;
|
|
uint32 alloc_size, out_size;
|
|
|
|
alloc_size= hash.records;
|
|
if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid),
|
|
MYF(MY_WME))))
|
|
return 1;
|
|
out_size= 0;
|
|
for (i= 0; i < alloc_size; ++i)
|
|
{
|
|
element *e= (element *)my_hash_element(&hash, i);
|
|
if (!e->last_gtid)
|
|
continue;
|
|
memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid));
|
|
}
|
|
|
|
*size= out_size;
|
|
return 0;
|
|
}
|
|
|
|
|
|
bool
|
|
rpl_binlog_state::append_pos(String *str)
|
|
{
|
|
uint32 i;
|
|
bool first= true;
|
|
|
|
for (i= 0; i < hash.records; ++i)
|
|
{
|
|
element *e= (element *)my_hash_element(&hash, i);
|
|
if (e->last_gtid &&
|
|
rpl_slave_state_tostring_helper(str, e->last_gtid, &first))
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
bool
|
|
rpl_binlog_state::append_state(String *str)
|
|
{
|
|
uint32 i, j;
|
|
bool first= true;
|
|
|
|
for (i= 0; i < hash.records; ++i)
|
|
{
|
|
element *e= (element *)my_hash_element(&hash, i);
|
|
if (!e->last_gtid)
|
|
{
|
|
DBUG_ASSERT(e->hash.records==0);
|
|
continue;
|
|
}
|
|
for (j= 0; j <= e->hash.records; ++j)
|
|
{
|
|
const rpl_gtid *gtid;
|
|
if (j < e->hash.records)
|
|
{
|
|
gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
|
|
if (gtid == e->last_gtid)
|
|
continue;
|
|
}
|
|
else
|
|
gtid= e->last_gtid;
|
|
|
|
if (rpl_slave_state_tostring_helper(str, gtid, &first))
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
slave_connection_state::slave_connection_state()
|
|
{
|
|
my_hash_init(&hash, &my_charset_bin, 32,
|
|
offsetof(entry, gtid) + offsetof(rpl_gtid, domain_id),
|
|
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
|
|
}
|
|
|
|
|
|
slave_connection_state::~slave_connection_state()
|
|
{
|
|
my_hash_free(&hash);
|
|
}
|
|
|
|
|
|
/*
|
|
Create a hash from the slave GTID state that is sent to master when slave
|
|
connects to start replication.
|
|
|
|
The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
|
|
|
|
0-2-112,1-4-1022
|
|
|
|
The state gives for each domain_id the GTID to start replication from for
|
|
the corresponding replication stream. So domain_id must be unique.
|
|
|
|
Returns 0 if ok, non-zero if error due to malformed input.
|
|
|
|
Note that input string is built by slave server, so it will not be incorrect
|
|
unless bug/corruption/malicious server. So we just need basic sanity check,
|
|
not fancy user-friendly error message.
|
|
*/
|
|
|
|
int
|
|
slave_connection_state::load(char *slave_request, size_t len)
|
|
{
|
|
char *p, *end;
|
|
uchar *rec;
|
|
rpl_gtid *gtid;
|
|
const entry *e;
|
|
|
|
reset();
|
|
p= slave_request;
|
|
end= slave_request + len;
|
|
if (p == end)
|
|
return 0;
|
|
for (;;)
|
|
{
|
|
if (!(rec= (uchar *)my_malloc(sizeof(entry), MYF(MY_WME))))
|
|
{
|
|
my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*gtid));
|
|
return 1;
|
|
}
|
|
gtid= &((entry *)rec)->gtid;
|
|
if (gtid_parser_helper(&p, end, gtid))
|
|
{
|
|
my_free(rec);
|
|
my_error(ER_INCORRECT_GTID_STATE, MYF(0));
|
|
return 1;
|
|
}
|
|
if ((e= (const entry *)
|
|
my_hash_search(&hash, (const uchar *)(>id->domain_id), 0)))
|
|
{
|
|
my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id,
|
|
gtid->server_id, (ulonglong)gtid->seq_no, e->gtid.domain_id,
|
|
e->gtid.server_id, (ulonglong)e->gtid.seq_no, gtid->domain_id);
|
|
my_free(rec);
|
|
return 1;
|
|
}
|
|
((entry *)rec)->flags= 0;
|
|
if (my_hash_insert(&hash, rec))
|
|
{
|
|
my_free(rec);
|
|
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
|
return 1;
|
|
}
|
|
if (p == end)
|
|
break; /* Finished. */
|
|
if (*p != ',')
|
|
{
|
|
my_error(ER_INCORRECT_GTID_STATE, MYF(0));
|
|
return 1;
|
|
}
|
|
++p;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
int
|
|
slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
|
|
{
|
|
uint32 i;
|
|
|
|
reset();
|
|
for (i= 0; i < count; ++i)
|
|
if (update(>id_list[i]))
|
|
return 1;
|
|
return 0;
|
|
}
|
|
|
|
|
|
static int
|
|
slave_connection_state_load_cb(rpl_gtid *gtid, void *data)
|
|
{
|
|
slave_connection_state *state= (slave_connection_state *)data;
|
|
return state->update(gtid);
|
|
}
|
|
|
|
|
|
/*
|
|
Same as rpl_slave_state::tostring(), but populates a slave_connection_state
|
|
instead.
|
|
*/
|
|
int
|
|
slave_connection_state::load(rpl_slave_state *state,
|
|
rpl_gtid *extra_gtids, uint32 num_extra)
|
|
{
|
|
reset();
|
|
return state->iterate(slave_connection_state_load_cb, this,
|
|
extra_gtids, num_extra);
|
|
}
|
|
|
|
|
|
slave_connection_state::entry *
|
|
slave_connection_state::find_entry(uint32 domain_id)
|
|
{
|
|
return (entry *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
|
|
}
|
|
|
|
|
|
rpl_gtid *
|
|
slave_connection_state::find(uint32 domain_id)
|
|
{
|
|
entry *e= find_entry(domain_id);
|
|
if (!e)
|
|
return NULL;
|
|
return &e->gtid;
|
|
}
|
|
|
|
|
|
int
|
|
slave_connection_state::update(const rpl_gtid *in_gtid)
|
|
{
|
|
entry *e;
|
|
uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
|
|
if (rec)
|
|
{
|
|
e= (entry *)rec;
|
|
e->gtid= *in_gtid;
|
|
return 0;
|
|
}
|
|
|
|
if (!(e= (entry *)my_malloc(sizeof(*e), MYF(MY_WME))))
|
|
return 1;
|
|
e->gtid= *in_gtid;
|
|
e->flags= 0;
|
|
if (my_hash_insert(&hash, (uchar *)e))
|
|
{
|
|
my_free(e);
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
void
|
|
slave_connection_state::remove(const rpl_gtid *in_gtid)
|
|
{
|
|
uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
|
|
#ifndef DBUG_OFF
|
|
bool err;
|
|
rpl_gtid *slave_gtid= &((entry *)rec)->gtid;
|
|
DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
|
|
DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
|
|
DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
|
|
#endif
|
|
|
|
IF_DBUG(err=, )
|
|
my_hash_delete(&hash, rec);
|
|
DBUG_ASSERT(!err);
|
|
}
|
|
|
|
|
|
void
|
|
slave_connection_state::remove_if_present(const rpl_gtid *in_gtid)
|
|
{
|
|
uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
|
|
if (rec)
|
|
my_hash_delete(&hash, rec);
|
|
}
|
|
|
|
|
|
int
|
|
slave_connection_state::to_string(String *out_str)
|
|
{
|
|
out_str->length(0);
|
|
return append_to_string(out_str);
|
|
}
|
|
|
|
|
|
int
|
|
slave_connection_state::append_to_string(String *out_str)
|
|
{
|
|
uint32 i;
|
|
bool first;
|
|
|
|
first= true;
|
|
for (i= 0; i < hash.records; ++i)
|
|
{
|
|
const entry *e= (const entry *)my_hash_element(&hash, i);
|
|
if (rpl_slave_state_tostring_helper(out_str, &e->gtid, &first))
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
int
|
|
slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
|
|
{
|
|
uint32 i, pos;
|
|
|
|
pos= 0;
|
|
for (i= 0; i < hash.records; ++i)
|
|
{
|
|
entry *e;
|
|
if (pos >= list_size)
|
|
return 1;
|
|
e= (entry *)my_hash_element(&hash, i);
|
|
memcpy(>id_list[pos++], &e->gtid, sizeof(e->gtid));
|
|
}
|
|
|
|
return 0;
|
|
}
|