MDEV-16329 [2/5] refactor binlog and cache_mngr

pump up binlog and cache manager to level of binlog_log_row_internal
This commit is contained in:
Nikita Malyavin 2020-10-06 19:56:03 +10:00 committed by Sergei Golubchik
parent 0dfbb05cd0
commit 429f635f30
8 changed files with 163 additions and 137 deletions

View file

@ -7249,7 +7249,6 @@ int handler::binlog_log_row(TABLE *table,
const uchar *after_record,
Log_func *log_func)
{
bool error;
THD *thd= table->in_use;
DBUG_ENTER("binlog_log_row");
@ -7257,8 +7256,21 @@ int handler::binlog_log_row(TABLE *table,
thd->binlog_write_table_maps())
DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED);
error= (*log_func)(thd, table, row_logging_has_trans,
before_record, after_record);
DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
DBUG_ASSERT((WSREP_NNULL(thd) && wsrep_emulate_bin_log)
|| mysql_bin_log.is_open());
auto *cache_mngr= thd->binlog_setup_trx_data();
if (cache_mngr == NULL)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
bool is_trans= row_logging_has_trans;
/* Ensure that all events in a GTID group are in the same cache */
if (thd->variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;
bool error= (*log_func)(thd, table, &mysql_bin_log, cache_mngr,
is_trans, before_record, after_record);
DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0);
}

View file

@ -654,7 +654,11 @@ given at all. */
#define HA_CREATE_PRINT_ALL_OPTIONS (1UL << 26)
typedef ulonglong alter_table_operations;
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);
class MYSQL_BIN_LOG;
class binlog_cache_mngr;
typedef bool Log_func(THD*, TABLE*, MYSQL_BIN_LOG *, binlog_cache_mngr *, bool,
const uchar*, const uchar*);
/*
These flags are set by the parser and describes the type of
@ -5623,11 +5627,6 @@ inline const LEX_CSTRING *table_case_name(HA_CREATE_INFO *info, const LEX_CSTRIN
return ((lower_case_table_names == 2 && info->alias.str) ? &info->alias : name);
}
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);
int binlog_log_row(TABLE* table,
const uchar *before_record,
const uchar *after_record,
Log_func *log_func);
/**
@def MYSQL_TABLE_IO_WAIT

View file

@ -2088,7 +2088,7 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
else
cache_mngr->trx_cache.restore_prev_position();
DBUG_ASSERT(thd->binlog_get_pending_rows_event(is_transactional) == NULL);
DBUG_ASSERT(cache_mngr->trx_cache.pending() == NULL);
DBUG_RETURN(error);
}
@ -6309,16 +6309,16 @@ binlog_cache_mngr *THD::binlog_get_cache_mngr() const
is @c true, the pending event is returned from the transactional cache.
Otherwise from the non-transactional cache.
@param is_transactional @c true indicates a transactional cache,
@param cache_mngr cache manager to return pending row from
@param use_trans_cache @c true indicates a transactional cache,
otherwise @c false a non-transactional.
@return
The row event if any.
*/
Rows_log_event*
THD::binlog_get_pending_rows_event(bool is_transactional) const
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache)
{
Rows_log_event* rows= NULL;
binlog_cache_mngr *const cache_mngr= binlog_get_cache_mngr();
/*
This is less than ideal, but here's the story: If there is no cache_mngr,
@ -6326,13 +6326,34 @@ THD::binlog_get_pending_rows_event(bool is_transactional) const
is set up there). In that case, we just return NULL.
*/
if (cache_mngr)
{
binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional));
rows= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
return rows;
}
rows= cache_data->pending();
int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
bool is_transactional,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool use_trans_cache)
{
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
int error= 0;
auto *pending= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
if (pending)
{
if (stmt_end)
{
pending->set_flags(Rows_log_event::STMT_END_F);
thd->reset_binlog_for_next_statement();
}
error= bin_log->flush_and_set_pending_rows_event(thd, 0, cache_mngr,
is_transactional);
}
return (rows);
return error;
}
/**
@ -6342,18 +6363,18 @@ THD::binlog_get_pending_rows_event(bool is_transactional) const
into the non-transactional cache.
@param evt a pointer to the row event.
@param is_transactional @c true indicates a transactional cache,
@param use_trans_cache @c true indicates a transactional cache,
otherwise @c false a non-transactional.
*/
void
THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional)
THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache)
{
binlog_cache_mngr *const cache_mngr= binlog_setup_trx_data();
DBUG_ASSERT(cache_mngr);
binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional));
cache_mngr->get_binlog_cache_data(use_trans_cache);
cache_data->set_pending(ev);
}
@ -6402,18 +6423,18 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
int
MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
Rows_log_event* event,
binlog_cache_mngr *cache_mngr,
bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open());
DBUG_PRINT("enter", ("event: %p", event));
binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr();
DBUG_ASSERT(cache_mngr);
bool should_use_trans_cache= use_trans_cache(thd, is_transactional);
binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional));
cache_mngr->get_binlog_cache_data(should_use_trans_cache);
DBUG_PRINT("info", ("cache_mngr->pending(): %p", cache_data->pending()));
@ -6442,7 +6463,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
delete pending;
}
thd->binlog_set_pending_rows_event(event, is_transactional);
thd->binlog_set_pending_rows_event(event, should_use_trans_cache);
DBUG_RETURN(0);
}

