mirror of
https://github.com/MariaDB/server.git
synced 2025-01-18 21:12:26 +01:00
a940151eec
Set temporary `SERVER_STATUS_IN_TRANS` so assert is not triggered in `trans_commit_stmt`.
1381 lines
40 KiB
C++
1381 lines
40 KiB
C++
/* Copyright (C) 2015-2019 Codership Oy <info@codership.com>
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License along
|
|
with this program; if not, write to the Free Software Foundation, Inc.,
|
|
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
|
|
|
|
#include "mariadb.h"
|
|
|
|
#include "table.h"
|
|
#include "key.h"
|
|
#include "sql_base.h"
|
|
#include "sql_parse.h"
|
|
#include "sql_update.h"
|
|
#include "transaction.h"
|
|
|
|
#include "mysql/service_wsrep.h"
|
|
#include "wsrep_schema.h"
|
|
#include "wsrep_applier.h"
|
|
#include "wsrep_xid.h"
|
|
#include "wsrep_binlog.h"
|
|
#include "wsrep_high_priority_service.h"
|
|
#include "wsrep_storage_service.h"
|
|
#include "wsrep_thd.h"
|
|
|
|
#include <string>
|
|
#include <sstream>
|
|
|
|
#define WSREP_SCHEMA "mysql"
|
|
#define WSREP_STREAMING_TABLE "wsrep_streaming_log"
|
|
#define WSREP_CLUSTER_TABLE "wsrep_cluster"
|
|
#define WSREP_MEMBERS_TABLE "wsrep_cluster_members"
|
|
|
|
const char* wsrep_sr_table_name_full= WSREP_SCHEMA "/" WSREP_STREAMING_TABLE;
|
|
|
|
static const std::string wsrep_schema_str= WSREP_SCHEMA;
|
|
static const std::string sr_table_str= WSREP_STREAMING_TABLE;
|
|
static const std::string cluster_table_str= WSREP_CLUSTER_TABLE;
|
|
static const std::string members_table_str= WSREP_MEMBERS_TABLE;
|
|
|
|
static const std::string create_cluster_table_str=
|
|
"CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + cluster_table_str +
|
|
"("
|
|
"cluster_uuid CHAR(36) PRIMARY KEY,"
|
|
"view_id BIGINT NOT NULL,"
|
|
"view_seqno BIGINT NOT NULL,"
|
|
"protocol_version INT NOT NULL,"
|
|
"capabilities INT NOT NULL"
|
|
") ENGINE=InnoDB";
|
|
|
|
static const std::string create_members_table_str=
|
|
"CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + members_table_str +
|
|
"("
|
|
"node_uuid CHAR(36) PRIMARY KEY,"
|
|
"cluster_uuid CHAR(36) NOT NULL,"
|
|
"node_name CHAR(32) NOT NULL,"
|
|
"node_incoming_address VARCHAR(256) NOT NULL"
|
|
") ENGINE=InnoDB";
|
|
|
|
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
|
|
static const std::string cluster_member_history_table_str= "wsrep_cluster_member_history";
|
|
static const std::string create_members_history_table_str=
|
|
"CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + cluster_member_history_table_str +
|
|
"("
|
|
"node_uuid CHAR(36) PRIMARY KEY,"
|
|
"cluster_uuid CHAR(36) NOT NULL,"
|
|
"last_view_id BIGINT NOT NULL,"
|
|
"last_view_seqno BIGINT NOT NULL,"
|
|
"node_name CHAR(32) NOT NULL,"
|
|
"node_incoming_address VARCHAR(256) NOT NULL"
|
|
") ENGINE=InnoDB";
|
|
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
|
|
|
|
static const std::string create_frag_table_str=
|
|
"CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + sr_table_str +
|
|
"("
|
|
"node_uuid CHAR(36), "
|
|
"trx_id BIGINT, "
|
|
"seqno BIGINT, "
|
|
"flags INT NOT NULL, "
|
|
"frag LONGBLOB NOT NULL, "
|
|
"PRIMARY KEY (node_uuid, trx_id, seqno)"
|
|
") ENGINE=InnoDB";
|
|
|
|
static const std::string delete_from_cluster_table=
|
|
"DELETE FROM " + wsrep_schema_str + "." + cluster_table_str;
|
|
|
|
static const std::string delete_from_members_table=
|
|
"DELETE FROM " + wsrep_schema_str + "." + members_table_str;
|
|
|
|
namespace Wsrep_schema_impl
|
|
{
|
|
|
|
class binlog_off
|
|
{
|
|
public:
|
|
binlog_off(THD* thd)
|
|
: m_thd(thd)
|
|
, m_option_bits(thd->variables.option_bits)
|
|
, m_sql_log_bin(thd->variables.sql_log_bin)
|
|
{
|
|
thd->variables.option_bits&= ~OPTION_BIN_LOG;
|
|
thd->variables.sql_log_bin= 0;
|
|
}
|
|
~binlog_off()
|
|
{
|
|
m_thd->variables.option_bits= m_option_bits;
|
|
m_thd->variables.sql_log_bin= m_sql_log_bin;
|
|
}
|
|
private:
|
|
THD* m_thd;
|
|
ulonglong m_option_bits;
|
|
my_bool m_sql_log_bin;
|
|
};
|
|
|
|
class wsrep_off
|
|
{
|
|
public:
|
|
wsrep_off(THD* thd)
|
|
: m_thd(thd)
|
|
, m_wsrep_on(thd->variables.wsrep_on)
|
|
{
|
|
thd->variables.wsrep_on= 0;
|
|
}
|
|
~wsrep_off()
|
|
{
|
|
m_thd->variables.wsrep_on= m_wsrep_on;
|
|
}
|
|
private:
|
|
THD* m_thd;
|
|
my_bool m_wsrep_on;
|
|
};
|
|
|
|
class thd_server_status
|
|
{
|
|
public:
|
|
thd_server_status(THD* thd, uint server_status, bool condition)
|
|
: m_thd(thd)
|
|
, m_thd_server_status(thd->server_status)
|
|
{
|
|
if (condition)
|
|
thd->server_status= server_status;
|
|
}
|
|
~thd_server_status()
|
|
{
|
|
m_thd->server_status= m_thd_server_status;
|
|
}
|
|
private:
|
|
THD* m_thd;
|
|
uint m_thd_server_status;
|
|
};
|
|
|
|
class thd_context_switch
|
|
{
|
|
public:
|
|
thd_context_switch(THD *orig_thd, THD *cur_thd)
|
|
: m_orig_thd(orig_thd)
|
|
, m_cur_thd(cur_thd)
|
|
{
|
|
wsrep_reset_threadvars(m_orig_thd);
|
|
wsrep_store_threadvars(m_cur_thd);
|
|
}
|
|
~thd_context_switch()
|
|
{
|
|
wsrep_reset_threadvars(m_cur_thd);
|
|
wsrep_store_threadvars(m_orig_thd);
|
|
}
|
|
private:
|
|
THD *m_orig_thd;
|
|
THD *m_cur_thd;
|
|
};
|
|
|
|
static int execute_SQL(THD* thd, const char* sql, uint length) {
|
|
DBUG_ENTER("Wsrep_schema::execute_SQL()");
|
|
int err= 0;
|
|
|
|
PSI_statement_locker *parent_locker= thd->m_statement_psi;
|
|
Parser_state parser_state;
|
|
|
|
WSREP_DEBUG("SQL: %d %s thd: %lld", length, sql, (long long)thd->thread_id);
|
|
|
|
if (parser_state.init(thd, (char*)sql, length) == 0) {
|
|
thd->reset_for_next_command();
|
|
lex_start(thd);
|
|
|
|
thd->m_statement_psi= NULL;
|
|
|
|
thd->set_query((char*)sql, length);
|
|
thd->set_query_id(next_query_id());
|
|
|
|
mysql_parse(thd, (char*)sql, length, & parser_state, FALSE, FALSE);
|
|
|
|
if (thd->is_error()) {
|
|
WSREP_WARN("Wsrep_schema::execute_sql() failed, %d %s\nSQL: %s",
|
|
thd->get_stmt_da()->sql_errno(),
|
|
thd->get_stmt_da()->message(),
|
|
sql);
|
|
err= 1;
|
|
}
|
|
thd->m_statement_psi= parent_locker;
|
|
thd->end_statement();
|
|
thd->reset_query();
|
|
close_thread_tables(thd);
|
|
delete_explain_query(thd->lex);
|
|
}
|
|
else {
|
|
WSREP_WARN("SR init failure");
|
|
}
|
|
thd->cleanup_after_query();
|
|
DBUG_RETURN(err);
|
|
}
|
|
|
|
/*
|
|
Initialize thd for next "statement"
|
|
*/
|
|
static void init_stmt(THD* thd) {
|
|
thd->reset_for_next_command();
|
|
}
|
|
|
|
static void finish_stmt(THD* thd) {
|
|
trans_commit_stmt(thd);
|
|
close_thread_tables(thd);
|
|
}
|
|
|
|
static int open_table(THD* thd,
|
|
const LEX_CSTRING *schema_name,
|
|
const LEX_CSTRING *table_name,
|
|
enum thr_lock_type const lock_type,
|
|
TABLE** table) {
|
|
assert(table);
|
|
*table= NULL;
|
|
|
|
DBUG_ENTER("Wsrep_schema::open_table()");
|
|
|
|
TABLE_LIST tables;
|
|
uint flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
|
|
MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY |
|
|
MYSQL_OPEN_IGNORE_FLUSH |
|
|
MYSQL_LOCK_IGNORE_TIMEOUT);
|
|
|
|
tables.init_one_table(schema_name,
|
|
table_name,
|
|
NULL, lock_type);
|
|
|
|
if (!open_n_lock_single_table(thd, &tables, tables.lock_type, flags)) {
|
|
close_thread_tables(thd);
|
|
my_error(ER_NO_SUCH_TABLE, MYF(0), schema_name->str, table_name->str);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
*table= tables.table;
|
|
(*table)->use_all_columns();
|
|
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
|
|
static int open_for_write(THD* thd, const char* table_name, TABLE** table) {
|
|
LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
|
|
LEX_CSTRING table_str= { table_name, strlen(table_name) };
|
|
if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_WRITE,
|
|
table)) {
|
|
WSREP_ERROR("Failed to open table %s.%s for writing",
|
|
schema_str.str, table_name);
|
|
return 1;
|
|
}
|
|
empty_record(*table);
|
|
(*table)->use_all_columns();
|
|
restore_record(*table, s->default_values);
|
|
return 0;
|
|
}
|
|
|
|
static void store(TABLE* table, uint field, const Wsrep_id& id) {
|
|
assert(field < table->s->fields);
|
|
std::ostringstream os;
|
|
os << id;
|
|
table->field[field]->store(os.str().c_str(),
|
|
os.str().size(),
|
|
&my_charset_bin);
|
|
}
|
|
|
|
|
|
template <typename INTTYPE>
|
|
static void store(TABLE* table, uint field, const INTTYPE val) {
|
|
assert(field < table->s->fields);
|
|
table->field[field]->store(val);
|
|
}
|
|
|
|
template <typename CHARTYPE>
|
|
static void store(TABLE* table, uint field, const CHARTYPE* str, size_t str_len) {
|
|
assert(field < table->s->fields);
|
|
table->field[field]->store((const char*)str,
|
|
str_len,
|
|
&my_charset_bin);
|
|
}
|
|
|
|
static void store(TABLE* table, uint field, const std::string& str)
|
|
{
|
|
store(table, field, str.c_str(), str.size());
|
|
}
|
|
|
|
static int update_or_insert(TABLE* table) {
|
|
DBUG_ENTER("Wsrep_schema::update_or_insert()");
|
|
int ret= 0;
|
|
char* key;
|
|
int error;
|
|
|
|
/*
|
|
Verify that the table has primary key defined.
|
|
*/
|
|
if (table->s->primary_key >= MAX_KEY ||
|
|
!table->s->keys_in_use.is_set(table->s->primary_key)) {
|
|
WSREP_ERROR("No primary key for %s.%s",
|
|
table->s->db.str, table->s->table_name.str);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
/*
|
|
Find the record and update or insert a new one if not found.
|
|
*/
|
|
if (!(key= (char*) my_safe_alloca(table->s->max_unique_length))) {
|
|
WSREP_ERROR("Error allocating %ud bytes for key",
|
|
table->s->max_unique_length);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
key_copy((uchar*) key, table->record[0],
|
|
table->key_info + table->s->primary_key, 0);
|
|
|
|
if ((error= table->file->ha_index_read_idx_map(table->record[1],
|
|
table->s->primary_key,
|
|
(uchar*) key,
|
|
HA_WHOLE_KEY,
|
|
HA_READ_KEY_EXACT))) {
|
|
/*
|
|
Row not found, insert a new one.
|
|
*/
|
|
if ((error= table->file->ha_write_row(table->record[0]))) {
|
|
WSREP_ERROR("Error writing into %s.%s: %d",
|
|
table->s->db.str,
|
|
table->s->table_name.str,
|
|
error);
|
|
ret= 1;
|
|
}
|
|
}
|
|
else if (!records_are_comparable(table) || compare_record(table)) {
|
|
/*
|
|
Record has changed
|
|
*/
|
|
if ((error= table->file->ha_update_row(table->record[1],
|
|
table->record[0])) &&
|
|
error != HA_ERR_RECORD_IS_THE_SAME) {
|
|
WSREP_ERROR("Error updating record in %s.%s: %d",
|
|
table->s->db.str,
|
|
table->s->table_name.str,
|
|
error);
|
|
ret= 1;
|
|
}
|
|
}
|
|
|
|
my_safe_afree(key, table->s->max_unique_length);
|
|
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
static int insert(TABLE* table) {
|
|
DBUG_ENTER("Wsrep_schema::insert()");
|
|
int ret= 0;
|
|
int error;
|
|
|
|
/*
|
|
Verify that the table has primary key defined.
|
|
*/
|
|
if (table->s->primary_key >= MAX_KEY ||
|
|
!table->s->keys_in_use.is_set(table->s->primary_key)) {
|
|
WSREP_ERROR("No primary key for %s.%s",
|
|
table->s->db.str, table->s->table_name.str);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
if ((error= table->file->ha_write_row(table->record[0]))) {
|
|
WSREP_ERROR("Error writing into %s.%s: %d",
|
|
table->s->db.str,
|
|
table->s->table_name.str,
|
|
error);
|
|
ret= 1;
|
|
}
|
|
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
static int delete_row(TABLE* table) {
|
|
int error;
|
|
int retry= 3;
|
|
|
|
do {
|
|
error= table->file->ha_delete_row(table->record[0]);
|
|
retry--;
|
|
} while (error && retry);
|
|
|
|
if (error) {
|
|
WSREP_ERROR("Error deleting row from %s.%s: %d",
|
|
table->s->db.str,
|
|
table->s->table_name.str,
|
|
error);
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int open_for_read(THD* thd, const char* table_name, TABLE** table) {
|
|
|
|
LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
|
|
LEX_CSTRING table_str= { table_name, strlen(table_name) };
|
|
if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_READ,
|
|
table)) {
|
|
WSREP_ERROR("Failed to open table %s.%s for reading",
|
|
schema_str.str, table_name);
|
|
return 1;
|
|
}
|
|
empty_record(*table);
|
|
(*table)->use_all_columns();
|
|
restore_record(*table, s->default_values);
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
Init table for sequential scan.
|
|
|
|
@return 0 in case of success, 1 in case of error.
|
|
*/
|
|
static int init_for_scan(TABLE* table) {
|
|
int error;
|
|
if ((error= table->file->ha_rnd_init(TRUE))) {
|
|
WSREP_ERROR("Failed to init table for scan: %d", error);
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
/*
|
|
Scan next record. For return codes see handler::ha_rnd_next()
|
|
|
|
@return 0 in case of success, error code in case of error
|
|
*/
|
|
static int next_record(TABLE* table) {
|
|
int error;
|
|
if ((error= table->file->ha_rnd_next(table->record[0])) &&
|
|
error != HA_ERR_END_OF_FILE) {
|
|
WSREP_ERROR("Failed to read next record: %d", error);
|
|
}
|
|
return error;
|
|
}
|
|
|
|
/*
|
|
End scan.
|
|
|
|
@return 0 in case of success, 1 in case of error.
|
|
*/
|
|
static int end_scan(TABLE* table) {
|
|
int error;
|
|
if ((error= table->file->ha_rnd_end())) {
|
|
WSREP_ERROR("Failed to end scan: %d", error);
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int scan(TABLE* table, uint field, wsrep::id& id)
|
|
{
|
|
assert(field < table->s->fields);
|
|
String uuid_str;
|
|
(void)table->field[field]->val_str(&uuid_str);
|
|
id= wsrep::id(std::string(uuid_str.c_ptr(), uuid_str.length()));
|
|
return 0;
|
|
}
|
|
|
|
template <typename INTTYPE>
|
|
static int scan(TABLE* table, uint field, INTTYPE& val)
|
|
{
|
|
assert(field < table->s->fields);
|
|
val= table->field[field]->val_int();
|
|
return 0;
|
|
}
|
|
|
|
static int scan(TABLE* table, uint field, char* strbuf, uint strbuf_len)
|
|
{
|
|
String str;
|
|
(void)table->field[field]->val_str(&str);
|
|
LEX_CSTRING tmp= str.lex_cstring();
|
|
uint len = tmp.length;
|
|
strncpy(strbuf, tmp.str, std::min(len, strbuf_len));
|
|
strbuf[strbuf_len - 1]= '\0';
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
Scan member
|
|
TODO: filter members by cluster UUID
|
|
*/
|
|
static int scan_member(TABLE* table,
|
|
const Wsrep_id& cluster_uuid,
|
|
std::vector<Wsrep_view::member>& members)
|
|
{
|
|
Wsrep_id member_id;
|
|
char member_name[128]= { 0, };
|
|
char member_incoming[128]= { 0, };
|
|
|
|
if (scan(table, 0, member_id) ||
|
|
scan(table, 2, member_name, sizeof(member_name)) ||
|
|
scan(table, 3, member_incoming, sizeof(member_incoming))) {
|
|
return 1;
|
|
}
|
|
|
|
if (members.empty() == false) {
|
|
assert(members.rbegin()->id() < member_id);
|
|
}
|
|
|
|
try {
|
|
members.push_back(Wsrep_view::member(member_id,
|
|
member_name,
|
|
member_incoming));
|
|
}
|
|
catch (...) {
|
|
WSREP_ERROR("Caught exception while scanning members table");
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
Init table for index scan and retrieve first record
|
|
|
|
@return 0 in case of success, error code in case of error.
|
|
*/
|
|
static int init_for_index_scan(TABLE* table, const uchar* key,
|
|
key_part_map map) {
|
|
int error;
|
|
if ((error= table->file->ha_index_init(table->s->primary_key, true))) {
|
|
WSREP_ERROR("Failed to init table for index scan: %d", error);
|
|
return error;
|
|
}
|
|
|
|
error= table->file->ha_index_read_map(table->record[0],
|
|
key, map, HA_READ_KEY_EXACT);
|
|
switch(error) {
|
|
case 0:
|
|
case HA_ERR_END_OF_FILE:
|
|
case HA_ERR_KEY_NOT_FOUND:
|
|
case HA_ERR_ABORTED_BY_USER:
|
|
break;
|
|
case -1:
|
|
WSREP_DEBUG("init_for_index_scan interrupted");
|
|
break;
|
|
default:
|
|
WSREP_ERROR("init_for_index_scan failed to read first record, error %d", error);
|
|
}
|
|
return error;
|
|
}
|
|
|
|
/*
|
|
End index scan.
|
|
|
|
@return 0 in case of success, 1 in case of error.
|
|
*/
|
|
static int end_index_scan(TABLE* table) {
|
|
int error;
|
|
if ((error= table->file->ha_index_end())) {
|
|
WSREP_ERROR("Failed to end scan: %d", error);
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static void make_key(TABLE* table, uchar* key, key_part_map* map, int parts) {
|
|
uint prefix_length= 0;
|
|
KEY_PART_INFO* key_part= table->key_info->key_part;
|
|
for (int i=0; i < parts; i++)
|
|
prefix_length += key_part[i].store_length;
|
|
*map= make_prev_keypart_map(parts);
|
|
key_copy(key, table->record[0], table->key_info, prefix_length);
|
|
}
|
|
} /* namespace Wsrep_schema_impl */
|
|
|
|
|
|
Wsrep_schema::Wsrep_schema()
|
|
{
|
|
}
|
|
|
|
Wsrep_schema::~Wsrep_schema()
|
|
{ }
|
|
|
|
static void wsrep_init_thd_for_schema(THD *thd)
|
|
{
|
|
thd->security_ctx->skip_grants();
|
|
thd->system_thread= SYSTEM_THREAD_GENERIC;
|
|
|
|
thd->real_id=pthread_self(); // Keep purify happy
|
|
|
|
thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime;
|
|
|
|
/* */
|
|
thd->variables.wsrep_on = 0;
|
|
/* No binlogging */
|
|
thd->variables.sql_log_bin = 0;
|
|
thd->variables.option_bits &= ~OPTION_BIN_LOG;
|
|
/* No general log */
|
|
thd->variables.option_bits |= OPTION_LOG_OFF;
|
|
/* Read committed isolation to avoid gap locking */
|
|
thd->variables.tx_isolation= ISO_READ_COMMITTED;
|
|
wsrep_assign_from_threadvars(thd);
|
|
wsrep_store_threadvars(thd);
|
|
}
|
|
|
|
int Wsrep_schema::init()
|
|
{
|
|
DBUG_ENTER("Wsrep_schema::init()");
|
|
int ret;
|
|
THD* thd= new THD(next_thread_id());
|
|
if (!thd) {
|
|
WSREP_ERROR("Unable to get thd");
|
|
DBUG_RETURN(1);
|
|
}
|
|
thd->thread_stack= (char*)&thd;
|
|
wsrep_init_thd_for_schema(thd);
|
|
|
|
if (Wsrep_schema_impl::execute_SQL(thd, create_cluster_table_str.c_str(),
|
|
create_cluster_table_str.size()) ||
|
|
Wsrep_schema_impl::execute_SQL(thd, create_members_table_str.c_str(),
|
|
create_members_table_str.size()) ||
|
|
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
|
|
Wsrep_schema_impl::execute_SQL(thd,
|
|
create_members_history_table_str.c_str(),
|
|
create_members_history_table_str.size()) ||
|
|
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
|
|
Wsrep_schema_impl::execute_SQL(thd,
|
|
create_frag_table_str.c_str(),
|
|
create_frag_table_str.size())) {
|
|
ret= 1;
|
|
}
|
|
else {
|
|
ret= 0;
|
|
}
|
|
|
|
delete thd;
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view)
|
|
{
|
|
DBUG_ENTER("Wsrep_schema::store_view()");
|
|
assert(view.status() == Wsrep_view::primary);
|
|
int ret= 1;
|
|
int error;
|
|
TABLE* cluster_table= 0;
|
|
TABLE* members_table= 0;
|
|
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
|
|
TABLE* members_history_table= 0;
|
|
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
|
|
|
|
Wsrep_schema_impl::wsrep_off wsrep_off(thd);
|
|
Wsrep_schema_impl::binlog_off binlog_off(thd);
|
|
|
|
/*
|
|
Clean up cluster table and members table.
|
|
*/
|
|
if (Wsrep_schema_impl::execute_SQL(thd,
|
|
delete_from_cluster_table.c_str(),
|
|
delete_from_cluster_table.size()) ||
|
|
Wsrep_schema_impl::execute_SQL(thd,
|
|
delete_from_members_table.c_str(),
|
|
delete_from_members_table.size())) {
|
|
goto out;
|
|
}
|
|
|
|
/*
|
|
Store cluster view info
|
|
*/
|
|
Wsrep_schema_impl::init_stmt(thd);
|
|
if (Wsrep_schema_impl::open_for_write(thd, cluster_table_str.c_str(), &cluster_table))
|
|
{
|
|
goto out;
|
|
}
|
|
|
|
Wsrep_schema_impl::store(cluster_table, 0, view.state_id().id());
|
|
Wsrep_schema_impl::store(cluster_table, 1, view.view_seqno().get());
|
|
Wsrep_schema_impl::store(cluster_table, 2, view.state_id().seqno().get());
|
|
Wsrep_schema_impl::store(cluster_table, 3, view.protocol_version());
|
|
Wsrep_schema_impl::store(cluster_table, 4, view.capabilities());
|
|
|
|
if ((error= Wsrep_schema_impl::update_or_insert(cluster_table)))
|
|
{
|
|
WSREP_ERROR("failed to write to cluster table: %d", error);
|
|
goto out;
|
|
}
|
|
|
|
Wsrep_schema_impl::finish_stmt(thd);
|
|
|
|
/*
|
|
Store info about current members
|
|
*/
|
|
Wsrep_schema_impl::init_stmt(thd);
|
|
if (Wsrep_schema_impl::open_for_write(thd, members_table_str.c_str(),
|
|
&members_table))
|
|
{
|
|
WSREP_ERROR("failed to open wsrep.members table");
|
|
goto out;
|
|
}
|
|
|
|
for (size_t i= 0; i < view.members().size(); ++i)
|
|
{
|
|
Wsrep_schema_impl::store(members_table, 0, view.members()[i].id());
|
|
Wsrep_schema_impl::store(members_table, 1, view.state_id().id());
|
|
Wsrep_schema_impl::store(members_table, 2, view.members()[i].name());
|
|
Wsrep_schema_impl::store(members_table, 3, view.members()[i].incoming());
|
|
if ((error= Wsrep_schema_impl::update_or_insert(members_table)))
|
|
{
|
|
WSREP_ERROR("failed to write wsrep.members table: %d", error);
|
|
goto out;
|
|
}
|
|
}
|
|
Wsrep_schema_impl::finish_stmt(thd);
|
|
|
|
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
|
|
/*
|
|
Store members history
|
|
*/
|
|
Wsrep_schema_impl::init_stmt(thd);
|
|
if (Wsrep_schema_impl::open_for_write(thd, cluster_member_history.c_str(),
|
|
&members_history_table)) {
|
|
WSREP_ERROR("failed to open wsrep.members table");
|
|
goto out;
|
|
}
|
|
|
|
for (size_t i= 0; i < view.members().size(); ++i) {
|
|
Wsrep_schema_impl::store(members_history_table, 0, view.members()[i].id());
|
|
Wsrep_schema_impl::store(members_history_table, 1, view.state_id().id());
|
|
Wsrep_schema_impl::store(members_history_table, 2, view.view_seqno());
|
|
Wsrep_schema_impl::store(members_history_table, 3, view.state_id().seqno());
|
|
Wsrep_schema_impl::store(members_history_table, 4,
|
|
view.members()[i].name());
|
|
Wsrep_schema_impl::store(members_history_table, 5,
|
|
view.members()[i].incoming());
|
|
if ((error= Wsrep_schema_impl::update_or_insert(members_history_table))) {
|
|
WSREP_ERROR("failed to write wsrep_cluster_member_history table: %d", error);
|
|
goto out;
|
|
}
|
|
}
|
|
Wsrep_schema_impl::finish_stmt(thd);
|
|
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
|
|
ret= 0;
|
|
out:
|
|
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
Wsrep_view Wsrep_schema::restore_view(THD* thd, const Wsrep_id& own_id) const {
|
|
DBUG_ENTER("Wsrep_schema::restore_view()");
|
|
|
|
int ret= 1;
|
|
int error;
|
|
|
|
TABLE* cluster_table= 0;
|
|
bool end_cluster_scan= false;
|
|
TABLE* members_table= 0;
|
|
bool end_members_scan= false;
|
|
|
|
/* variables below need to be initialized in case cluster table is empty */
|
|
Wsrep_id cluster_uuid;
|
|
wsrep_seqno_t view_id= -1;
|
|
wsrep_seqno_t view_seqno= -1;
|
|
int my_idx= -1;
|
|
int proto_ver= 0;
|
|
wsrep_cap_t capabilities= 0;
|
|
std::vector<Wsrep_view::member> members;
|
|
|
|
// we don't want causal waits for reading non-replicated private data
|
|
int const wsrep_sync_wait_saved= thd->variables.wsrep_sync_wait;
|
|
thd->variables.wsrep_sync_wait= 0;
|
|
|
|
if (trans_begin(thd, MYSQL_START_TRANS_OPT_READ_ONLY)) {
|
|
WSREP_ERROR("wsrep_schema::restore_view(): Failed to start transaction");
|
|
goto out;
|
|
}
|
|
|
|
/*
|
|
Read cluster info from cluster table
|
|
*/
|
|
Wsrep_schema_impl::init_stmt(thd);
|
|
if (Wsrep_schema_impl::open_for_read(thd, cluster_table_str.c_str(), &cluster_table) ||
|
|
Wsrep_schema_impl::init_for_scan(cluster_table)) {
|
|
goto out;
|
|
}
|
|
|
|
if (((error= Wsrep_schema_impl::next_record(cluster_table)) != 0 ||
|
|
Wsrep_schema_impl::scan(cluster_table, 0, cluster_uuid) ||
|
|
Wsrep_schema_impl::scan(cluster_table, 1, view_id) ||
|
|
Wsrep_schema_impl::scan(cluster_table, 2, view_seqno) ||
|
|
Wsrep_schema_impl::scan(cluster_table, 3, proto_ver) ||
|
|
Wsrep_schema_impl::scan(cluster_table, 4, capabilities)) &&
|
|
error != HA_ERR_END_OF_FILE) {
|
|
end_cluster_scan= true;
|
|
goto out;
|
|
}
|
|
|
|
if (Wsrep_schema_impl::end_scan(cluster_table)) {
|
|
goto out;
|
|
}
|
|
Wsrep_schema_impl::finish_stmt(thd);
|
|
|
|
/*
|
|
Read members from members table
|
|
*/
|
|
Wsrep_schema_impl::init_stmt(thd);
|
|
if (Wsrep_schema_impl::open_for_read(thd, members_table_str.c_str(), &members_table) ||
|
|
Wsrep_schema_impl::init_for_scan(members_table)) {
|
|
goto out;
|
|
}
|
|
end_members_scan= true;
|
|
|
|
while (true) {
|
|
if ((error= Wsrep_schema_impl::next_record(members_table)) == 0) {
|
|
if (Wsrep_schema_impl::scan_member(members_table,
|
|
cluster_uuid,
|
|
members)) {
|
|
goto out;
|
|
}
|
|
}
|
|
else if (error == HA_ERR_END_OF_FILE) {
|
|
break;
|
|
}
|
|
else {
|
|
goto out;
|
|
}
|
|
}
|
|
|
|
end_members_scan= false;
|
|
if (Wsrep_schema_impl::end_scan(members_table)) {
|
|
goto out;
|
|
}
|
|
Wsrep_schema_impl::finish_stmt(thd);
|
|
|
|
if (own_id.is_undefined() == false) {
|
|
for (uint i= 0; i < members.size(); ++i) {
|
|
if (members[i].id() == own_id) {
|
|
my_idx= i;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
(void)trans_commit(thd);
|
|
ret= 0; /* Success*/
|
|
out:
|
|
|
|
if (end_cluster_scan) Wsrep_schema_impl::end_scan(cluster_table);
|
|
if (end_members_scan) Wsrep_schema_impl::end_scan(members_table);
|
|
|
|
if (0 != ret) {
|
|
trans_rollback_stmt(thd);
|
|
if (!trans_rollback(thd)) {
|
|
close_thread_tables(thd);
|
|
}
|
|
}
|
|
thd->mdl_context.release_transactional_locks();
|
|
|
|
thd->variables.wsrep_sync_wait= wsrep_sync_wait_saved;
|
|
|
|
if (0 == ret) {
|
|
Wsrep_view ret_view(
|
|
wsrep::gtid(cluster_uuid, Wsrep_seqno(view_seqno)),
|
|
Wsrep_seqno(view_id),
|
|
wsrep::view::primary,
|
|
capabilities,
|
|
my_idx,
|
|
proto_ver,
|
|
members
|
|
);
|
|
|
|
if (wsrep_debug) {
|
|
std::ostringstream os;
|
|
os << "Restored cluster view:\n" << ret_view;
|
|
WSREP_INFO("%s", os.str().c_str());
|
|
}
|
|
DBUG_RETURN(ret_view);
|
|
}
|
|
else
|
|
{
|
|
WSREP_ERROR("wsrep_schema::restore_view() failed.");
|
|
Wsrep_view ret_view;
|
|
DBUG_RETURN(ret_view);
|
|
}
|
|
}
|
|
|
|
int Wsrep_schema::append_fragment(THD* thd,
|
|
const wsrep::id& server_id,
|
|
wsrep::transaction_id transaction_id,
|
|
wsrep::seqno seqno,
|
|
int flags,
|
|
const wsrep::const_buffer& data)
|
|
{
|
|
DBUG_ENTER("Wsrep_schema::append_fragment");
|
|
std::ostringstream os;
|
|
os << server_id;
|
|
WSREP_DEBUG("Append fragment(%llu) %s, %llu",
|
|
thd->thread_id,
|
|
os.str().c_str(),
|
|
transaction_id.get());
|
|
Wsrep_schema_impl::binlog_off binlog_off(thd);
|
|
Wsrep_schema_impl::init_stmt(thd);
|
|
|
|
TABLE* frag_table= 0;
|
|
if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))
|
|
{
|
|
trans_rollback_stmt(thd);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
Wsrep_schema_impl::store(frag_table, 0, server_id);
|
|
Wsrep_schema_impl::store(frag_table, 1, transaction_id.get());
|
|
Wsrep_schema_impl::store(frag_table, 2, seqno.get());
|
|
Wsrep_schema_impl::store(frag_table, 3, flags);
|
|
Wsrep_schema_impl::store(frag_table, 4, data.data(), data.size());
|
|
|
|
int error;
|
|
if ((error= Wsrep_schema_impl::insert(frag_table))) {
|
|
WSREP_ERROR("Failed to write to frag table: %d", error);
|
|
trans_rollback_stmt(thd);
|
|
DBUG_RETURN(1);
|
|
}
|
|
Wsrep_schema_impl::finish_stmt(thd);
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
int Wsrep_schema::update_fragment_meta(THD* thd,
|
|
const wsrep::ws_meta& ws_meta)
|
|
{
|
|
DBUG_ENTER("Wsrep_schema::update_fragment_meta");
|
|
std::ostringstream os;
|
|
os << ws_meta.server_id();
|
|
WSREP_DEBUG("update_frag_seqno(%llu) %s, %llu, seqno %lld",
|
|
thd->thread_id,
|
|
os.str().c_str(),
|
|
ws_meta.transaction_id().get(),
|
|
ws_meta.seqno().get());
|
|
DBUG_ASSERT(ws_meta.seqno().is_undefined() == false);
|
|
|
|
Wsrep_schema_impl::binlog_off binlog_off(thd);
|
|
int error;
|
|
uchar key[MAX_KEY_LENGTH];
|
|
key_part_map key_map= 0;
|
|
TABLE* frag_table= 0;
|
|
|
|
Wsrep_schema_impl::init_stmt(thd);
|
|
if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))
|
|
{
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
/* Find record with the given uuid, trx id, and seqno -1 */
|
|
Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id());
|
|
Wsrep_schema_impl::store(frag_table, 1, ws_meta.transaction_id().get());
|
|
Wsrep_schema_impl::store(frag_table, 2, -1);
|
|
Wsrep_schema_impl::make_key(frag_table, key, &key_map, 3);
|
|
|
|
if ((error= Wsrep_schema_impl::init_for_index_scan(frag_table,
|
|
key, key_map)))
|
|
{
|
|
if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
|
|
{
|
|
WSREP_WARN("Record not found in %s.%s: %d",
|
|
frag_table->s->db.str,
|
|
frag_table->s->table_name.str,
|
|
error);
|
|
}
|
|
Wsrep_schema_impl::finish_stmt(thd);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
/* Copy the original record to frag_table->record[1] */
|
|
store_record(frag_table, record[1]);
|
|
|
|
/* Store seqno in frag_table->record[0] and update the row */
|
|
Wsrep_schema_impl::store(frag_table, 2, ws_meta.seqno().get());
|
|
if ((error= frag_table->file->ha_update_row(frag_table->record[1],
|
|
frag_table->record[0]))) {
|
|
WSREP_ERROR("Error updating record in %s.%s: %d",
|
|
frag_table->s->db.str,
|
|
frag_table->s->table_name.str,
|
|
error);
|
|
Wsrep_schema_impl::finish_stmt(thd);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
int ret= Wsrep_schema_impl::end_index_scan(frag_table);
|
|
Wsrep_schema_impl::finish_stmt(thd);
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
static int remove_fragment(THD* thd,
|
|
TABLE* frag_table,
|
|
const wsrep::id& server_id,
|
|
wsrep::transaction_id transaction_id,
|
|
wsrep::seqno seqno)
|
|
{
|
|
WSREP_DEBUG("remove_fragment(%llu) trx %llu, seqno %lld",
|
|
thd->thread_id,
|
|
transaction_id.get(),
|
|
seqno.get());
|
|
int ret= 0;
|
|
int error;
|
|
uchar key[MAX_KEY_LENGTH];
|
|
key_part_map key_map= 0;
|
|
|
|
DBUG_ASSERT(server_id.is_undefined() == false);
|
|
DBUG_ASSERT(transaction_id.is_undefined() == false);
|
|
DBUG_ASSERT(seqno.is_undefined() == false);
|
|
|
|
/*
|
|
Remove record with the given uuid, trx id, and seqno.
|
|
Using a complete key here avoids gap locks.
|
|
*/
|
|
Wsrep_schema_impl::store(frag_table, 0, server_id);
|
|
Wsrep_schema_impl::store(frag_table, 1, transaction_id.get());
|
|
Wsrep_schema_impl::store(frag_table, 2, seqno.get());
|
|
Wsrep_schema_impl::make_key(frag_table, key, &key_map, 3);
|
|
|
|
if ((error= Wsrep_schema_impl::init_for_index_scan(frag_table,
|
|
key,
|
|
key_map)))
|
|
{
|
|
if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
|
|
{
|
|
WSREP_DEBUG("Record not found in %s.%s:trx %llu, seqno %lld, error %d",
|
|
frag_table->s->db.str,
|
|
frag_table->s->table_name.str,
|
|
transaction_id.get(),
|
|
seqno.get(),
|
|
error);
|
|
}
|
|
ret= error;
|
|
}
|
|
else if (Wsrep_schema_impl::delete_row(frag_table))
|
|
{
|
|
ret= 1;
|
|
}
|
|
|
|
Wsrep_schema_impl::end_index_scan(frag_table);
|
|
return ret;
|
|
}
|
|
|
|
int Wsrep_schema::remove_fragments(THD* thd,
|
|
const wsrep::id& server_id,
|
|
wsrep::transaction_id transaction_id,
|
|
const std::vector<wsrep::seqno>& fragments)
|
|
{
|
|
DBUG_ENTER("Wsrep_schema::remove_fragments");
|
|
int ret= 0;
|
|
|
|
WSREP_DEBUG("Removing %zu fragments", fragments.size());
|
|
Wsrep_schema_impl::wsrep_off wsrep_off(thd);
|
|
Wsrep_schema_impl::binlog_off binlog_off(thd);
|
|
|
|
Query_tables_list query_tables_list_backup;
|
|
Open_tables_backup open_tables_backup;
|
|
thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
|
|
thd->reset_n_backup_open_tables_state(&open_tables_backup);
|
|
|
|
TABLE* frag_table= 0;
|
|
if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))
|
|
{
|
|
ret= 1;
|
|
}
|
|
else
|
|
{
|
|
for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
|
|
i != fragments.end(); ++i)
|
|
{
|
|
if (remove_fragment(thd,
|
|
frag_table,
|
|
server_id,
|
|
transaction_id, *i))
|
|
{
|
|
ret= 1;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
close_thread_tables(thd);
|
|
thd->restore_backup_open_tables_state(&open_tables_backup);
|
|
thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
|
|
|
|
if (thd->wsrep_cs().mode() == wsrep::client_state::m_local &&
|
|
!thd->in_multi_stmt_transaction_mode())
|
|
{
|
|
/*
|
|
The ugly part: Locally executing autocommit statement is
|
|
committing and it has removed a fragment from stable storage.
|
|
Now calling finish_stmt() will call trans_commit_stmt(), which will
|
|
actually commit the transaction, what we really don't want
|
|
to do at this point.
|
|
|
|
Doing nothing at this point seems to work ok, this block is
|
|
intentionally no-op and for documentation purposes only.
|
|
*/
|
|
}
|
|
else
|
|
{
|
|
Wsrep_schema_impl::thd_server_status
|
|
thd_server_status(thd, thd->server_status | SERVER_STATUS_IN_TRANS,
|
|
thd->in_multi_stmt_transaction_mode());
|
|
Wsrep_schema_impl::finish_stmt(thd);
|
|
}
|
|
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
int Wsrep_schema::replay_transaction(THD* orig_thd,
|
|
Relay_log_info* rli,
|
|
const wsrep::ws_meta& ws_meta,
|
|
const std::vector<wsrep::seqno>& fragments)
|
|
{
|
|
DBUG_ENTER("Wsrep_schema::replay_transaction");
|
|
DBUG_ASSERT(!fragments.empty());
|
|
|
|
THD thd(next_thread_id(), true);
|
|
thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
|
|
(char*) &thd);
|
|
wsrep_assign_from_threadvars(&thd);
|
|
|
|
Wsrep_schema_impl::wsrep_off wsrep_off(&thd);
|
|
Wsrep_schema_impl::binlog_off binlog_off(&thd);
|
|
Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &thd);
|
|
|
|
int ret= 1;
|
|
int error;
|
|
TABLE* frag_table= 0;
|
|
uchar key[MAX_KEY_LENGTH];
|
|
key_part_map key_map= 0;
|
|
|
|
for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
|
|
i != fragments.end(); ++i)
|
|
{
|
|
Wsrep_schema_impl::init_stmt(&thd);
|
|
if ((error= Wsrep_schema_impl::open_for_read(&thd, sr_table_str.c_str(), &frag_table)))
|
|
{
|
|
WSREP_WARN("Could not open SR table for read: %d", error);
|
|
Wsrep_schema_impl::finish_stmt(&thd);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id());
|
|
Wsrep_schema_impl::store(frag_table, 1, ws_meta.transaction_id().get());
|
|
Wsrep_schema_impl::store(frag_table, 2, i->get());
|
|
Wsrep_schema_impl::make_key(frag_table, key, &key_map, 3);
|
|
|
|
int error= Wsrep_schema_impl::init_for_index_scan(frag_table,
|
|
key,
|
|
key_map);
|
|
if (error)
|
|
{
|
|
WSREP_WARN("Failed to init streaming log table for index scan: %d",
|
|
error);
|
|
Wsrep_schema_impl::end_index_scan(frag_table);
|
|
ret= 1;
|
|
break;
|
|
}
|
|
|
|
int flags;
|
|
Wsrep_schema_impl::scan(frag_table, 3, flags);
|
|
WSREP_DEBUG("replay_fragment(%llu): seqno: %lld flags: %x",
|
|
ws_meta.transaction_id().get(),
|
|
i->get(),
|
|
flags);
|
|
String buf;
|
|
frag_table->field[4]->val_str(&buf);
|
|
|
|
{
|
|
Wsrep_schema_impl::thd_context_switch thd_context_switch(&thd, orig_thd);
|
|
|
|
ret= wsrep_apply_events(orig_thd, rli, buf.c_ptr_quick(), buf.length());
|
|
if (ret)
|
|
{
|
|
WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");
|
|
break;
|
|
}
|
|
}
|
|
|
|
Wsrep_schema_impl::end_index_scan(frag_table);
|
|
Wsrep_schema_impl::finish_stmt(&thd);
|
|
|
|
Wsrep_schema_impl::init_stmt(&thd);
|
|
|
|
if ((error= Wsrep_schema_impl::open_for_write(&thd,
|
|
sr_table_str.c_str(),
|
|
&frag_table)))
|
|
{
|
|
WSREP_WARN("Could not open SR table for write: %d", error);
|
|
Wsrep_schema_impl::finish_stmt(&thd);
|
|
DBUG_RETURN(1);
|
|
}
|
|
error= Wsrep_schema_impl::init_for_index_scan(frag_table,
|
|
key,
|
|
key_map);
|
|
if (error)
|
|
{
|
|
WSREP_WARN("Failed to init streaming log table for index scan: %d",
|
|
error);
|
|
Wsrep_schema_impl::end_index_scan(frag_table);
|
|
ret= 1;
|
|
break;
|
|
}
|
|
|
|
error= Wsrep_schema_impl::delete_row(frag_table);
|
|
if (error)
|
|
{
|
|
WSREP_WARN("Could not delete row from streaming log table: %d", error);
|
|
Wsrep_schema_impl::end_index_scan(frag_table);
|
|
ret= 1;
|
|
break;
|
|
}
|
|
Wsrep_schema_impl::end_index_scan(frag_table);
|
|
Wsrep_schema_impl::finish_stmt(&thd);
|
|
}
|
|
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
|
|
{
|
|
DBUG_ENTER("Wsrep_schema::recover_sr_transactions");
|
|
THD storage_thd(next_thread_id(), true);
|
|
storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
|
|
(char*) &storage_thd);
|
|
wsrep_assign_from_threadvars(&storage_thd);
|
|
TABLE* frag_table= 0;
|
|
TABLE* cluster_table= 0;
|
|
Wsrep_storage_service storage_service(&storage_thd);
|
|
Wsrep_schema_impl::binlog_off binlog_off(&storage_thd);
|
|
Wsrep_schema_impl::wsrep_off wsrep_off(&storage_thd);
|
|
Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd,
|
|
&storage_thd);
|
|
Wsrep_server_state& server_state(Wsrep_server_state::instance());
|
|
|
|
int ret= 1;
|
|
int error;
|
|
wsrep::id cluster_id;
|
|
|
|
Wsrep_schema_impl::init_stmt(&storage_thd);
|
|
storage_thd.wsrep_skip_locking= FALSE;
|
|
if (Wsrep_schema_impl::open_for_read(&storage_thd,
|
|
cluster_table_str.c_str(),
|
|
&cluster_table) ||
|
|
Wsrep_schema_impl::init_for_scan(cluster_table))
|
|
{
|
|
Wsrep_schema_impl::finish_stmt(&storage_thd);
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
if ((error= Wsrep_schema_impl::next_record(cluster_table)))
|
|
{
|
|
Wsrep_schema_impl::end_scan(cluster_table);
|
|
Wsrep_schema_impl::finish_stmt(&storage_thd);
|
|
trans_commit(&storage_thd);
|
|
if (error == HA_ERR_END_OF_FILE)
|
|
{
|
|
WSREP_INFO("Cluster table is empty, not recovering transactions");
|
|
DBUG_RETURN(0);
|
|
}
|
|
else
|
|
{
|
|
WSREP_ERROR("Failed to read cluster table: %d", error);
|
|
DBUG_RETURN(1);
|
|
}
|
|
}
|
|
|
|
Wsrep_schema_impl::scan(cluster_table, 0, cluster_id);
|
|
Wsrep_schema_impl::end_scan(cluster_table);
|
|
Wsrep_schema_impl::finish_stmt(&storage_thd);
|
|
|
|
std::ostringstream os;
|
|
os << cluster_id;
|
|
WSREP_INFO("Recovered cluster id %s", os.str().c_str());
|
|
|
|
storage_thd.wsrep_skip_locking= TRUE;
|
|
Wsrep_schema_impl::init_stmt(&storage_thd);
|
|
|
|
/*
|
|
Open the table for reading and writing so that fragments without
|
|
valid seqno can be deleted.
|
|
*/
|
|
if (Wsrep_schema_impl::open_for_write(&storage_thd, sr_table_str.c_str(), &frag_table) ||
|
|
Wsrep_schema_impl::init_for_scan(frag_table))
|
|
{
|
|
WSREP_ERROR("Failed to open SR table for write");
|
|
goto out;
|
|
}
|
|
|
|
while (0 == error)
|
|
{
|
|
if ((error= Wsrep_schema_impl::next_record(frag_table)) == 0)
|
|
{
|
|
wsrep::id server_id;
|
|
Wsrep_schema_impl::scan(frag_table, 0, server_id);
|
|
wsrep::client_id client_id;
|
|
unsigned long long transaction_id_ull;
|
|
Wsrep_schema_impl::scan(frag_table, 1, transaction_id_ull);
|
|
wsrep::transaction_id transaction_id(transaction_id_ull);
|
|
long long seqno_ll;
|
|
Wsrep_schema_impl::scan(frag_table, 2, seqno_ll);
|
|
wsrep::seqno seqno(seqno_ll);
|
|
|
|
/* This is possible if the server crashes between inserting the
|
|
fragment into table and updating the fragment seqno after
|
|
certification. */
|
|
if (seqno.is_undefined())
|
|
{
|
|
Wsrep_schema_impl::delete_row(frag_table);
|
|
continue;
|
|
}
|
|
|
|
wsrep::gtid gtid(cluster_id, seqno);
|
|
int flags;
|
|
Wsrep_schema_impl::scan(frag_table, 3, flags);
|
|
String data_str;
|
|
|
|
(void)frag_table->field[4]->val_str(&data_str);
|
|
wsrep::const_buffer data(data_str.c_ptr_quick(), data_str.length());
|
|
wsrep::ws_meta ws_meta(gtid,
|
|
wsrep::stid(server_id,
|
|
transaction_id,
|
|
client_id),
|
|
wsrep::seqno::undefined(),
|
|
flags);
|
|
|
|
wsrep::high_priority_service* applier;
|
|
if (!(applier= server_state.find_streaming_applier(server_id,
|
|
transaction_id)))
|
|
{
|
|
DBUG_ASSERT(wsrep::starts_transaction(flags));
|
|
applier = wsrep_create_streaming_applier(&storage_thd, "recovery");
|
|
server_state.start_streaming_applier(server_id, transaction_id,
|
|
applier);
|
|
applier->start_transaction(wsrep::ws_handle(transaction_id, 0),
|
|
ws_meta);
|
|
}
|
|
applier->store_globals();
|
|
wsrep::mutable_buffer unused;
|
|
if ((ret= applier->apply_write_set(ws_meta, data, unused)) != 0)
|
|
{
|
|
WSREP_ERROR("SR trx recovery applying returned %d", ret);
|
|
}
|
|
else
|
|
{
|
|
applier->after_apply();
|
|
}
|
|
storage_service.store_globals();
|
|
}
|
|
else if (error == HA_ERR_END_OF_FILE)
|
|
{
|
|
ret= 0;
|
|
}
|
|
else
|
|
{
|
|
WSREP_ERROR("SR table scan returned error %d", error);
|
|
}
|
|
}
|
|
Wsrep_schema_impl::end_scan(frag_table);
|
|
Wsrep_schema_impl::finish_stmt(&storage_thd);
|
|
trans_commit(&storage_thd);
|
|
storage_thd.set_mysys_var(0);
|
|
out:
|
|
DBUG_RETURN(ret);
|
|
}
|