mirror of
				https://github.com/MariaDB/server.git
				synced 2025-11-04 04:46:15 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			1528 lines
		
	
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1528 lines
		
	
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2015-2021 Codership Oy <info@codership.com>
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or modify
 | 
						|
   it under the terms of the GNU General Public License as published by
 | 
						|
   the Free Software Foundation; version 2 of the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License along
 | 
						|
   with this program; if not, write to the Free Software Foundation, Inc.,
 | 
						|
   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
 | 
						|
 | 
						|
#include "mariadb.h"
 | 
						|
 | 
						|
#include "table.h"
 | 
						|
#include "key.h"
 | 
						|
#include "sql_base.h"
 | 
						|
#include "sql_parse.h"
 | 
						|
#include "sql_update.h"
 | 
						|
#include "transaction.h"
 | 
						|
 | 
						|
#include "mysql/service_wsrep.h"
 | 
						|
#include "wsrep_schema.h"
 | 
						|
#include "wsrep_applier.h"
 | 
						|
#include "wsrep_xid.h"
 | 
						|
#include "wsrep_binlog.h"
 | 
						|
#include "wsrep_high_priority_service.h"
 | 
						|
#include "wsrep_storage_service.h"
 | 
						|
#include "wsrep_thd.h"
 | 
						|
#include "wsrep_server_state.h"
 | 
						|
 | 
						|
#include <string>
 | 
						|
#include <sstream>
 | 
						|
 | 
						|
const char* wsrep_sr_table_name_full= WSREP_SCHEMA "/" WSREP_STREAMING_TABLE;
 | 
						|
 | 
						|
static const std::string wsrep_schema_str= WSREP_SCHEMA;
 | 
						|
static const std::string sr_table_str= WSREP_STREAMING_TABLE;
 | 
						|
static const std::string cluster_table_str= WSREP_CLUSTER_TABLE;
 | 
						|
static const std::string members_table_str= WSREP_MEMBERS_TABLE;
 | 
						|
 | 
						|
static const std::string create_cluster_table_str=
 | 
						|
  "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + cluster_table_str +
 | 
						|
  "("
 | 
						|
  "cluster_uuid CHAR(36) PRIMARY KEY,"
 | 
						|
  "view_id BIGINT NOT NULL,"
 | 
						|
  "view_seqno BIGINT NOT NULL,"
 | 
						|
  "protocol_version INT NOT NULL,"
 | 
						|
  "capabilities INT NOT NULL"
 | 
						|
  ") ENGINE=InnoDB STATS_PERSISTENT=0 CHARSET=latin1";
 | 
						|
 | 
						|
static const std::string create_members_table_str=
 | 
						|
  "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + members_table_str +
 | 
						|
  "("
 | 
						|
  "node_uuid CHAR(36) PRIMARY KEY,"
 | 
						|
  "cluster_uuid CHAR(36) NOT NULL,"
 | 
						|
  "node_name CHAR(32) NOT NULL,"
 | 
						|
  "node_incoming_address VARCHAR(256) NOT NULL"
 | 
						|
  ") ENGINE=InnoDB STATS_PERSISTENT=0 CHARSET=latin1";
 | 
						|
 | 
						|
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
 | 
						|
static const std::string cluster_member_history_table_str= "wsrep_cluster_member_history";
 | 
						|
static const std::string create_members_history_table_str=
 | 
						|
  "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + cluster_member_history_table_str +
 | 
						|
  "("
 | 
						|
  "node_uuid CHAR(36) PRIMARY KEY,"
 | 
						|
  "cluster_uuid CHAR(36) NOT NULL,"
 | 
						|
  "last_view_id BIGINT NOT NULL,"
 | 
						|
  "last_view_seqno BIGINT NOT NULL,"
 | 
						|
  "node_name CHAR(32) NOT NULL,"
 | 
						|
  "node_incoming_address VARCHAR(256) NOT NULL"
 | 
						|
  ") ENGINE=InnoDB STATS_PERSISTENT=0 CHARSET=latin1";
 | 
						|
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
 | 
						|
 | 
						|
static const std::string create_frag_table_str=
 | 
						|
  "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + sr_table_str +
 | 
						|
  "("
 | 
						|
  "node_uuid CHAR(36), "
 | 
						|
  "trx_id BIGINT, "
 | 
						|
  "seqno BIGINT, "
 | 
						|
  "flags INT NOT NULL, "
 | 
						|
  "frag LONGBLOB NOT NULL, "
 | 
						|
  "PRIMARY KEY (node_uuid, trx_id, seqno)"
 | 
						|
  ") ENGINE=InnoDB STATS_PERSISTENT=0 CHARSET=latin1";
 | 
						|
 | 
						|
static const std::string delete_from_cluster_table=
 | 
						|
  "DELETE FROM " + wsrep_schema_str + "." + cluster_table_str;
 | 
						|
 | 
						|
static const std::string delete_from_members_table=
 | 
						|
  "DELETE FROM " + wsrep_schema_str + "." + members_table_str;
 | 
						|
 | 
						|
/* For rolling upgrade we need to use ALTER. We do not want
 | 
						|
persistent statistics to be collected from these tables. */
 | 
						|
static const std::string alter_cluster_table=
 | 
						|
  "ALTER TABLE " + wsrep_schema_str + "." + cluster_table_str +
 | 
						|
  " STATS_PERSISTENT=0 CHARSET=latin1";
 | 
						|
 | 
						|
static const std::string alter_members_table=
 | 
						|
  "ALTER TABLE " + wsrep_schema_str + "." + members_table_str +
 | 
						|
  " STATS_PERSISTENT=0 CHARSET=latin1";
 | 
						|
 | 
						|
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
 | 
						|
static const std::string alter_members_history_table=
 | 
						|
  "ALTER TABLE " + wsrep_schema_str + "." + members_history_table_str +
 | 
						|
  " STATS_PERSISTENT=0 CHARSET=latin1";
 | 
						|
#endif
 | 
						|
 | 
						|
static const std::string alter_frag_table=
 | 
						|
  "ALTER TABLE " + wsrep_schema_str + "." + sr_table_str +
 | 
						|
  " STATS_PERSISTENT=0 CHARSET=latin1";
 | 
						|
 | 
						|