View file

@ -725,6 +725,7 @@ public:
#if !defined(MYSQL_CLIENT)
int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event,
binlog_cache_mngr *cache_mngr,
bool is_transactional);
int remove_pending_rows_event(THD *thd, bool is_transactional);
@ -1175,6 +1176,13 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
void make_default_log_name(char **out, const char* log_ext, bool once);
void binlog_reset_cache(THD *thd);
bool write_annotated_row(THD *thd);
int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
bool is_transactional,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
extern handlerton *binlog_hton;

View file

@ -4926,13 +4926,16 @@ public:
#endif
#if defined(MYSQL_SERVER)
static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional,
const uchar *before_record
__attribute__((unused)),
const uchar *after_record)
{
DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_write_row(table, is_transactional, after_record);
return thd->binlog_write_row(table, bin_log, cache_mngr, is_transactional,
after_record);
}
#endif
@ -5009,12 +5012,14 @@ public:
#ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional,
const uchar *before_record,
const uchar *after_record)
{
DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_update_row(table, is_transactional,
return thd->binlog_update_row(table, bin_log, cache_mngr, is_transactional,
before_record, after_record);
}
#endif
@ -5098,13 +5103,15 @@ public:
#endif
#ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional,
const uchar *before_record,
const uchar *after_record
__attribute__((unused)))
{
DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_delete_row(table, is_transactional,
return thd->binlog_delete_row(table, bin_log, cache_mngr, is_transactional,
before_record);
}
#endif

View file

@ -6899,6 +6899,32 @@ bool THD::binlog_table_should_be_logged(const LEX_CSTRING *db)
binlog_filter->db_ok(db->str)));
}
struct RowsEventFactory
{
int type_code;
Rows_log_event *(*create)(THD*, TABLE*, ulong, bool is_transactional);
};
/**
Creates RowsEventFactory, responsible for creating Rows_log_event descendant.
@tparam RowsEventT is a type which will be constructed by
RowsEventFactory::create.
@return a RowsEventFactory object with type_code equal to RowsEventT::TYPE_CODE
and create containing pointer to a RowsEventT constructor callback.
*/
template<class RowsEventT>
static RowsEventFactory binlog_get_rows_event_creator()
{
return { RowsEventT::TYPE_CODE,
[](THD* thd, TABLE* table, ulong flags, bool is_transactional)
-> Rows_log_event*
{
return new RowsEventT(thd, table, flags, is_transactional);
}
};
}
/*
Template member function for ensuring that there is an rows log
event of the apropriate type before proceeding.
@ -6920,31 +6946,25 @@ bool THD::binlog_table_should_be_logged(const LEX_CSTRING *db)
If error, NULL.
*/
template <class RowsEventT> Rows_log_event*
THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
size_t needed,
bool is_transactional,
RowsEventT *hint __attribute__((unused)))
Rows_log_event*
binlog_prepare_pending_rows_event(THD *thd, TABLE* table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
uint32 serv_id, size_t needed,
bool is_transactional,
RowsEventFactory event_factory)
{
DBUG_ENTER("binlog_prepare_pending_rows_event");
/* Pre-conditions */
DBUG_ASSERT(table->s->table_map_id != ~0UL);
/* Fetch the type code for the RowsEventT template parameter */
int const general_type_code= RowsEventT::TYPE_CODE;
/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
is_transactional= 1;
/*
There is no good place to set up the transactional data, so we
have to do it here.
*/
if (binlog_setup_trx_data() == NULL)
DBUG_RETURN(NULL);
Rows_log_event* pending= binlog_get_pending_rows_event(is_transactional);
Rows_log_event* pending= binlog_get_pending_rows_event(cache_mngr,
use_trans_cache(thd,
is_transactional));
if (unlikely(pending && !pending->is_valid()))
DBUG_RETURN(NULL);
@ -6962,14 +6982,14 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
if (!pending ||
pending->server_id != serv_id ||
pending->get_table_id() != table->s->table_map_id ||
pending->get_general_type_code() != general_type_code ||
pending->get_general_type_code() != event_factory.type_code ||
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
pending->read_write_bitmaps_cmp(table) == FALSE)
{
/* Create a new RowsEventT... */
Rows_log_event* const
ev= new RowsEventT(this, table, table->s->table_map_id,
is_transactional);
ev= event_factory.create(thd, table, table->s->table_map_id,
is_transactional);
if (unlikely(!ev))
DBUG_RETURN(NULL);
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
@ -6977,9 +6997,8 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
flush the pending event and replace it with the newly created
event...
*/
if (unlikely(
mysql_bin_log.flush_and_set_pending_rows_event(this, ev,
is_transactional)))
if (unlikely(bin_log->flush_and_set_pending_rows_event(thd, ev, cache_mngr,
is_transactional)))
{
delete ev;
DBUG_RETURN(NULL);
@ -7114,13 +7133,10 @@ CPP_UNNAMED_NS_START
CPP_UNNAMED_NS_END
int THD::binlog_write_row(TABLE* table, bool is_trans,
int THD::binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_trans,
uchar const *record)
{
DBUG_ASSERT(is_current_stmt_binlog_format_row());
DBUG_ASSERT((WSREP_NNULL(this) && wsrep_emulate_bin_log) ||
mysql_bin_log.is_open());
/*
Pack records into format for transfer. We are allocating more
memory than needed, but that doesn't matter.
@ -7134,21 +7150,14 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
size_t const len= pack_row(table, table->rpl_write_set, row_data, record);
/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;
auto creator= binlog_should_compress(len) ?
binlog_get_rows_event_creator<Write_rows_compressed_log_event>() :
binlog_get_rows_event_creator<Write_rows_log_event>();
Rows_log_event* ev;
if (binlog_should_compress(len))
ev =
binlog_prepare_pending_rows_event(table, variables.server_id,
len, is_trans,
static_cast<Write_rows_compressed_log_event*>(0));
else
ev =
binlog_prepare_pending_rows_event(table, variables.server_id,
len, is_trans,
static_cast<Write_rows_log_event*>(0));
auto *ev= binlog_prepare_pending_rows_event(this, table,
&mysql_bin_log, cache_mngr,
variables.server_id,
len, is_trans, creator);
if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM;
@ -7156,14 +7165,11 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
return ev->add_row_data(row_data, len);
}
int THD::binlog_update_row(TABLE* table, bool is_trans,
int THD::binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_trans,
const uchar *before_record,
const uchar *after_record)
{
DBUG_ASSERT(is_current_stmt_binlog_format_row());
DBUG_ASSERT((WSREP_NNULL(this) && wsrep_emulate_bin_log) ||
mysql_bin_log.is_open());
/**
Save a reference to the original read bitmaps
We will need this to restore the bitmaps at the end as
@ -7196,11 +7202,6 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
before_record);
size_t const after_size= pack_row(table, table->rpl_write_set, after_row,
after_record);
/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;
/*
Don't print debug messages when running valgrind since they can
trigger false warnings.
@ -7212,17 +7213,14 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
DBUG_DUMP("after_row", after_row, after_size);
#endif
Rows_log_event* ev;
if(binlog_should_compress(before_size + after_size))
ev =
binlog_prepare_pending_rows_event(table, variables.server_id,
before_size + after_size, is_trans,
static_cast<Update_rows_compressed_log_event*>(0));
else
ev =
binlog_prepare_pending_rows_event(table, variables.server_id,
before_size + after_size, is_trans,
static_cast<Update_rows_log_event*>(0));
auto creator= binlog_should_compress(before_size + after_size) ?
binlog_get_rows_event_creator<Update_rows_compressed_log_event>() :
binlog_get_rows_event_creator<Update_rows_log_event>();
auto *ev= binlog_prepare_pending_rows_event(this, table,
&mysql_bin_log, cache_mngr,
variables.server_id,
before_size + after_size,
is_trans, creator);
if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM;
@ -7237,12 +7235,10 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
}
int THD::binlog_delete_row(TABLE* table, bool is_trans,
int THD::binlog_delete_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_trans,
uchar const *record)
{
DBUG_ASSERT(is_current_stmt_binlog_format_row());
DBUG_ASSERT((WSREP_NNULL(this) && wsrep_emulate_bin_log) ||
mysql_bin_log.is_open());
/**
Save a reference to the original read bitmaps
We will need this to restore the bitmaps at the end as
@ -7273,21 +7269,13 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
DBUG_DUMP("table->read_set", (uchar*) table->read_set->bitmap, (table->s->fields + 7) / 8);
size_t const len= pack_row(table, table->read_set, row_data, record);
/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;
Rows_log_event* ev;
if(binlog_should_compress(len))
ev =
binlog_prepare_pending_rows_event(table, variables.server_id,
len, is_trans,
static_cast<Delete_rows_compressed_log_event*>(0));
else
ev =
binlog_prepare_pending_rows_event(table, variables.server_id,
len, is_trans,
static_cast<Delete_rows_log_event*>(0));
auto creator= binlog_should_compress(len) ?
binlog_get_rows_event_creator<Delete_rows_compressed_log_event>() :
binlog_get_rows_event_creator<Delete_rows_log_event>();
auto *ev= binlog_prepare_pending_rows_event(this, table,
&mysql_bin_log, cache_mngr,
variables.server_id,
len, is_trans, creator);
if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM;
@ -7404,22 +7392,14 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
if (variables.option_bits & OPTION_GTID_BEGIN)
is_transactional= 1;
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
int error= 0;
if (Rows_log_event *pending= binlog_get_pending_rows_event(is_transactional))
{
if (stmt_end)
{
pending->set_flags(Rows_log_event::STMT_END_F);
reset_binlog_for_next_statement();
}
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0,
is_transactional);
}
auto *cache_mngr= binlog_get_cache_mngr();
if (!cache_mngr)
DBUG_RETURN(0);
int error=
::binlog_flush_pending_rows_event(this, stmt_end, is_transactional,
&mysql_bin_log, cache_mngr,
use_trans_cache(this, is_transactional));
DBUG_RETURN(error);
}

View file

@ -2946,11 +2946,14 @@ public:
*/
void binlog_start_trans_and_stmt();
void binlog_set_stmt_begin();
int binlog_write_row(TABLE* table, bool is_transactional,
int binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_transactional,
const uchar *buf);
int binlog_delete_row(TABLE* table, bool is_transactional,
int binlog_delete_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_transactional,
const uchar *buf);
int binlog_update_row(TABLE* table, bool is_transactional,
int binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr, bool is_transactional,
const uchar *old_data, const uchar *new_data);
bool prepare_handlers_for_update(uint flag);
bool binlog_write_annotated_row(Log_event_writer *writer);
@ -2965,13 +2968,7 @@ public:
Member functions to handle pending event for row-level logging.
*/
binlog_cache_mngr *binlog_get_cache_mngr() const;
template <class RowsEventT> Rows_log_event*
binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
size_t needed,
bool is_transactional,
RowsEventT* hint);
Rows_log_event* binlog_get_pending_rows_event(bool is_transactional) const;
void binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional);
void binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache);
inline int binlog_flush_pending_rows_event(bool stmt_end)
{
return (binlog_flush_pending_rows_event(stmt_end, FALSE) ||
@ -2983,7 +2980,8 @@ public:
bool binlog_need_stmt_format(bool is_transactional) const
{
return log_current_statement() &&
!binlog_get_pending_rows_event(is_transactional);
!binlog_get_pending_rows_event(binlog_get_cache_mngr(),
use_trans_cache(this, is_transactional));
}
bool binlog_for_noop_dml(bool transactional_table);

View file

@ -243,7 +243,8 @@ size_t Wsrep_client_service::bytes_generated() const
if (cache)
{
size_t pending_rows_event_length= 0;
if (Rows_log_event* ev= m_thd->binlog_get_pending_rows_event(true))
auto *cache_mngr= m_thd->binlog_get_cache_mngr();
if (auto* ev= binlog_get_pending_rows_event(cache_mngr, true))
{
pending_rows_event_length= ev->get_data_size();
}