mirror of
https://github.com/MariaDB/server.git
synced 2025-01-16 03:52:35 +01:00
3f59bbeeae
The problem seems to be a deadlock between KILL command execution and BF abort issued by an applier, where: * KILL has locked victim's LOCK_thd_kill and LOCK_thd_data. * Applier has innodb side global lock mutex and victim trx mutex. * KILL is calling innobase_kill_query, and is blocked by innodb global lock mutex. * Applier is in wsrep_innobase_kill_one_trx and is blocked by victim's LOCK_thd_kill. The fix in this commit removes the TOI replication of KILL command and makes KILL execution less intrusive operation. Aborting the victim happens now by using awake_no_mutex() and ha_abort_transaction(). If the KILL happens when the transaction is committing, the KILL operation is postponed to happen after the statement has completed in order to avoid KILL to interrupt commit processing. Notable changes in this commit: * wsrep client connections's error state may remain sticky after client connection is closed. This error message will then pop up for the next client session issuing first SQL statement. This problem raised with test galera.galera_bf_kill. The fix is to reset wsrep client error state, before a THD is reused for next connetion. * Release THD locks in wsrep_abort_transaction when locking innodb mutexes. This guarantees same locking order as with applier BF aborting. * BF abort from MDL was changed to do BF abort on server/wsrep-lib side first, and only then do the BF abort on InnoDB side. This removes the need to call back from InnoDB for BF aborts which originate from MDL and simplifies the locking. * Removed wsrep_thd_set_wsrep_aborter() from service_wsrep.h. The manipulation of the wsrep_aborter can be done solely on server side. Moreover, it is now debug only variable and could be excluded from optimized builds. * Remove LOCK_thd_kill from wsrep_thd_LOCK/UNLOCK to allow more fine grained locking for SR BF abort which may require locking of victim LOCK_thd_kill. Added explicit call for wsrep_thd_kill_LOCK/UNLOCK where appropriate. * Wsrep-lib was updated to version which allows external locking for BF abort calls. Changes to MTR tests: * Disable galera_bf_abort_group_commit. This test is going to be removed (MDEV-30855). * Record galera_gcache_recover_manytrx as result file was incomplete. Trivial change. * Make galera_create_table_as_select more deterministic: Wait until CTAS execution has reached MDL wait for multi-master conflict case. Expected error from multi-master conflict is ER_QUERY_INTERRUPTED. This is because CTAS does not yet have open wsrep transaction when it is waiting for MDL, query gets interrupted instead of BF aborted. This should be addressed in separate task. * A new test galera_kill_group_commit to verify correct behavior when KILL is executed while the transaction is committing. Co-authored-by: Seppo Jaakola <seppo.jaakola@iki.fi> Co-authored-by: Jan Lindström <jan.lindstrom@galeracluster.com> Signed-off-by: Julius Goryavsky <julius.goryavsky@mariadb.com>
585 lines
19 KiB
C++
585 lines
19 KiB
C++
/* Copyright (C) 2013-2023 Codership Oy <info@codership.com>
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License along
|
|
with this program; if not, write to the Free Software Foundation, Inc.,
|
|
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
|
|
|
|
#include "mariadb.h"
|
|
#include "wsrep_thd.h"
|
|
#include "wsrep_trans_observer.h"
|
|
#include "wsrep_high_priority_service.h"
|
|
#include "wsrep_storage_service.h"
|
|
#include "wsrep_server_state.h"
|
|
#include "transaction.h"
|
|
#include "rpl_rli.h"
|
|
#include "log_event.h"
|
|
#include "sql_parse.h"
|
|
#include "wsrep_mysqld.h" // start_wsrep_THD();
|
|
#include "mysql/service_wsrep.h"
|
|
#include "debug_sync.h"
|
|
#include "slave.h"
|
|
#include "rpl_rli.h"
|
|
#include "rpl_mi.h"
|
|
|
|
extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
|
|
|
|
static Wsrep_thd_queue* wsrep_rollback_queue= 0;
|
|
static Atomic_counter<uint64_t> wsrep_bf_aborts_counter;
|
|
|
|
|
|
int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
|
|
enum enum_var_type scope)
|
|
{
|
|
wsrep_local_bf_aborts= wsrep_bf_aborts_counter;
|
|
var->type= SHOW_LONGLONG;
|
|
var->value= (char*)&wsrep_local_bf_aborts;
|
|
return 0;
|
|
}
|
|
|
|
static void wsrep_replication_process(THD *thd,
|
|
void* arg __attribute__((unused)))
|
|
{
|
|
DBUG_ENTER("wsrep_replication_process");
|
|
|
|
Wsrep_applier_service applier_service(thd);
|
|
|
|
WSREP_INFO("Starting applier thread %llu", thd->thread_id);
|
|
enum wsrep::provider::status
|
|
ret= Wsrep_server_state::get_provider().run_applier(&applier_service);
|
|
|
|
WSREP_INFO("Applier thread exiting ret: %d thd: %llu", ret, thd->thread_id);
|
|
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
|
|
wsrep_close_applier(thd);
|
|
mysql_cond_broadcast(&COND_wsrep_slave_threads);
|
|
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
|
|
|
|
delete thd->wsrep_rgi->rli->mi;
|
|
delete thd->wsrep_rgi->rli;
|
|
|
|
thd->wsrep_rgi->cleanup_after_session();
|
|
delete thd->wsrep_rgi;
|
|
thd->wsrep_rgi= NULL;
|
|
|
|
|
|
if(thd->has_thd_temporary_tables())
|
|
{
|
|
WSREP_WARN("Applier %lld has temporary tables at exit.",
|
|
thd->thread_id);
|
|
}
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
static bool create_wsrep_THD(Wsrep_thd_args* args, bool mutex_protected)
|
|
{
|
|
if (!mutex_protected)
|
|
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
|
|
|
|
ulong old_wsrep_running_threads= wsrep_running_threads;
|
|
|
|
DBUG_ASSERT(args->thread_type() == WSREP_APPLIER_THREAD ||
|
|
args->thread_type() == WSREP_ROLLBACKER_THREAD);
|
|
|
|
bool res= mysql_thread_create(args->thread_type() == WSREP_APPLIER_THREAD
|
|
? key_wsrep_applier : key_wsrep_rollbacker,
|
|
args->thread_id(), &connection_attrib,
|
|
start_wsrep_THD, (void*)args);
|
|
|
|
if (res)
|
|
WSREP_ERROR("Can't create wsrep thread");
|
|
|
|
/*
|
|
if starting a thread on server startup, wait until the this thread's THD
|
|
is fully initialized (otherwise a THD initialization code might
|
|
try to access a partially initialized server data structure - MDEV-8208).
|
|
*/
|
|
if (!mysqld_server_initialized)
|
|
{
|
|
while (old_wsrep_running_threads == wsrep_running_threads)
|
|
{
|
|
mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
|
|
}
|
|
}
|
|
|
|
if (!mutex_protected)
|
|
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
|
|
|
|
return res;
|
|
}
|
|
|
|
bool wsrep_create_appliers(long threads, bool mutex_protected)
|
|
{
|
|
/* Dont' start slave threads if wsrep-provider or wsrep-cluster-address
|
|
is not set.
|
|
*/
|
|
if (!WSREP_PROVIDER_EXISTS)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
DBUG_ASSERT(wsrep_cluster_address[0]);
|
|
|
|
long wsrep_threads=0;
|
|
|
|
while (wsrep_threads++ < threads)
|
|
{
|
|
Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_replication_process,
|
|
WSREP_APPLIER_THREAD,
|
|
pthread_self()));
|
|
if (create_wsrep_THD(args, mutex_protected))
|
|
{
|
|
WSREP_WARN("Can't create thread to manage wsrep replication");
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
static void wsrep_remove_streaming_fragments(THD* thd, const char* ctx)
|
|
{
|
|
wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
|
|
Wsrep_storage_service* storage_service= wsrep_create_storage_service(thd, ctx);
|
|
storage_service->store_globals();
|
|
storage_service->adopt_transaction(thd->wsrep_trx());
|
|
storage_service->remove_fragments();
|
|
storage_service->commit(wsrep::ws_handle(transaction_id, 0),
|
|
wsrep::ws_meta());
|
|
Wsrep_server_state::instance().server_service()
|
|
.release_storage_service(storage_service);
|
|
wsrep_store_threadvars(thd);
|
|
}
|
|
|
|
static void wsrep_rollback_high_priority(THD *thd, THD *rollbacker)
|
|
{
|
|
WSREP_DEBUG("Rollbacker aborting SR applier thd (%llu %lu)",
|
|
thd->thread_id, thd->real_id);
|
|
char* orig_thread_stack= thd->thread_stack;
|
|
thd->thread_stack= rollbacker->thread_stack;
|
|
DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
|
|
/* Must be streaming and must have been removed from the
|
|
server state streaming appliers map. */
|
|
DBUG_ASSERT(thd->wsrep_trx().is_streaming());
|
|
DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier(
|
|
thd->wsrep_trx().server_id(),
|
|
thd->wsrep_trx().id()));
|
|
DBUG_ASSERT(thd->wsrep_applier_service);
|
|
|
|
/* Fragment removal should happen before rollback to make
|
|
the transaction non-observable in SR table after the rollback
|
|
completes. For correctness the order does not matter here,
|
|
but currently it is mandated by checks in some MTR tests. */
|
|
wsrep_remove_streaming_fragments(thd, "high priority");
|
|
thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
|
|
wsrep::ws_meta());
|
|
thd->wsrep_applier_service->after_apply();
|
|
thd->thread_stack= orig_thread_stack;
|
|
WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
|
|
thd->thread_id, thd->real_id);
|
|
/* Will free THD */
|
|
Wsrep_server_state::instance().server_service()
|
|
.release_high_priority_service(thd->wsrep_applier_service);
|
|
}
|
|
|
|
static void wsrep_rollback_local(THD *thd, THD *rollbacker)
|
|
{
|
|
WSREP_DEBUG("Rollbacker aborting local thd (%llu %lu)",
|
|
thd->thread_id, thd->real_id);
|
|
char* orig_thread_stack= thd->thread_stack;
|
|
thd->thread_stack= rollbacker->thread_stack;
|
|
if (thd->wsrep_trx().is_streaming())
|
|
{
|
|
wsrep_remove_streaming_fragments(thd, "local");
|
|
}
|
|
/* Set thd->event_scheduler.data temporarily to NULL to avoid
|
|
callbacks to threadpool wait_begin() during rollback. */
|
|
auto saved_esd= thd->event_scheduler.data;
|
|
thd->event_scheduler.data= 0;
|
|
mysql_mutex_lock(&thd->LOCK_thd_data);
|
|
/* prepare THD for rollback processing */
|
|
thd->reset_for_next_command();
|
|
thd->lex->sql_command= SQLCOM_ROLLBACK;
|
|
mysql_mutex_unlock(&thd->LOCK_thd_data);
|
|
/* Perform a client rollback, restore globals and signal
|
|
the victim only when all the resources have been
|
|
released */
|
|
thd->wsrep_cs().client_service().bf_rollback();
|
|
wsrep_reset_threadvars(thd);
|
|
/* Assign saved event_scheduler.data back before letting
|
|
client to continue. */
|
|
thd->event_scheduler.data= saved_esd;
|
|
thd->thread_stack= orig_thread_stack;
|
|
thd->wsrep_cs().sync_rollback_complete();
|
|
WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
|
|
thd->thread_id, thd->real_id);
|
|
}
|
|
|
|
static void wsrep_rollback_process(THD *rollbacker,
|
|
void *arg __attribute__((unused)))
|
|
{
|
|
DBUG_ENTER("wsrep_rollback_process");
|
|
|
|
THD* thd= NULL;
|
|
DBUG_ASSERT(!wsrep_rollback_queue);
|
|
wsrep_rollback_queue= new Wsrep_thd_queue(rollbacker);
|
|
WSREP_INFO("Starting rollbacker thread %llu", rollbacker->thread_id);
|
|
|
|
thd_proc_info(rollbacker, "wsrep aborter idle");
|
|
while ((thd= wsrep_rollback_queue->pop_front()) != NULL)
|
|
{
|
|
mysql_mutex_lock(&thd->LOCK_thd_data);
|
|
wsrep::client_state& cs(thd->wsrep_cs());
|
|
const wsrep::transaction& tx(cs.transaction());
|
|
if (tx.state() == wsrep::transaction::s_aborted)
|
|
{
|
|
WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d",
|
|
(long long)thd->real_id,
|
|
tx.state());
|
|
mysql_mutex_unlock(&thd->LOCK_thd_data);
|
|
continue;
|
|
}
|
|
mysql_mutex_unlock(&thd->LOCK_thd_data);
|
|
|
|
wsrep_reset_threadvars(rollbacker);
|
|
wsrep_store_threadvars(thd);
|
|
thd->wsrep_cs().acquire_ownership();
|
|
|
|
thd_proc_info(rollbacker, "wsrep aborter active");
|
|
|
|
/* Rollback methods below may free thd pointer. Do not try
|
|
to access it after method returns. */
|
|
if (wsrep_thd_is_applying(thd))
|
|
{
|
|
wsrep_rollback_high_priority(thd, rollbacker);
|
|
}
|
|
else
|
|
{
|
|
wsrep_rollback_local(thd, rollbacker);
|
|
}
|
|
wsrep_store_threadvars(rollbacker);
|
|
thd_proc_info(rollbacker, "wsrep aborter idle");
|
|
}
|
|
|
|
delete wsrep_rollback_queue;
|
|
wsrep_rollback_queue= NULL;
|
|
|
|
WSREP_INFO("rollbacker thread exiting %llu", rollbacker->thread_id);
|
|
|
|
DBUG_ASSERT(rollbacker->killed != NOT_KILLED);
|
|
DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
void wsrep_create_rollbacker()
|
|
{
|
|
DBUG_ASSERT(wsrep_cluster_address[0]);
|
|
Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_rollback_process,
|
|
WSREP_ROLLBACKER_THREAD,
|
|
pthread_self()));
|
|
|
|
/* create rollbacker */
|
|
if (create_wsrep_THD(args, false))
|
|
WSREP_WARN("Can't create thread to manage wsrep rollback");
|
|
}
|
|
|
|
/*
|
|
Start async rollback process
|
|
|
|
Asserts thd->LOCK_thd_data ownership
|
|
*/
|
|
void wsrep_fire_rollbacker(THD *thd)
|
|
{
|
|
DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting);
|
|
DBUG_PRINT("wsrep",("enqueuing trx abort for %llu", thd->thread_id));
|
|
WSREP_DEBUG("enqueuing trx abort for (%llu)", thd->thread_id);
|
|
if (wsrep_rollback_queue->push_back(thd))
|
|
{
|
|
WSREP_WARN("duplicate thd %llu for rollbacker",
|
|
thd->thread_id);
|
|
}
|
|
}
|
|
|
|
static bool wsrep_bf_abort_low(THD *bf_thd, THD *victim_thd)
|
|
{
|
|
mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data);
|
|
|
|
#ifdef ENABLED_DEBUG_SYNC
|
|
DBUG_EXECUTE_IF("sync.wsrep_bf_abort",
|
|
{
|
|
const char act[]=
|
|
"now "
|
|
"SIGNAL sync.wsrep_bf_abort_reached "
|
|
"WAIT_FOR signal.wsrep_bf_abort";
|
|
DBUG_ASSERT(!debug_sync_set_action(bf_thd,
|
|
STRING_WITH_LEN(act)));
|
|
};);
|
|
#endif
|
|
|
|
wsrep::seqno bf_seqno(bf_thd->wsrep_trx().ws_meta().seqno());
|
|
bool ret;
|
|
|
|
{
|
|
/* Adopt the lock, it is being held by the caller. */
|
|
Wsrep_mutex wsm{&victim_thd->LOCK_thd_data};
|
|
wsrep::unique_lock<wsrep::mutex> lock{wsm, std::adopt_lock};
|
|
|
|
if (wsrep_thd_is_toi(bf_thd))
|
|
{
|
|
ret= victim_thd->wsrep_cs().total_order_bf_abort(lock, bf_seqno);
|
|
}
|
|
else
|
|
{
|
|
DBUG_ASSERT(WSREP(victim_thd) ? victim_thd->wsrep_trx().active() : 1);
|
|
ret= victim_thd->wsrep_cs().bf_abort(lock, bf_seqno);
|
|
}
|
|
if (ret)
|
|
{
|
|
/* BF abort should be allowed only once by wsrep-lib.*/
|
|
DBUG_ASSERT(victim_thd->wsrep_aborter == 0);
|
|
victim_thd->wsrep_aborter= bf_thd->thread_id;
|
|
wsrep_bf_aborts_counter++;
|
|
}
|
|
lock.release(); /* No unlock at the end of the scope. */
|
|
}
|
|
|
|
/* Sanity check for wsrep-lib calls to return with LOCK_thd_data held. */
|
|
mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data);
|
|
|
|
return ret;
|
|
}
|
|
|
|
|
|
void wsrep_abort_thd(THD *bf_thd,
|
|
THD *victim_thd,
|
|
my_bool signal)
|
|
{
|
|
DBUG_ENTER("wsrep_abort_thd");
|
|
|
|
mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data);
|
|
mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill);
|
|
|
|
/* Note that when you use RSU node is desynced from cluster, thus WSREP(thd)
|
|
might not be true.
|
|
*/
|
|
if ((WSREP(bf_thd)
|
|
|| ((WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU)
|
|
&& wsrep_thd_is_toi(bf_thd))
|
|
|| bf_thd->lex->sql_command == SQLCOM_KILL)
|
|
&& !wsrep_thd_is_aborting(victim_thd) &&
|
|
wsrep_bf_abort_low(bf_thd, victim_thd) &&
|
|
!victim_thd->wsrep_cs().is_rollbacker_active())
|
|
{
|
|
WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu",
|
|
(long long)bf_thd->real_id, (long long)victim_thd->real_id);
|
|
victim_thd->awake_no_mutex(KILL_QUERY_HARD);
|
|
ha_abort_transaction(bf_thd, victim_thd, signal);
|
|
}
|
|
else
|
|
{
|
|
WSREP_DEBUG("wsrep_abort_thd not effective: bf %llu victim %llu "
|
|
"wsrep %d wsrep_on %d RSU %d TOI %d aborting %d",
|
|
(long long)bf_thd->real_id, (long long)victim_thd->real_id,
|
|
WSREP_NNULL(bf_thd), WSREP_ON,
|
|
bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU,
|
|
wsrep_thd_is_toi(bf_thd),
|
|
wsrep_thd_is_aborting(victim_thd));
|
|
mysql_mutex_unlock(&victim_thd->LOCK_thd_data);
|
|
mysql_mutex_unlock(&victim_thd->LOCK_thd_kill);
|
|
}
|
|
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
bool wsrep_bf_abort(THD* bf_thd, THD* victim_thd)
|
|
{
|
|
WSREP_LOG_THD(bf_thd, "BF aborter before");
|
|
WSREP_LOG_THD(victim_thd, "victim before");
|
|
|
|
mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data);
|
|
|
|
if (WSREP(victim_thd) && !victim_thd->wsrep_trx().active())
|
|
{
|
|
WSREP_DEBUG("wsrep_bf_abort, BF abort for non active transaction");
|
|
switch (victim_thd->wsrep_trx().state())
|
|
{
|
|
case wsrep::transaction::s_aborting: /* fall through */
|
|
case wsrep::transaction::s_aborted:
|
|
WSREP_DEBUG("victim thd is already aborted or in aborting state.");
|
|
return false;
|
|
default:
|
|
break;
|
|
}
|
|
/* Test: galera_create_table_as_select. Here we enter wsrep-lib
|
|
were LOCK_thd_data will be acquired, thus we need to release it.
|
|
However, we can still hold LOCK_thd_kill to protect from
|
|
disconnect or delete. */
|
|
mysql_mutex_unlock(&victim_thd->LOCK_thd_data);
|
|
wsrep_start_transaction(victim_thd, victim_thd->wsrep_next_trx_id());
|
|
mysql_mutex_lock(&victim_thd->LOCK_thd_data);
|
|
}
|
|
|
|
return wsrep_bf_abort_low(bf_thd, victim_thd);
|
|
}
|
|
|
|
uint wsrep_kill_thd(THD *thd, THD *victim_thd, killed_state kill_signal)
|
|
{
|
|
DBUG_ENTER("wsrep_kill_thd");
|
|
DBUG_ASSERT(WSREP(victim_thd));
|
|
mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill);
|
|
mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data);
|
|
using trans= wsrep::transaction;
|
|
auto trx_state= victim_thd->wsrep_trx().state();
|
|
#ifndef DBUG_OFF
|
|
victim_thd->wsrep_killed_state= trx_state;
|
|
#endif /* DBUG_OFF */
|
|
/*
|
|
Already killed or in commit codepath. Mark the victim as killed,
|
|
the killed status will be restored in wsrep_after_commit() and
|
|
will be processed after the commit is over. In case of multiple
|
|
KILLs happened on commit codepath, the last one will be effective.
|
|
*/
|
|
if (victim_thd->wsrep_abort_by_kill ||
|
|
trx_state == trans::s_preparing ||
|
|
trx_state == trans::s_committing ||
|
|
trx_state == trans::s_ordered_commit)
|
|
{
|
|
victim_thd->wsrep_abort_by_kill= kill_signal;
|
|
mysql_mutex_unlock(&victim_thd->LOCK_thd_data);
|
|
mysql_mutex_unlock(&victim_thd->LOCK_thd_kill);
|
|
DBUG_RETURN(0);
|
|
}
|
|
/*
|
|
Mark killed victim_thd with kill_signal so that awake_no_mutex does
|
|
not dive into storage engine. We use ha_abort_transaction()
|
|
to do the storage engine part for wsrep THDs.
|
|
*/
|
|
DEBUG_SYNC(thd, "wsrep_kill_before_awake_no_mutex");
|
|
victim_thd->wsrep_abort_by_kill= kill_signal;
|
|
victim_thd->awake_no_mutex(kill_signal);
|
|
/* ha_abort_transaction() releases tmp->LOCK_thd_kill, so tmp
|
|
is not safe to access anymore. */
|
|
ha_abort_transaction(thd, victim_thd, 1);
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
void wsrep_backup_kill_for_commit(THD *thd)
|
|
{
|
|
DBUG_ASSERT(WSREP(thd));
|
|
mysql_mutex_assert_owner(&thd->LOCK_thd_kill);
|
|
DBUG_ASSERT(thd->killed != NOT_KILLED);
|
|
mysql_mutex_lock(&thd->LOCK_thd_data);
|
|
/* If the transaction will roll back, keep the killed state.
|
|
For must replay, the replay will happen in different THD context
|
|
which is high priority and cannot be killed. The owning thread will
|
|
pick the killed state in after statement processing. */
|
|
if (thd->wsrep_trx().state() != wsrep::transaction::s_cert_failed &&
|
|
thd->wsrep_trx().state() != wsrep::transaction::s_must_abort &&
|
|
thd->wsrep_trx().state() != wsrep::transaction::s_aborting &&
|
|
thd->wsrep_trx().state() != wsrep::transaction::s_must_replay)
|
|
{
|
|
thd->wsrep_abort_by_kill= thd->killed;
|
|
thd->wsrep_abort_by_kill_err= thd->killed_err;
|
|
thd->killed= NOT_KILLED;
|
|
thd->killed_err= 0;
|
|
}
|
|
mysql_mutex_unlock(&thd->LOCK_thd_data);
|
|
}
|
|
|
|
void wsrep_restore_kill_after_commit(THD *thd)
|
|
{
|
|
DBUG_ASSERT(WSREP(thd));
|
|
mysql_mutex_assert_owner(&thd->LOCK_thd_kill);
|
|
thd->killed= thd->wsrep_abort_by_kill;
|
|
thd->killed_err= thd->wsrep_abort_by_kill_err;
|
|
thd->wsrep_abort_by_kill= NOT_KILLED;
|
|
thd->wsrep_abort_by_kill_err= 0;
|
|
}
|
|
|
|
int wsrep_create_threadvars()
|
|
{
|
|
int ret= 0;
|
|
if (thread_handling == SCHEDULER_TYPES_COUNT)
|
|
{
|
|
/* Caller should have called wsrep_reset_threadvars() before this
|
|
method. */
|
|
DBUG_ASSERT(!pthread_getspecific(THR_KEY_mysys));
|
|
pthread_setspecific(THR_KEY_mysys, 0);
|
|
ret= my_thread_init();
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void wsrep_delete_threadvars()
|
|
{
|
|
if (thread_handling == SCHEDULER_TYPES_COUNT)
|
|
{
|
|
/* The caller should have called wsrep_store_threadvars() before
|
|
this method. */
|
|
DBUG_ASSERT(pthread_getspecific(THR_KEY_mysys));
|
|
/* Reset psi state to avoid deallocating applier thread
|
|
psi_thread. */
|
|
#ifdef HAVE_PSI_INTERFACE
|
|
PSI_thread *psi_thread= PSI_CALL_get_thread();
|
|
if (PSI_server)
|
|
{
|
|
PSI_server->set_thread(0);
|
|
}
|
|
#endif /* HAVE_PSI_INTERFACE */
|
|
my_thread_end();
|
|
PSI_CALL_set_thread(psi_thread);
|
|
pthread_setspecific(THR_KEY_mysys, 0);
|
|
}
|
|
}
|
|
|
|
void wsrep_assign_from_threadvars(THD *thd)
|
|
{
|
|
if (thread_handling == SCHEDULER_TYPES_COUNT)
|
|
{
|
|
st_my_thread_var *mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
|
|
DBUG_ASSERT(mysys_var);
|
|
thd->set_mysys_var(mysys_var);
|
|
}
|
|
}
|
|
|
|
Wsrep_threadvars wsrep_save_threadvars()
|
|
{
|
|
return Wsrep_threadvars{
|
|
current_thd,
|
|
(st_my_thread_var*) pthread_getspecific(THR_KEY_mysys)
|
|
};
|
|
}
|
|
|
|
void wsrep_restore_threadvars(const Wsrep_threadvars& globals)
|
|
{
|
|
set_current_thd(globals.cur_thd);
|
|
pthread_setspecific(THR_KEY_mysys, globals.mysys_var);
|
|
}
|
|
|
|
void wsrep_store_threadvars(THD *thd)
|
|
{
|
|
if (thread_handling == SCHEDULER_TYPES_COUNT)
|
|
{
|
|
pthread_setspecific(THR_KEY_mysys, thd->mysys_var);
|
|
}
|
|
thd->store_globals();
|
|
}
|
|
|
|
void wsrep_reset_threadvars(THD *thd)
|
|
{
|
|
if (thread_handling == SCHEDULER_TYPES_COUNT)
|
|
{
|
|
pthread_setspecific(THR_KEY_mysys, 0);
|
|
}
|
|
else
|
|
{
|
|
thd->reset_globals();
|
|
}
|
|
}
|