/* Copyright 2018 Codership Oy This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "wsrep_client_service.h" #include "wsrep_high_priority_service.h" #include "wsrep_applier.h" /* wsrep_apply_events() */ #include "wsrep_binlog.h" /* wsrep_dump_rbr_buf() */ #include "wsrep_schema.h" /* remove_fragments() */ #include "wsrep_thd.h" #include "wsrep_xid.h" #include "wsrep_trans_observer.h" #include "sql_base.h" /* close_temporary_table() */ #include "sql_class.h" /* THD */ #include "sql_parse.h" /* stmt_causes_implicit_commit() */ #include "rpl_filter.h" /* binlog_filter */ #include "rpl_rli.h" /* Relay_log_info */ #include "slave.h" /* opt_log_slave_updates */ #include "transaction.h" /* trans_commit()... */ #include "log.h" /* stmt_has_updated_trans_table() */ //#include "debug_sync.h" #include "mysql/service_debug_sync.h" namespace { void debug_sync_caller(THD* thd, const char* sync_point) { #ifdef ENABLED_DEBUG_SYNC_OUT debug_sync_set_action(thd, sync_point, strlen(sync_point)); #endif #ifdef ENABLED_DEBUG_SYNC if (debug_sync_service) debug_sync_service(thd,sync_point,strlen(sync_point)); #endif } } Wsrep_client_service::Wsrep_client_service(THD* thd, Wsrep_client_state& client_state) : wsrep::client_service() , m_thd(thd) , m_client_state(client_state) { } void Wsrep_client_service::store_globals() { DBUG_ENTER("Wsrep_client_service::store_globals"); m_thd->store_globals(); DBUG_VOID_RETURN; } void Wsrep_client_service::reset_globals() { DBUG_ENTER("Wsrep_client_service::reset_globals"); m_thd->reset_globals(); DBUG_VOID_RETURN; } bool Wsrep_client_service::interrupted() const { DBUG_ASSERT(m_thd == current_thd); mysql_mutex_lock(&m_thd->LOCK_thd_data); /* wsrep state can be interrupted only if THD was explicitly killed, for wsrep conflicts, we use deadlock error only */ bool ret= (m_thd->killed != NOT_KILLED && m_thd->wsrep_trx().state() != wsrep::transaction::s_must_abort && m_thd->wsrep_trx().state() != wsrep::transaction::s_aborting && m_thd->wsrep_trx().state() != wsrep::transaction::s_aborted); mysql_mutex_unlock(&m_thd->LOCK_thd_data); if (ret) { WSREP_DEBUG("wsrep state is interrupted, THD::killed %d trx state %d", m_thd->killed, m_thd->wsrep_trx().state()); } return ret; } int Wsrep_client_service::prepare_data_for_replication() { DBUG_ASSERT(m_thd == current_thd); DBUG_ENTER("Wsrep_client_service::prepare_data_for_replication"); size_t data_len= 0; IO_CACHE* cache= wsrep_get_trans_cache(m_thd); if (cache) { m_thd->binlog_flush_pending_rows_event(true); if (wsrep_write_cache(m_thd, cache, &data_len)) { WSREP_ERROR("rbr write fail, data_len: %zu", data_len); // wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT); DBUG_RETURN(1); } } if (data_len == 0) { if (m_thd->get_stmt_da()->is_ok() && m_thd->get_stmt_da()->affected_rows() > 0 && !binlog_filter->is_on() && !m_thd->wsrep_trx().is_streaming()) { WSREP_DEBUG("empty rbr buffer, query: %s, " "affected rows: %llu, " "changed tables: %d, " "sql_log_bin: %d", WSREP_QUERY(m_thd), m_thd->get_stmt_da()->affected_rows(), stmt_has_updated_trans_table(m_thd), m_thd->variables.sql_log_bin); } else { WSREP_DEBUG("empty rbr buffer, query: %s", WSREP_QUERY(m_thd)); } } DBUG_RETURN(0); } void Wsrep_client_service::cleanup_transaction() { DBUG_ASSERT(m_thd == current_thd); if (WSREP_EMULATE_BINLOG(m_thd)) wsrep_thd_binlog_trx_reset(m_thd); m_thd->wsrep_affected_rows= 0; } int Wsrep_client_service::prepare_fragment_for_replication(wsrep::mutable_buffer& buffer) { DBUG_ASSERT(m_thd == current_thd); THD* thd= m_thd; DBUG_ENTER("Wsrep_client_service::prepare_fragment_for_replication"); IO_CACHE* cache= wsrep_get_trans_cache(thd); thd->binlog_flush_pending_rows_event(true); if (!cache) { DBUG_RETURN(0); } const my_off_t saved_pos(my_b_tell(cache)); if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().bytes_certified(), 0, 0)) { DBUG_RETURN(1); } int ret= 0; size_t total_length= 0; size_t length= my_b_bytes_in_cache(cache); if (!length) { length= my_b_fill(cache); } if (length > 0) { do { total_length+= length; if (total_length > wsrep_max_ws_size) { WSREP_WARN("transaction size limit (%lu) exceeded: %zu", wsrep_max_ws_size, total_length); ret= 1; goto cleanup; } buffer.push_back(reinterpret_cast(cache->read_pos), reinterpret_cast(cache->read_pos + length)); cache->read_pos= cache->read_end; } while (cache->file >= 0 && (length= my_b_fill(cache))); } DBUG_ASSERT(total_length == buffer.size()); cleanup: if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) { WSREP_WARN("Failed to reinitialize IO cache"); ret= 1; } DBUG_RETURN(ret); } int Wsrep_client_service::remove_fragments() { DBUG_ENTER("Wsrep_client_service::remove_fragments"); if (wsrep_schema->remove_fragments(m_thd, Wsrep_server_state::instance().id(), m_thd->wsrep_trx().id(), m_thd->wsrep_sr().fragments())) { WSREP_DEBUG("Failed to remove fragments from SR storage for transaction " "%llu, %llu", m_thd->thread_id, m_thd->wsrep_trx().id().get()); DBUG_RETURN(1); } DBUG_RETURN(0); } bool Wsrep_client_service::statement_allowed_for_streaming() const { /* Todo: Decide if implicit commit is allowed with streaming replication. !stmt_causes_implicit_commit(m_thd, CF_IMPLICIT_COMMIT_BEGIN); */ return true; } size_t Wsrep_client_service::bytes_generated() const { IO_CACHE* cache= wsrep_get_trans_cache(m_thd); if (cache) { m_thd->binlog_flush_pending_rows_event(true); return my_b_tell(cache); } return 0; } void Wsrep_client_service::will_replay() { DBUG_ASSERT(m_thd == current_thd); mysql_mutex_lock(&LOCK_wsrep_replaying); ++wsrep_replaying; mysql_mutex_unlock(&LOCK_wsrep_replaying); } enum wsrep::provider::status Wsrep_client_service::replay() { DBUG_ASSERT(m_thd == current_thd); Wsrep_replayer_service replayer_service(m_thd); wsrep::provider& provider(m_thd->wsrep_cs().provider()); mysql_mutex_lock(&m_thd->LOCK_thd_data); m_thd->killed= NOT_KILLED; mysql_mutex_unlock(&m_thd->LOCK_thd_data); enum wsrep::provider::status ret= provider.replay(m_thd->wsrep_trx().ws_handle(), &replayer_service); replayer_service.replay_status(ret); mysql_mutex_lock(&LOCK_wsrep_replaying); --wsrep_replaying; mysql_mutex_unlock(&LOCK_wsrep_replaying); return ret; } void Wsrep_client_service::wait_for_replayers(wsrep::unique_lock& lock) { DBUG_ASSERT(m_thd == current_thd); lock.unlock(); mysql_mutex_lock(&LOCK_wsrep_replaying); while (wsrep_replaying > 0) { mysql_cond_wait(&COND_wsrep_replaying, &LOCK_wsrep_replaying); } mysql_mutex_unlock(&LOCK_wsrep_replaying); lock.lock(); } void Wsrep_client_service::debug_sync(const char* sync_point) { DBUG_ASSERT(m_thd == current_thd); debug_sync_caller(m_thd, sync_point); } void Wsrep_client_service::debug_crash(const char* crash_point) { // DBUG_ASSERT(m_thd == current_thd); DBUG_EXECUTE_IF(crash_point, DBUG_SUICIDE(); ); } int Wsrep_client_service::bf_rollback() { DBUG_ASSERT(m_thd == current_thd); DBUG_ENTER("Wsrep_client_service::rollback"); int ret= (trans_rollback_stmt(m_thd) || trans_rollback(m_thd)); if (m_thd->locked_tables_mode && m_thd->lock) { m_thd->locked_tables_list.unlock_locked_tables(m_thd); m_thd->variables.option_bits&= ~OPTION_TABLE_LOCK; } if (m_thd->global_read_lock.is_acquired()) { m_thd->global_read_lock.unlock_global_read_lock(m_thd); } m_thd->mdl_context.release_transactional_locks(); m_thd->mdl_context.release_explicit_locks(); DBUG_RETURN(ret); }