namespace Wsrep_schema_impl
 | 
						|
{
 | 
						|
 | 
						|
class binlog_off
 | 
						|
{
 | 
						|
public:
 | 
						|
  binlog_off(THD* thd)
 | 
						|
    : m_thd(thd)
 | 
						|
    , m_option_bits(thd->variables.option_bits)
 | 
						|
    , m_sql_log_bin(thd->variables.sql_log_bin)
 | 
						|
  {
 | 
						|
    thd->variables.option_bits&= ~OPTION_BIN_LOG;
 | 
						|
    thd->variables.sql_log_bin= 0;
 | 
						|
  }
 | 
						|
  ~binlog_off()
 | 
						|
  {
 | 
						|
    m_thd->variables.option_bits= m_option_bits;
 | 
						|
    m_thd->variables.sql_log_bin= m_sql_log_bin;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  THD* m_thd;
 | 
						|
  ulonglong m_option_bits;
 | 
						|
  my_bool m_sql_log_bin;
 | 
						|
};
 | 
						|
 | 
						|
class wsrep_off
 | 
						|
{
 | 
						|
public:
 | 
						|
  wsrep_off(THD* thd)
 | 
						|
    : m_thd(thd)
 | 
						|
    , m_wsrep_on(thd->variables.wsrep_on)
 | 
						|
  {
 | 
						|
    thd->variables.wsrep_on= 0;
 | 
						|
  }
 | 
						|
  ~wsrep_off()
 | 
						|
  {
 | 
						|
    m_thd->variables.wsrep_on= m_wsrep_on;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  THD* m_thd;
 | 
						|
  my_bool m_wsrep_on;
 | 
						|
};
 | 
						|
 | 
						|
class thd_server_status
 | 
						|
{
 | 
						|
public:
 | 
						|
  thd_server_status(THD* thd, uint server_status, bool condition)
 | 
						|
    : m_thd(thd)
 | 
						|
    , m_thd_server_status(thd->server_status)
 | 
						|
  {
 | 
						|
    if (condition)
 | 
						|
      thd->server_status= server_status;
 | 
						|
  }
 | 
						|
  ~thd_server_status()
 | 
						|
  {
 | 
						|
    m_thd->server_status= m_thd_server_status;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  THD* m_thd;
 | 
						|
  uint m_thd_server_status;
 | 
						|
};
 | 
						|
 | 
						|
class thd_context_switch
 | 
						|
{
 | 
						|
public:
 | 
						|
  thd_context_switch(THD *orig_thd, THD *cur_thd)
 | 
						|
    : m_orig_thd(orig_thd)
 | 
						|
    , m_cur_thd(cur_thd)
 | 
						|
  {
 | 
						|
    wsrep_reset_threadvars(m_orig_thd);
 | 
						|
    wsrep_store_threadvars(m_cur_thd);
 | 
						|
  }
 | 
						|
  ~thd_context_switch()
 | 
						|
  {
 | 
						|
    wsrep_reset_threadvars(m_cur_thd);
 | 
						|
    wsrep_store_threadvars(m_orig_thd);
 | 
						|
  }
 | 
						|
private:
 | 
						|
  THD *m_orig_thd;
 | 
						|
  THD *m_cur_thd;
 | 
						|
};
 | 
						|
 | 
						|
class sql_safe_updates
 | 
						|
{
 | 
						|
public:
 | 
						|
  sql_safe_updates(THD* thd)
 | 
						|
    : m_thd(thd)
 | 
						|
    , m_option_bits(thd->variables.option_bits)
 | 
						|
  {
 | 
						|
    thd->variables.option_bits&= ~OPTION_SAFE_UPDATES;
 | 
						|
  }
 | 
						|
  ~sql_safe_updates()
 | 
						|
  {
 | 
						|
    m_thd->variables.option_bits= m_option_bits;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  THD* m_thd;
 | 
						|
  ulonglong m_option_bits;
 | 
						|
};
 | 
						|
 | 
						|
static int execute_SQL(THD* thd, const char* sql, uint length) {
 | 
						|
  DBUG_ENTER("Wsrep_schema::execute_SQL()");
 | 
						|
  int err= 0;
 | 
						|
 | 
						|
  PSI_statement_locker *parent_locker= thd->m_statement_psi;
 | 
						|
  Parser_state parser_state;
 | 
						|
 | 
						|
  WSREP_DEBUG("SQL: %d %s thd: %lld", length, sql, (long long)thd->thread_id);
 | 
						|
 | 
						|
  if (parser_state.init(thd, (char*)sql, length) == 0) {
 | 
						|
    thd->reset_for_next_command();
 | 
						|
    lex_start(thd);
 | 
						|
 | 
						|
    thd->m_statement_psi= NULL;
 | 
						|
 | 
						|
    thd->set_query((char*)sql, length);
 | 
						|
    thd->set_query_id(next_query_id());
 | 
						|
 | 
						|
    mysql_parse(thd, (char*)sql, length, & parser_state, FALSE, FALSE);
 | 
						|
 | 
						|
    if (thd->is_error()) {
 | 
						|
      WSREP_WARN("Wsrep_schema::execute_sql() failed, %d %s\nSQL: %s",
 | 
						|
                 thd->get_stmt_da()->sql_errno(),
 | 
						|
                 thd->get_stmt_da()->message(),
 | 
						|
                 sql);
 | 
						|
      err= 1;
 | 
						|
    }
 | 
						|
    thd->m_statement_psi= parent_locker;
 | 
						|
    thd->end_statement();
 | 
						|
    thd->reset_query();
 | 
						|
    close_thread_tables(thd);
 | 
						|
    delete_explain_query(thd->lex);
 | 
						|
  }
 | 
						|
  else {
 | 
						|
    WSREP_WARN("SR init failure");
 | 
						|
  }
 | 
						|
  thd->cleanup_after_query();
 | 
						|
  DBUG_RETURN(err);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Initialize thd for next "statement"
 | 
						|
 */
 | 
						|
static void init_stmt(THD* thd) {
 | 
						|
  thd->reset_for_next_command();
 | 
						|
}
 | 
						|
 | 
						|
static void finish_stmt(THD* thd) {
 | 
						|
  trans_commit_stmt(thd);
 | 
						|
  close_thread_tables(thd);
 | 
						|
}
 | 
						|
 | 
						|
static int open_table(THD *thd, const LEX_CSTRING *schema_name,
 | 
						|
                      const LEX_CSTRING *table_name,
 | 
						|
                      enum thr_lock_type const lock_type,
 | 
						|
                      TABLE_LIST *table_list)
 | 
						|
{
 | 
						|
  assert(table_list);
 | 
						|
  DBUG_ENTER("Wsrep_schema::open_table()");
 | 
						|
  const uint flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
 | 
						|
                     MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY |
 | 
						|
                     MYSQL_OPEN_IGNORE_FLUSH | MYSQL_LOCK_IGNORE_TIMEOUT);
 | 
						|
  table_list->init_one_table(schema_name, table_name, NULL, lock_type);
 | 
						|
  thd->lex->query_tables_own_last= 0;
 | 
						|
 | 
						|
  // No need to open table if the query was bf aborted,
 | 
						|
  // thd client will get ER_LOCK_DEADLOCK in the end.
 | 
						|
  const bool interrupted= thd->killed ||
 | 
						|
       (thd->is_error() &&
 | 
						|
       (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED));
 | 
						|
 | 
						|
  if (interrupted ||
 | 
						|
      !open_n_lock_single_table(thd, table_list, table_list->lock_type, flags))
 | 
						|
  {
 | 
						|
    close_thread_tables(thd);
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
 | 
						|
  table_list->table->use_all_columns();
 | 
						|
 | 
						|
  DBUG_RETURN(0);
 | 
						|
}
 | 
						|
 | 
						|
static int open_for_write(THD* thd, const char* table_name, TABLE_LIST* table_list)
 | 
						|
{
 | 
						|
  LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
 | 
						|
  LEX_CSTRING table_str= { table_name, strlen(table_name) };
 | 
						|
  if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_WRITE,
 | 
						|
                                    table_list))
 | 
						|
  {
 | 
						|
    // No need to log an error if the query was bf aborted,
 | 
						|
    // thd client will get ER_LOCK_DEADLOCK in the end.
 | 
						|
    const bool interrupted= thd->killed ||
 | 
						|
      (thd->is_error() &&
 | 
						|
       (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED));
 | 
						|
    if (!interrupted)
 | 
						|
    {
 | 
						|
      WSREP_ERROR("Failed to open table %s.%s for writing",
 | 
						|
                  schema_str.str, table_name);
 | 
						|
    }
 | 
						|
    return 1;
 | 
						|
  }
 | 
						|
  empty_record(table_list->table);
 | 
						|
  table_list->table->use_all_columns();
 | 
						|
  restore_record(table_list->table, s->default_values);
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
static void store(TABLE* table, uint field, const Wsrep_id& id) {
 | 
						|
  assert(field < table->s->fields);
 | 
						|
  std::ostringstream os;
 | 
						|
  os << id;
 | 
						|
  table->field[field]->store(os.str().c_str(),
 | 
						|
                             os.str().size(),
 | 
						|
                             &my_charset_bin);
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
template <typename INTTYPE>
 | 
						|
static void store(TABLE* table, uint field, const INTTYPE val) {
 | 
						|
  assert(field < table->s->fields);
 | 
						|
  table->field[field]->store(val);
 | 
						|
}
 | 
						|
 | 
						|
template <typename CHARTYPE>
 | 
						|
static void store(TABLE* table, uint field, const CHARTYPE* str, size_t str_len) {
 | 
						|
  assert(field < table->s->fields);
 | 
						|
  table->field[field]->store((const char*)str,
 | 
						|
                             str_len,
 | 
						|
                             &my_charset_bin);
 | 
						|
}
 | 
						|
 | 
						|
static void store(TABLE* table, uint field, const std::string& str)
 | 
						|
{
 | 
						|
  store(table, field, str.c_str(), str.size());
 | 
						|
}
 | 
						|
 | 
						|
static int update_or_insert(TABLE* table) {
 | 
						|
  DBUG_ENTER("Wsrep_schema::update_or_insert()");
 | 
						|
  int ret= 0;
 | 
						|
  char* key;
 | 
						|
  int error;
 | 
						|
 | 
						|
  /*
 | 
						|
    Verify that the table has primary key defined.
 | 
						|
  */
 | 
						|
  if (table->s->primary_key >= MAX_KEY ||
 | 
						|
      !table->s->keys_in_use.is_set(table->s->primary_key)) {
 | 
						|
    WSREP_ERROR("No primary key for %s.%s",
 | 
						|
                table->s->db.str, table->s->table_name.str);
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
 | 
						|
  /*
 | 
						|
    Find the record and update or insert a new one if not found.
 | 
						|
  */
 | 
						|
  if (!(key= (char*) my_safe_alloca(table->s->max_unique_length))) {
 | 
						|
    WSREP_ERROR("Error allocating %ud bytes for key",
 | 
						|
                table->s->max_unique_length);
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
 | 
						|
  key_copy((uchar*) key, table->record[0],
 | 
						|
           table->key_info + table->s->primary_key, 0);
 | 
						|
 | 
						|
  if ((error= table->file->ha_index_read_idx_map(table->record[1],
 | 
						|
                                                 table->s->primary_key,
 | 
						|
                                                 (uchar*) key,
 | 
						|
                                                 HA_WHOLE_KEY,
 | 
						|
                                                 HA_READ_KEY_EXACT))) {
 | 
						|
    /*
 | 
						|
      Row not found, insert a new one.
 | 
						|
    */
 | 
						|
    if ((error= table->file->ha_write_row(table->record[0]))) {
 | 
						|
      WSREP_ERROR("Error writing into %s.%s: %d",
 | 
						|
                  table->s->db.str,
 | 
						|
                  table->s->table_name.str,
 | 
						|
                  error);
 | 
						|
      ret= 1;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else if (!records_are_comparable(table) || compare_record(table)) {
 | 
						|
    /*
 | 
						|
      Record has changed
 | 
						|
    */
 | 
						|
    if ((error= table->file->ha_update_row(table->record[1],
 | 
						|
                                           table->record[0])) &&
 | 
						|
        error != HA_ERR_RECORD_IS_THE_SAME) {
 | 
						|
      WSREP_ERROR("Error updating record in %s.%s: %d",
 | 
						|
                  table->s->db.str,
 | 
						|
                  table->s->table_name.str,
 | 
						|
                  error);
 | 
						|
      ret= 1;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  my_safe_afree(key, table->s->max_unique_length);
 | 
						|
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
static int insert(TABLE* table) {
 | 
						|
  DBUG_ENTER("Wsrep_schema::insert()");
 | 
						|
  int ret= 0;
 | 
						|
  int error;
 | 
						|
 | 
						|
  /*
 | 
						|
    Verify that the table has primary key defined.
 | 
						|
  */
 | 
						|
  if (table->s->primary_key >= MAX_KEY ||
 | 
						|
      !table->s->keys_in_use.is_set(table->s->primary_key)) {
 | 
						|
    WSREP_ERROR("No primary key for %s.%s",
 | 
						|
                table->s->db.str, table->s->table_name.str);
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
 | 
						|
  if ((error= table->file->ha_write_row(table->record[0]))) {
 | 
						|
    WSREP_ERROR("Error writing into %s.%s: %d",
 | 
						|
                table->s->db.str,
 | 
						|
                table->s->table_name.str,
 | 
						|
                error);
 | 
						|
    ret= 1;
 | 
						|
  }
 | 
						|
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
static int delete_row(TABLE* table) {
 | 
						|
  int error;
 | 
						|
  int retry= 3;
 | 
						|
 | 
						|
  do {
 | 
						|
    error= table->file->ha_delete_row(table->record[0]);
 | 
						|
    retry--;
 | 
						|
  } while (error && retry);
 | 
						|
 | 
						|
  if (error) {
 | 
						|
    WSREP_ERROR("Error deleting row from %s.%s: %d",
 | 
						|
                table->s->db.str,
 | 
						|
                table->s->table_name.str,
 | 
						|
                error);
 | 
						|
    return 1;
 | 
						|
  }
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
static int open_for_read(THD *thd, const char *table_name,
 | 
						|
                         TABLE_LIST *table_list)
 | 
						|
{
 | 
						|
  LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
 | 
						|
  LEX_CSTRING table_str= { table_name, strlen(table_name) };
 | 
						|
  if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_READ,
 | 
						|
                                    table_list))
 | 
						|
  {
 | 
						|
    WSREP_ERROR("Failed to open table %s.%s for reading",
 | 
						|
                schema_str.str, table_name);
 | 
						|
    return 1;
 | 
						|
  }
 | 
						|
  empty_record(table_list->table);
 | 
						|
  table_list->table->use_all_columns();
 | 
						|
  restore_record(table_list->table, s->default_values);
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Init table for sequential scan.
 | 
						|
 | 
						|
  @return 0 in case of success, 1 in case of error.
 | 
						|
 */
 | 
						|
static int init_for_scan(TABLE* table) {
 | 
						|
  int error;
 | 
						|
  if ((error= table->file->ha_rnd_init(TRUE))) {
 | 
						|
    WSREP_ERROR("Failed to init table for scan: %d", error);
 | 
						|
    return 1;
 | 
						|
  }
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
/*
 | 
						|
  Scan next record. For return codes see handler::ha_rnd_next()
 | 
						|
 | 
						|
  @return 0 in case of success, error code in case of error
 | 
						|
 */
 | 
						|
static int next_record(TABLE* table) {
 | 
						|
  int error;
 | 
						|
  if ((error= table->file->ha_rnd_next(table->record[0])) &&
 | 
						|
      error != HA_ERR_END_OF_FILE) {
 | 
						|
    WSREP_ERROR("Failed to read next record: %d", error);
 | 
						|
  }
 | 
						|
  return error;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  End scan.
 | 
						|
 | 
						|
  @return 0 in case of success, 1 in case of error.
 | 
						|
 */
 | 
						|
static int end_scan(TABLE* table) {
 | 
						|
  int error;
 | 
						|
  if ((error= table->file->ha_rnd_end())) {
 | 
						|
    WSREP_ERROR("Failed to end scan: %d", error);
 | 
						|
    return 1;
 | 
						|
  }
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
static int scan(TABLE* table, uint field, wsrep::id& id)
 | 
						|
{
 | 
						|
  assert(field < table->s->fields);
 | 
						|
  String uuid_str;
 | 
						|
  (void)table->field[field]->val_str(&uuid_str);
 | 
						|
  id= wsrep::id(std::string(uuid_str.c_ptr(), uuid_str.length()));
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
template <typename INTTYPE>
 | 
						|
static int scan(TABLE* table, uint field, INTTYPE& val)
 | 
						|
{
 | 
						|
  assert(field < table->s->fields);
 | 
						|
  val= table->field[field]->val_int();
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
static int scan(TABLE* table, uint field, char* strbuf, uint strbuf_len)
 | 
						|
{
 | 
						|
  uint len;
 | 
						|
  StringBuffer<STRING_BUFFER_USUAL_SIZE> str;
 | 
						|
  (void) table->field[field]->val_str(&str);
 | 
						|
  len= str.length();
 | 
						|
  strmake(strbuf, str.ptr(), MY_MIN(len, strbuf_len-1));
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Scan member
 | 
						|
  TODO: filter members by cluster UUID
 | 
						|
 */
 | 
						|
static int scan_member(TABLE* table,
 | 
						|
                       const Wsrep_id& cluster_uuid,
 | 
						|
                       std::vector<Wsrep_view::member>& members)
 | 
						|
{
 | 
						|
  Wsrep_id member_id;
 | 
						|
  char member_name[128]= { 0, };
 | 
						|
  char member_incoming[128]= { 0, };
 | 
						|
 | 
						|
  if (scan(table, 0, member_id) ||
 | 
						|
      scan(table, 2, member_name, sizeof(member_name)) ||
 | 
						|
      scan(table, 3, member_incoming, sizeof(member_incoming))) {
 | 
						|
    return 1;
 | 
						|
  }
 | 
						|
 | 
						|
  if (members.empty() == false) {
 | 
						|
    assert(members.rbegin()->id() < member_id);
 | 
						|
  }
 | 
						|
 | 
						|
  try {
 | 
						|
    members.push_back(Wsrep_view::member(member_id,
 | 
						|
                                         member_name,
 | 
						|
                                         member_incoming));
 | 
						|
  }
 | 
						|
  catch (...) {
 | 
						|
    WSREP_ERROR("Caught exception while scanning members table");
 | 
						|
    return 1;
 | 
						|
  }
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Init table for index scan and retrieve first record
 | 
						|
 | 
						|
  @return 0 in case of success, error code in case of error.
 | 
						|
 */
 | 
						|
static int init_for_index_scan(TABLE* table, const uchar* key,
 | 
						|
                               key_part_map map) {
 | 
						|
  int error;
 | 
						|
  if ((error= table->file->ha_index_init(table->s->primary_key, true))) {
 | 
						|
    WSREP_ERROR("Failed to init table for index scan: %d", error);
 | 
						|
    return error;
 | 
						|
  }
 | 
						|
 | 
						|
  error= table->file->ha_index_read_map(table->record[0],
 | 
						|
                                        key, map, HA_READ_KEY_EXACT);
 | 
						|
  switch(error) {
 | 
						|
  case 0:
 | 
						|
  case HA_ERR_END_OF_FILE:
 | 
						|
  case HA_ERR_KEY_NOT_FOUND:
 | 
						|
  case HA_ERR_ABORTED_BY_USER:
 | 
						|
    break;
 | 
						|
  case -1:
 | 
						|
    WSREP_DEBUG("init_for_index_scan interrupted");
 | 
						|
    break;
 | 
						|
  default:
 | 
						|
    WSREP_ERROR("init_for_index_scan failed to read first record, error %d", error);
 | 
						|
  }
 | 
						|
  return error;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  End index scan.
 | 
						|
 | 
						|
  @return 0 in case of success, 1 in case of error.
 | 
						|
 */
 | 
						|
static int end_index_scan(TABLE* table) {
 | 
						|
  int error;
 | 
						|
  if (table->file->inited) {
 | 
						|
    if ((error= table->file->ha_index_end())) {
 | 
						|
      WSREP_ERROR("Failed to end scan: %d", error);
 | 
						|
      return 1;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
static void make_key(TABLE* table, uchar** key, key_part_map* map, int parts) {
 | 
						|
  uint prefix_length= 0;
 | 
						|
  KEY_PART_INFO* key_part= table->key_info->key_part;
 | 
						|
 | 
						|
  for (int i=0; i < parts; i++)
 | 
						|
    prefix_length += key_part[i].store_length;
 | 
						|
 | 
						|
  *map= make_prev_keypart_map(parts);
 | 
						|
 | 
						|
  if (!(*key= (uchar *) my_malloc(PSI_NOT_INSTRUMENTED, prefix_length + 1, MYF(MY_WME))))
 | 
						|
  {
 | 
						|
    WSREP_ERROR("Failed to allocate memory for key prefix_length %u", prefix_length);
 | 
						|
    assert(0);
 | 
						|
  }
 | 
						|
 | 
						|
  key_copy(*key, table->record[0], table->key_info, prefix_length);
 | 
						|
}
 | 
						|
 | 
						|
} /* namespace Wsrep_schema_impl */
 | 
						|
 | 
						|
 | 
						|
Wsrep_schema::Wsrep_schema() = default;
 | 
						|
 | 
						|
Wsrep_schema::~Wsrep_schema() = default;
 | 
						|
 | 
						|
static void wsrep_init_thd_for_schema(THD *thd)
 | 
						|
{
 | 
						|
  thd->security_ctx->skip_grants();
 | 
						|
  thd->system_thread= SYSTEM_THREAD_GENERIC;
 | 
						|
 | 
						|
  thd->real_id=pthread_self(); // Keep purify happy
 | 
						|
 | 
						|
  thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime;
 | 
						|
 | 
						|
  /* No Galera replication */
 | 
						|
  thd->variables.wsrep_on= 0;
 | 
						|
  /* No binlogging */
 | 
						|
  thd->variables.sql_log_bin= 0;
 | 
						|
  thd->variables.option_bits&= ~OPTION_BIN_LOG;
 | 
						|
  /* No safe updates */
 | 
						|
  thd->variables.option_bits&= ~OPTION_SAFE_UPDATES;
 | 
						|
  /* No general log */
 | 
						|
  thd->variables.option_bits|= OPTION_LOG_OFF;
 | 
						|
  /* Read committed isolation to avoid gap locking */
 | 
						|
  thd->variables.tx_isolation= ISO_READ_COMMITTED;
 | 
						|
  wsrep_assign_from_threadvars(thd);
 | 
						|
  wsrep_store_threadvars(thd);
 | 
						|
}
 | 
						|
 | 
						|
int Wsrep_schema::init()
 | 
						|
{
 | 
						|
  DBUG_ENTER("Wsrep_schema::init()");
 | 
						|
  int ret;
 | 
						|
  THD* thd= new THD(next_thread_id());
 | 
						|
  if (!thd) {
 | 
						|
    WSREP_ERROR("Unable to get thd");
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
  thd->thread_stack= (char*)&thd;
 | 
						|
  wsrep_init_thd_for_schema(thd);
 | 
						|
 | 
						|
  if (Wsrep_schema_impl::execute_SQL(thd, create_cluster_table_str.c_str(),
 | 
						|
                                     create_cluster_table_str.size()) ||
 | 
						|
      Wsrep_schema_impl::execute_SQL(thd, create_members_table_str.c_str(),
 | 
						|
                                     create_members_table_str.size()) ||
 | 
						|
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
 | 
						|
      Wsrep_schema_impl::execute_SQL(thd,
 | 
						|
                                     create_members_history_table_str.c_str(),
 | 
						|
                                     create_members_history_table_str.size()) ||
 | 
						|
      Wsrep_schema_impl::execute_SQL(thd,
 | 
						|
                                     alter_members_history_table.c_str(),
 | 
						|
                                     alter_members_history_table.size()) ||
 | 
						|
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
 | 
						|
      Wsrep_schema_impl::execute_SQL(thd,
 | 
						|
                                     create_frag_table_str.c_str(),
 | 
						|
                                     create_frag_table_str.size()) ||
 | 
						|
      Wsrep_schema_impl::execute_SQL(thd,
 | 
						|
                                     alter_cluster_table.c_str(),
 | 
						|
                                     alter_cluster_table.size()) ||
 | 
						|
      Wsrep_schema_impl::execute_SQL(thd,
 | 
						|
                                     alter_members_table.c_str(),
 | 
						|
                                     alter_members_table.size()) ||
 | 
						|
      Wsrep_schema_impl::execute_SQL(thd,
 | 
						|
                                     alter_frag_table.c_str(),
 | 
						|
	                             alter_frag_table.size()))
 | 
						|
  {
 | 
						|
    ret= 1;
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    ret= 0;
 | 
						|
  }
 | 
						|
 | 
						|
  delete thd;
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view)
 | 
						|
{
 | 
						|
  DBUG_ENTER("Wsrep_schema::store_view()");
 | 
						|
  assert(view.status() == Wsrep_view::primary);
 | 
						|
  int ret= 1;
 | 
						|
  int error;
 | 
						|
  TABLE_LIST cluster_table_l;
 | 
						|
  TABLE* cluster_table= 0;
 | 
						|
  TABLE_LIST members_table_l;
 | 
						|
  TABLE* members_table = 0;
 | 
						|
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
 | 
						|
  TABLE* members_history_table= 0;
 | 
						|
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
 | 
						|
 | 
						|
  Wsrep_schema_impl::wsrep_off wsrep_off(thd);
 | 
						|
  Wsrep_schema_impl::binlog_off binlog_off(thd);
 | 
						|
  Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
 | 
						|
 | 
						|
  /*
 | 
						|
    Clean up cluster table and members table.
 | 
						|
  */
 | 
						|
  if (Wsrep_schema_impl::execute_SQL(thd,
 | 
						|
                                     delete_from_cluster_table.c_str(),
 | 
						|
                                     delete_from_cluster_table.size()) ||
 | 
						|
      Wsrep_schema_impl::execute_SQL(thd,
 | 
						|
                                     delete_from_members_table.c_str(),
 | 
						|
                                     delete_from_members_table.size())) {
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
 | 
						|
  /*
 | 
						|
    Store cluster view info
 | 
						|
  */
 | 
						|
  Wsrep_schema_impl::init_stmt(thd);
 | 
						|
  if (Wsrep_schema_impl::open_for_write(thd, cluster_table_str.c_str(), &cluster_table_l))
 | 
						|
  {
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
 | 
						|
  cluster_table= cluster_table_l.table;
 | 
						|
 | 
						|
  Wsrep_schema_impl::store(cluster_table, 0, view.state_id().id());
 | 
						|
  Wsrep_schema_impl::store(cluster_table, 1, view.view_seqno().get());
 | 
						|
  Wsrep_schema_impl::store(cluster_table, 2, view.state_id().seqno().get());
 | 
						|
  Wsrep_schema_impl::store(cluster_table, 3, view.protocol_version());
 | 
						|
  Wsrep_schema_impl::store(cluster_table, 4, view.capabilities());
 | 
						|
 | 
						|
  if ((error= Wsrep_schema_impl::update_or_insert(cluster_table)))
 | 
						|
  {
 | 
						|
    WSREP_ERROR("failed to write to cluster table: %d", error);
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
 | 
						|
  Wsrep_schema_impl::finish_stmt(thd);
 | 
						|
 | 
						|
  /*
 | 
						|
    Store info about current members
 | 
						|
  */
 | 
						|
  Wsrep_schema_impl::init_stmt(thd);
 | 
						|
  if (Wsrep_schema_impl::open_for_write(thd, members_table_str.c_str(),
 | 
						|
                                        &members_table_l))
 | 
						|
  {
 | 
						|
    WSREP_ERROR("failed to open wsrep.members table");
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
 | 
						|
  members_table= members_table_l.table;
 | 
						|
 | 
						|
  for (size_t i= 0; i < view.members().size(); ++i)
 | 
						|
  {
 | 
						|
    Wsrep_schema_impl::store(members_table, 0, view.members()[i].id());
 | 
						|
    Wsrep_schema_impl::store(members_table, 1, view.state_id().id());
 | 
						|
    Wsrep_schema_impl::store(members_table, 2, view.members()[i].name());
 | 
						|
    Wsrep_schema_impl::store(members_table, 3, view.members()[i].incoming());
 | 
						|
    if ((error= Wsrep_schema_impl::update_or_insert(members_table)))
 | 
						|
    {
 | 
						|
      WSREP_ERROR("failed to write wsrep.members table: %d", error);
 | 
						|
      goto out;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  Wsrep_schema_impl::finish_stmt(thd);
 | 
						|
 | 
						|
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
 | 
						|
  /*
 | 
						|
    Store members history
 | 
						|
  */
 | 
						|
  Wsrep_schema_impl::init_stmt(thd);
 | 
						|
  if (Wsrep_schema_impl::open_for_write(thd, cluster_member_history.c_str(),
 | 
						|
                                        &members_history_table)) {
 | 
						|
    WSREP_ERROR("failed to open wsrep.members table");
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
 | 
						|
  for (size_t i= 0; i < view.members().size(); ++i) {
 | 
						|
    Wsrep_schema_impl::store(members_history_table, 0, view.members()[i].id());
 | 
						|
    Wsrep_schema_impl::store(members_history_table, 1, view.state_id().id());
 | 
						|
    Wsrep_schema_impl::store(members_history_table, 2, view.view_seqno());
 | 
						|
    Wsrep_schema_impl::store(members_history_table, 3, view.state_id().seqno());
 | 
						|
    Wsrep_schema_impl::store(members_history_table, 4,
 | 
						|
                             view.members()[i].name());
 | 
						|
    Wsrep_schema_impl::store(members_history_table, 5,
 | 
						|
                             view.members()[i].incoming());
 | 
						|
    if ((error= Wsrep_schema_impl::update_or_insert(members_history_table))) {
 | 
						|
      WSREP_ERROR("failed to write wsrep_cluster_member_history table: %d", error);
 | 
						|
      goto out;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  Wsrep_schema_impl::finish_stmt(thd);
 | 
						|
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
 | 
						|
  ret= 0;
 | 
						|
 out:
 | 
						|
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
Wsrep_view Wsrep_schema::restore_view(THD* thd, const Wsrep_id& own_id) const {
 | 
						|
  DBUG_ENTER("Wsrep_schema::restore_view()");
 | 
						|
 | 
						|
  int ret= 1;
 | 
						|
  int error;
 | 
						|
 | 
						|
  TABLE_LIST cluster_table_l;
 | 
						|
  TABLE* cluster_table= 0;
 | 
						|
  bool end_cluster_scan= false;
 | 
						|
  TABLE_LIST members_table_l;
 | 
						|
  TABLE* members_table= 0;
 | 
						|
  bool end_members_scan= false;
 | 
						|
 | 
						|
  /* variables below need to be initialized in case cluster table is empty */
 | 
						|
  Wsrep_id cluster_uuid;
 | 
						|
  wsrep_seqno_t view_id= -1;
 | 
						|
  wsrep_seqno_t view_seqno= -1;
 | 
						|
  int my_idx= -1;
 | 
						|
  int proto_ver= 0;
 | 
						|
  wsrep_cap_t capabilities= 0;
 | 
						|
  std::vector<Wsrep_view::member> members;
 | 
						|
 | 
						|
  // we don't want causal waits for reading non-replicated private data
 | 
						|
  int const wsrep_sync_wait_saved= thd->variables.wsrep_sync_wait;
 | 
						|
  thd->variables.wsrep_sync_wait= 0;
 | 
						|
 | 
						|
  if (trans_begin(thd, MYSQL_START_TRANS_OPT_READ_ONLY)) {
 | 
						|
    WSREP_ERROR("wsrep_schema::restore_view(): Failed to start transaction");
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
 | 
						|
  /*
 | 
						|
    Read cluster info from cluster table
 | 
						|
   */
 | 
						|
  Wsrep_schema_impl::init_stmt(thd);
 | 
						|
  if (Wsrep_schema_impl::open_for_read(thd, cluster_table_str.c_str(), &cluster_table_l)) {
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
  cluster_table = cluster_table_l.table;
 | 
						|
 | 
						|
  if (Wsrep_schema_impl::init_for_scan(cluster_table)) {
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
 | 
						|
  if (((error= Wsrep_schema_impl::next_record(cluster_table)) != 0 ||
 | 
						|
       Wsrep_schema_impl::scan(cluster_table, 0, cluster_uuid) ||
 | 
						|
       Wsrep_schema_impl::scan(cluster_table, 1, view_id) ||
 | 
						|
       Wsrep_schema_impl::scan(cluster_table, 2, view_seqno) ||
 | 
						|
       Wsrep_schema_impl::scan(cluster_table, 3, proto_ver) ||
 | 
						|
       Wsrep_schema_impl::scan(cluster_table, 4, capabilities)) &&
 | 
						|
      error != HA_ERR_END_OF_FILE) {
 | 
						|
    end_cluster_scan= true;
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
 | 
						|
  if (Wsrep_schema_impl::end_scan(cluster_table)) {
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
  Wsrep_schema_impl::finish_stmt(thd);
 | 
						|
 | 
						|
  /*
 | 
						|
    Read members from members table
 | 
						|
  */
 | 
						|
  Wsrep_schema_impl::init_stmt(thd);
 | 
						|
  if (Wsrep_schema_impl::open_for_read(thd, members_table_str.c_str(),
 | 
						|
                                       &members_table_l))
 | 
						|
  {
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
 | 
						|
  members_table= members_table_l.table;
 | 
						|
  if (Wsrep_schema_impl::init_for_scan(members_table)) {
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
  end_members_scan= true;
 | 
						|
 | 
						|
  while (true) {
 | 
						|
    if ((error= Wsrep_schema_impl::next_record(members_table)) == 0) {
 | 
						|
      if (Wsrep_schema_impl::scan_member(members_table,
 | 
						|
                                         cluster_uuid,
 | 
						|
                                         members)) {
 | 
						|
        goto out;
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else if (error == HA_ERR_END_OF_FILE) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    else {
 | 
						|
      goto out;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  end_members_scan= false;
 | 
						|
  if (Wsrep_schema_impl::end_scan(members_table)) {
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
  Wsrep_schema_impl::finish_stmt(thd);
 | 
						|
 | 
						|
  if (own_id.is_undefined() == false) {
 | 
						|
    for (uint i= 0; i < members.size(); ++i) {
 | 
						|
      if (members[i].id() == own_id) {
 | 
						|
        my_idx= i;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  (void)trans_commit(thd);
 | 
						|
  ret= 0; /* Success*/
 | 
						|
 out:
 | 
						|
 | 
						|
  if (end_cluster_scan) Wsrep_schema_impl::end_scan(cluster_table);
 | 
						|
  if (end_members_scan) Wsrep_schema_impl::end_scan(members_table);
 | 
						|
 | 
						|
  if (0 != ret) {
 | 
						|
    trans_rollback_stmt(thd);
 | 
						|
    if (!trans_rollback(thd)) {
 | 
						|
      close_thread_tables(thd);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  thd->release_transactional_locks();
 | 
						|
 | 
						|
  thd->variables.wsrep_sync_wait= wsrep_sync_wait_saved;
 | 
						|
 | 
						|
  if (0 == ret) {
 | 
						|
    Wsrep_view ret_view(
 | 
						|
      wsrep::gtid(cluster_uuid, Wsrep_seqno(view_seqno)),
 | 
						|
      Wsrep_seqno(view_id),
 | 
						|
      wsrep::view::primary,
 | 
						|
      capabilities,
 | 
						|
      my_idx,
 | 
						|
      proto_ver,
 | 
						|
      members
 | 
						|
    );
 | 
						|
 | 
						|
    if (wsrep_debug) {
 | 
						|
      std::ostringstream os;
 | 
						|
      os << "Restored cluster view:\n" << ret_view;
 | 
						|
      WSREP_INFO("%s", os.str().c_str());
 | 
						|
    }
 | 
						|
    DBUG_RETURN(ret_view);
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    WSREP_ERROR("wsrep_schema::restore_view() failed.");
 | 
						|
    Wsrep_view ret_view;
 | 
						|
    DBUG_RETURN(ret_view);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
int Wsrep_schema::append_fragment(THD* thd,
 | 
						|
                                  const wsrep::id& server_id,
 | 
						|
                                  wsrep::transaction_id transaction_id,
 | 
						|
                                  wsrep::seqno seqno,
 | 
						|
                                  int flags,
 | 
						|
                                  const wsrep::const_buffer& data)
 | 
						|
{
 | 
						|
  DBUG_ENTER("Wsrep_schema::append_fragment");
 | 
						|
  std::ostringstream os;
 | 
						|
  os << server_id;
 | 
						|
  WSREP_DEBUG("Append fragment(%llu) %s, %llu",
 | 
						|
              thd->thread_id,
 | 
						|
              os.str().c_str(),
 | 
						|
              transaction_id.get());
 | 
						|
  /* use private query table list for the duration of fragment storing,
 | 
						|
     populated query table list from "parent DML" may cause problems .e.g
 | 
						|
     for virtual column handling
 | 
						|
 */
 | 
						|
  Query_tables_list query_tables_list_backup;
 | 
						|
  thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
 | 
						|
 | 
						|
  Wsrep_schema_impl::binlog_off binlog_off(thd);
 | 
						|
  Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
 | 
						|
  Wsrep_schema_impl::init_stmt(thd);
 | 
						|
 | 
						|
  TABLE_LIST frag_table_l;
 | 
						|
  if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table_l))
 | 
						|
  {
 | 
						|
    trans_rollback_stmt(thd);
 | 
						|
    thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
 | 
						|
  TABLE* frag_table= frag_table_l.table;
 | 
						|
  Wsrep_schema_impl::store(frag_table, 0, server_id);
 | 
						|
  Wsrep_schema_impl::store(frag_table, 1, transaction_id.get());
 | 
						|
  Wsrep_schema_impl::store(frag_table, 2, seqno.get());
 | 
						|
  Wsrep_schema_impl::store(frag_table, 3, flags);
 | 
						|
  Wsrep_schema_impl::store(frag_table, 4, data.data(), data.size());
 | 
						|
 | 
						|
  if (Wsrep_schema_impl::insert(frag_table)) {
 | 
						|
    trans_rollback_stmt(thd);
 | 
						|
    close_thread_tables(thd);
 | 
						|
    thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
  Wsrep_schema_impl::finish_stmt(thd);
 | 
						|
  thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
 | 
						|
  DBUG_RETURN(0);
 | 
						|
}
 | 
						|
 | 
						|
int Wsrep_schema::update_fragment_meta(THD* thd,
 | 
						|
                                       const wsrep::ws_meta& ws_meta)
 | 
						|
{
 | 
						|
  DBUG_ENTER("Wsrep_schema::update_fragment_meta");
 | 
						|
  std::ostringstream os;
 | 
						|
  os << ws_meta.server_id();
 | 
						|
  WSREP_DEBUG("update_frag_seqno(%llu) %s, %llu, seqno %lld",
 | 
						|
              thd->thread_id,
 | 
						|
              os.str().c_str(),
 | 
						|
              ws_meta.transaction_id().get(),
 | 
						|
              ws_meta.seqno().get());
 | 
						|
  DBUG_ASSERT(ws_meta.seqno().is_undefined() == false);
 | 
						|
 | 
						|
  /* use private query table list for the duration of fragment storing,
 | 
						|
     populated query table list from "parent DML" may cause problems .e.g
 | 
						|
     for virtual column handling
 | 
						|
 */
 | 
						|
  Query_tables_list query_tables_list_backup;
 | 
						|
  thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
 | 
						|
 | 
						|
  Wsrep_schema_impl::binlog_off binlog_off(thd);
 | 
						|
  Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
 | 
						|
  int error;
 | 
						|
  uchar *key=NULL;
 | 
						|
  key_part_map key_map= 0;
 | 
						|
  TABLE* frag_table= 0;
 | 
						|
  TABLE_LIST frag_table_l;
 | 
						|
 | 
						|
  Wsrep_schema_impl::init_stmt(thd);
 | 
						|
  if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table_l))
 | 
						|
  {
 | 
						|
    thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
  frag_table= frag_table_l.table;
 | 
						|
 | 
						|
  /* Find record with the given uuid, trx id, and seqno -1 */
 | 
						|
  Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id());
 | 
						|
  Wsrep_schema_impl::store(frag_table, 1, ws_meta.transaction_id().get());
 | 
						|
  Wsrep_schema_impl::store(frag_table, 2, -1);
 | 
						|
  Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
 | 
						|
 | 
						|
  if ((error= Wsrep_schema_impl::init_for_index_scan(frag_table,
 | 
						|
                                                     key, key_map)))
 | 
						|
  {
 | 
						|
    if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
 | 
						|
    {
 | 
						|
      WSREP_WARN("Record not found in %s.%s: %d",
 | 
						|
                 frag_table->s->db.str,
 | 
						|
                 frag_table->s->table_name.str,
 | 
						|
                 error);
 | 
						|
    }
 | 
						|
    Wsrep_schema_impl::finish_stmt(thd);
 | 
						|
    thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
 | 
						|
    my_free(key);
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
 | 
						|
  my_free(key);
 | 
						|
  /* Copy the original record to frag_table->record[1] */
 | 
						|
  store_record(frag_table, record[1]);
 | 
						|
 | 
						|
  /* Store seqno in frag_table->record[0] and update the row */
 | 
						|
  Wsrep_schema_impl::store(frag_table, 2, ws_meta.seqno().get());
 | 
						|
  if ((error= frag_table->file->ha_update_row(frag_table->record[1],
 | 
						|
                                              frag_table->record[0]))) {
 | 
						|
    WSREP_ERROR("Error updating record in %s.%s: %d",
 | 
						|
                frag_table->s->db.str,
 | 
						|
                frag_table->s->table_name.str,
 | 
						|
                error);
 | 
						|
    Wsrep_schema_impl::finish_stmt(thd);
 | 
						|
    thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
 | 
						|
  int ret= Wsrep_schema_impl::end_index_scan(frag_table);
 | 
						|
  Wsrep_schema_impl::finish_stmt(thd);
 | 
						|
  thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
static int remove_fragment(THD*                  thd,
 | 
						|
                           TABLE*                frag_table,
 | 
						|
                           const wsrep::id&      server_id,
 | 
						|
                           wsrep::transaction_id transaction_id,
 | 
						|
                           wsrep::seqno          seqno)
 | 
						|
{
 | 
						|
  WSREP_DEBUG("remove_fragment(%llu) trx %llu, seqno %lld",
 | 
						|
              thd->thread_id,
 | 
						|
              transaction_id.get(),
 | 
						|
              seqno.get());
 | 
						|
  int ret= 0;
 | 
						|
  int error;
 | 
						|
  uchar *key= NULL;
 | 
						|
  key_part_map key_map= 0;
 | 
						|
 | 
						|
  DBUG_ASSERT(server_id.is_undefined() == false);
 | 
						|
  DBUG_ASSERT(transaction_id.is_undefined() == false);
 | 
						|
  DBUG_ASSERT(seqno.is_undefined() == false);
 | 
						|
 | 
						|
  /*
 | 
						|
    Remove record with the given uuid, trx id, and seqno.
 | 
						|
    Using a complete key here avoids gap locks.
 | 
						|
  */
 | 
						|
  Wsrep_schema_impl::store(frag_table, 0, server_id);
 | 
						|
  Wsrep_schema_impl::store(frag_table, 1, transaction_id.get());
 | 
						|
  Wsrep_schema_impl::store(frag_table, 2, seqno.get());
 | 
						|
  Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
 | 
						|
 | 
						|
  if ((error= Wsrep_schema_impl::init_for_index_scan(frag_table,
 | 
						|
                                                     key,
 | 
						|
                                                     key_map)))
 | 
						|
  {
 | 
						|
    if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
 | 
						|
    {
 | 
						|
      WSREP_DEBUG("Record not found in %s.%s:trx %llu, seqno %lld, error %d",
 | 
						|
                 frag_table->s->db.str,
 | 
						|
                 frag_table->s->table_name.str,
 | 
						|
                 transaction_id.get(),
 | 
						|
                 seqno.get(),
 | 
						|
                 error);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      ret= error;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else if (Wsrep_schema_impl::delete_row(frag_table))
 | 
						|
  {
 | 
						|
    ret= 1;
 | 
						|
  }
 | 
						|
 | 
						|
  if (key)
 | 
						|
    my_free(key);
 | 
						|
  Wsrep_schema_impl::end_index_scan(frag_table);
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int Wsrep_schema::remove_fragments(THD* thd,
 | 
						|
                                   const wsrep::id& server_id,
 | 
						|
                                   wsrep::transaction_id transaction_id,
 | 
						|
                                   const std::vector<wsrep::seqno>& fragments)
 | 
						|
{
 | 
						|
  DBUG_ENTER("Wsrep_schema::remove_fragments");
 | 
						|
  int ret= 0;
 | 
						|
 | 
						|
  WSREP_DEBUG("Removing %zu fragments", fragments.size());
 | 
						|
  Wsrep_schema_impl::wsrep_off  wsrep_off(thd);
 | 
						|
  Wsrep_schema_impl::binlog_off binlog_off(thd);
 | 
						|
  Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
 | 
						|
 | 
						|
  Query_tables_list query_tables_list_backup;
 | 
						|
  Open_tables_backup open_tables_backup;
 | 
						|
  thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
 | 
						|
  thd->reset_n_backup_open_tables_state(&open_tables_backup);
 | 
						|
 | 
						|
  TABLE* frag_table= 0;
 | 
						|
  TABLE_LIST frag_table_l;
 | 
						|
  if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table_l))
 | 
						|
  {
 | 
						|
    ret= 1;
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    frag_table= frag_table_l.table;
 | 
						|
    for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
 | 
						|
         i != fragments.end(); ++i)
 | 
						|
    {
 | 
						|
      if (remove_fragment(thd,
 | 
						|
                          frag_table,
 | 
						|
                          server_id,
 | 
						|
                          transaction_id, *i))
 | 
						|
      {
 | 
						|
        ret= 1;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  close_thread_tables(thd);
 | 
						|
  thd->restore_backup_open_tables_state(&open_tables_backup);
 | 
						|
  thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
 | 
						|
 | 
						|
  if (thd->wsrep_cs().mode() == wsrep::client_state::m_local &&
 | 
						|
      !thd->in_multi_stmt_transaction_mode())
 | 
						|
  {
 | 
						|
    /*
 | 
						|
      The ugly part: Locally executing autocommit statement is
 | 
						|
      committing and it has removed a fragment from stable storage.
 | 
						|
      Now calling finish_stmt() will call trans_commit_stmt(), which will
 | 
						|
      actually commit the transaction, what we really don't want
 | 
						|
      to do at this point.
 | 
						|
 | 
						|
      Doing nothing at this point seems to work ok, this block is
 | 
						|
      intentionally no-op and for documentation purposes only.
 | 
						|
    */
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    Wsrep_schema_impl::thd_server_status
 | 
						|
      thd_server_status(thd, thd->server_status | SERVER_STATUS_IN_TRANS,
 | 
						|
                        thd->in_multi_stmt_transaction_mode());
 | 
						|
    Wsrep_schema_impl::finish_stmt(thd);
 | 
						|
  }
 | 
						|
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
int Wsrep_schema::replay_transaction(THD* orig_thd,
 | 
						|
                                     Relay_log_info* rli,
 | 
						|
                                     const wsrep::ws_meta& ws_meta,
 | 
						|
                                     const std::vector<wsrep::seqno>& fragments)
 | 
						|
{
 | 
						|
  DBUG_ENTER("Wsrep_schema::replay_transaction");
 | 
						|
  DBUG_ASSERT(!fragments.empty());
 | 
						|
 | 
						|
  THD thd(next_thread_id(), true);
 | 
						|
  thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
 | 
						|
                     (char*) &thd);
 | 
						|
  wsrep_assign_from_threadvars(&thd);
 | 
						|
 | 
						|
  Wsrep_schema_impl::wsrep_off  wsrep_off(&thd);
 | 
						|
  Wsrep_schema_impl::binlog_off binlog_off(&thd);
 | 
						|
  Wsrep_schema_impl::sql_safe_updates sql_safe_updates(&thd);
 | 
						|
  Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &thd);
 | 
						|
 | 
						|
  int ret= 1;
 | 
						|
  int error;
 | 
						|
  TABLE* frag_table= 0;
 | 
						|
  TABLE_LIST frag_table_l;
 | 
						|
  uchar *key=NULL;
 | 
						|
  key_part_map key_map= 0;
 | 
						|
 | 
						|
  for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
 | 
						|
       i != fragments.end(); ++i)
 | 
						|
  {
 | 
						|
    Wsrep_schema_impl::init_stmt(&thd);
 | 
						|
    if ((error= Wsrep_schema_impl::open_for_read(&thd, sr_table_str.c_str(), &frag_table_l)))
 | 
						|
    {
 | 
						|
      WSREP_WARN("Could not open SR table for read: %d", error);
 | 
						|
      Wsrep_schema_impl::finish_stmt(&thd);
 | 
						|
      DBUG_RETURN(1);
 | 
						|
    }
 | 
						|
    frag_table= frag_table_l.table;
 | 
						|
 | 
						|
    Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id());
 | 
						|
    Wsrep_schema_impl::store(frag_table, 1, ws_meta.transaction_id().get());
 | 
						|
    Wsrep_schema_impl::store(frag_table, 2, i->get());
 | 
						|
    Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
 | 
						|
 | 
						|
    int error= Wsrep_schema_impl::init_for_index_scan(frag_table,
 | 
						|
                                                      key,
 | 
						|
                                                      key_map);
 | 
						|
    if (error)
 | 
						|
    {
 | 
						|
      WSREP_WARN("Failed to init streaming log table for index scan: %d",
 | 
						|
                 error);
 | 
						|
      Wsrep_schema_impl::end_index_scan(frag_table);
 | 
						|
      ret= 1;
 | 
						|
      break;
 | 
						|
    }
 | 
						|
 | 
						|
    int flags;
 | 
						|
    Wsrep_schema_impl::scan(frag_table, 3, flags);
 | 
						|
    WSREP_DEBUG("replay_fragment(%llu): seqno: %lld flags: %x",
 | 
						|
                ws_meta.transaction_id().get(),
 | 
						|
                i->get(),
 | 
						|
                flags);
 | 
						|
    String buf;
 | 
						|
    frag_table->field[4]->val_str(&buf);
 | 
						|
 | 
						|
    {
 | 
						|
      Wsrep_schema_impl::thd_context_switch thd_context_switch(&thd, orig_thd);
 | 
						|
 | 
						|
      ret= wsrep_apply_events(orig_thd, rli, buf.c_ptr_quick(), buf.length());
 | 
						|
      if (ret)
 | 
						|
      {
 | 
						|
        WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    Wsrep_schema_impl::end_index_scan(frag_table);
 | 
						|
    Wsrep_schema_impl::finish_stmt(&thd);
 | 
						|
 | 
						|
    Wsrep_schema_impl::init_stmt(&thd);
 | 
						|
 | 
						|
    if ((error= Wsrep_schema_impl::open_for_write(&thd,
 | 
						|
                                                  sr_table_str.c_str(),
 | 
						|
                                                  &frag_table_l)))
 | 
						|
    {
 | 
						|
      WSREP_WARN("Could not open SR table for write: %d", error);
 | 
						|
      Wsrep_schema_impl::finish_stmt(&thd);
 | 
						|
      DBUG_RETURN(1);
 | 
						|
    }
 | 
						|
    frag_table= frag_table_l.table;
 | 
						|
 | 
						|
    error= Wsrep_schema_impl::init_for_index_scan(frag_table,
 | 
						|
                                                  key,
 | 
						|
                                                  key_map);
 | 
						|
    if (error)
 | 
						|
    {
 | 
						|
      WSREP_WARN("Failed to init streaming log table for index scan: %d",
 | 
						|
                 error);
 | 
						|
      Wsrep_schema_impl::end_index_scan(frag_table);
 | 
						|
      ret= 1;
 | 
						|
      break;
 | 
						|
    }
 | 
						|
 | 
						|
    error= Wsrep_schema_impl::delete_row(frag_table);
 | 
						|
 | 
						|
    if (error)
 | 
						|
    {
 | 
						|
      WSREP_WARN("Could not delete row from streaming log table: %d", error);
 | 
						|
      Wsrep_schema_impl::end_index_scan(frag_table);
 | 
						|
      ret= 1;
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    Wsrep_schema_impl::end_index_scan(frag_table);
 | 
						|
    Wsrep_schema_impl::finish_stmt(&thd);
 | 
						|
    my_free(key);
 | 
						|
    key= NULL;
 | 
						|
  }
 | 
						|
 | 
						|
  if (key)
 | 
						|
    my_free(key);
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 | 
						|
 | 
						|
int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
 | 
						|
{
 | 
						|
  DBUG_ENTER("Wsrep_schema::recover_sr_transactions");
 | 
						|
  THD storage_thd(next_thread_id(), true);
 | 
						|
  storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
 | 
						|
                             (char*) &storage_thd);
 | 
						|
  wsrep_assign_from_threadvars(&storage_thd);
 | 
						|
  TABLE* frag_table= 0;
 | 
						|
  TABLE_LIST frag_table_l;
 | 
						|
  TABLE* cluster_table= 0;
 | 
						|
  TABLE_LIST cluster_table_l;
 | 
						|
  Wsrep_storage_service storage_service(&storage_thd);
 | 
						|
  Wsrep_schema_impl::binlog_off binlog_off(&storage_thd);
 | 
						|
  Wsrep_schema_impl::wsrep_off wsrep_off(&storage_thd);
 | 
						|
  Wsrep_schema_impl::sql_safe_updates sql_safe_updates(&storage_thd);
 | 
						|
  Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd,
 | 
						|
                                                           &storage_thd);
 | 
						|
  Wsrep_server_state& server_state(Wsrep_server_state::instance());
 | 
						|
 | 
						|
  int ret= 1;
 | 
						|
  int error;
 | 
						|
  wsrep::id cluster_id;
 | 
						|
 | 
						|
  Wsrep_schema_impl::init_stmt(&storage_thd);
 | 
						|
  storage_thd.wsrep_skip_locking= FALSE;
 | 
						|
  if (Wsrep_schema_impl::open_for_read(&storage_thd, cluster_table_str.c_str(),
 | 
						|
                                       &cluster_table_l))
 | 
						|
  {
 | 
						|
    Wsrep_schema_impl::finish_stmt(&storage_thd);
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
  cluster_table= cluster_table_l.table;
 | 
						|
 | 
						|
  if (Wsrep_schema_impl::init_for_scan(cluster_table))
 | 
						|
  {
 | 
						|
    Wsrep_schema_impl::finish_stmt(&storage_thd);
 | 
						|
    DBUG_RETURN(1);
 | 
						|
  }
 | 
						|
 | 
						|
  if ((error= Wsrep_schema_impl::next_record(cluster_table)))
 | 
						|
  {
 | 
						|
    Wsrep_schema_impl::end_scan(cluster_table);
 | 
						|
    Wsrep_schema_impl::finish_stmt(&storage_thd);
 | 
						|
    trans_commit(&storage_thd);
 | 
						|
    if (error == HA_ERR_END_OF_FILE)
 | 
						|
    {
 | 
						|
      WSREP_INFO("Cluster table is empty, not recovering transactions");
 | 
						|
      DBUG_RETURN(0);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      WSREP_ERROR("Failed to read cluster table: %d", error);
 | 
						|
      DBUG_RETURN(1);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Wsrep_schema_impl::scan(cluster_table, 0, cluster_id);
 | 
						|
  Wsrep_schema_impl::end_scan(cluster_table);
 | 
						|
  Wsrep_schema_impl::finish_stmt(&storage_thd);
 | 
						|
 | 
						|
  std::ostringstream os;
 | 
						|
  os << cluster_id;
 | 
						|
  WSREP_INFO("Recovered cluster id %s", os.str().c_str());
 | 
						|
 | 
						|
  storage_thd.wsrep_skip_locking= TRUE;
 | 
						|
  Wsrep_schema_impl::init_stmt(&storage_thd);
 | 
						|
 | 
						|
  /*
 | 
						|
    Open the table for reading and writing so that fragments without
 | 
						|
    valid seqno can be deleted.
 | 
						|
  */
 | 
						|
  if (Wsrep_schema_impl::open_for_write(&storage_thd, sr_table_str.c_str(),
 | 
						|
                                        &frag_table_l))
 | 
						|
  {
 | 
						|
    WSREP_ERROR("Failed to open SR table for write");
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
  frag_table= frag_table_l.table;
 | 
						|
 | 
						|
  if (Wsrep_schema_impl::init_for_scan(frag_table))
 | 
						|
  {
 | 
						|
    WSREP_ERROR("Failed to init for index scan");
 | 
						|
    goto out;
 | 
						|
  }
 | 
						|
 | 
						|
  while (0 == error)
 | 
						|
  {
 | 
						|
    if ((error= Wsrep_schema_impl::next_record(frag_table)) == 0)
 | 
						|
    {
 | 
						|
      wsrep::id server_id;
 | 
						|
      Wsrep_schema_impl::scan(frag_table, 0, server_id);
 | 
						|
      wsrep::client_id client_id;
 | 
						|
      unsigned long long transaction_id_ull;
 | 
						|
      Wsrep_schema_impl::scan(frag_table, 1, transaction_id_ull);
 | 
						|
      wsrep::transaction_id transaction_id(transaction_id_ull);
 | 
						|
      long long seqno_ll;
 | 
						|
      Wsrep_schema_impl::scan(frag_table, 2, seqno_ll);
 | 
						|
      wsrep::seqno seqno(seqno_ll);
 | 
						|
 | 
						|
      /* This is possible if the server crashes between inserting the
 | 
						|
         fragment into table and updating the fragment seqno after
 | 
						|
         certification. */
 | 
						|
      if (seqno.is_undefined())
 | 
						|
      {
 | 
						|
        Wsrep_schema_impl::delete_row(frag_table);
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
 | 
						|
      wsrep::gtid gtid(cluster_id, seqno);
 | 
						|
      int flags;
 | 
						|
      Wsrep_schema_impl::scan(frag_table, 3, flags);
 | 
						|
      String data_str;
 | 
						|
 | 
						|
      (void)frag_table->field[4]->val_str(&data_str);
 | 
						|
      wsrep::const_buffer data(data_str.c_ptr_quick(), data_str.length());
 | 
						|
      wsrep::ws_meta ws_meta(gtid,
 | 
						|
                             wsrep::stid(server_id,
 | 
						|
                                         transaction_id,
 | 
						|
                                         client_id),
 | 
						|
                             wsrep::seqno::undefined(),
 | 
						|
                             flags);
 | 
						|
 | 
						|
      wsrep::high_priority_service* applier;
 | 
						|
      if (!(applier= server_state.find_streaming_applier(server_id,
 | 
						|
                                                         transaction_id)))
 | 
						|
      {
 | 
						|
        DBUG_ASSERT(wsrep::starts_transaction(flags));
 | 
						|
        applier = wsrep_create_streaming_applier(&storage_thd, "recovery");
 | 
						|
        server_state.start_streaming_applier(server_id, transaction_id,
 | 
						|
                                             applier);
 | 
						|
        applier->start_transaction(wsrep::ws_handle(transaction_id, 0),
 | 
						|
                                   ws_meta);
 | 
						|
      }
 | 
						|
      applier->store_globals();
 | 
						|
      wsrep::mutable_buffer unused;
 | 
						|
      if ((ret= applier->apply_write_set(ws_meta, data, unused)) != 0)
 | 
						|
      {
 | 
						|
        WSREP_ERROR("SR trx recovery applying returned %d", ret);
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        applier->after_apply();
 | 
						|
      }
 | 
						|
      storage_service.store_globals();
 | 
						|
    }
 | 
						|
    else if (error == HA_ERR_END_OF_FILE)
 | 
						|
    {
 | 
						|
      ret= 0;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      WSREP_ERROR("SR table scan returned error %d", error);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  Wsrep_schema_impl::end_scan(frag_table);
 | 
						|
  Wsrep_schema_impl::finish_stmt(&storage_thd);
 | 
						|
  trans_commit(&storage_thd);
 | 
						|
  storage_thd.set_mysys_var(0);
 | 
						|
out:
 | 
						|
  DBUG_RETURN(ret);
 | 
						|
}
 |