MDEV-34705: out-of band binlogging, partial untested commit to do a separate refactoring of end_event

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen 2024-11-17 20:20:53 +01:00
parent 72a0978301
commit 10be4f500a
7 changed files with 374 additions and 9 deletions

View file

@ -1534,8 +1534,21 @@ struct handlerton
/* Optional implementation of binlog in the engine. */
bool (*binlog_init)(size_t binlog_size);
/* Binlog an event group that doesn't go through commit_ordered. */
bool (*binlog_write_direct)(IO_CACHE *cache, size_t main_size,
const rpl_gtid *gtid);
/*
Binlog parts of large transactions out-of-band, in different chunks in the
binlog as the transaction executes. This limits the amount of data that
must be binlogged transactionally during COMMIT. The engine_data points to
a pointer location that the engine can set to maintain its own context
for the out-of-band data.
*/
bool (*binlog_oob_data)(THD *thd, const unsigned char *data, size_t data_len,
void **engine_data);
/* Call to allow engine to release the engine_data from binlog_oob_data(). */
void (*binlog_oob_free)(THD *thd, void *engine_data);
/* Obtain an object to allow reading from the binlog. */
handler_binlog_reader * (*get_binlog_reader)();
/*

View file

@ -362,15 +362,19 @@ Log_event::select_checksum_alg(const binlog_cache_data *data)
class binlog_cache_mngr {
public:
binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size,
binlog_cache_mngr(THD *thd_arg,
my_off_t param_max_binlog_stmt_cache_size,
my_off_t param_max_binlog_cache_size,
ulong *param_ptr_binlog_stmt_cache_use,
ulong *param_ptr_binlog_stmt_cache_disk_use,
ulong *param_ptr_binlog_cache_use,
ulong *param_ptr_binlog_cache_disk_use,
bool precompute_checksums)
: stmt_cache(precompute_checksums), trx_cache(precompute_checksums),
last_commit_pos_offset(0), gtid_cache_offset(0), using_xa(FALSE), xa_xid(0)
: thd(thd_arg), engine_ptr(0),
stmt_cache(precompute_checksums),
trx_cache(precompute_checksums),
last_commit_pos_offset(0), gtid_cache_offset(0), did_oob_spill(false),
using_xa(FALSE), xa_xid(0)
{
stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size,
param_ptr_binlog_stmt_cache_use,
@ -383,6 +387,8 @@ public:
void reset(bool do_stmt, bool do_trx)
{
if (engine_ptr)
(*opt_binlog_engine_hton->binlog_oob_free)(thd, engine_ptr);
if (do_stmt)
stmt_cache.reset();
if (do_trx)
@ -405,6 +411,10 @@ public:
return (is_transactional ? &trx_cache.cache_log : &stmt_cache.cache_log);
}
THD *thd;
/* For use by engine when --binlog-storage-engine. */
void *engine_ptr;
binlog_cache_data stmt_cache;
binlog_cache_data trx_cache;
@ -421,6 +431,11 @@ public:
/* Point in trx cache where GTID event sits after end event. */
my_off_t gtid_cache_offset;
/*
Flag to remember if we spilled any partial transaction data to the binlog
implemented in storage engine.
*/
bool did_oob_spill;
/*
Flag set true if this transaction is committed with log_xid() as part of
XA, false if not.
@ -6242,6 +6257,18 @@ bool stmt_has_updated_non_trans_table(const THD* thd)
return (thd->transaction->stmt.modified_non_trans_table);
}
static int
binlog_spill_to_engine(struct st_io_cache *cache, const uchar *data, size_t len)
{
binlog_cache_mngr *mngr= (binlog_cache_mngr *)cache->append_read_pos;
bool res= (*opt_binlog_engine_hton->binlog_oob_data)(mngr->thd, data, len,
&mngr->engine_ptr);
mngr->did_oob_spill= true;
return res;
}
/*
These functions are placed in this file since they need access to
binlog_hton, which has internal linkage.
@ -6254,9 +6281,36 @@ static binlog_cache_mngr *binlog_setup_cache_mngr(THD *thd)
MYF(MY_ZEROFILL));
if (!cache_mngr ||
open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir,
LOG_PREFIX, (size_t)binlog_stmt_cache_size, MYF(MY_WME)) ||
open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir,
LOG_PREFIX, (size_t)binlog_cache_size, MYF(MY_WME)))
LOG_PREFIX, (size_t)binlog_stmt_cache_size, MYF(MY_WME)))
{
my_free(cache_mngr);
return NULL;
}
IO_CACHE *trx_cache= &cache_mngr->trx_cache.cache_log;
my_bool res;
if (likely(opt_binlog_engine_hton) &&
likely(opt_binlog_engine_hton->binlog_oob_data))
{
/*
With binlog implementation in engine, we do not need to spill large
transactions to temporary file, we will binlog data out-of-band spread
through the binlog as the transaction runs. Setting the file to INT_MIN
makes IO_CACHE not attempt to create the temporary file.
*/
res= init_io_cache(trx_cache, (File)INT_MIN, (size_t)binlog_cache_size,
WRITE_CACHE, 0L, 0, MYF(MY_WME | MY_NABP));
/*
Use a custom write_function to spill to the engine-implemented binlog.
And re-use the IO_CACHE::append_read_pos as a handle for our
write_function; it is unused when the cache is not SEQ_READ_APPEND.
*/
trx_cache->write_function= binlog_spill_to_engine;
trx_cache->append_read_pos= (uchar *)cache_mngr;
}
else
res= open_cached_file(trx_cache, mysql_tmpdir, LOG_PREFIX,
(size_t)binlog_cache_size, MYF(MY_WME));
if (unlikely(res))
{
my_free(cache_mngr);
return NULL;
@ -6271,7 +6325,7 @@ static binlog_cache_mngr *binlog_setup_cache_mngr(THD *thd)
bool precompute_checksums=
!WSREP_NNULL(thd) && !encrypt_binlog && !opt_binlog_legacy_event_pos;
cache_mngr= new (cache_mngr)
binlog_cache_mngr(max_binlog_stmt_cache_size,
binlog_cache_mngr(thd, max_binlog_stmt_cache_size,
max_binlog_cache_size,
&binlog_stmt_cache_use,
&binlog_stmt_cache_disk_use,

View file

@ -4290,6 +4290,8 @@ enum fsp_binlog_chunk_types {
FSP_BINLOG_TYPE_COMMIT= 1,
/* A binlog GTID state record. */
FSP_BINLOG_TYPE_GTID_STATE= 2,
/* Out-of-band event group data. */
FSP_BINLOG_TYPE_OOB_DATA= 3,
/* Padding data at end of page. */
FSP_BINLOG_TYPE_FILLER= 0xff
@ -4401,6 +4403,59 @@ struct chunk_data_base {
};
/* Structure holding context for out-of-band chunks of binlogged event group. */
struct binlog_oob_context {
#ifdef _MSC_VER
/* Flexible array member is not standard C++, disable compiler warning. */
#pragma warning(disable : 4200)
#endif
uint32_t node_list_len;
uint32_t node_list_alloc_len;
/*
The node_list contains the root of each tree in the forest of perfect
binary trees.
*/
struct node_info {
uint64_t file_no;
uint64_t offset;
uint64_t node_index;
uint32_t height;
} node_list [];
/*
Structure used to encapsulate the data to be binlogged in an out-of-band
chunk, for use by fsp_binlog_write_chunk().
*/
struct chunk_data_oob : public chunk_data_base {
/*
Need room for 5 numbers:
node index
left child file_no
left child offset
right child file_no
right child offset
*/
static constexpr uint32_t max_buffer= 5*COMPR_INT_MAX64;
uint64_t sofar;
uint64_t main_len;
byte *main_data;
uint32_t header_len;
byte header_buf[max_buffer];
chunk_data_oob(uint64_t idx,
uint64_t left_file_no, uint64_t left_offset,
uint64_t right_file_no, uint64_t right_offset,
byte *data, size_t data_len);
virtual ~chunk_data_oob() {};
virtual std::pair<uint32_t, bool> copy_data(byte *p, uint32_t max_len) final;
};
bool binlog_node(uint32_t node, uint64_t new_idx,
uint32_t left_node, uint32_t right_node,
chunk_data_oob *oob_data);
};
class ha_innodb_binlog_reader : public handler_binlog_reader {
/* Buffer to hold a page read directly from the binlog file. */
uchar *page_buf;
@ -5441,8 +5496,8 @@ binlog_state_recover()
}
void fsp_binlog_write_chunk(chunk_data_base *chunk_data, mtr_t *mtr,
byte chunk_type)
std::pair<uint64_t, uint64_t>
fsp_binlog_write_chunk(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
{
uint32_t page_size= (uint32_t)srv_page_size;
uint32_t page_size_shift= srv_page_size_shift;
@ -5454,6 +5509,8 @@ void fsp_binlog_write_chunk(chunk_data_base *chunk_data, mtr_t *mtr,
buf_block_t *block= binlog_cur_block;
uint64_t file_no= active_binlog_file_no.load(std::memory_order_relaxed);
uint64_t pending_prev_end_offset= 0;
uint64_t start_file_no= 0;
uint64_t start_offset= 0;
/*
Write out the event data in chunks of whatever size will fit in the current
@ -5588,6 +5645,11 @@ void fsp_binlog_write_chunk(chunk_data_base *chunk_data, mtr_t *mtr,
page_offset= FIL_PAGE_DATA;
continue;
}
if (start_offset == 0)
{
start_file_no= file_no;
start_offset= (page_no << page_size_shift) + page_offset;
}
page_remain-= 3; /* Type byte and 2-byte length. */
std::pair<uint32_t, bool> size_last=
chunk_data->copy_data(ptr+3, page_remain);
@ -5621,6 +5683,7 @@ void fsp_binlog_write_chunk(chunk_data_base *chunk_data, mtr_t *mtr,
std::memory_order_relaxed);
binlog_cur_end_offset[file_no & 1].store((page_no << page_size_shift) + page_offset,
std::memory_order_relaxed);
return {start_file_no, start_offset};
}
@ -5708,6 +5771,232 @@ fsp_binlog_write_cache(IO_CACHE *cache, size_t main_size, mtr_t *mtr)
}
/* Allocate a context for out-of-band binlogging. */
static binlog_oob_context *
alloc_oob_context(uint32 list_length)
{
size_t needed= sizeof(binlog_oob_context) +
list_length * sizeof(binlog_oob_context::node_info);
binlog_oob_context *c=
(binlog_oob_context *) ut_malloc(needed, mem_key_binlog);
if (c)
{
c->node_list_alloc_len= list_length;
c->node_list_len= 0;
}
else
my_error(ER_OUTOFMEMORY, MYF(0), needed);
return c;
}
static inline void
free_oob_context(binlog_oob_context *c)
{
ut_free(c);
}
static binlog_oob_context *
ensure_oob_context(void **engine_data, uint32_t needed_len)
{
binlog_oob_context *c= (binlog_oob_context *)*engine_data;
if (c->node_list_alloc_len >= needed_len)
return c;
if (needed_len < c->node_list_alloc_len + 10)
needed_len= c->node_list_alloc_len + 10;
binlog_oob_context *new_c= alloc_oob_context(needed_len);
if (UNIV_UNLIKELY(!new_c))
return nullptr;
memcpy(new_c, c, sizeof(binlog_oob_context) +
needed_len*sizeof(binlog_oob_context::node_info));
new_c->node_list_alloc_len= needed_len;
*engine_data= new_c;
free_oob_context(c);
return new_c;
}
/*
Binlog an out-of-band piece of event group data.
For large transactions, we binlog the data in pieces spread out over the
binlog file(s), to avoid a large stall to write large amounts of data during
transaction commit, and to avoid having to keep all of the transaction in
memory or spill it to temporary file.
The chunks of data are written out in a binary tree structure, to allow
efficiently reading the transaction back in order from start to end. Note
that the binlog is written append-only, so we cannot simply link each chunk
to the following chunk, as the following chunk is unknown when binlogging the
prior chunk. With a binary tree structure, the reader can do a post-order
traversal and only need to keep log_2(N) node pointers in-memory at any time.
A perfect binary tree of height h has 2**h - 1 nodes. At any time during a
transaction, the out-of-band data in the binary log for that transaction
consists of a forest (eg. a list) of perfect binary trees of strictly
decreasing height, except that the last two trees may have the same height.
For example, here is how it looks for a transaction where 13 nodes (0-12)
have been binlogged out-of-band so far:
6
_ / \_
2 5 9 12
/ \ / \ / \ / \
0 1 3 4 7 8 10 11
In addition to the shown binary tree parent->child pointers, each leaf has a
(single) link to the root node of the prior (at the time the leaf was added)
tree. In the example this means the following links:
11->10, 10->9, 8->7, 7->6, 4->3, 3->2, 1->0
This allows to fully traverse the forest of perfect binary trees starting
from the last node (12 in the example). In the example, only 10->9 and 7->6
will be needed, but the other links would be needed if the tree had been
completed at earlier stages.
As a new node is added, there are two different cases on how to maintain
the binary tree forest structure:
1. If the last two trees in the forest have the same height h, then those
two trees are replaced by a single tree of height (h+1) with the new
node as root and the two trees as left and right child. The number of
trees in the forest thus decrease by one.
2. Otherwise the new node is added at the end of the forest as a tree of
height 1; in this case the forest increases by one tree.
In both cases, we maintain the invariants that the forest consist of a list
of perfect binary trees, and that the heights of the trees are strictly
decreasing except that the last two trees can have the same height.
When a transaction is committed, the commit record contains a pointer to
the root node of the last tree in the forest. If the transaction is never
committed (explicitly rolled back or lost due to disconnect or server
restart or crash), then the out-of-band data is simply left in place; it
will be ignored by readers and eventually discarded as the old binlog files
are purged.
*/
bool
fsp_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
void **engine_data)
{
binlog_oob_context *c= (binlog_oob_context *)*engine_data;
if (!c)
*engine_data= c= alloc_oob_context(10);
if (UNIV_UNLIKELY(!c))
return true;
uint32_t i= c->node_list_len;
uint64_t new_idx= i==0 ? 0 : c->node_list[i-1].node_index + 1;
if (i >= 2 && c->node_list[i-2].height == c->node_list[i-1].height)
{
/* Case 1: Replace two trees with a tree rooted in a new node. */
binlog_oob_context::chunk_data_oob oob_data
(new_idx,
c->node_list[i-2].file_no, c->node_list[i-2].offset,
c->node_list[i-1].file_no, c->node_list[i-1].offset,
(byte *)data, data_len);
if (c->binlog_node(i-2, new_idx, i-2, i-1, &oob_data))
return true;
c->node_list_len= i - 1;
}
else
{
/* Case 2: Add the new node as a singleton tree. */
c= ensure_oob_context(engine_data, i+1);
if (!c)
return true;
binlog_oob_context::chunk_data_oob oob_data
(new_idx,
0, 0, /* NULL left child signifies a leaf */
c->node_list[i-1].file_no, c->node_list[i-1].offset,
(byte *)data, data_len);
if (c->binlog_node(i, new_idx, i-1, i-1, &oob_data))
return true;
c->node_list_len= i + 1;
}
return false;
}
/*
Binlog a new out-of-band tree node and put it at position `node` in the list
of trees. A leaf node is denoted by left and right child being identical (and
in this case they point to the root of the prior tree).
*/
bool
binlog_oob_context::binlog_node(uint32_t node, uint64_t new_idx,
uint32_t left_node, uint32_t right_node,
chunk_data_oob *oob_data)
{
uint32_t new_height=
left_node == right_node ? 1 : 1 + node_list[left_node].height;
mtr_t mtr;
mtr.start();
std::pair<uint64_t, uint64_t> new_file_no_offset=
fsp_binlog_write_chunk(oob_data, &mtr, FSP_BINLOG_TYPE_OOB_DATA);
mtr.commit();
node_list[node].file_no= new_file_no_offset.first;
node_list[node].offset= new_file_no_offset.second;
node_list[node].node_index= new_idx;
node_list[node].height= new_height;
return false; // ToDo: Error handling?
}
binlog_oob_context::chunk_data_oob::chunk_data_oob(uint64_t idx,
uint64_t left_file_no, uint64_t left_offset,
uint64_t right_file_no, uint64_t right_offset,
byte *data, size_t data_len)
: sofar(0), main_len(data_len), main_data(data)
{
ut_ad(data_len > 0);
byte *p= &header_buf[0];
p= compr_int_write(p, idx);
p= compr_int_write(p, left_file_no);
p= compr_int_write(p, left_offset);
p= compr_int_write(p, right_file_no);
p= compr_int_write(p, right_offset);
ut_ad(p - &header_buf[0] <= max_buffer);
header_len= (uint32_t)(p - &header_buf[0]);
}
std::pair<uint32_t, bool>
binlog_oob_context::chunk_data_oob::copy_data(byte *p, uint32_t max_len)
{
uint32_t size;
/* First write header data, if any left. */
if (sofar < header_len)
{
size= std::min(header_len - (uint32_t)sofar, max_len);
memcpy(p, header_buf + sofar, size);
p+= size;
sofar+= size;
if (UNIV_UNLIKELY(max_len == size))
return {size, sofar == header_len + main_len};
max_len-= size;
}
/* Then write the main chunk data. */
ut_ad(sofar >= header_len);
ut_ad(main_len > 0);
size= (uint32_t)std::min(header_len + main_len - sofar, (uint64_t)max_len);
memcpy(p, main_data + (sofar - header_len), size);
sofar+= size;
return {size, sofar == header_len + main_len};
}
void
fsp_free_oob(THD *thd, void *engine_data)
{
free_oob_context((binlog_oob_context *)engine_data);
}
extern "C" void binlog_get_cache(THD *, IO_CACHE **, size_t *,
const rpl_gtid **);

View file

@ -4159,6 +4159,8 @@ static int innodb_init(void* p)
innobase_hton->update_optimizer_costs= innobase_update_optimizer_costs;
innobase_hton->binlog_init= innodb_binlog_init;
innobase_hton->binlog_write_direct= innobase_binlog_write_direct;
innobase_hton->binlog_oob_data= fsp_binlog_oob;
innobase_hton->binlog_oob_free= fsp_free_oob;
innobase_hton->get_binlog_reader= innodb_get_binlog_reader;
innodb_remember_check_sysvar_funcs();

View file

@ -561,6 +561,9 @@ extern void fsp_binlog_trx(trx_t *trx, mtr_t *mtr);
class handler_binlog_reader;
extern bool innobase_binlog_write_direct(IO_CACHE *cache, size_t main_size,
const struct rpl_gtid *gtid);
extern bool fsp_binlog_oob(THD *thd, const unsigned char *data, size_t data_len,
void **engine_data);
extern void fsp_free_oob(THD *thd, void *engine_data);
extern handler_binlog_reader *innodb_get_binlog_reader();
extern void fsp_binlog_init();
extern bool innodb_binlog_init(size_t binlog_size);

View file

@ -165,6 +165,7 @@ ut_allocator::get_mem_key()):
happens then that means that the list of predefined names must be extended.
Keep this list alphabetically sorted. */
extern PSI_memory_key mem_key_ahi;
extern PSI_memory_key mem_key_binlog;
extern PSI_memory_key mem_key_buf_buf_pool;
extern PSI_memory_key mem_key_dict_stats_bg_recalc_pool_t;
extern PSI_memory_key mem_key_dict_stats_index_map_t;
@ -850,6 +851,7 @@ constexpr const char* const auto_event_names[] =
"fil0crypt",
"fil0fil",
"fsp0file",
"fsp0fsp",
"fts0ast",
"fts0blex",
"fts0config",

View file

@ -38,6 +38,7 @@ Keep this list alphabetically sorted. */
#ifdef BTR_CUR_HASH_ADAPT
PSI_memory_key mem_key_ahi;
#endif /* BTR_CUR_HASH_ADAPT */
PSI_memory_key mem_key_binlog;
PSI_memory_key mem_key_buf_buf_pool;
PSI_memory_key mem_key_dict_stats_bg_recalc_pool_t;
PSI_memory_key mem_key_dict_stats_index_map_t;
@ -65,6 +66,7 @@ static PSI_memory_info pfs_info[] = {
#ifdef BTR_CUR_HASH_ADAPT
{&mem_key_ahi, "adaptive hash index", 0},
#endif /* BTR_CUR_HASH_ADAPT */
{&mem_key_binlog, "innodb binlog implementation", 0},
{&mem_key_buf_buf_pool, "buf_buf_pool", 0},
{&mem_key_dict_stats_bg_recalc_pool_t, "dict_stats_bg_recalc_pool_t", 0},
{&mem_key_dict_stats_index_map_t, "dict_stats_index_map_t", 0},