mirror of
				https://github.com/MariaDB/server.git
				synced 2025-11-04 12:56:14 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			608 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			608 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright 2016-2023 Codership Oy <http://www.codership.com>
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or modify
 | 
						|
   it under the terms of the GNU General Public License as published by
 | 
						|
   the Free Software Foundation; version 2 of the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
 | 
						|
 | 
						|
#ifndef WSREP_TRANS_OBSERVER_H
 | 
						|
#define WSREP_TRANS_OBSERVER_H
 | 
						|
 | 
						|
#include "my_global.h"
 | 
						|
#include "mysql/service_wsrep.h"
 | 
						|
#include "wsrep_applier.h" /* wsrep_apply_error */
 | 
						|
#include "wsrep_xid.h"
 | 
						|
#include "wsrep_thd.h"
 | 
						|
#include "wsrep_binlog.h" /* register/deregister group commit */
 | 
						|
#include "my_dbug.h"
 | 
						|
 | 
						|
class THD;
 | 
						|
 | 
						|
void wsrep_commit_empty(THD* thd, bool all);
 | 
						|
 | 
						|
/*
 | 
						|
   Return true if THD has active wsrep transaction.
 | 
						|
 */
 | 
						|
static inline bool wsrep_is_active(THD* thd)
 | 
						|
{
 | 
						|
  return (thd->wsrep_cs().state() != wsrep::client_state::s_none  &&
 | 
						|
          thd->wsrep_cs().transaction().active() &&
 | 
						|
          !thd->internal_transaction());
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Return true if transaction is ordered.
 | 
						|
 */
 | 
						|
static inline bool wsrep_is_ordered(THD* thd)
 | 
						|
{
 | 
						|
  return thd->wsrep_trx().ordered();
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Return true if transaction has been BF aborted but has not been
 | 
						|
  rolled back yet.
 | 
						|
 | 
						|
  It is required that the caller holds thd->LOCK_thd_data.
 | 
						|
*/
 | 
						|
static inline bool wsrep_must_abort(THD* thd)
 | 
						|
{
 | 
						|
  mysql_mutex_assert_owner(&thd->LOCK_thd_data);
 | 
						|
  return (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Return true if the transaction must be replayed.
 | 
						|
 */
 | 
						|
static inline bool wsrep_must_replay(THD* thd)
 | 
						|
{
 | 
						|
  return (thd->wsrep_trx().state() == wsrep::transaction::s_must_replay);
 | 
						|
}
 | 
						|
/*
 | 
						|
  Return true if transaction has not been committed.
 | 
						|
 | 
						|
  Note that we don't require thd->LOCK_thd_data here. Calling this method
 | 
						|
  makes sense only from codepaths which are past ordered_commit state
 | 
						|
  and the wsrep transaction is immune to BF aborts at that point.
 | 
						|
*/
 | 
						|
static inline bool wsrep_not_committed(THD* thd)
 | 
						|
{
 | 
						|
  return (thd->wsrep_trx().state() != wsrep::transaction::s_committed);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Return true if THD is either committing a transaction or statement
 | 
						|
  is autocommit.
 | 
						|
 */
 | 
						|
static inline bool wsrep_is_real(THD* thd, bool all)
 | 
						|
{
 | 
						|
  return (all || thd->transaction->all.ha_list == 0);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Check if a transaction has generated changes.
 | 
						|
 */
 | 
						|
static inline bool wsrep_has_changes(THD* thd)
 | 
						|
{
 | 
						|
  // Transaction has changes to replicate if it
 | 
						|
  // has appended one or more certification keys,
 | 
						|
  // and has actual changes to replicate in binlog
 | 
						|
  // cache. Except for streaming replication,
 | 
						|
  // where commit message may have no payload.
 | 
						|
  return !thd->wsrep_trx().is_empty() &&
 | 
						|
    (!wsrep_is_binlog_cache_empty(thd) || thd->wsrep_trx().is_streaming());
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Check if an active transaction has been BF aborted.
 | 
						|
 */
 | 
						|
static inline bool wsrep_is_bf_aborted(THD* thd)
 | 
						|
{
 | 
						|
  return (thd->wsrep_trx().active() && thd->wsrep_trx().bf_aborted());
 | 
						|
}
 | 
						|
 | 
						|
static inline int wsrep_check_pk(THD* thd)
 | 
						|
{
 | 
						|
  if (!wsrep_certify_nonPK)
 | 
						|
  {
 | 
						|
    for (TABLE* table= thd->open_tables; table != NULL; table= table->next)
 | 
						|
    {
 | 
						|
      if (table->key_info == NULL || table->s->primary_key == MAX_KEY)
 | 
						|
      {
 | 
						|
        WSREP_DEBUG("No primary key found for table %s.%s",
 | 
						|
                    table->s->db.str, table->s->table_name.str);
 | 
						|
        wsrep_override_error(thd, ER_LOCK_DEADLOCK);
 | 
						|
        return 1;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
static inline bool wsrep_streaming_enabled(THD* thd)
 | 
						|
{
 | 
						|
  return (thd->wsrep_sr().fragment_size() > 0);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Return number of fragments successfully certified for the
 | 
						|
  current statement.
 | 
						|
 */
 | 
						|
static inline size_t wsrep_fragments_certified_for_stmt(THD* thd)
 | 
						|
{
 | 
						|
    return thd->wsrep_trx().fragments_certified_for_statement();
 | 
						|
}
 | 
						|
 | 
						|
static inline int wsrep_start_transaction(THD* thd, wsrep_trx_id_t trx_id)
 | 
						|
{
 | 
						|
  if (thd->wsrep_cs().state() != wsrep::client_state::s_none) {
 | 
						|
    if (wsrep_is_active(thd) == false)
 | 
						|
      return thd->wsrep_cs().start_transaction(wsrep::transaction_id(trx_id));
 | 
						|
  }
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
/**/
 | 
						|
static inline int wsrep_start_trx_if_not_started(THD* thd)
 | 
						|
{
 | 
						|
  int ret= 0;
 | 
						|
  DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID);
 | 
						|
  DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_local);
 | 
						|
  if (thd->wsrep_trx().active() == false)
 | 
						|
  {
 | 
						|
    ret= wsrep_start_transaction(thd, thd->wsrep_next_trx_id());
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Called after each row operation.
 | 
						|
 | 
						|
  Return zero on success, non-zero on failure.
 | 
						|
 */
 | 
						|
static inline int wsrep_after_row_internal(THD* thd)
 | 
						|
{
 | 
						|
  if (thd->wsrep_cs().state() != wsrep::client_state::s_none  &&
 | 
						|
      wsrep_thd_is_local(thd))
 | 
						|
  {
 | 
						|
    if (wsrep_check_pk(thd))
 | 
						|
    {
 | 
						|
      return 1;
 | 
						|
    }
 | 
						|
    else if (wsrep_streaming_enabled(thd))
 | 
						|
    {
 | 
						|
      return thd->wsrep_cs().after_row();
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Helper method to determine whether commit time hooks
 | 
						|
  should be run for the transaction.
 | 
						|
 | 
						|
  Commit hooks must be run in the following cases:
 | 
						|
  - The transaction is local and has generated write set and is committing.
 | 
						|
  - The transaction has been BF aborted
 | 
						|
  - Is running in high priority mode and is ordered. This can be replayer,
 | 
						|
    applier or storage access.
 | 
						|
 */
 | 
						|
static inline bool wsrep_run_commit_hook(THD* thd, bool all)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_run_commit_hook");
 | 
						|
  DBUG_PRINT("wsrep", ("Is_active: %d is_real %d has_changes %d is_applying %d "
 | 
						|
                       "is_ordered: %d",
 | 
						|
                       wsrep_is_active(thd), wsrep_is_real(thd, all),
 | 
						|
                       wsrep_has_changes(thd), wsrep_thd_is_applying(thd),
 | 
						|
                       wsrep_is_ordered(thd)));
 | 
						|
 | 
						|
  /* skipping non-wsrep threads */
 | 
						|
  if (!WSREP(thd))
 | 
						|
    DBUG_RETURN(false);
 | 
						|
 | 
						|
  /* Is MST commit or autocommit? */
 | 
						|
  bool ret= wsrep_is_active(thd) && wsrep_is_real(thd, all);
 | 
						|
  /* Do not commit if we are aborting */
 | 
						|
  ret= ret && (thd->wsrep_trx().state() != wsrep::transaction::s_aborting);
 | 
						|
  if (ret && !(wsrep_has_changes(thd) ||  /* Has generated write set */
 | 
						|
               /* Is high priority (replay, applier, storage) and the
 | 
						|
                  transaction is scheduled for commit ordering */
 | 
						|
               (wsrep_thd_is_applying(thd) && wsrep_is_ordered(thd))))
 | 
						|
  {
 | 
						|
    mysql_mutex_lock(&thd->LOCK_thd_data);
 | 
						|
    DBUG_PRINT("wsrep", ("state: %s",
 | 
						|
                         wsrep::to_c_string(thd->wsrep_trx().state())));
 | 
						|
    /* Transaction is local but has no changes, the commit hooks will
 | 
						|
       be skipped and the wsrep transaction is terminated in
 | 
						|
       wsrep_commit_empty() */
 | 
						|
    if (thd->wsrep_trx().state() == wsrep::transaction::s_executing)
 | 
						|
    {
 | 
						|
      ret= false;
 | 
						|
    }
 | 
						|
    mysql_mutex_unlock(&thd->LOCK_thd_data);
 | 
						|
  }
 | 
						|
 | 
						|
  mysql_mutex_lock(&thd->LOCK_thd_data);
 | 
						|
  /* Transaction creating sequence is TOI or RSU,
 | 
						|
  CREATE SEQUENCE = CREATE + INSERT (initial value)
 | 
						|
  and replicated using statement based replication, thus
 | 
						|
  the commit hooks will be skipped.
 | 
						|
 | 
						|
  For TEMPORARY SEQUENCES commit hooks will be done as
 | 
						|
  CREATE + INSERT is not replicated and needs to be
 | 
						|
  committed locally. */
 | 
						|
  if (ret &&
 | 
						|
      (thd->wsrep_cs().mode() == wsrep::client_state::m_toi ||
 | 
						|
       thd->wsrep_cs().mode() == wsrep::client_state::m_rsu) &&
 | 
						|
      thd->lex->sql_command == SQLCOM_CREATE_SEQUENCE &&
 | 
						|
      !thd->lex->tmp_table())
 | 
						|
    ret= false;
 | 
						|
  mysql_mutex_unlock(&thd->LOCK_thd_data);
 | 
						|
 | 
						|
  DBUG_PRINT("wsrep", ("return: %d", ret));
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Called before the transaction is prepared.
 | 
						|
 | 
						|
  Return zero on success, non-zero on failure.
 | 
						|
 */
 | 
						|
static inline int wsrep_before_prepare(THD* thd, bool all)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_before_prepare");
 | 
						|
  WSREP_DEBUG("wsrep_before_prepare: %d", wsrep_is_real(thd, all));
 | 
						|
  int ret= 0;
 | 
						|
  DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
 | 
						|
  if ((ret= thd->wsrep_parallel_slave_wait_for_prior_commit()))
 | 
						|
  {
 | 
						|
    DBUG_RETURN(ret);
 | 
						|
  }
 | 
						|
  if ((ret= thd->wsrep_cs().before_prepare()) == 0)
 | 
						|
  {
 | 
						|
    DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
 | 
						|
    wsrep_xid_init(&thd->wsrep_xid,
 | 
						|
                   thd->wsrep_trx().ws_meta().gtid(),
 | 
						|
                   wsrep_gtid_server.gtid());
 | 
						|
  }
 | 
						|
 | 
						|
  mysql_mutex_lock(&thd->LOCK_thd_kill);
 | 
						|
  if (thd->killed) wsrep_backup_kill_for_commit(thd);
 | 
						|
  mysql_mutex_unlock(&thd->LOCK_thd_kill);
 | 
						|
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Called after the transaction has been prepared.
 | 
						|
 | 
						|
  Return zero on success, non-zero on failure.
 | 
						|
 */
 | 
						|
static inline int wsrep_after_prepare(THD* thd, bool all)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_after_prepare");
 | 
						|
  WSREP_DEBUG("wsrep_after_prepare: %d", wsrep_is_real(thd, all));
 | 
						|
  DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
 | 
						|
  int ret= thd->wsrep_cs().after_prepare();
 | 
						|
  DBUG_ASSERT(ret == 0 || thd->wsrep_cs().current_error() ||
 | 
						|
              thd->wsrep_cs().transaction().state() == wsrep::transaction::s_must_replay);
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Called before the transaction is committed.
 | 
						|
 | 
						|
  This function must be called from both client and
 | 
						|
  applier contexts before commit.
 | 
						|
 | 
						|
  Return zero on success, non-zero on failure.
 | 
						|
 */
 | 
						|
static inline int wsrep_before_commit(THD* thd, bool all)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_before_commit");
 | 
						|
  WSREP_DEBUG("wsrep_before_commit: %d, %lld",
 | 
						|
              wsrep_is_real(thd, all),
 | 
						|
              (long long)wsrep_thd_trx_seqno(thd));
 | 
						|
  THD_STAGE_INFO(thd, stage_waiting_certification);
 | 
						|
  int ret= 0;
 | 
						|
  DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
 | 
						|
 | 
						|
  if ((ret= thd->wsrep_cs().before_commit()) == 0)
 | 
						|
  {
 | 
						|
    DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
 | 
						|
    if (!thd->variables.gtid_seq_no &&
 | 
						|
        (thd->wsrep_trx().ws_meta().flags() & wsrep::provider::flag::commit))
 | 
						|
    {
 | 
						|
        uint64 seqno= 0;
 | 
						|
        if (thd->variables.wsrep_gtid_seq_no &&
 | 
						|
            thd->variables.wsrep_gtid_seq_no > wsrep_gtid_server.seqno())
 | 
						|
        {
 | 
						|
          seqno= thd->variables.wsrep_gtid_seq_no;
 | 
						|
          wsrep_gtid_server.seqno(thd->variables.wsrep_gtid_seq_no);
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          seqno= wsrep_gtid_server.seqno_inc();
 | 
						|
        }
 | 
						|
        thd->variables.wsrep_gtid_seq_no= 0;
 | 
						|
        thd->wsrep_current_gtid_seqno= seqno;
 | 
						|
        if (mysql_bin_log.is_open() && wsrep_gtid_mode)
 | 
						|
        {
 | 
						|
          thd->variables.gtid_seq_no= seqno;
 | 
						|
          thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id;
 | 
						|
          thd->variables.server_id= wsrep_gtid_server.server_id;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    wsrep_xid_init(&thd->wsrep_xid,
 | 
						|
                   thd->wsrep_trx().ws_meta().gtid(),
 | 
						|
                   wsrep_gtid_server.gtid());
 | 
						|
    wsrep_register_for_group_commit(thd);
 | 
						|
  }
 | 
						|
 | 
						|
  mysql_mutex_lock(&thd->LOCK_thd_kill);
 | 
						|
  if (thd->killed) wsrep_backup_kill_for_commit(thd);
 | 
						|
  mysql_mutex_unlock(&thd->LOCK_thd_kill);
 | 
						|
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Called after the transaction has been ordered for commit.
 | 
						|
 | 
						|
  This function must be called from both client and
 | 
						|
  applier contexts after the commit has been ordered.
 | 
						|
 | 
						|
  @param thd Pointer to THD
 | 
						|
  @param all 
 | 
						|
  @param err Error buffer in case of applying error
 | 
						|
 | 
						|
  Return zero on success, non-zero on failure.
 | 
						|
 */
 | 
						|
static inline int wsrep_ordered_commit(THD* thd, bool all)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_ordered_commit");
 | 
						|
  WSREP_DEBUG("wsrep_ordered_commit: %d %lld", wsrep_is_real(thd, all),
 | 
						|
              (long long) wsrep_thd_trx_seqno(thd));
 | 
						|
  DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
 | 
						|
  DBUG_RETURN(thd->wsrep_cs().ordered_commit());
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Called after the transaction has been committed.
 | 
						|
 | 
						|
  Return zero on success, non-zero on failure.
 | 
						|
 */
 | 
						|
static inline int wsrep_after_commit(THD* thd, bool all)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_after_commit");
 | 
						|
  WSREP_DEBUG("wsrep_after_commit: %d, %d, %lld, %d",
 | 
						|
              wsrep_is_real(thd, all),
 | 
						|
              wsrep_is_active(thd),
 | 
						|
              (long long)wsrep_thd_trx_seqno(thd),
 | 
						|
              wsrep_has_changes(thd));
 | 
						|
  DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
 | 
						|
  if (thd->internal_transaction())
 | 
						|
    DBUG_RETURN(0);
 | 
						|
  int ret= 0;
 | 
						|
  if (thd->wsrep_trx().state() == wsrep::transaction::s_committing)
 | 
						|
  {
 | 
						|
    ret= thd->wsrep_cs().ordered_commit();
 | 
						|
  }
 | 
						|
  wsrep_unregister_from_group_commit(thd);
 | 
						|
  thd->wsrep_xid.null();
 | 
						|
  DBUG_RETURN(ret || thd->wsrep_cs().after_commit());
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Called before the transaction is rolled back.
 | 
						|
 | 
						|
  Return zero on success, non-zero on failure.
 | 
						|
 */
 | 
						|
static inline int wsrep_before_rollback(THD* thd, bool all)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_before_rollback");
 | 
						|
  int ret= 0;
 | 
						|
  if (wsrep_is_active(thd))
 | 
						|
  {
 | 
						|
    if (!all && thd->in_active_multi_stmt_transaction())
 | 
						|
    {
 | 
						|
      if (wsrep_emulate_bin_log)
 | 
						|
      {
 | 
						|
        wsrep_thd_binlog_stmt_rollback(thd);
 | 
						|
      }
 | 
						|
 | 
						|
      if (thd->wsrep_trx().is_streaming() &&
 | 
						|
          (wsrep_fragments_certified_for_stmt(thd) > 0))
 | 
						|
      {
 | 
						|
        /* Non-safe statement rollback during SR multi statement
 | 
						|
           transaction. A statement rollback is considered unsafe, if
 | 
						|
           the same statement has already replicated one or more fragments.
 | 
						|
           Self abort the transaction, the actual rollback and error
 | 
						|
           handling will be done in after statement phase. */
 | 
						|
        WSREP_DEBUG("statement rollback is not safe for streaming replication");
 | 
						|
        wsrep_thd_self_abort(thd);
 | 
						|
        ret= 0;
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else if (wsrep_is_real(thd, all) &&
 | 
						|
             thd->wsrep_trx().state() != wsrep::transaction::s_aborted)
 | 
						|
    {
 | 
						|
      /* Real transaction rolling back and wsrep abort not completed
 | 
						|
         yet */
 | 
						|
      /* Reset XID so that it does not trigger writing serialization
 | 
						|
         history in InnoDB. This needs to be avoided because rollback
 | 
						|
         may happen out of order and replay may follow. */
 | 
						|
      thd->wsrep_xid.null();
 | 
						|
      ret= thd->wsrep_cs().before_rollback();
 | 
						|
    }
 | 
						|
  }
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Called after the transaction has been rolled back.
 | 
						|
 | 
						|
  Return zero on success, non-zero on failure.
 | 
						|
 */
 | 
						|
static inline int wsrep_after_rollback(THD* thd, bool all)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_after_rollback");
 | 
						|
  DBUG_RETURN((wsrep_is_real(thd, all) && wsrep_is_active(thd) &&
 | 
						|
               thd->wsrep_cs().transaction().state() !=
 | 
						|
               wsrep::transaction::s_aborted) ?
 | 
						|
              thd->wsrep_cs().after_rollback() : 0);
 | 
						|
}
 | 
						|
 | 
						|
static inline int wsrep_before_statement(THD* thd)
 | 
						|
{
 | 
						|
  return (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
 | 
						|
          !thd->internal_transaction() ?
 | 
						|
	  thd->wsrep_cs().before_statement() : 0);
 | 
						|
}
 | 
						|
 | 
						|
static inline
 | 
						|
int wsrep_after_statement(THD* thd)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_after_statement");
 | 
						|
  WSREP_DEBUG("wsrep_after_statement for %lu client_state %s "
 | 
						|
              " client_mode %s trans_state %s",
 | 
						|
              thd_get_thread_id(thd),
 | 
						|
              wsrep::to_c_string(thd->wsrep_cs().state()),
 | 
						|
              wsrep::to_c_string(thd->wsrep_cs().mode()),
 | 
						|
              wsrep::to_c_string(thd->wsrep_cs().transaction().state()));
 | 
						|
  int ret= ((thd->wsrep_cs().state() != wsrep::client_state::s_none &&
 | 
						|
               thd->wsrep_cs().mode() == Wsrep_client_state::m_local) &&
 | 
						|
              !thd->internal_transaction() ?
 | 
						|
              thd->wsrep_cs().after_statement() : 0);
 | 
						|
 | 
						|
  if (wsrep_is_active(thd))
 | 
						|
  {
 | 
						|
    mysql_mutex_lock(&thd->LOCK_thd_kill);
 | 
						|
    wsrep_restore_kill_after_commit(thd);
 | 
						|
    mysql_mutex_unlock(&thd->LOCK_thd_kill);
 | 
						|
  }
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
static inline void wsrep_after_apply(THD* thd)
 | 
						|
{
 | 
						|
  DBUG_ASSERT(wsrep_thd_is_applying(thd));
 | 
						|
  WSREP_DEBUG("wsrep_after_apply %lld", thd->thread_id);
 | 
						|
  if (!thd->internal_transaction())
 | 
						|
    thd->wsrep_cs().after_applying();
 | 
						|
}
 | 
						|
 | 
						|
static inline void wsrep_open(THD* thd)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_open");
 | 
						|
  if (WSREP_ON_)
 | 
						|
  {
 | 
						|
    /* WSREP_PROVIDER_EXISTS_ cannot be set if WSREP_ON_ is not set */
 | 
						|
    DBUG_ASSERT(WSREP_PROVIDER_EXISTS_);
 | 
						|
    thd->wsrep_cs().open(wsrep::client_id(thd->thread_id));
 | 
						|
    thd->wsrep_cs().debug_log_level(wsrep_debug);
 | 
						|
    if (!thd->wsrep_applier && thd->variables.wsrep_trx_fragment_size)
 | 
						|
    {
 | 
						|
      thd->wsrep_cs().enable_streaming(
 | 
						|
        wsrep_fragment_unit(thd->variables.wsrep_trx_fragment_unit),
 | 
						|
        size_t(thd->variables.wsrep_trx_fragment_size));
 | 
						|
    }
 | 
						|
  }
 | 
						|
  DBUG_VOID_RETURN;
 | 
						|
}
 | 
						|
 | 
						|
static inline void wsrep_close(THD* thd)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_close");
 | 
						|
  if (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
 | 
						|
      !thd->internal_transaction())
 | 
						|
  {
 | 
						|
    thd->wsrep_cs().close();
 | 
						|
  }
 | 
						|
  DBUG_VOID_RETURN;
 | 
						|
}
 | 
						|
 | 
						|
static inline void wsrep_cleanup(THD* thd)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_cleanup");
 | 
						|
  if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
 | 
						|
  {
 | 
						|
    thd->wsrep_cs().cleanup();
 | 
						|
  }
 | 
						|
  DBUG_VOID_RETURN;
 | 
						|
}
 | 
						|
 | 
						|
static inline void
 | 
						|
wsrep_wait_rollback_complete_and_acquire_ownership(THD *thd)
 | 
						|
{
 | 
						|
  DBUG_ENTER("wsrep_wait_rollback_complete_and_acquire_ownership");
 | 
						|
  if (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
 | 
						|
      !thd->internal_transaction())
 | 
						|
  {
 | 
						|
    thd->wsrep_cs().wait_rollback_complete_and_acquire_ownership();
 | 
						|
  }
 | 
						|
  DBUG_VOID_RETURN;
 | 
						|
}
 | 
						|
 | 
						|
static inline int wsrep_before_command(THD* thd, bool keep_command_error)
 | 
						|
{
 | 
						|
  return (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
 | 
						|
          !thd->internal_transaction() ?
 | 
						|
          thd->wsrep_cs().before_command(keep_command_error) : 0);
 | 
						|
}
 | 
						|
 | 
						|
static inline int wsrep_before_command(THD* thd)
 | 
						|
{
 | 
						|
  return wsrep_before_command(thd, false);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Called after each command.
 | 
						|
 | 
						|
  Return zero on success, non-zero on failure.
 | 
						|
*/
 | 
						|
static inline void wsrep_after_command_before_result(THD* thd)
 | 
						|
{
 | 
						|
  if (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
 | 
						|
      !thd->internal_transaction())
 | 
						|
  {
 | 
						|
    thd->wsrep_cs().after_command_before_result();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
static inline void wsrep_after_command_after_result(THD* thd)
 | 
						|
{
 | 
						|
  if (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
 | 
						|
      !thd->internal_transaction())
 | 
						|
  {
 | 
						|
    thd->wsrep_cs().after_command_after_result();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
static inline void wsrep_after_command_ignore_result(THD* thd)
 | 
						|
{
 | 
						|
  wsrep_after_command_before_result(thd);
 | 
						|
  DBUG_ASSERT(!thd->wsrep_cs().current_error());
 | 
						|
  wsrep_after_command_after_result(thd);
 | 
						|
}
 | 
						|
 | 
						|
static inline enum wsrep::client_error wsrep_current_error(THD* thd)
 | 
						|
{
 | 
						|
  return thd->wsrep_cs().current_error();
 | 
						|
}
 | 
						|
 | 
						|
static inline enum wsrep::provider::status
 | 
						|
wsrep_current_error_status(THD* thd)
 | 
						|
{
 | 
						|
  return thd->wsrep_cs().current_error_status();
 | 
						|
}
 | 
						|
 | 
						|
#endif /* WSREP_TRANS_OBSERVER */
 |