MDEV-34705: Binlog-in-engine: Handful of fixes

Fix missing WORDS_BIGENDIAN define in ut0compr_int.cc.

Fix misaligned read buffer for O_DIRECT.

Fix wrong/missing update_binlog_end_pos() in binlog group commit.

Fix race where active_binlog_file_no incremented too early.

Fix wrong assertion when reader reaches the very start of (active+1).

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen 2025-01-22 18:20:32 +00:00
parent 18932be567
commit 4814d204e0
4 changed files with 35 additions and 8 deletions

View file

@ -9090,6 +9090,9 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
}
else
{
if (opt_binlog_engine_hton)
update_binlog_end_pos();
/*
If we rotated the binlog, and if we are using the unoptimized thread
scheduling where every thread runs its own commit_ordered(), then we
@ -9330,8 +9333,12 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
Note: must be _after_ the RUN_HOOK(after_flush) or else
semi-sync might not have put the transaction into
it's list before dump-thread tries to send it
When --binlog-storage-engine, the binlog write happens during
commit_ordered(), so postpone the update until then.
*/
update_binlog_end_pos(commit_offset);
if (!opt_binlog_engine_hton)
update_binlog_end_pos(commit_offset);
if (unlikely(any_error))
sql_print_error("Failed to run 'after_flush' hooks");
@ -9490,6 +9497,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
mysql_mutex_unlock(&LOCK_commit_ordered);
DEBUG_SYNC(leader->thd, "commit_after_group_release_commit_ordered");
if (opt_binlog_engine_hton)
update_binlog_end_pos();
if (check_purge)
checkpoint_and_purge(binlog_id);

View file

@ -378,6 +378,7 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
to re-use tablespace ids between just two, SRV_SPACE_ID_BINLOG0 and
SRV_SPACE_ID_BINLOG1.
*/
ut_ad(!pending_prev_end_offset);
pending_prev_end_offset= page_no << page_size_shift;
mysql_mutex_lock(&active_binlog_mutex);
/* ToDo: Make this wait killable?. */
@ -390,7 +391,6 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
++file_no;
binlog_cur_written_offset[file_no & 1].store(0, std::memory_order_relaxed);
binlog_cur_end_offset[file_no & 1].store(0, std::memory_order_relaxed);
active_binlog_file_no.store(file_no, std::memory_order_release);
active_binlog_space= space= last_created_binlog_space;
pthread_cond_signal(&active_binlog_cond);
mysql_mutex_unlock(&active_binlog_mutex);
@ -528,9 +528,15 @@ fsp_binlog_write_rec(chunk_data_base *chunk_data, mtr_t *mtr, byte chunk_type)
binlog_cur_page_no= page_no;
binlog_cur_page_offset= page_offset;
if (UNIV_UNLIKELY(pending_prev_end_offset))
{
mysql_mutex_lock(&active_binlog_mutex);
binlog_cur_end_offset[(file_no-1) & 1].store(pending_prev_end_offset,
std::memory_order_relaxed);
binlog_cur_end_offset[file_no & 1].store((page_no << page_size_shift) + page_offset,
active_binlog_file_no.store(file_no, std::memory_order_release);
pthread_cond_signal(&active_binlog_cond);
mysql_mutex_unlock(&active_binlog_mutex);
}
binlog_cur_end_offset[file_no & 1].store(((uint64_t)page_no << page_size_shift) + page_offset,
std::memory_order_relaxed);
return {start_file_no, start_offset};
}
@ -656,7 +662,16 @@ binlog_chunk_reader::fetch_current_page()
uint64_t active= active2;
uint64_t end_offset=
binlog_cur_end_offset[s.file_no&1].load(std::memory_order_acquire);
ut_ad(s.file_no <= active);
if (s.file_no > active)
{
ut_ad(s.page_no == 0);
ut_ad(s.in_page_offset == 0);
/*
Allow a reader that reached the very end of the active binlog file to
have moved ahead early to the start of the coming binlog file.
*/
return CHUNK_READER_EOF;
}
if (s.file_no + 1 >= active) {
/* Check if we should read from the buffer pool or from the file. */
@ -1020,8 +1035,8 @@ bool binlog_chunk_reader::data_available()
uint64_t active= active_binlog_file_no.load(std::memory_order_acquire);
if (active != s.file_no)
{
ut_ad(active > s.file_no);
return true;
ut_ad(active > s.file_no || (s.page_no == 0 && s.in_page_offset == 0));
return active > s.file_no;
}
uint64_t end_offset=
binlog_cur_end_offset[s.file_no&1].load(std::memory_order_acquire);

View file

@ -697,8 +697,9 @@ innodb_binlog_discover()
*/
fil_space_t *space, *prev_space;
uint32_t page_no, prev_page_no, pos_in_page, prev_pos_in_page;
// ToDo: Do we need aligned_malloc() for page_buf, to be able to read a page into it (like IO_DIRECT maybe) ?
std::unique_ptr<byte[]> page_buf(new byte[page_size]);
std::unique_ptr<byte, void (*)(void *)>
page_buf(static_cast<byte*>(aligned_malloc(page_size, page_size)),
&aligned_free);
if (!page_buf)
return -1;
if (binlog_files.found_binlogs >= 1) {

View file

@ -22,6 +22,7 @@ Reading and writing of compressed integers.
Created 2024-10-01 Kristian Nielsen <knielsen@knielsen-hq.org>
*******************************************************/
#include "univ.i"
#include "ut0compr_int.h"
/* Read and write compressed (up to) 64-bit integers. */