/* Copyright (C) 2013 Codership Oy This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */ #include "mariadb.h" #include "wsrep_thd.h" #include "wsrep_trans_observer.h" #include "wsrep_high_priority_service.h" #include "wsrep_storage_service.h" #include "transaction.h" #include "rpl_rli.h" #include "log_event.h" #include "sql_parse.h" #include "sql_base.h" // close_thread_tables() #include "mysqld.h" // start_wsrep_THD(); #include "wsrep_applier.h" // start_wsrep_THD(); #include "mysql/service_wsrep.h" #include "debug_sync.h" #include "slave.h" #include "rpl_rli.h" #include "rpl_mi.h" extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys); static Wsrep_thd_queue* wsrep_rollback_queue= 0; static Atomic_counter wsrep_bf_aborts_counter; int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff, enum enum_var_type scope) { wsrep_local_bf_aborts= wsrep_bf_aborts_counter; var->type= SHOW_LONGLONG; var->value= (char*)&wsrep_local_bf_aborts; return 0; } static void wsrep_replication_process(THD *thd, void* arg __attribute__((unused))) { DBUG_ENTER("wsrep_replication_process"); Wsrep_applier_service applier_service(thd); /* thd->system_thread_info.rpl_sql_info isn't initialized. */ thd->system_thread_info.rpl_sql_info= new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter); WSREP_INFO("Starting applier thread %llu", thd->thread_id); enum wsrep::provider::status ret= Wsrep_server_state::get_provider().run_applier(&applier_service); WSREP_INFO("Applier thread exiting ret: %d thd: %llu", ret, thd->thread_id); mysql_mutex_lock(&LOCK_wsrep_slave_threads); wsrep_close_applier(thd); mysql_cond_broadcast(&COND_wsrep_slave_threads); mysql_mutex_unlock(&LOCK_wsrep_slave_threads); delete thd->system_thread_info.rpl_sql_info; delete thd->wsrep_rgi->rli->mi; delete thd->wsrep_rgi->rli; thd->wsrep_rgi->cleanup_after_session(); delete thd->wsrep_rgi; thd->wsrep_rgi= NULL; if(thd->has_thd_temporary_tables()) { WSREP_WARN("Applier %lld has temporary tables at exit.", thd->thread_id); } DBUG_VOID_RETURN; } static bool create_wsrep_THD(Wsrep_thd_args* args, bool mutex_protected) { if (!mutex_protected) mysql_mutex_lock(&LOCK_wsrep_slave_threads); ulong old_wsrep_running_threads= wsrep_running_threads; DBUG_ASSERT(args->thread_type() == WSREP_APPLIER_THREAD || args->thread_type() == WSREP_ROLLBACKER_THREAD); bool res= mysql_thread_create(args->thread_type() == WSREP_APPLIER_THREAD ? key_wsrep_applier : key_wsrep_rollbacker, args->thread_id(), &connection_attrib, start_wsrep_THD, (void*)args); if (res) { WSREP_ERROR("Canät create wsrep thread"); } /* if starting a thread on server startup, wait until the this thread's THD is fully initialized (otherwise a THD initialization code might try to access a partially initialized server data structure - MDEV-8208). */ if (!mysqld_server_initialized) { while (old_wsrep_running_threads == wsrep_running_threads) { mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads); } } if (!mutex_protected) mysql_mutex_unlock(&LOCK_wsrep_slave_threads); return res; } bool wsrep_create_appliers(long threads, bool mutex_protected) { /* Dont' start slave threads if wsrep-provider or wsrep-cluster-address is not set. */ if (!WSREP_PROVIDER_EXISTS) { return false; } if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0) { WSREP_DEBUG("wsrep_create_appliers exit due to empty address"); return false; } long wsrep_threads=0; while (wsrep_threads++ < threads) { Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_replication_process, WSREP_APPLIER_THREAD, pthread_self())); if (create_wsrep_THD(args, mutex_protected)) { WSREP_WARN("Can't create thread to manage wsrep replication"); return true; } } return false; } static void wsrep_remove_streaming_fragments(THD* thd, const char* ctx) { wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); Wsrep_storage_service* storage_service= wsrep_create_storage_service(thd, ctx); storage_service->store_globals(); storage_service->adopt_transaction(thd->wsrep_trx()); storage_service->remove_fragments(); storage_service->commit(wsrep::ws_handle(transaction_id, 0), wsrep::ws_meta()); Wsrep_server_state::instance().server_service() .release_storage_service(storage_service); wsrep_store_threadvars(thd); } static void wsrep_rollback_high_priority(THD *thd, THD *rollbacker) { WSREP_DEBUG("Rollbacker aborting SR applier thd (%llu %lu)", thd->thread_id, thd->real_id); char* orig_thread_stack= thd->thread_stack; thd->thread_stack= rollbacker->thread_stack; DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority); /* Must be streaming and must have been removed from the server state streaming appliers map. */ DBUG_ASSERT(thd->wsrep_trx().is_streaming()); DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier( thd->wsrep_trx().server_id(), thd->wsrep_trx().id())); DBUG_ASSERT(thd->wsrep_applier_service); /* Fragment removal should happen before rollback to make the transaction non-observable in SR table after the rollback completes. For correctness the order does not matter here, but currently it is mandated by checks in some MTR tests. */ wsrep_remove_streaming_fragments(thd, "high priority"); thd->wsrep_applier_service->rollback(wsrep::ws_handle(), wsrep::ws_meta()); thd->wsrep_applier_service->after_apply(); thd->thread_stack= orig_thread_stack; WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)", thd->thread_id, thd->real_id); /* Will free THD */ Wsrep_server_state::instance().server_service() .release_high_priority_service(thd->wsrep_applier_service); } static void wsrep_rollback_local(THD *thd, THD *rollbacker) { WSREP_DEBUG("Rollbacker aborting local thd (%llu %lu)", thd->thread_id, thd->real_id); char* orig_thread_stack= thd->thread_stack; thd->thread_stack= rollbacker->thread_stack; if (thd->wsrep_trx().is_streaming()) { wsrep_remove_streaming_fragments(thd, "local"); } /* Set thd->event_scheduler.data temporarily to NULL to avoid callbacks to threadpool wait_begin() during rollback. */ auto saved_esd= thd->event_scheduler.data; thd->event_scheduler.data= 0; mysql_mutex_lock(&thd->LOCK_thd_data); /* prepare THD for rollback processing */ thd->reset_for_next_command(); thd->lex->sql_command= SQLCOM_ROLLBACK; mysql_mutex_unlock(&thd->LOCK_thd_data); /* Perform a client rollback, restore globals and signal the victim only when all the resources have been released */ thd->wsrep_cs().client_service().bf_rollback(); wsrep_reset_threadvars(thd); /* Assign saved event_scheduler.data back before letting client to continue. */ thd->event_scheduler.data= saved_esd; thd->thread_stack= orig_thread_stack; thd->wsrep_cs().sync_rollback_complete(); WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)", thd->thread_id, thd->real_id); } static void wsrep_rollback_process(THD *rollbacker, void *arg __attribute__((unused))) { DBUG_ENTER("wsrep_rollback_process"); THD* thd= NULL; DBUG_ASSERT(!wsrep_rollback_queue); wsrep_rollback_queue= new Wsrep_thd_queue(rollbacker); WSREP_INFO("Starting rollbacker thread %llu", rollbacker->thread_id); thd_proc_info(rollbacker, "wsrep aborter idle"); while ((thd= wsrep_rollback_queue->pop_front()) != NULL) { mysql_mutex_lock(&thd->LOCK_thd_data); wsrep::client_state& cs(thd->wsrep_cs()); const wsrep::transaction& tx(cs.transaction()); if (tx.state() == wsrep::transaction::s_aborted) { WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d", (long long)thd->real_id, tx.state()); mysql_mutex_unlock(&thd->LOCK_thd_data); continue; } mysql_mutex_unlock(&thd->LOCK_thd_data); wsrep_reset_threadvars(rollbacker); wsrep_store_threadvars(thd); thd->wsrep_cs().acquire_ownership(); thd_proc_info(rollbacker, "wsrep aborter active"); /* Rollback methods below may free thd pointer. Do not try to access it after method returns. */ if (wsrep_thd_is_applying(thd)) { wsrep_rollback_high_priority(thd, rollbacker); } else { wsrep_rollback_local(thd, rollbacker); } wsrep_store_threadvars(rollbacker); thd_proc_info(rollbacker, "wsrep aborter idle"); } delete wsrep_rollback_queue; wsrep_rollback_queue= NULL; WSREP_INFO("rollbacker thread exiting %llu", rollbacker->thread_id); DBUG_ASSERT(rollbacker->killed != NOT_KILLED); DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting")); DBUG_VOID_RETURN; } void wsrep_create_rollbacker() { if (wsrep_cluster_address && wsrep_cluster_address[0] != 0) { Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_rollback_process, WSREP_ROLLBACKER_THREAD, pthread_self())); /* create rollbacker */ if (create_wsrep_THD(args, false)) WSREP_WARN("Can't create thread to manage wsrep rollback"); } } /* Start async rollback process Asserts thd->LOCK_thd_data ownership */ void wsrep_fire_rollbacker(THD *thd) { DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting); DBUG_PRINT("wsrep",("enqueuing trx abort for %llu", thd->thread_id)); WSREP_DEBUG("enqueuing trx abort for (%llu)", thd->thread_id); if (wsrep_rollback_queue->push_back(thd)) { WSREP_WARN("duplicate thd %llu for rollbacker", thd->thread_id); } } int wsrep_abort_thd(THD *bf_thd_ptr, THD *victim_thd_ptr, my_bool signal) { DBUG_ENTER("wsrep_abort_thd"); THD *victim_thd= (THD *) victim_thd_ptr; THD *bf_thd= (THD *) bf_thd_ptr; mysql_mutex_lock(&victim_thd->LOCK_thd_data); if ( (WSREP(bf_thd) || ( (WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) && wsrep_thd_is_toi(bf_thd)) ) && victim_thd && !wsrep_thd_is_aborting(victim_thd)) { WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); mysql_mutex_unlock(&victim_thd->LOCK_thd_data); ha_abort_transaction(bf_thd, victim_thd, signal); mysql_mutex_lock(&victim_thd->LOCK_thd_data); } else { WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd); } mysql_mutex_unlock(&victim_thd->LOCK_thd_data); DBUG_RETURN(1); } bool wsrep_bf_abort(const THD* bf_thd, THD* victim_thd) { WSREP_LOG_THD(bf_thd, "BF aborter before"); WSREP_LOG_THD(victim_thd, "victim before"); wsrep::seqno bf_seqno(bf_thd->wsrep_trx().ws_meta().seqno()); if (WSREP(victim_thd) && !victim_thd->wsrep_trx().active()) { WSREP_DEBUG("wsrep_bf_abort, BF abort for non active transaction"); wsrep_start_transaction(victim_thd, victim_thd->wsrep_next_trx_id()); } bool ret; if (wsrep_thd_is_toi(bf_thd)) { ret= victim_thd->wsrep_cs().total_order_bf_abort(bf_seqno); } else { ret= victim_thd->wsrep_cs().bf_abort(bf_seqno); } if (ret) { wsrep_bf_aborts_counter++; } return ret; } /* Get auto increment variables for THD. Use global settings for applier threads. */ void wsrep_thd_auto_increment_variables(THD* thd, unsigned long long* offset, unsigned long long* increment) { if (wsrep_thd_is_applying(thd) && thd->wsrep_trx().state() != wsrep::transaction::s_replaying) { *offset= global_system_variables.auto_increment_offset; *increment= global_system_variables.auto_increment_increment; return; } *offset= thd->variables.auto_increment_offset; *increment= thd->variables.auto_increment_increment; } int wsrep_create_threadvars() { int ret= 0; if (thread_handling == SCHEDULER_TYPES_COUNT) { /* Caller should have called wsrep_reset_threadvars() before this method. */ DBUG_ASSERT(!pthread_getspecific(THR_KEY_mysys)); pthread_setspecific(THR_KEY_mysys, 0); ret= my_thread_init(); } return ret; } void wsrep_delete_threadvars() { if (thread_handling == SCHEDULER_TYPES_COUNT) { /* The caller should have called wsrep_store_threadvars() before this method. */ DBUG_ASSERT(pthread_getspecific(THR_KEY_mysys)); /* Reset psi state to avoid deallocating applier thread psi_thread. */ #ifdef HAVE_PSI_INTERFACE PSI_thread *psi_thread= PSI_CALL_get_thread(); if (PSI_server) { PSI_server->set_thread(0); } #endif /* HAVE_PSI_INTERFACE */ my_thread_end(); PSI_CALL_set_thread(psi_thread); pthread_setspecific(THR_KEY_mysys, 0); } } void wsrep_assign_from_threadvars(THD *thd) { if (thread_handling == SCHEDULER_TYPES_COUNT) { st_my_thread_var *mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys); DBUG_ASSERT(mysys_var); thd->set_mysys_var(mysys_var); } } Wsrep_threadvars wsrep_save_threadvars() { return Wsrep_threadvars{ current_thd, (st_my_thread_var*) pthread_getspecific(THR_KEY_mysys) }; } void wsrep_restore_threadvars(const Wsrep_threadvars& globals) { set_current_thd(globals.cur_thd); pthread_setspecific(THR_KEY_mysys, globals.mysys_var); } int wsrep_store_threadvars(THD *thd) { if (thread_handling == SCHEDULER_TYPES_COUNT) { pthread_setspecific(THR_KEY_mysys, thd->mysys_var); } return thd->store_globals(); } void wsrep_reset_threadvars(THD *thd) { if (thread_handling == SCHEDULER_TYPES_COUNT) { pthread_setspecific(THR_KEY_mysys, 0); } else { thd->reset_globals(); } }