MDEV-34705: Binlog in Engine: Refactor to add rpl_binlog_state_base::iterate()

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen 2024-10-02 12:36:36 +02:00
parent 82ec0e4cde
commit 91ae144e53
2 changed files with 68 additions and 112 deletions

View file

@ -79,6 +79,51 @@ struct rpl_binlog_state_base
int get_gtid_list_nolock(rpl_gtid *gtid_list, uint32 list_size);
rpl_gtid *find_nolock(uint32 domain_id, uint32 server_id);
bool is_before_pos(slave_connection_state *pos);
/*
Inline iterator over a binlog state, most recent GTID comes last for each
domain just like get_gtid_list_nolock().
The ITERATOR_FUNC should have signature bool f(const rpl_gtid *), and
return true in case of error (in which case iterate() aborts and also
returns true).
Intended to do custom GTID state processing without requiring the overhead
of an intermediate list, and where the extra code generation is justified.
*/
template <typename F> bool iterate(F iterator_func)
{
uint32 i, j;
ulong outer_records= hash.records;
for (i= 0; i < outer_records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
ulong inner_records= e->hash.records;
const rpl_gtid *last_gtid= e->last_gtid;
if (unlikely(!last_gtid))
{
DBUG_ASSERT(inner_records==0);
continue;
}
for (j= 0; j <= inner_records; ++j)
{
const rpl_gtid *gtid;
if (j < inner_records)
{
gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
if (gtid == last_gtid)
continue;
}
else
gtid= e->last_gtid;
if (iterator_func(gtid))
return true;
}
}
return false; // No error
}
};

View file

@ -1609,28 +1609,11 @@ rpl_binlog_state_base::load_nolock(struct rpl_gtid *list, uint32 count)
bool
rpl_binlog_state_base::load_nolock(rpl_binlog_state_base *orig_state)
{
ulong i, j;
HASH *h1= &orig_state->hash;
reset_nolock();
for (i= 0; i < h1->records; ++i)
{
element *e= (element *)my_hash_element(h1, i);
HASH *h2= &e->hash;
const rpl_gtid *last_gtid= e->last_gtid;
for (j= 0; j < h2->records; ++j)
{
const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(h2, j);
if (gtid == last_gtid)
continue;
if (update_nolock(gtid))
return true;
}
if (likely(last_gtid) && update_nolock(last_gtid))
return true;
}
return false;
return orig_state->iterate(
[this] (const rpl_gtid *gtid) {
return update_nolock(gtid);
});
}
@ -1719,36 +1702,14 @@ rpl_binlog_state_base::count_nolock()
int
rpl_binlog_state_base::get_gtid_list_nolock(rpl_gtid *gtid_list, uint32 list_size)
{
uint32 i, j, pos;
pos= 0;
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
if (!e->last_gtid)
{
DBUG_ASSERT(e->hash.records==0);
continue;
}
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
if (j < e->hash.records)
{
gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
if (gtid == e->last_gtid)
continue;
}
else
gtid= e->last_gtid;
uint32_t pos= 0;
return iterate(
[&gtid_list, list_size, &pos] (const rpl_gtid *gtid) {
if (pos >= list_size)
return 1;
return true;
memcpy(&gtid_list[pos++], gtid, sizeof(*gtid));
}
}
return 0;
return false;
});
}
@ -2099,42 +2060,17 @@ end:
int
rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
{
ulong i, j;
char buf[21];
int res= 0;
mysql_mutex_lock(&LOCK_binlog_state);
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
if (!e->last_gtid)
{
DBUG_ASSERT(e->hash.records == 0);
continue;
}
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
if (j < e->hash.records)
{
gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
if (gtid == e->last_gtid)
continue;
}
else
gtid= e->last_gtid;
longlong10_to_str(gtid->seq_no, buf, 10);
if (my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id,
buf))
{
res= 1;
goto end;
}
}
}
int res= iterate([&buf, dest] (const rpl_gtid *gtid) {
longlong10_to_str(gtid->seq_no, buf, 10);
if (my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf))
return true;
return false;
});
end:
mysql_mutex_unlock(&LOCK_binlog_state);
return res;
}
@ -2286,43 +2222,18 @@ rpl_binlog_state::append_pos(String *str)
bool
rpl_binlog_state::append_state(String *str)
{
uint32 i, j;
bool res= false;
mysql_mutex_lock(&LOCK_binlog_state);
reset_dynamic(&gtid_sort_array);
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
if (!e->last_gtid)
{
DBUG_ASSERT(e->hash.records==0);
continue;
}
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
if (j < e->hash.records)
{
gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
if (gtid == e->last_gtid)
continue;
}
else
gtid= e->last_gtid;
bool res= iterate([this] (const rpl_gtid *gtid) {
if (insert_dynamic(&gtid_sort_array, (const void *) gtid))
return true;
return false;
});
if (insert_dynamic(&gtid_sort_array, (const void *) gtid))
{
res= true;
goto end;
}
}
}
if (likely(!res))
rpl_slave_state_tostring_helper(&gtid_sort_array, str);
rpl_slave_state_tostring_helper(&gtid_sort_array, str);
end:
mysql_mutex_unlock(&LOCK_binlog_state);
return res;
}