mirror of
https://github.com/MariaDB/server.git
synced 2025-01-27 09:14:17 +01:00
789 lines
26 KiB
C++
789 lines
26 KiB
C++
/* Copyright 2018-2023 Codership Oy <info@codership.com>
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
|
|
|
|
#include "wsrep_high_priority_service.h"
|
|
#include "wsrep_applier.h"
|
|
#include "wsrep_binlog.h"
|
|
#include "wsrep_schema.h"
|
|
#include "wsrep_xid.h"
|
|
#include "wsrep_trans_observer.h"
|
|
#include "wsrep_server_state.h"
|
|
|
|
#include "sql_class.h" /* THD */
|
|
#include "transaction.h"
|
|
#include "debug_sync.h"
|
|
/* RLI */
|
|
#include "rpl_rli.h"
|
|
#define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1
|
|
#define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
|
|
#include "slave.h"
|
|
#include "rpl_mi.h"
|
|
#include "rpl_constants.h"
|
|
|
|
namespace
|
|
{
|
|
/*
|
|
Scoped mode for applying non-transactional write sets (TOI)
|
|
*/
|
|
class Wsrep_non_trans_mode
|
|
{
|
|
public:
|
|
Wsrep_non_trans_mode(THD* thd, const wsrep::ws_meta& ws_meta)
|
|
: m_thd(thd)
|
|
, m_option_bits(thd->variables.option_bits)
|
|
, m_server_status(thd->server_status)
|
|
{
|
|
m_thd->variables.option_bits&= ~OPTION_BEGIN;
|
|
m_thd->server_status&= ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
|
|
m_thd->wsrep_cs().enter_toi_mode(ws_meta);
|
|
}
|
|
~Wsrep_non_trans_mode()
|
|
{
|
|
m_thd->variables.option_bits= m_option_bits;
|
|
m_thd->server_status= m_server_status;
|
|
m_thd->wsrep_cs().leave_toi_mode();
|
|
}
|
|
private:
|
|
Wsrep_non_trans_mode(const Wsrep_non_trans_mode&);
|
|
Wsrep_non_trans_mode& operator=(const Wsrep_non_trans_mode&);
|
|
THD* m_thd;
|
|
ulonglong m_option_bits;
|
|
uint m_server_status;
|
|
};
|
|
}
|
|
|
|
static rpl_group_info* wsrep_relay_group_init(THD* thd, const char* log_fname)
|
|
{
|
|
Relay_log_info* rli= new Relay_log_info(false);
|
|
|
|
if (!rli->relay_log.description_event_for_exec)
|
|
{
|
|
rli->relay_log.description_event_for_exec=
|
|
new Format_description_log_event(4, 0, BINLOG_CHECKSUM_ALG_OFF);
|
|
}
|
|
|
|
static LEX_CSTRING connection_name= { STRING_WITH_LEN("wsrep") };
|
|
|
|
/*
|
|
Master_info's constructor initializes rpl_filter by either an already
|
|
constructed Rpl_filter object from global 'rpl_filters' list if the
|
|
specified connection name is same, or it constructs a new Rpl_filter
|
|
object and adds it to rpl_filters. This object is later destructed by
|
|
Mater_info's destructor by looking it up based on connection name in
|
|
rpl_filters list.
|
|
|
|
However, since all Master_info objects created here would share same
|
|
connection name ("wsrep"), destruction of any of the existing Master_info
|
|
objects (in wsrep_return_from_bf_mode()) would free rpl_filter referenced
|
|
by any/all existing Master_info objects.
|
|
|
|
In order to avoid that, we have added a check in Master_info's destructor
|
|
to not free the "wsrep" rpl_filter. It will eventually be freed by
|
|
free_all_rpl_filters() when server terminates.
|
|
*/
|
|
rli->mi= new Master_info(&connection_name, false);
|
|
|
|
struct rpl_group_info *rgi= new rpl_group_info(rli);
|
|
rgi->thd= rli->sql_driver_thd= thd;
|
|
|
|
if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
|
|
{
|
|
rgi->deferred_events= new Deferred_log_events(rli);
|
|
}
|
|
|
|
return rgi;
|
|
}
|
|
|
|
static void wsrep_setup_uk_and_fk_checks(THD* thd)
|
|
{
|
|
/* Tune FK and UK checking policy. These are reset back to original
|
|
in Wsrep_high_priority_service destructor. */
|
|
if (wsrep_slave_UK_checks == FALSE)
|
|
thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
|
|
else
|
|
thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
|
|
|
|
if (wsrep_slave_FK_checks == FALSE)
|
|
thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
|
|
else
|
|
thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
|
|
}
|
|
|
|
static int apply_events(THD* thd,
|
|
Relay_log_info* rli,
|
|
const wsrep::const_buffer& data,
|
|
wsrep::mutable_buffer& err,
|
|
bool const include_msg)
|
|
{
|
|
int const ret= wsrep_apply_events(thd, rli, data.data(), data.size());
|
|
if (ret || wsrep_thd_has_ignored_error(thd))
|
|
{
|
|
if (ret)
|
|
{
|
|
wsrep_store_error(thd, err, include_msg);
|
|
}
|
|
wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
/****************************************************************************
|
|
High priority service
|
|
*****************************************************************************/
|
|
|
|
Wsrep_high_priority_service::Wsrep_high_priority_service(THD* thd)
|
|
: wsrep::high_priority_service(Wsrep_server_state::instance())
|
|
, wsrep::high_priority_context(thd->wsrep_cs())
|
|
, m_thd(thd)
|
|
, m_rli()
|
|
{
|
|
LEX_CSTRING db_str= { NULL, 0 };
|
|
m_shadow.option_bits = thd->variables.option_bits;
|
|
m_shadow.server_status= thd->server_status;
|
|
m_shadow.vio = thd->net.vio;
|
|
m_shadow.tx_isolation = thd->variables.tx_isolation;
|
|
m_shadow.db = (char *)thd->db.str;
|
|
m_shadow.db_length = thd->db.length;
|
|
m_shadow.user_time = thd->user_time;
|
|
m_shadow.row_count_func= thd->get_row_count_func();
|
|
m_shadow.wsrep_applier= thd->wsrep_applier;
|
|
|
|
/* Disable general logging on applier threads */
|
|
thd->variables.option_bits |= OPTION_LOG_OFF;
|
|
|
|
/* enable binlogging regardless of log_slave_updates setting
|
|
this is for ensuring that both local and applier transaction go through
|
|
same commit ordering algorithm in group commit control
|
|
*/
|
|
thd->variables.option_bits|= OPTION_BIN_LOG;
|
|
|
|
thd->net.vio= 0;
|
|
thd->reset_db(&db_str);
|
|
thd->clear_error();
|
|
thd->variables.tx_isolation= ISO_READ_COMMITTED;
|
|
thd->tx_isolation = ISO_READ_COMMITTED;
|
|
|
|
/* From trans_begin() */
|
|
thd->variables.option_bits|= OPTION_BEGIN;
|
|
thd->server_status|= SERVER_STATUS_IN_TRANS;
|
|
|
|
/* Make THD wsrep_applier so that it cannot be killed */
|
|
thd->wsrep_applier= true;
|
|
|
|
if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init(thd, "wsrep_relay");
|
|
|
|
m_rgi= thd->wsrep_rgi;
|
|
m_rgi->thd= thd;
|
|
m_rli= m_rgi->rli;
|
|
thd_proc_info(thd, "wsrep applier idle");
|
|
}
|
|
|
|
Wsrep_high_priority_service::~Wsrep_high_priority_service()
|
|
{
|
|
THD* thd= m_thd;
|
|
thd->variables.option_bits = m_shadow.option_bits;
|
|
thd->server_status = m_shadow.server_status;
|
|
thd->net.vio = m_shadow.vio;
|
|
thd->variables.tx_isolation= m_shadow.tx_isolation;
|
|
LEX_CSTRING db_str= { m_shadow.db, m_shadow.db_length };
|
|
thd->reset_db(&db_str);
|
|
thd->user_time = m_shadow.user_time;
|
|
|
|
if (thd->wsrep_rgi && thd->wsrep_rgi->rli)
|
|
delete thd->wsrep_rgi->rli->mi;
|
|
if (thd->wsrep_rgi)
|
|
delete thd->wsrep_rgi->rli;
|
|
delete thd->wsrep_rgi;
|
|
thd->wsrep_rgi= NULL;
|
|
|
|
thd->set_row_count_func(m_shadow.row_count_func);
|
|
thd->wsrep_applier = m_shadow.wsrep_applier;
|
|
}
|
|
|
|
int Wsrep_high_priority_service::start_transaction(
|
|
const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta)
|
|
{
|
|
DBUG_ENTER(" Wsrep_high_priority_service::start_transaction");
|
|
DBUG_RETURN(m_thd->wsrep_cs().start_transaction(ws_handle, ws_meta) ||
|
|
trans_begin(m_thd));
|
|
}
|
|
|
|
const wsrep::transaction& Wsrep_high_priority_service::transaction() const
|
|
{
|
|
DBUG_ENTER(" Wsrep_high_priority_service::transaction");
|
|
DBUG_RETURN(m_thd->wsrep_trx());
|
|
}
|
|
|
|
int Wsrep_high_priority_service::next_fragment(const wsrep::ws_meta& ws_meta)
|
|
{
|
|
DBUG_ENTER(" Wsrep_high_priority_service::next_fragment");
|
|
DBUG_RETURN(m_thd->wsrep_cs().next_fragment(ws_meta));
|
|
}
|
|
|
|
int Wsrep_high_priority_service::adopt_transaction(
|
|
const wsrep::transaction& transaction)
|
|
{
|
|
DBUG_ENTER(" Wsrep_high_priority_service::adopt_transaction");
|
|
/* Adopt transaction first to set up transaction meta data for
|
|
trans begin. If trans_begin() fails for some reason, roll back
|
|
the wsrep transaction before return. */
|
|
m_thd->wsrep_cs().adopt_transaction(transaction);
|
|
int ret= trans_begin(m_thd);
|
|
if (ret)
|
|
{
|
|
m_thd->wsrep_cs().before_rollback();
|
|
m_thd->wsrep_cs().after_rollback();
|
|
}
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
int Wsrep_high_priority_service::append_fragment_and_commit(
|
|
const wsrep::ws_handle& ws_handle,
|
|
const wsrep::ws_meta& ws_meta,
|
|
const wsrep::const_buffer& data,
|
|
const wsrep::xid& xid WSREP_UNUSED)
|
|
{
|
|
DBUG_ENTER("Wsrep_high_priority_service::append_fragment_and_commit");
|
|
int ret= start_transaction(ws_handle, ws_meta);
|
|
/*
|
|
Start transaction explicitly to avoid early commit via
|
|
trans_commit_stmt() in append_fragment()
|
|
*/
|
|
ret= ret || trans_begin(m_thd);
|
|
ret= ret || wsrep_schema->append_fragment(m_thd,
|
|
ws_meta.server_id(),
|
|
ws_meta.transaction_id(),
|
|
ws_meta.seqno(),
|
|
ws_meta.flags(),
|
|
data);
|
|
|
|
/*
|
|
Note: The commit code below seems to be identical to
|
|
Wsrep_storage_service::commit(). Consider implementing
|
|
common utility function to deal with commit.
|
|
*/
|
|
const bool do_binlog_commit= (opt_log_slave_updates &&
|
|
wsrep_gtid_mode &&
|
|
m_thd->variables.gtid_seq_no);
|
|
/*
|
|
Write skip event into binlog if gtid_mode is on. This is to
|
|
maintain gtid continuity.
|
|
*/
|
|
if (do_binlog_commit)
|
|
{
|
|
ret= wsrep_write_skip_event(m_thd);
|
|
}
|
|
|
|
if (!ret)
|
|
{
|
|
ret= m_thd->wsrep_cs().prepare_for_ordering(ws_handle,
|
|
ws_meta, true);
|
|
}
|
|
|
|
ret= ret || trans_commit(m_thd);
|
|
ret= ret || (m_thd->wsrep_cs().after_applying(), 0);
|
|
|
|
m_thd->release_transactional_locks();
|
|
|
|
free_root(m_thd->mem_root, MYF(MY_KEEP_PREALLOC));
|
|
|
|
thd_proc_info(m_thd, "wsrep applier committed");
|
|
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
int Wsrep_high_priority_service::remove_fragments(const wsrep::ws_meta& ws_meta)
|
|
{
|
|
DBUG_ENTER("Wsrep_high_priority_service::remove_fragments");
|
|
int ret= wsrep_schema->remove_fragments(m_thd,
|
|
ws_meta.server_id(),
|
|
ws_meta.transaction_id(),
|
|
m_thd->wsrep_sr().fragments());
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
int Wsrep_high_priority_service::commit(const wsrep::ws_handle& ws_handle,
|
|
const wsrep::ws_meta& ws_meta)
|
|
{
|
|
DBUG_ENTER("Wsrep_high_priority_service::commit");
|
|
THD* thd= m_thd;
|
|
DBUG_ASSERT(thd->wsrep_trx().active());
|
|
thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, true);
|
|
thd_proc_info(thd, "committing");
|
|
int ret=0;
|
|
|
|
const bool is_ordered= !ws_meta.seqno().is_undefined();
|
|
|
|
if (!thd->transaction->stmt.is_empty())
|
|
ret= trans_commit_stmt(thd);
|
|
|
|
if (ret == 0)
|
|
ret= trans_commit(thd);
|
|
|
|
if (ret == 0)
|
|
{
|
|
m_rgi->cleanup_context(thd, 0);
|
|
}
|
|
|
|
m_thd->release_transactional_locks();
|
|
|
|
thd_proc_info(thd, "wsrep applier committed");
|
|
|
|
if (!is_ordered)
|
|
{
|
|
m_thd->wsrep_cs().before_rollback();
|
|
m_thd->wsrep_cs().after_rollback();
|
|
}
|
|
else if (m_thd->wsrep_trx().state() == wsrep::transaction::s_executing)
|
|
{
|
|
/*
|
|
Wsrep commit was ordered but it did not go through commit time
|
|
hooks and remains active. Cycle through commit hooks to release
|
|
commit order and to make cleanup happen in after_applying() call.
|
|
|
|
This is a workaround for CTAS with empty result set.
|
|
*/
|
|
WSREP_DEBUG("Commit not finished for applier %llu", thd->thread_id);
|
|
ret= ret || m_thd->wsrep_cs().before_commit() ||
|
|
m_thd->wsrep_cs().ordered_commit() ||
|
|
m_thd->wsrep_cs().after_commit();
|
|
}
|
|
|
|
thd->lex->sql_command= SQLCOM_END;
|
|
|
|
free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
|
|
|
|
must_exit_= check_exit_status();
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
|
|
const wsrep::ws_meta& ws_meta)
|
|
{
|
|
DBUG_ENTER("Wsrep_high_priority_service::rollback");
|
|
if (ws_meta.ordered())
|
|
{
|
|
m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, false);
|
|
}
|
|
else
|
|
{
|
|
assert(ws_meta == wsrep::ws_meta());
|
|
assert(ws_handle == wsrep::ws_handle());
|
|
}
|
|
int ret= (trans_rollback_stmt(m_thd) || trans_rollback(m_thd));
|
|
|
|
WSREP_DEBUG("::rollback() thread: %lu, client_state %s "
|
|
"client_mode %s trans_state %s killed %d",
|
|
thd_get_thread_id(m_thd),
|
|
wsrep_thd_client_state_str(m_thd),
|
|
wsrep_thd_client_mode_str(m_thd),
|
|
wsrep_thd_transaction_state_str(m_thd),
|
|
m_thd->killed);
|
|
|
|
#ifdef ENABLED_DEBUG_SYNC
|
|
DBUG_EXECUTE_IF("sync.wsrep_rollback_mdl_release",
|
|
{
|
|
const char act[]=
|
|
"now "
|
|
"SIGNAL sync.wsrep_rollback_mdl_release_reached "
|
|
"WAIT_FOR signal.wsrep_rollback_mdl_release";
|
|
DBUG_ASSERT(!debug_sync_set_action(m_thd,
|
|
STRING_WITH_LEN(act)));
|
|
};);
|
|
#endif
|
|
|
|
m_thd->release_transactional_locks();
|
|
|
|
free_root(m_thd->mem_root, MYF(MY_KEEP_PREALLOC));
|
|
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
|
|
const wsrep::const_buffer& data,
|
|
wsrep::mutable_buffer& err)
|
|
{
|
|
DBUG_ENTER("Wsrep_high_priority_service::apply_toi");
|
|
THD* thd= m_thd;
|
|
Wsrep_non_trans_mode non_trans_mode(thd, ws_meta);
|
|
|
|
wsrep::client_state& client_state(thd->wsrep_cs());
|
|
DBUG_ASSERT(client_state.in_toi());
|
|
|
|
thd_proc_info(thd, "wsrep applier toi");
|
|
|
|
WSREP_DEBUG("Wsrep_high_priority_service::apply_toi: %lld",
|
|
client_state.toi_meta().seqno().get());
|
|
|
|
#ifdef ENABLED_DEBUG_SYNC
|
|
DBUG_EXECUTE_IF("sync.wsrep_apply_toi",
|
|
{
|
|
const char act[]=
|
|
"now "
|
|
"SIGNAL sync.wsrep_apply_toi_reached "
|
|
"WAIT_FOR signal.wsrep_apply_toi";
|
|
DBUG_ASSERT(!debug_sync_set_action(thd,
|
|
STRING_WITH_LEN(act)));
|
|
};);
|
|
#endif
|
|
|
|
thd->set_time();
|
|
int ret= apply_events(thd, m_rli, data, err, false);
|
|
wsrep_thd_set_ignored_error(thd, false);
|
|
trans_commit(thd);
|
|
|
|
thd->close_temporary_tables();
|
|
thd->lex->sql_command= SQLCOM_END;
|
|
|
|
wsrep_gtid_server.signal_waiters(thd->wsrep_current_gtid_seqno, false);
|
|
wsrep_set_SE_checkpoint(client_state.toi_meta().gtid(), wsrep_gtid_server.gtid());
|
|
|
|
must_exit_= check_exit_status();
|
|
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
void Wsrep_high_priority_service::store_globals()
|
|
{
|
|
wsrep_store_threadvars(m_thd);
|
|
m_thd->wsrep_cs().acquire_ownership();
|
|
}
|
|
|
|
void Wsrep_high_priority_service::reset_globals()
|
|
{
|
|
wsrep_reset_threadvars(m_thd);
|
|
}
|
|
|
|
void Wsrep_high_priority_service::switch_execution_context(wsrep::high_priority_service& orig_high_priority_service)
|
|
{
|
|
DBUG_ENTER("Wsrep_high_priority_service::switch_execution_context");
|
|
Wsrep_high_priority_service&
|
|
orig_hps= static_cast<Wsrep_high_priority_service&>(orig_high_priority_service);
|
|
m_thd->thread_stack= orig_hps.m_thd->thread_stack;
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
int Wsrep_high_priority_service::log_dummy_write_set(const wsrep::ws_handle& ws_handle,
|
|
const wsrep::ws_meta& ws_meta,
|
|
wsrep::mutable_buffer& err)
|
|
{
|
|
DBUG_ENTER("Wsrep_high_priority_service::log_dummy_write_set");
|
|
int ret= 0;
|
|
DBUG_PRINT("info",
|
|
("Wsrep_high_priority_service::log_dummy_write_set: seqno=%lld",
|
|
ws_meta.seqno().get()));
|
|
#ifdef ENABLED_DEBUG_SYNC
|
|
DBUG_EXECUTE_IF("sync.wsrep_log_dummy_write_set",
|
|
{
|
|
const char act[]=
|
|
"now "
|
|
"SIGNAL sync.wsrep_log_dummy_write_set_reached ";
|
|
DBUG_ASSERT(!debug_sync_set_action(m_thd,
|
|
STRING_WITH_LEN(act)));
|
|
};);
|
|
#endif
|
|
|
|
if (ws_meta.ordered())
|
|
{
|
|
wsrep::client_state& cs(m_thd->wsrep_cs());
|
|
if (!cs.transaction().active())
|
|
{
|
|
cs.start_transaction(ws_handle, ws_meta);
|
|
}
|
|
adopt_apply_error(err);
|
|
WSREP_DEBUG("Log dummy write set %lld", ws_meta.seqno().get());
|
|
ret= cs.provider().commit_order_enter(ws_handle, ws_meta);
|
|
if (!(ret && opt_log_slave_updates && wsrep_gtid_mode &&
|
|
m_thd->variables.gtid_seq_no))
|
|
{
|
|
cs.before_rollback();
|
|
cs.after_rollback();
|
|
}
|
|
|
|
if (!WSREP_EMULATE_BINLOG(m_thd))
|
|
{
|
|
wsrep_register_for_group_commit(m_thd);
|
|
/* wait_for_prior_commit() ensures that all preceding transactions
|
|
have been committed and seqno has been synced into
|
|
storage engine. We don't release commit order here yet to
|
|
avoid following transactions to sync seqno before
|
|
wsrep_set_SE_checkpoint() below returns. This effectively pauses
|
|
group commit for the checkpoint operation, but is the only way to
|
|
ensure proper ordering. */
|
|
m_thd->wait_for_prior_commit();
|
|
}
|
|
|
|
WSREP_DEBUG("checkpointing dummy write set %lld", ws_meta.seqno().get());
|
|
wsrep_set_SE_checkpoint(ws_meta.gtid(), wsrep_gtid_server.gtid());
|
|
|
|
if (!WSREP_EMULATE_BINLOG(m_thd))
|
|
{
|
|
wsrep_unregister_from_group_commit(m_thd);
|
|
}
|
|
ret= ret || cs.provider().commit_order_leave(ws_handle, ws_meta, err);
|
|
cs.after_applying();
|
|
}
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
void Wsrep_high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err)
|
|
{
|
|
m_thd->wsrep_cs().adopt_apply_error(err);
|
|
}
|
|
|
|
void Wsrep_high_priority_service::debug_crash(const char* crash_point)
|
|
{
|
|
DBUG_ASSERT(m_thd == current_thd);
|
|
DBUG_EXECUTE_IF(crash_point, DBUG_SUICIDE(););
|
|
}
|
|
|
|
/****************************************************************************
|
|
Applier service
|
|
*****************************************************************************/
|
|
|
|
Wsrep_applier_service::Wsrep_applier_service(THD* thd)
|
|
: Wsrep_high_priority_service(thd)
|
|
{
|
|
thd->wsrep_applier_service= this;
|
|
thd->wsrep_cs().open(wsrep::client_id(thd->thread_id));
|
|
thd->wsrep_cs().before_command();
|
|
thd->wsrep_cs().debug_log_level(wsrep_debug);
|
|
if (!thd->slave_thread)
|
|
thd->system_thread_info.rpl_sql_info=
|
|
new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter);
|
|
}
|
|
|
|
Wsrep_applier_service::~Wsrep_applier_service()
|
|
{
|
|
if (!m_thd->slave_thread)
|
|
delete m_thd->system_thread_info.rpl_sql_info;
|
|
m_thd->wsrep_cs().after_command_before_result();
|
|
m_thd->wsrep_cs().after_command_after_result();
|
|
m_thd->wsrep_cs().close();
|
|
m_thd->wsrep_cs().cleanup();
|
|
m_thd->wsrep_applier_service= NULL;
|
|
}
|
|
|
|
int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta,
|
|
const wsrep::const_buffer& data,
|
|
wsrep::mutable_buffer& err)
|
|
{
|
|
DBUG_ENTER("Wsrep_applier_service::apply_write_set");
|
|
THD* thd= m_thd;
|
|
|
|
thd->variables.option_bits |= OPTION_BEGIN;
|
|
thd->variables.option_bits |= OPTION_GTID_BEGIN;
|
|
thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT;
|
|
DBUG_ASSERT(thd->wsrep_trx().active());
|
|
DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_executing);
|
|
|
|
thd_proc_info(thd, "applying write set");
|
|
|
|
/* moved dbug sync point here, after possible THD switch for SR transactions
|
|
has ben done
|
|
*/
|
|
#ifdef ENABLED_DEBUG_SYNC
|
|
/* Allow tests to block the applier thread using the DBUG facilities */
|
|
DBUG_EXECUTE_IF("sync.wsrep_apply_cb",
|
|
{
|
|
const char act[]=
|
|
"now "
|
|
"SIGNAL sync.wsrep_apply_cb_reached "
|
|
"WAIT_FOR signal.wsrep_apply_cb";
|
|
DBUG_ASSERT(!debug_sync_set_action(thd,
|
|
STRING_WITH_LEN(act)));
|
|
};);
|
|
#endif /* ENABLED_DEBUG_SYNC */
|
|
|
|
wsrep_setup_uk_and_fk_checks(thd);
|
|
int ret= apply_events(thd, m_rli, data, err, true);
|
|
|
|
thd->close_temporary_tables();
|
|
if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit))
|
|
{
|
|
thd->wsrep_cs().fragment_applied(ws_meta.seqno());
|
|
}
|
|
thd_proc_info(thd, "wsrep applied write set");
|
|
|
|
thd->variables.option_bits &= ~OPTION_GTID_BEGIN;
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
int Wsrep_applier_service::apply_nbo_begin(const wsrep::ws_meta& ws_meta,
|
|
const wsrep::const_buffer& data,
|
|
wsrep::mutable_buffer& err)
|
|
{
|
|
DBUG_ENTER("Wsrep_applier_service::apply_nbo_begin");
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
void Wsrep_applier_service::after_apply()
|
|
{
|
|
DBUG_ENTER("Wsrep_applier_service::after_apply");
|
|
wsrep_after_apply(m_thd);
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
bool Wsrep_applier_service::check_exit_status() const
|
|
{
|
|
bool ret= false;
|
|
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
|
|
if (wsrep_slave_count_change < 0)
|
|
{
|
|
++wsrep_slave_count_change;
|
|
ret= true;
|
|
}
|
|
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
|
|
return ret;
|
|
}
|
|
|
|
/****************************************************************************
|
|
Replayer service
|
|
*****************************************************************************/
|
|
|
|
Wsrep_replayer_service::Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd)
|
|
: Wsrep_high_priority_service(replayer_thd)
|
|
, m_orig_thd(orig_thd)
|
|
, m_da_shadow()
|
|
, m_replay_status()
|
|
{
|
|
/* Response must not have been sent to client */
|
|
DBUG_ASSERT(!orig_thd->get_stmt_da()->is_sent());
|
|
/* PS reprepare observer should have been removed already
|
|
open_table() will fail if we have dangling observer here */
|
|
DBUG_ASSERT(!orig_thd->m_reprepare_observer);
|
|
/* Replaying should happen always from after_statement() hook
|
|
after rollback, which should guarantee that there are no
|
|
transactional locks */
|
|
DBUG_ASSERT(!orig_thd->mdl_context.has_transactional_locks());
|
|
|
|
replayer_thd->system_thread_info.rpl_sql_info=
|
|
new rpl_sql_thread_info(replayer_thd->wsrep_rgi->rli->mi->rpl_filter);
|
|
|
|
/* Make a shadow copy of diagnostics area and reset */
|
|
m_da_shadow.status= orig_thd->get_stmt_da()->status();
|
|
if (m_da_shadow.status == Diagnostics_area::DA_OK)
|
|
{
|
|
m_da_shadow.affected_rows= orig_thd->get_stmt_da()->affected_rows();
|
|
m_da_shadow.last_insert_id= orig_thd->get_stmt_da()->last_insert_id();
|
|
strmake(m_da_shadow.message, orig_thd->get_stmt_da()->message(),
|
|
sizeof(m_da_shadow.message) - 1);
|
|
}
|
|
orig_thd->get_stmt_da()->reset_diagnostics_area();
|
|
|
|
/* Release explicit locks */
|
|
if (orig_thd->locked_tables_mode && orig_thd->lock)
|
|
{
|
|
WSREP_WARN("releasing table lock for replaying (%llu)",
|
|
orig_thd->thread_id);
|
|
orig_thd->locked_tables_list.unlock_locked_tables(orig_thd);
|
|
orig_thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
|
|
}
|
|
|
|
thd_proc_info(orig_thd, "wsrep replaying trx");
|
|
|
|
/*
|
|
Switch execution context to replayer_thd and prepare it for
|
|
replay execution.
|
|
*/
|
|
/* Copy thd vars from orig_thd before reset, otherwise reset
|
|
for orig thd clears thread local storage before copy. */
|
|
wsrep_assign_from_threadvars(replayer_thd);
|
|
wsrep_reset_threadvars(orig_thd);
|
|
wsrep_store_threadvars(replayer_thd);
|
|
wsrep_open(replayer_thd);
|
|
wsrep_before_command(replayer_thd);
|
|
replayer_thd->wsrep_cs().clone_transaction_for_replay(orig_thd->wsrep_trx());
|
|
}
|
|
|
|
Wsrep_replayer_service::~Wsrep_replayer_service()
|
|
{
|
|
/* Switch execution context back to original. */
|
|
wsrep_after_apply(m_thd);
|
|
wsrep_after_command_ignore_result(m_thd);
|
|
wsrep_close(m_thd);
|
|
wsrep_reset_threadvars(m_thd);
|
|
wsrep_store_threadvars(m_orig_thd);
|
|
|
|
DBUG_ASSERT(!m_orig_thd->get_stmt_da()->is_sent());
|
|
DBUG_ASSERT(!m_orig_thd->get_stmt_da()->is_set());
|
|
|
|
delete m_thd->system_thread_info.rpl_sql_info;
|
|
m_thd->system_thread_info.rpl_sql_info= nullptr;
|
|
|
|
if (m_replay_status == wsrep::provider::success)
|
|
{
|
|
DBUG_ASSERT(m_thd->wsrep_cs().current_error() == wsrep::e_success);
|
|
m_orig_thd->reset_kill_query();
|
|
my_ok(m_orig_thd, m_da_shadow.affected_rows, m_da_shadow.last_insert_id);
|
|
}
|
|
else if (m_replay_status == wsrep::provider::error_certification_failed)
|
|
{
|
|
wsrep_override_error(m_orig_thd, ER_LOCK_DEADLOCK);
|
|
}
|
|
else
|
|
{
|
|
DBUG_ASSERT(0);
|
|
WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
|
|
m_replay_status,
|
|
m_orig_thd->db.str, wsrep_thd_query(m_orig_thd));
|
|
unireg_abort(1);
|
|
}
|
|
}
|
|
|
|
int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta,
|
|
const wsrep::const_buffer& data,
|
|
wsrep::mutable_buffer& err)
|
|
{
|
|
DBUG_ENTER("Wsrep_replayer_service::apply_write_set");
|
|
THD* thd= m_thd;
|
|
|
|
DBUG_ASSERT(thd->wsrep_trx().active());
|
|
DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_replaying);
|
|
|
|
#ifdef ENABLED_DEBUG_SYNC
|
|
/* Allow tests to block the replayer thread using the DBUG facilities */
|
|
DBUG_EXECUTE_IF("sync.wsrep_replay_cb",
|
|
{
|
|
const char act[]=
|
|
"now "
|
|
"SIGNAL sync.wsrep_replay_cb_reached "
|
|
"WAIT_FOR signal.wsrep_replay_cb";
|
|
DBUG_ASSERT(!debug_sync_set_action(thd,
|
|
STRING_WITH_LEN(act)));
|
|
};);
|
|
#endif
|
|
|
|
wsrep_setup_uk_and_fk_checks(thd);
|
|
|
|
int ret= 0;
|
|
if (!wsrep::starts_transaction(ws_meta.flags()))
|
|
{
|
|
DBUG_ASSERT(thd->wsrep_trx().is_streaming());
|
|
ret= wsrep_schema->replay_transaction(thd,
|
|
m_rli,
|
|
ws_meta,
|
|
thd->wsrep_sr().fragments());
|
|
}
|
|
ret= ret || apply_events(thd, m_rli, data, err, true);
|
|
thd->close_temporary_tables();
|
|
if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit))
|
|
{
|
|
thd->wsrep_cs().fragment_applied(ws_meta.seqno());
|
|
}
|
|
|
|
thd_proc_info(thd, "wsrep replayed write set");
|
|
DBUG_RETURN(ret);
|
|
}
|