mariadb/sql/wsrep_applier.cc
Denis Protivensky c5688a17a0 MDEV-34822: Skip FK checks in Galera during applying in IST
Appliers need to verify foreign key constraints during normal
operation, in multi-active topologies, and for this reason appliers
are configured to enable FK checking.

However, during node joining, in IST and latter catch up period,
the node is still idle (from local connections), and only source
for incoming transactions is the cluster sending certified write
sets for applying. IST happens with parallel applying, and there
is a possibility that foreign key check cause lock conflicts between
appliers accessing FK child and parent tables. Also, the excessive
FK checking will slow down IST process somewhat.

For this reasons, we could relax FK checks for appliers during IST
and catch up periods. The relaxed FK check mode should, however, be
configurable e.g. by wsrep_mode flag: SKIP_APPLIER_FK_CHECKS_IN_IST.
When this operation mode is set, and the node is processing IST or
catch up, appliers should skip FK checking.

Signed-off-by: Julius Goryavsky <julius.goryavsky@mariadb.com>
2025-03-18 18:28:20 +01:00

269 lines
7.6 KiB
C++

/* Copyright (C) 2013-2024 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
#include "mariadb.h"
#include "mysql/service_wsrep.h"
#include "wsrep_applier.h"
#include "wsrep_priv.h"
#include "wsrep_binlog.h" // wsrep_dump_rbr_buf()
#include "wsrep_xid.h"
#include "wsrep_thd.h"
#include "wsrep_trans_observer.h"
#include "wsrep_schema.h" // wsrep_schema
#include "slave.h" // opt_log_slave_updates
#include "debug_sync.h"
/*
read the first event from (*buf). The size of the (*buf) is (*buf_len).
At the end (*buf) is shitfed to point to the following event or NULL and
(*buf_len) will be changed to account just being read bytes of the 1st event.
*/
static Log_event* wsrep_read_log_event(
char **arg_buf, size_t *arg_buf_len,
const Format_description_log_event *description_event)
{
DBUG_ENTER("wsrep_read_log_event");
char *head= (*arg_buf);
uint data_len= uint4korr(head + EVENT_LEN_OFFSET);
uchar *buf= (uchar*) (*arg_buf);
const char *error= 0;
Log_event *res= 0;
res= Log_event::read_log_event(buf, data_len, &error, description_event,
true);
if (!res)
{
DBUG_ASSERT(error != 0);
sql_print_error("Error in Log_event::read_log_event(): "
"'%s', data_len: %d, event_type: %d",
error,data_len,head[EVENT_TYPE_OFFSET]);
}
(*arg_buf)+= data_len;
(*arg_buf_len)-= data_len;
DBUG_RETURN(res);
}
#include "transaction.h" // trans_commit(), trans_rollback()
void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev)
{
if (thd->wsrep_apply_format)
{
delete (Format_description_log_event*)thd->wsrep_apply_format;
}
thd->wsrep_apply_format= ev;
}
Format_description_log_event*
wsrep_get_apply_format(THD* thd)
{
if (thd->wsrep_apply_format)
{
return (Format_description_log_event*) thd->wsrep_apply_format;
}
DBUG_ASSERT(thd->wsrep_rgi);
return thd->wsrep_rgi->rli->relay_log.description_event_for_exec;
}
void wsrep_store_error(const THD* const thd,
wsrep::mutable_buffer& dst,
bool const include_msg)
{
Diagnostics_area::Sql_condition_iterator it=
thd->get_stmt_da()->sql_conditions();
const Sql_condition* cond;
static size_t const max_len= 2*MAX_SLAVE_ERRMSG; // 2x so that we have enough
dst.resize(max_len);
char* slider= dst.data();
const char* const buf_end= slider + max_len - 1; // -1: leave space for \0
for (cond= it++; cond && slider < buf_end; cond= it++)
{
uint const err_code= cond->get_sql_errno();
const char* const err_str= cond->get_message_text();
if (include_msg)
{
slider+= snprintf(slider, buf_end - slider, " %s, Error_code: %d;",
err_str, err_code);
}
else
{
slider+= snprintf(slider, buf_end - slider, " Error_code: %d;",
err_code);
}
}
if (slider != dst.data())
{
*slider= '\0';
slider++;
}
dst.resize(slider - dst.data());
WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: '%s'",
thd->thread_id, (long long)wsrep_thd_trx_seqno(thd),
dst.size(), dst.size() ? dst.data() : "(null)");
}
int wsrep_apply_events(THD* thd,
Relay_log_info* rli,
const void* events_buf,
size_t buf_len)
{
char *buf= (char *)events_buf;
int rcode= 0;
int event= 1;
Log_event_type typ;
DBUG_ENTER("wsrep_apply_events");
if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
(long long) wsrep_thd_trx_seqno(thd));
thd->variables.gtid_seq_no= 0;
if (wsrep_gtid_mode)
thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id;
else
thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id;
while (buf_len)
{
int exec_res;
Log_event* ev= wsrep_read_log_event(&buf, &buf_len,
wsrep_get_apply_format(thd));
if (!ev)
{
WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %zu",
(long long)wsrep_thd_trx_seqno(thd), buf_len);
rcode= WSREP_ERR_BAD_EVENT;
goto error;
}
typ= ev->get_type_code();
switch (typ) {
case FORMAT_DESCRIPTION_EVENT:
wsrep_set_apply_format(thd, (Format_description_log_event*)ev);
continue;
case GTID_EVENT:
{
Gtid_log_event *gtid_ev= (Gtid_log_event*)ev;
thd->variables.server_id= gtid_ev->server_id;
thd->variables.gtid_domain_id= gtid_ev->domain_id;
if ((gtid_ev->server_id == wsrep_gtid_server.server_id) &&
(gtid_ev->domain_id == wsrep_gtid_server.domain_id))
{
thd->variables.wsrep_gtid_seq_no= gtid_ev->seq_no;
}
else
{
thd->variables.gtid_seq_no= gtid_ev->seq_no;
}
if (wsrep_gtid_mode)
wsrep_schema->store_gtid_event(thd, gtid_ev);
delete ev;
}
continue;
default:
break;
}
if (!thd->variables.gtid_seq_no && wsrep_thd_is_toi(thd) &&
(ev->get_type_code() == QUERY_EVENT))
{
uint64 seqno= wsrep_gtid_server.seqno_inc();
thd->wsrep_current_gtid_seqno= seqno;
if (mysql_bin_log.is_open() && wsrep_gtid_mode)
{
thd->variables.gtid_seq_no= seqno;
}
}
if (LOG_EVENT_IS_WRITE_ROW(typ) ||
LOG_EVENT_IS_UPDATE_ROW(typ) ||
LOG_EVENT_IS_DELETE_ROW(typ))
{
Rows_log_event* rle = static_cast<Rows_log_event*>(ev);
if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))
{
rle->set_flags(Rows_log_event::RELAXED_UNIQUE_CHECKS_F);
}
if (thd_test_options(thd, OPTION_NO_FOREIGN_KEY_CHECKS))
{
rle->set_flags(Rows_log_event::NO_FOREIGN_KEY_CHECKS_F);
}
}
/* Use the original server id for logging. */
thd->set_server_id(ev->server_id);
thd->lex->current_select= 0;
thd->variables.option_bits=
(thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0);
if (ev->get_type_code() == GTID_EVENT)
{
thd->variables.option_bits &= ~OPTION_GTID_BEGIN;
}
ev->thd= thd;
thd->set_time();
if (!ev->when)
{
my_hrtime_t hrtime= my_hrtime();
ev->when= hrtime_to_my_time(hrtime);
ev->when_sec_part= hrtime_sec_part(hrtime);
}
exec_res= ev->apply_event(thd->wsrep_rgi);
DBUG_PRINT("info", ("exec_event result: %d", exec_res));
if (exec_res)
{
WSREP_WARN("Event %d %s apply failed: %d, seqno %lld",
event, ev->get_type_str(), exec_res,
(long long) wsrep_thd_trx_seqno(thd));
rcode= exec_res;
/* stop processing for the first error */
delete ev;
goto error;
}
event++;
delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev);
}
error:
if (thd->killed == KILL_CONNECTION)
WSREP_INFO("applier aborted: %lld", (long long)wsrep_thd_trx_seqno(thd));
wsrep_set_apply_format(thd, NULL);
DBUG_RETURN(rcode);
}