MDEV-4478: Implement GTID "strict mode"

When @@GLOBAL.gtid_strict_mode=1, then certain operations result
in error that would otherwise result in out-of-order binlog files
between servers.

GTID sequence numbers are now allocated independently per domain;
this results in less/no holes in GTID sequences, increasing the
likelyhood that diverging binlogs will be caught by the slave when
GTID strict mode is enabled.
This commit is contained in:
unknown 2013-05-28 13:28:31 +02:00
commit ee2b7db3f8
24 changed files with 896 additions and 169 deletions

View file

@ -370,7 +370,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
}
table->file->ha_index_end();
mysql_bin_log.bump_seq_no_counter_if_needed(gtid->seq_no);
if(!err && opt_bin_log &&
(err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
gtid->seq_no)))
my_error(ER_OUT_OF_RESOURCES, MYF(0));
end:
@ -719,7 +722,7 @@ rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
reset();
for (i= 0; i < count; ++i)
{
if (update(&(list[i])))
if (update(&(list[i]), false))
return true;
}
return false;
@ -741,48 +744,111 @@ rpl_binlog_state::~rpl_binlog_state()
Returns 0 for ok, 1 for error.
*/
int
rpl_binlog_state::update(const struct rpl_gtid *gtid)
rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
{
rpl_gtid *lookup_gtid;
element *elem;
elem= (element *)my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0);
if (elem)
if ((elem= (element *)my_hash_search(&hash,
(const uchar *)(&gtid->domain_id), 0)))
{
/*
By far the most common case is that successive events within same
replication domain have the same server id (it changes only when
switching to a new master). So save a hash lookup in this case.
*/
if (likely(elem->last_gtid->server_id == gtid->server_id))
if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no)
{
elem->last_gtid->seq_no= gtid->seq_no;
return 0;
}
lookup_gtid= (rpl_gtid *)
my_hash_search(&elem->hash, (const uchar *)&gtid->server_id, 0);
if (lookup_gtid)
{
lookup_gtid->seq_no= gtid->seq_no;
elem->last_gtid= lookup_gtid;
return 0;
}
/* Allocate a new GTID and insert it. */
lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
if (!lookup_gtid)
return 1;
memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
{
my_free(lookup_gtid);
my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id,
gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id,
elem->last_gtid->server_id, elem->last_gtid->seq_no);
return 1;
}
elem->last_gtid= lookup_gtid;
if (elem->seq_no_counter < gtid->seq_no)
elem->seq_no_counter= gtid->seq_no;
if (!elem->update_element(gtid))
return 0;
}
else if (!alloc_element(gtid))
return 0;
my_error(ER_OUT_OF_RESOURCES, MYF(0));
return 1;
}
/*
Fill in a new GTID, allocating next sequence number, and update state
accordingly.
*/
int
rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
rpl_gtid *gtid)
{
element *elem;
gtid->domain_id= domain_id;
gtid->server_id= server_id;
if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
{
gtid->seq_no= ++elem->seq_no_counter;
if (!elem->update_element(gtid))
return 0;
}
else
{
gtid->seq_no= 1;
if (!alloc_element(gtid))
return 0;
}
my_error(ER_OUT_OF_RESOURCES, MYF(0));
return 1;
}
/* Helper functions for update. */
int
rpl_binlog_state::element::update_element(const rpl_gtid *gtid)
{
rpl_gtid *lookup_gtid;
/*
By far the most common case is that successive events within same
replication domain have the same server id (it changes only when
switching to a new master). So save a hash lookup in this case.
*/
if (likely(last_gtid && last_gtid->server_id == gtid->server_id))
{
last_gtid->seq_no= gtid->seq_no;
return 0;
}
lookup_gtid= (rpl_gtid *)
my_hash_search(&hash, (const uchar *)&gtid->server_id, 0);
if (lookup_gtid)
{
lookup_gtid->seq_no= gtid->seq_no;
last_gtid= lookup_gtid;
return 0;
}
/* Allocate a new GTID and insert it. */
lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
if (!lookup_gtid)
return 1;
memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
if (my_hash_insert(&hash, (const uchar *)lookup_gtid))
{
my_free(lookup_gtid);
return 1;
}
last_gtid= lookup_gtid;
return 0;
}
int
rpl_binlog_state::alloc_element(const rpl_gtid *gtid)
{
element *elem;
rpl_gtid *lookup_gtid;
/* First time we see this domain_id; allocate a new element. */
elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
@ -793,6 +859,7 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid)
offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
HASH_UNIQUE);
elem->last_gtid= lookup_gtid;
elem->seq_no_counter= gtid->seq_no;
memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
{
@ -812,23 +879,64 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid)
}
uint64
rpl_binlog_state::seq_no_from_state()
/*
Check that a new GTID can be logged without creating an out-of-order
sequence number with existing GTIDs.
*/
bool
rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
uint64 seq_no)
{
ulong i, j;
uint64 seq_no= 0;
element *elem;
for (i= 0; i < hash.records; ++i)
if ((elem= (element *)my_hash_search(&hash,
(const uchar *)(&domain_id), 0)) &&
elem->last_gtid && elem->last_gtid->seq_no >= seq_no)
{
element *e= (element *)my_hash_element(&hash, i);
for (j= 0; j < e->hash.records; ++j)
{
const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
if (gtid->seq_no > seq_no)
seq_no= gtid->seq_no;
}
my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no,
elem->last_gtid->domain_id, elem->last_gtid->server_id,
elem->last_gtid->seq_no);
return 1;
}
return seq_no;
return 0;
}
/*
When we see a new GTID that will not be binlogged (eg. slave thread
with --log-slave-updates=0), then we need to remember to allocate any
GTID seq_no of our own within that domain starting from there.
Returns 0 if ok, non-zero if out-of-memory.
*/
int
rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
{
element *elem;
if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
{
if (elem->seq_no_counter < seq_no)
elem->seq_no_counter= seq_no;
return 0;
}
/* We need to allocate a new, empty element to remember the next seq_no. */
if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
return 1;
elem->domain_id= domain_id;
my_hash_init(&elem->hash, &my_charset_bin, 32,
offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
HASH_UNIQUE);
elem->last_gtid= NULL;
elem->seq_no_counter= seq_no;
if (0 == my_hash_insert(&hash, (const uchar *)elem))
return 0;
my_hash_free(&elem->hash);
my_free(elem);
return 1;
}
@ -849,6 +957,11 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
{
size_t res;
element *e= (element *)my_hash_element(&hash, i);
if (!e->last_gtid)
{
DBUG_ASSERT(e->hash.records == 0);
continue;
}
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
@ -890,7 +1003,7 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src)
end= buf + res;
if (gtid_parser_helper(&p, end, &gtid))
return 1;
if (update(&gtid))
if (update(&gtid, false))
return 1;
}
return 0;
@ -906,6 +1019,17 @@ rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
}
rpl_gtid *
rpl_binlog_state::find_most_recent(uint32 domain_id)
{
element *elem;
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (elem && elem->last_gtid)
return elem->last_gtid;
return NULL;
}
uint32
rpl_binlog_state::count()
@ -929,6 +1053,11 @@ rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
if (!e->last_gtid)
{
DBUG_ASSERT(e->hash.records==0);
continue;
}
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
@ -965,16 +1094,22 @@ int
rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
{
uint32 i;
uint32 alloc_size, out_size;
*size= hash.records;
if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME))))
alloc_size= hash.records;
if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid),
MYF(MY_WME))))
return 1;
for (i= 0; i < *size; ++i)
out_size= 0;
for (i= 0; i < alloc_size; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid));
if (!e->last_gtid)
continue;
memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid));
}
*size= out_size;
return 0;
}
@ -988,7 +1123,8 @@ rpl_binlog_state::append_pos(String *str)
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
if (rpl_slave_state_tostring_helper(str, e->last_gtid, &first))
if (e->last_gtid &&
rpl_slave_state_tostring_helper(str, e->last_gtid, &first))
return true;
}