mariadb/sql/wsrep_schema.cc
2024-09-02 03:49:03 +02:00

1831 lines
53 KiB
C++

/* Copyright (C) 2015-2023 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
#include "mariadb.h"
#include "table.h"
#include "key.h"
#include "sql_base.h"
#include "sql_parse.h"
#include "sql_update.h"
#include "transaction.h"
#include "mysql/service_wsrep.h"
#include "wsrep_schema.h"
#include "wsrep_applier.h"
#include "wsrep_xid.h"
#include "wsrep_binlog.h"
#include "wsrep_high_priority_service.h"
#include "wsrep_storage_service.h"
#include "wsrep_thd.h"
#include "wsrep_server_state.h"
#include <string>
#include <sstream>
#define WSREP_SCHEMA "mysql"
#define WSREP_STREAMING_TABLE "wsrep_streaming_log"
#define WSREP_CLUSTER_TABLE "wsrep_cluster"
#define WSREP_MEMBERS_TABLE "wsrep_cluster_members"
#define WSREP_ALLOWLIST_TABLE "wsrep_allowlist"
LEX_CSTRING WSREP_LEX_SCHEMA= {STRING_WITH_LEN(WSREP_SCHEMA)};
LEX_CSTRING WSREP_LEX_STREAMING= {STRING_WITH_LEN(WSREP_STREAMING_TABLE)};
LEX_CSTRING WSREP_LEX_CLUSTER= {STRING_WITH_LEN(WSREP_CLUSTER_TABLE)};
LEX_CSTRING WSREP_LEX_MEMBERS= {STRING_WITH_LEN(WSREP_MEMBERS_TABLE)};
LEX_CSTRING WSREP_LEX_ALLOWLIST= {STRING_WITH_LEN(WSREP_ALLOWLIST_TABLE)};
const char* wsrep_sr_table_name_full= WSREP_SCHEMA "/" WSREP_STREAMING_TABLE;
static const std::string wsrep_schema_str= WSREP_SCHEMA;
static const std::string sr_table_str= WSREP_STREAMING_TABLE;
static const std::string cluster_table_str= WSREP_CLUSTER_TABLE;
static const std::string members_table_str= WSREP_MEMBERS_TABLE;
static const std::string allowlist_table_str= WSREP_ALLOWLIST_TABLE;
static const std::string create_cluster_table_str=
"CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + cluster_table_str +
"("
"cluster_uuid CHAR(36) PRIMARY KEY,"
"view_id BIGINT NOT NULL,"
"view_seqno BIGINT NOT NULL,"
"protocol_version INT NOT NULL,"
"capabilities INT NOT NULL"
") ENGINE=InnoDB STATS_PERSISTENT=0 CHARSET=latin1";
static const std::string create_members_table_str=
"CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + members_table_str +
"("
"node_uuid CHAR(36) PRIMARY KEY,"
"cluster_uuid CHAR(36) NOT NULL,"
"node_name CHAR(32) NOT NULL,"
"node_incoming_address VARCHAR(256) NOT NULL"
") ENGINE=InnoDB STATS_PERSISTENT=0 CHARSET=latin1";
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
static const std::string cluster_member_history_table_str= "wsrep_cluster_member_history";
static const std::string create_members_history_table_str=
"CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + cluster_member_history_table_str +
"("
"node_uuid CHAR(36) PRIMARY KEY,"
"cluster_uuid CHAR(36) NOT NULL,"
"last_view_id BIGINT NOT NULL,"
"last_view_seqno BIGINT NOT NULL,"
"node_name CHAR(32) NOT NULL,"
"node_incoming_address VARCHAR(256) NOT NULL"
") ENGINE=InnoDB STATS_PERSISTENT=0 CHARSET=latin1";
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
static const std::string create_frag_table_str=
"CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + sr_table_str +
"("
"node_uuid CHAR(36), "
"trx_id BIGINT, "
"seqno BIGINT, "
"flags INT NOT NULL, "
"frag LONGBLOB NOT NULL, "
"PRIMARY KEY (node_uuid, trx_id, seqno)"
") ENGINE=InnoDB STATS_PERSISTENT=0 CHARSET=latin1";
static const std::string create_allowlist_table_str=
"CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + allowlist_table_str +
"("
"ip CHAR(64) NOT NULL,"
"PRIMARY KEY (ip)"
") ENGINE=InnoDB STATS_PERSISTENT=0";
static const std::string delete_from_cluster_table=
"DELETE FROM " + wsrep_schema_str + "." + cluster_table_str;
static const std::string delete_from_members_table=
"DELETE FROM " + wsrep_schema_str + "." + members_table_str;
/* For rolling upgrade we need to use ALTER. We do not want
persistent statistics to be collected from these tables. */
static const std::string alter_cluster_table=
"ALTER TABLE " + wsrep_schema_str + "." + cluster_table_str +
" STATS_PERSISTENT=0 CHARSET=latin1";
static const std::string alter_members_table=
"ALTER TABLE " + wsrep_schema_str + "." + members_table_str +
" STATS_PERSISTENT=0 CHARSET=latin1";
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
static const std::string alter_members_history_table=
"ALTER TABLE " + wsrep_schema_str + "." + members_history_table_str +
" STATS_PERSISTENT=0 CHARSET=latin1";
#endif
static const std::string alter_frag_table=
"ALTER TABLE " + wsrep_schema_str + "." + sr_table_str +
" STATS_PERSISTENT=0 CHARSET=latin1";
namespace Wsrep_schema_impl
{
class binlog_off
{
public:
binlog_off(THD* thd)
: m_thd(thd)
, m_option_bits(thd->variables.option_bits)
, m_sql_log_bin(thd->variables.sql_log_bin)
{
thd->variables.option_bits&= ~OPTION_BIN_LOG;
thd->variables.sql_log_bin= 0;
}
~binlog_off()
{
m_thd->variables.option_bits= m_option_bits;
m_thd->variables.sql_log_bin= m_sql_log_bin;
}
private:
THD* m_thd;
ulonglong m_option_bits;
my_bool m_sql_log_bin;
};
class wsrep_off
{
public:
wsrep_off(THD* thd)
: m_thd(thd)
, m_wsrep_on(thd->variables.wsrep_on)
{
thd->variables.wsrep_on= 0;
}
~wsrep_off()
{
m_thd->variables.wsrep_on= m_wsrep_on;
}
private:
THD* m_thd;
my_bool m_wsrep_on;
};
class wsrep_ignore_table
{
public:
wsrep_ignore_table(THD* thd)
: m_thd(thd)
, m_wsrep_ignore_table(thd->wsrep_ignore_table)
{
thd->wsrep_ignore_table= true;
}
~wsrep_ignore_table()
{
m_thd->wsrep_ignore_table= m_wsrep_ignore_table;
}
private:
THD* m_thd;
my_bool m_wsrep_ignore_table;
};
class thd_server_status
{
public:
thd_server_status(THD* thd, uint server_status, bool condition)
: m_thd(thd)
, m_thd_server_status(thd->server_status)
{
if (condition)
thd->server_status= server_status;
}
~thd_server_status()
{
m_thd->server_status= m_thd_server_status;
}
private:
THD* m_thd;
uint m_thd_server_status;
};
class thd_context_switch
{
public:
thd_context_switch(THD *orig_thd, THD *cur_thd)
: m_orig_thd(orig_thd)
, m_cur_thd(cur_thd)
{
wsrep_reset_threadvars(m_orig_thd);
wsrep_store_threadvars(m_cur_thd);
}
~thd_context_switch()
{
wsrep_reset_threadvars(m_cur_thd);
wsrep_store_threadvars(m_orig_thd);
}
private:
THD *m_orig_thd;
THD *m_cur_thd;
};
class sql_safe_updates
{
public:
sql_safe_updates(THD* thd)
: m_thd(thd)
, m_option_bits(thd->variables.option_bits)
{
thd->variables.option_bits&= ~OPTION_SAFE_UPDATES;
}
~sql_safe_updates()
{
m_thd->variables.option_bits= m_option_bits;
}
private:
THD* m_thd;
ulonglong m_option_bits;
};
static int execute_SQL(THD* thd, const char* sql, uint length) {
DBUG_ENTER("Wsrep_schema::execute_SQL()");
int err= 0;
PSI_statement_locker *parent_locker= thd->m_statement_psi;
Parser_state parser_state;
WSREP_DEBUG("SQL: %d %s thd: %lld", length, sql, (long long)thd->thread_id);
if (parser_state.init(thd, (char*)sql, length) == 0) {
thd->reset_for_next_command();
lex_start(thd);
thd->m_statement_psi= NULL;
thd->set_query((char*)sql, length);
thd->set_query_id(next_query_id());
mysql_parse(thd, (char*)sql, length, & parser_state);
if (thd->is_error()) {
WSREP_WARN("Wsrep_schema::execute_sql() failed, %d %s\nSQL: %s",
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message(),
sql);
err= 1;
}
thd->m_statement_psi= parent_locker;
thd->end_statement();
thd->reset_query();
close_thread_tables(thd);
delete_explain_query(thd->lex);
}
else {
WSREP_WARN("SR init failure");
}
thd->cleanup_after_query();
DBUG_RETURN(err);
}
/*
Initialize thd for next "statement"
*/
static void init_stmt(THD* thd) {
thd->reset_for_next_command();
}
static void finish_stmt(THD* thd) {
trans_commit_stmt(thd);
close_thread_tables(thd);
}
static int open_table(THD *thd, const LEX_CSTRING *schema_name,
const LEX_CSTRING *table_name,
enum thr_lock_type const lock_type,
TABLE_LIST *table_list)
{
assert(table_list);
DBUG_ENTER("Wsrep_schema::open_table()");
const uint flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY |
MYSQL_OPEN_IGNORE_FLUSH | MYSQL_LOCK_IGNORE_TIMEOUT);
table_list->init_one_table(schema_name, table_name, NULL, lock_type);
thd->lex->query_tables_own_last= 0;
// No need to open table if the query was bf aborted,
// thd client will get ER_LOCK_DEADLOCK in the end.
const bool interrupted= thd->killed ||
(thd->is_error() &&
(thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED));
if (interrupted ||
!open_n_lock_single_table(thd, table_list, table_list->lock_type, flags))
{
close_thread_tables(thd);
DBUG_RETURN(1);
}
table_list->table->use_all_columns();
DBUG_RETURN(0);
}
static int open_for_write(THD* thd, const char* table_name, TABLE_LIST* table_list)
{
LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
LEX_CSTRING table_str= { table_name, strlen(table_name) };
if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_WRITE,
table_list))
{
// No need to log an error if the query was bf aborted,
// thd client will get ER_LOCK_DEADLOCK in the end.
const bool interrupted= thd->killed ||
(thd->is_error() &&
(thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED));
if (!interrupted)
{
WSREP_ERROR("Failed to open table %s.%s for writing",
schema_str.str, table_name);
}
return 1;
}
empty_record(table_list->table);
table_list->table->use_all_columns();
restore_record(table_list->table, s->default_values);
return 0;
}
static void store(TABLE* table, uint field, const Wsrep_id& id) {
assert(field < table->s->fields);
std::ostringstream os;
os << id;
table->field[field]->store(os.str().c_str(),
os.str().size(),
&my_charset_bin);
}
template <typename INTTYPE>
static void store(TABLE* table, uint field, const INTTYPE val) {
assert(field < table->s->fields);
table->field[field]->store(val);
}
template <typename CHARTYPE>
static void store(TABLE* table, uint field, const CHARTYPE* str, size_t str_len) {
assert(field < table->s->fields);
table->field[field]->store((const char*)str,
str_len,
&my_charset_bin);
}
static void store(TABLE* table, uint field, const std::string& str)
{
store(table, field, str.c_str(), str.size());
}
static int update_or_insert(TABLE* table) {
DBUG_ENTER("Wsrep_schema::update_or_insert()");
int ret= 0;
char* key;
int error;
/*
Verify that the table has primary key defined.
*/
if (table->s->primary_key >= MAX_KEY ||
!table->s->keys_in_use.is_set(table->s->primary_key)) {
WSREP_ERROR("No primary key for %s.%s",
table->s->db.str, table->s->table_name.str);
DBUG_RETURN(1);
}
/*
Find the record and update or insert a new one if not found.
*/
if (!(key= (char*) my_safe_alloca(table->s->max_unique_length))) {
WSREP_ERROR("Error allocating %ud bytes for key",
table->s->max_unique_length);
DBUG_RETURN(1);
}
key_copy((uchar*) key, table->record[0],
table->key_info + table->s->primary_key, 0);
if ((error= table->file->ha_index_read_idx_map(table->record[1],
table->s->primary_key,
(uchar*) key,
HA_WHOLE_KEY,
HA_READ_KEY_EXACT))) {
/*
Row not found, insert a new one.
*/
if ((error= table->file->ha_write_row(table->record[0]))) {
WSREP_ERROR("Error writing into %s.%s: %d",
table->s->db.str,
table->s->table_name.str,
error);
ret= 1;
}
}
else if (!records_are_comparable(table) || compare_record(table)) {
/*
Record has changed
*/
if ((error= table->file->ha_update_row(table->record[1],
table->record[0])) &&
error != HA_ERR_RECORD_IS_THE_SAME) {
WSREP_ERROR("Error updating record in %s.%s: %d",
table->s->db.str,
table->s->table_name.str,
error);
ret= 1;
}
}
my_safe_afree(key, table->s->max_unique_length);
DBUG_RETURN(ret);
}
static int insert(TABLE* table) {
DBUG_ENTER("Wsrep_schema::insert()");
int ret= 0;
int error;
/*
Verify that the table has primary key defined.
*/
if (table->s->primary_key >= MAX_KEY ||
!table->s->keys_in_use.is_set(table->s->primary_key)) {
WSREP_ERROR("No primary key for %s.%s",
table->s->db.str, table->s->table_name.str);
DBUG_RETURN(1);
}
if ((error= table->file->ha_write_row(table->record[0]))) {
if (error == HA_ERR_FOUND_DUPP_KEY) {
WSREP_WARN("Duplicate key found when writing into %s.%s",
table->s->db.str,
table->s->table_name.str);
ret= HA_ERR_FOUND_DUPP_KEY;
} else {
WSREP_ERROR("Error writing into %s.%s: %d",
table->s->db.str,
table->s->table_name.str,
error);
ret= 1;
}
}
DBUG_RETURN(ret);
}
static int delete_row(TABLE* table) {
int error;
int retry= 3;
do {
error= table->file->ha_delete_row(table->record[0]);
retry--;
} while (error && retry);
if (error) {
WSREP_ERROR("Error deleting row from %s.%s: %d",
table->s->db.str,
table->s->table_name.str,
error);
return 1;
}
return 0;
}
static int open_for_read(THD *thd, const char *table_name,
TABLE_LIST *table_list)
{
LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
LEX_CSTRING table_str= { table_name, strlen(table_name) };
if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_READ,
table_list))
{
WSREP_ERROR("Failed to open table %s.%s for reading",
schema_str.str, table_name);
return 1;
}
empty_record(table_list->table);
table_list->table->use_all_columns();
restore_record(table_list->table, s->default_values);
return 0;
}
/*
Init table for sequential scan.
@return 0 in case of success, 1 in case of error.
*/
static int init_for_scan(TABLE* table) {
int error;
if ((error= table->file->ha_rnd_init(TRUE))) {
WSREP_ERROR("Failed to init table for scan: %d", error);
return 1;
}
return 0;
}
/*
Scan next record. For return codes see handler::ha_rnd_next()
@return 0 in case of success, error code in case of error
*/
static int next_record(TABLE* table) {
int error;
if ((error= table->file->ha_rnd_next(table->record[0])) &&
error != HA_ERR_END_OF_FILE) {
WSREP_ERROR("Failed to read next record: %d", error);
}
return error;
}
/*
End scan.
@return 0 in case of success, 1 in case of error.
*/
static int end_scan(TABLE* table) {
int error;
if ((error= table->file->ha_rnd_end())) {
WSREP_ERROR("Failed to end scan: %d", error);
return 1;
}
return 0;
}
static int scan(TABLE* table, uint field, wsrep::id& id)
{
assert(field < table->s->fields);
String uuid_str;
(void)table->field[field]->val_str(&uuid_str);
id= wsrep::id(std::string(uuid_str.c_ptr(), uuid_str.length()));
return 0;
}
template <typename INTTYPE>
static int scan(TABLE* table, uint field, INTTYPE& val)
{
assert(field < table->s->fields);
val= table->field[field]->val_int();
return 0;
}
static int scan(TABLE* table, uint field, char* strbuf, uint strbuf_len)
{
uint len;
StringBuffer<STRING_BUFFER_USUAL_SIZE> str;
(void) table->field[field]->val_str(&str);
len= str.length();
strmake(strbuf, str.ptr(), MY_MIN(len, strbuf_len-1));
return 0;
}
/*
Scan member
TODO: filter members by cluster UUID
*/
static int scan_member(TABLE* table,
const Wsrep_id& cluster_uuid,
std::vector<Wsrep_view::member>& members)
{
Wsrep_id member_id;
char member_name[128]= { 0, };
char member_incoming[128]= { 0, };
if (scan(table, 0, member_id) ||
scan(table, 2, member_name, sizeof(member_name)) ||
scan(table, 3, member_incoming, sizeof(member_incoming))) {
return 1;
}
if (members.empty() == false) {
assert(members.rbegin()->id() < member_id);
}
try {
members.push_back(Wsrep_view::member(member_id,
member_name,
member_incoming));
}
catch (...) {
WSREP_ERROR("Caught exception while scanning members table");
return 1;
}
return 0;
}
/*
Init table for index scan and retrieve first record
@return 0 in case of success, error code in case of error.
*/
static int init_for_index_scan(TABLE* table, const uchar* key,
key_part_map map) {
int error;
if ((error= table->file->ha_index_init(table->s->primary_key, true))) {
WSREP_ERROR("Failed to init table for index scan: %d", error);
return error;
}
error= table->file->ha_index_read_map(table->record[0],
key, map, HA_READ_KEY_EXACT);
switch(error) {
case 0:
case HA_ERR_END_OF_FILE:
case HA_ERR_KEY_NOT_FOUND:
case HA_ERR_ABORTED_BY_USER:
break;
case -1:
WSREP_DEBUG("init_for_index_scan interrupted");
break;
default:
WSREP_ERROR("init_for_index_scan failed to read first record, error %d", error);
}
return error;
}
/*
End index scan.
@return 0 in case of success, 1 in case of error.
*/
static int end_index_scan(TABLE* table) {
int error;
if (table->file->inited) {
if ((error= table->file->ha_index_end())) {
WSREP_ERROR("Failed to end scan: %d", error);
return 1;
}
}
return 0;
}
static void make_key(TABLE* table, uchar** key, key_part_map* map, int parts) {
uint prefix_length= 0;
KEY_PART_INFO* key_part= table->key_info->key_part;
for (int i=0; i < parts; i++)
prefix_length += key_part[i].store_length;
*map= make_prev_keypart_map(parts);
if (!(*key= (uchar *) my_malloc(PSI_NOT_INSTRUMENTED, prefix_length + 1, MYF(MY_WME))))
{
WSREP_ERROR("Failed to allocate memory for key prefix_length %u", prefix_length);
assert(0);
}
key_copy(*key, table->record[0], table->key_info, prefix_length);
}
} /* namespace Wsrep_schema_impl */
Wsrep_schema::Wsrep_schema() = default;
Wsrep_schema::~Wsrep_schema() = default;
static void wsrep_init_thd_for_schema(THD *thd)
{
thd->security_ctx->skip_grants();
thd->system_thread= SYSTEM_THREAD_GENERIC;
thd->real_id=pthread_self(); // Keep purify happy
thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime;
/* No Galera replication */
thd->variables.wsrep_on= 0;
/* No binlogging */
thd->variables.sql_log_bin= 0;
thd->variables.option_bits&= ~OPTION_BIN_LOG;
/* No safe updates */
thd->variables.option_bits&= ~OPTION_SAFE_UPDATES;
/* No general log */
thd->variables.option_bits|= OPTION_LOG_OFF;
/* Read committed isolation to avoid gap locking */
thd->variables.tx_isolation= ISO_READ_COMMITTED;
wsrep_assign_from_threadvars(thd);
wsrep_store_threadvars(thd);
}
static bool wsrep_schema_ready= false;
int Wsrep_schema::init()
{
DBUG_ENTER("Wsrep_schema::init()");
int ret;
THD* thd= new THD(next_thread_id());
if (!thd) {
WSREP_ERROR("Unable to get thd");
DBUG_RETURN(1);
}
thd->thread_stack= (char*)&thd;
wsrep_init_thd_for_schema(thd);
if (Wsrep_schema_impl::execute_SQL(thd, create_cluster_table_str.c_str(),
create_cluster_table_str.size()) ||
Wsrep_schema_impl::execute_SQL(thd, create_members_table_str.c_str(),
create_members_table_str.size()) ||
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
Wsrep_schema_impl::execute_SQL(thd,
create_members_history_table_str.c_str(),
create_members_history_table_str.size()) ||
Wsrep_schema_impl::execute_SQL(thd,
alter_members_history_table.c_str(),
alter_members_history_table.size()) ||
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
Wsrep_schema_impl::execute_SQL(thd,
create_frag_table_str.c_str(),
create_frag_table_str.size()) ||
Wsrep_schema_impl::execute_SQL(thd,
alter_cluster_table.c_str(),
alter_cluster_table.size()) ||
Wsrep_schema_impl::execute_SQL(thd,
alter_members_table.c_str(),
alter_members_table.size()) ||
Wsrep_schema_impl::execute_SQL(thd,
alter_frag_table.c_str(),
alter_frag_table.size()) ||
Wsrep_schema_impl::execute_SQL(thd,
create_allowlist_table_str.c_str(),
create_allowlist_table_str.size()))
{
ret= 1;
}
else
{
wsrep_schema_ready= true;
ret= 0;
}
delete thd;
DBUG_RETURN(ret);
}
int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view)
{
DBUG_ENTER("Wsrep_schema::store_view()");
assert(view.status() == Wsrep_view::primary);
int ret= 1;
int error;
TABLE_LIST cluster_table_l;
TABLE* cluster_table= 0;
TABLE_LIST members_table_l;
TABLE* members_table = 0;
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
TABLE* members_history_table= 0;
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
Wsrep_schema_impl::wsrep_off wsrep_off(thd);
Wsrep_schema_impl::binlog_off binlog_off(thd);
Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
if (trans_begin(thd, MYSQL_START_TRANS_OPT_READ_WRITE))
{
WSREP_ERROR("Failed to start transaction for store view");
goto out_not_started;
}
/*
Clean up cluster table and members table.
*/
if (Wsrep_schema_impl::execute_SQL(thd,
delete_from_cluster_table.c_str(),
delete_from_cluster_table.size()) ||
Wsrep_schema_impl::execute_SQL(thd,
delete_from_members_table.c_str(),
delete_from_members_table.size())) {
goto out;
}
/*
Store cluster view info
*/
Wsrep_schema_impl::init_stmt(thd);
if (Wsrep_schema_impl::open_for_write(thd, cluster_table_str.c_str(), &cluster_table_l))
{
goto out;
}
cluster_table= cluster_table_l.table;
Wsrep_schema_impl::store(cluster_table, 0, view.state_id().id());
Wsrep_schema_impl::store(cluster_table, 1, view.view_seqno().get());
Wsrep_schema_impl::store(cluster_table, 2, view.state_id().seqno().get());
Wsrep_schema_impl::store(cluster_table, 3, view.protocol_version());
Wsrep_schema_impl::store(cluster_table, 4, view.capabilities());
if ((error= Wsrep_schema_impl::update_or_insert(cluster_table)))
{
WSREP_ERROR("failed to write to cluster table: %d", error);
goto out;
}
Wsrep_schema_impl::finish_stmt(thd);
/*
Store info about current members
*/
Wsrep_schema_impl::init_stmt(thd);
if (Wsrep_schema_impl::open_for_write(thd, members_table_str.c_str(),
&members_table_l))
{
WSREP_ERROR("failed to open wsrep.members table");
goto out;
}
members_table= members_table_l.table;
for (size_t i= 0; i < view.members().size(); ++i)
{
Wsrep_schema_impl::store(members_table, 0, view.members()[i].id());
Wsrep_schema_impl::store(members_table, 1, view.state_id().id());
Wsrep_schema_impl::store(members_table, 2, view.members()[i].name());
Wsrep_schema_impl::store(members_table, 3, view.members()[i].incoming());
if ((error= Wsrep_schema_impl::update_or_insert(members_table)))
{
WSREP_ERROR("failed to write wsrep.members table: %d", error);
goto out;
}
}
Wsrep_schema_impl::finish_stmt(thd);
#ifdef WSREP_SCHEMA_MEMBERS_HISTORY
/*
Store members history
*/
Wsrep_schema_impl::init_stmt(thd);
if (Wsrep_schema_impl::open_for_write(thd, cluster_member_history.c_str(),
&members_history_table)) {
WSREP_ERROR("failed to open wsrep.members table");
goto out;
}
for (size_t i= 0; i < view.members().size(); ++i) {
Wsrep_schema_impl::store(members_history_table, 0, view.members()[i].id());
Wsrep_schema_impl::store(members_history_table, 1, view.state_id().id());
Wsrep_schema_impl::store(members_history_table, 2, view.view_seqno());
Wsrep_schema_impl::store(members_history_table, 3, view.state_id().seqno());
Wsrep_schema_impl::store(members_history_table, 4,
view.members()[i].name());
Wsrep_schema_impl::store(members_history_table, 5,
view.members()[i].incoming());
if ((error= Wsrep_schema_impl::update_or_insert(members_history_table))) {
WSREP_ERROR("failed to write wsrep_cluster_member_history table: %d", error);
goto out;
}
}
Wsrep_schema_impl::finish_stmt(thd);
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
ret= 0;
out:
if (ret)
{
trans_rollback_stmt(thd);
if (!trans_rollback(thd))
{
close_thread_tables(thd);
}
}
else if (trans_commit(thd))
{
ret= 1;
WSREP_ERROR("Failed to commit transaction for store view");
}
thd->release_transactional_locks();
out_not_started:
DBUG_RETURN(ret);
}
Wsrep_view Wsrep_schema::restore_view(THD* thd, const Wsrep_id& own_id) const {
DBUG_ENTER("Wsrep_schema::restore_view()");
int ret= 1;
int error;
TABLE_LIST cluster_table_l;
TABLE* cluster_table= 0;
bool end_cluster_scan= false;
TABLE_LIST members_table_l;
TABLE* members_table= 0;
bool end_members_scan= false;
/* variables below need to be initialized in case cluster table is empty */
Wsrep_id cluster_uuid;
wsrep_seqno_t view_id= -1;
wsrep_seqno_t view_seqno= -1;
int my_idx= -1;
int proto_ver= 0;
wsrep_cap_t capabilities= 0;
std::vector<Wsrep_view::member> members;
// we don't want causal waits for reading non-replicated private data
int const wsrep_sync_wait_saved= thd->variables.wsrep_sync_wait;
thd->variables.wsrep_sync_wait= 0;
if (trans_begin(thd, MYSQL_START_TRANS_OPT_READ_ONLY)) {
WSREP_ERROR("wsrep_schema::restore_view(): Failed to start transaction");
goto out;
}
/*
Read cluster info from cluster table
*/
Wsrep_schema_impl::init_stmt(thd);
if (Wsrep_schema_impl::open_for_read(thd, cluster_table_str.c_str(), &cluster_table_l)) {
goto out;
}
cluster_table = cluster_table_l.table;
if (Wsrep_schema_impl::init_for_scan(cluster_table)) {
goto out;
}
if (((error= Wsrep_schema_impl::next_record(cluster_table)) != 0 ||
Wsrep_schema_impl::scan(cluster_table, 0, cluster_uuid) ||
Wsrep_schema_impl::scan(cluster_table, 1, view_id) ||
Wsrep_schema_impl::scan(cluster_table, 2, view_seqno) ||
Wsrep_schema_impl::scan(cluster_table, 3, proto_ver) ||
Wsrep_schema_impl::scan(cluster_table, 4, capabilities)) &&
error != HA_ERR_END_OF_FILE) {
end_cluster_scan= true;
goto out;
}
if (Wsrep_schema_impl::end_scan(cluster_table)) {
goto out;
}
Wsrep_schema_impl::finish_stmt(thd);
/*
Read members from members table
*/
Wsrep_schema_impl::init_stmt(thd);
if (Wsrep_schema_impl::open_for_read(thd, members_table_str.c_str(),
&members_table_l))
{
goto out;
}
members_table= members_table_l.table;
if (Wsrep_schema_impl::init_for_scan(members_table)) {
goto out;
}
end_members_scan= true;
while (true) {
if ((error= Wsrep_schema_impl::next_record(members_table)) == 0) {
if (Wsrep_schema_impl::scan_member(members_table,
cluster_uuid,
members)) {
goto out;
}
}
else if (error == HA_ERR_END_OF_FILE) {
break;
}
else {
goto out;
}
}
end_members_scan= false;
if (Wsrep_schema_impl::end_scan(members_table)) {
goto out;
}
Wsrep_schema_impl::finish_stmt(thd);
if (own_id.is_undefined() == false) {
for (uint i= 0; i < members.size(); ++i) {
if (members[i].id() == own_id) {
my_idx= i;
break;
}
}
}
(void)trans_commit(thd);
ret= 0; /* Success*/
out:
if (end_cluster_scan) Wsrep_schema_impl::end_scan(cluster_table);
if (end_members_scan) Wsrep_schema_impl::end_scan(members_table);
if (0 != ret) {
trans_rollback_stmt(thd);
if (!trans_rollback(thd)) {
close_thread_tables(thd);
}
}
thd->release_transactional_locks();
thd->variables.wsrep_sync_wait= wsrep_sync_wait_saved;
if (0 == ret) {
Wsrep_view ret_view(
wsrep::gtid(cluster_uuid, Wsrep_seqno(view_seqno)),
Wsrep_seqno(view_id),
wsrep::view::primary,
capabilities,
my_idx,
proto_ver,
members
);
if (wsrep_debug) {
std::ostringstream os;
os << "Restored cluster view:\n" << ret_view;
WSREP_INFO("%s", os.str().c_str());
}
DBUG_RETURN(ret_view);
}
else
{
WSREP_ERROR("wsrep_schema::restore_view() failed.");
Wsrep_view ret_view;
DBUG_RETURN(ret_view);
}
}
int Wsrep_schema::append_fragment(THD* thd,
const wsrep::id& server_id,
wsrep::transaction_id transaction_id,
wsrep::seqno seqno,
int flags,
const wsrep::const_buffer& data)
{
DBUG_ENTER("Wsrep_schema::append_fragment");
std::ostringstream os;
os << server_id;
WSREP_DEBUG("Append fragment(%llu) %s, %llu",
thd->thread_id,
os.str().c_str(),
transaction_id.get());
/* use private query table list for the duration of fragment storing,
populated query table list from "parent DML" may cause problems .e.g
for virtual column handling
*/
Query_tables_list query_tables_list_backup;
thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
Wsrep_schema_impl::binlog_off binlog_off(thd);
Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
Wsrep_schema_impl::init_stmt(thd);
TABLE_LIST frag_table_l;
if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table_l))
{
trans_rollback_stmt(thd);
thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
DBUG_RETURN(1);
}
TABLE* frag_table= frag_table_l.table;
Wsrep_schema_impl::store(frag_table, 0, server_id);
Wsrep_schema_impl::store(frag_table, 1, transaction_id.get());
Wsrep_schema_impl::store(frag_table, 2, seqno.get());
Wsrep_schema_impl::store(frag_table, 3, flags);
Wsrep_schema_impl::store(frag_table, 4, data.data(), data.size());
if (Wsrep_schema_impl::insert(frag_table)) {
trans_rollback_stmt(thd);
close_thread_tables(thd);
thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
DBUG_RETURN(1);
}
Wsrep_schema_impl::finish_stmt(thd);
thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
DBUG_RETURN(0);
}
int Wsrep_schema::update_fragment_meta(THD* thd,
const wsrep::ws_meta& ws_meta)
{
DBUG_ENTER("Wsrep_schema::update_fragment_meta");
std::ostringstream os;
os << ws_meta.server_id();
WSREP_DEBUG("update_frag_seqno(%llu) %s, %llu, seqno %lld",
thd->thread_id,
os.str().c_str(),
ws_meta.transaction_id().get(),
ws_meta.seqno().get());
DBUG_ASSERT(ws_meta.seqno().is_undefined() == false);
/* use private query table list for the duration of fragment storing,
populated query table list from "parent DML" may cause problems .e.g
for virtual column handling
*/
Query_tables_list query_tables_list_backup;
thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
Wsrep_schema_impl::binlog_off binlog_off(thd);
Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
int error;
uchar *key=NULL;
key_part_map key_map= 0;
TABLE* frag_table= 0;
TABLE_LIST frag_table_l;
Wsrep_schema_impl::init_stmt(thd);
if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table_l))
{
thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
DBUG_RETURN(1);
}
frag_table= frag_table_l.table;
/* Find record with the given uuid, trx id, and seqno -1 */
Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id());
Wsrep_schema_impl::store(frag_table, 1, ws_meta.transaction_id().get());
Wsrep_schema_impl::store(frag_table, 2, -1);
Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
if ((error= Wsrep_schema_impl::init_for_index_scan(frag_table,
key, key_map)))
{
if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
{
WSREP_WARN("Record not found in %s.%s: %d",
frag_table->s->db.str,
frag_table->s->table_name.str,
error);
}
Wsrep_schema_impl::finish_stmt(thd);
thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
my_free(key);
DBUG_RETURN(1);
}
my_free(key);
/* Copy the original record to frag_table->record[1] */
store_record(frag_table, record[1]);
/* Store seqno in frag_table->record[0] and update the row */
Wsrep_schema_impl::store(frag_table, 2, ws_meta.seqno().get());
if ((error= frag_table->file->ha_update_row(frag_table->record[1],
frag_table->record[0]))) {
WSREP_ERROR("Error updating record in %s.%s: %d",
frag_table->s->db.str,
frag_table->s->table_name.str,
error);
Wsrep_schema_impl::finish_stmt(thd);
thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
DBUG_RETURN(1);
}
int ret= Wsrep_schema_impl::end_index_scan(frag_table);
Wsrep_schema_impl::finish_stmt(thd);
thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
DBUG_RETURN(ret);
}
static int remove_fragment(THD* thd,
TABLE* frag_table,
const wsrep::id& server_id,
wsrep::transaction_id transaction_id,
wsrep::seqno seqno)
{
WSREP_DEBUG("remove_fragment(%llu) trx %llu, seqno %lld",
thd->thread_id,
transaction_id.get(),
seqno.get());
int ret= 0;
int error;
uchar *key= NULL;
key_part_map key_map= 0;
DBUG_ASSERT(server_id.is_undefined() == false);
DBUG_ASSERT(transaction_id.is_undefined() == false);
DBUG_ASSERT(seqno.is_undefined() == false);
/*
Remove record with the given uuid, trx id, and seqno.
Using a complete key here avoids gap locks.
*/
Wsrep_schema_impl::store(frag_table, 0, server_id);
Wsrep_schema_impl::store(frag_table, 1, transaction_id.get());
Wsrep_schema_impl::store(frag_table, 2, seqno.get());
Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
if ((error= Wsrep_schema_impl::init_for_index_scan(frag_table,
key,
key_map)))
{
if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
{
WSREP_DEBUG("Record not found in %s.%s:trx %llu, seqno %lld, error %d",
frag_table->s->db.str,
frag_table->s->table_name.str,
transaction_id.get(),
seqno.get(),
error);
}
else
{
ret= error;
}
}
else if (Wsrep_schema_impl::delete_row(frag_table))
{
ret= 1;
}
if (key)
my_free(key);
Wsrep_schema_impl::end_index_scan(frag_table);
return ret;
}
int Wsrep_schema::remove_fragments(THD* thd,
const wsrep::id& server_id,
wsrep::transaction_id transaction_id,
const std::vector<wsrep::seqno>& fragments)
{
DBUG_ENTER("Wsrep_schema::remove_fragments");
int ret= 0;
WSREP_DEBUG("Removing %zu fragments", fragments.size());
Wsrep_schema_impl::wsrep_ignore_table wsrep_ignore_table(thd);
Wsrep_schema_impl::binlog_off binlog_off(thd);
Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
Query_tables_list query_tables_list_backup;
Open_tables_backup open_tables_backup;
thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
thd->reset_n_backup_open_tables_state(&open_tables_backup);
TABLE* frag_table= 0;
TABLE_LIST frag_table_l;
if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table_l))
{
ret= 1;
}
else
{
frag_table= frag_table_l.table;
for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
i != fragments.end(); ++i)
{
if (remove_fragment(thd,
frag_table,
server_id,
transaction_id, *i))
{
ret= 1;
break;
}
}
}
close_thread_tables(thd);
thd->restore_backup_open_tables_state(&open_tables_backup);
thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
if (thd->wsrep_cs().mode() == wsrep::client_state::m_local &&
!thd->in_multi_stmt_transaction_mode())
{
/*
The ugly part: Locally executing autocommit statement is
committing and it has removed a fragment from stable storage.
Now calling finish_stmt() will call trans_commit_stmt(), which will
actually commit the transaction, what we really don't want
to do at this point.
Doing nothing at this point seems to work ok, this block is
intentionally no-op and for documentation purposes only.
*/
}
else
{
Wsrep_schema_impl::thd_server_status
thd_server_status(thd, thd->server_status | SERVER_STATUS_IN_TRANS,
thd->in_multi_stmt_transaction_mode());
Wsrep_schema_impl::finish_stmt(thd);
}
DBUG_RETURN(ret);
}
static int replay_transaction(THD* thd,
THD* orig_thd,
Relay_log_info* rli,
const wsrep::ws_meta& ws_meta,
const std::vector<wsrep::seqno>& fragments)
{
Wsrep_schema_impl::wsrep_off wsrep_off(thd);
Wsrep_schema_impl::binlog_off binlog_off(thd);
Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, thd);
int ret= 1;
int error;
TABLE* frag_table= 0;
TABLE_LIST frag_table_l;
uchar *key=NULL;
key_part_map key_map= 0;
for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
i != fragments.end(); ++i)
{
Wsrep_schema_impl::init_stmt(thd);
if ((error= Wsrep_schema_impl::open_for_read(thd, sr_table_str.c_str(), &frag_table_l)))
{
WSREP_WARN("Could not open SR table for read: %d", error);
Wsrep_schema_impl::finish_stmt(thd);
return 1;
}
frag_table= frag_table_l.table;
Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id());
Wsrep_schema_impl::store(frag_table, 1, ws_meta.transaction_id().get());
Wsrep_schema_impl::store(frag_table, 2, i->get());
Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
int error= Wsrep_schema_impl::init_for_index_scan(frag_table,
key,
key_map);
if (error)
{
WSREP_WARN("Failed to init streaming log table for index scan: %d",
error);
Wsrep_schema_impl::end_index_scan(frag_table);
ret= 1;
break;
}
int flags;
Wsrep_schema_impl::scan(frag_table, 3, flags);
WSREP_DEBUG("replay_fragment(%llu): seqno: %lld flags: %x",
ws_meta.transaction_id().get(),
i->get(),
flags);
String buf;
frag_table->field[4]->val_str(&buf);
{
Wsrep_schema_impl::thd_context_switch thd_context_switch(thd, orig_thd);
ret= wsrep_apply_events(orig_thd, rli, buf.ptr(), buf.length());
if (ret)
{
WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");
break;
}
}
Wsrep_schema_impl::end_index_scan(frag_table);
Wsrep_schema_impl::finish_stmt(thd);
Wsrep_schema_impl::init_stmt(thd);
if ((error= Wsrep_schema_impl::open_for_write(thd,
sr_table_str.c_str(),
&frag_table_l)))
{
WSREP_WARN("Could not open SR table for write: %d", error);
Wsrep_schema_impl::finish_stmt(thd);
ret= 1;
break;
}
frag_table= frag_table_l.table;
error= Wsrep_schema_impl::init_for_index_scan(frag_table,
key,
key_map);
if (error)
{
WSREP_WARN("Failed to init streaming log table for index scan: %d",
error);
Wsrep_schema_impl::end_index_scan(frag_table);
ret= 1;
break;
}
error= Wsrep_schema_impl::delete_row(frag_table);
if (error)
{
WSREP_WARN("Could not delete row from streaming log table: %d", error);
Wsrep_schema_impl::end_index_scan(frag_table);
ret= 1;
break;
}
Wsrep_schema_impl::end_index_scan(frag_table);
Wsrep_schema_impl::finish_stmt(thd);
my_free(key);
key= NULL;
}
if (key)
my_free(key);
return ret;
}
int Wsrep_schema::replay_transaction(THD* orig_thd,
Relay_log_info* rli,
const wsrep::ws_meta& ws_meta,
const std::vector<wsrep::seqno>& fragments)
{
DBUG_ENTER("Wsrep_schema::replay_transaction");
DBUG_ASSERT(!fragments.empty());
THD *thd= new THD(next_thread_id(), true);
if (!thd)
{
WSREP_WARN("Could not allocate memory for THD");
DBUG_RETURN(1);
}
thd->thread_stack= (orig_thd ? orig_thd->thread_stack : (char *) &thd);
wsrep_assign_from_threadvars(thd);
int ret= ::replay_transaction(thd, orig_thd, rli, ws_meta, fragments);
delete thd;
DBUG_RETURN(ret);
}
static int recover_sr_transactions(THD* storage_thd, THD* orig_thd)
{
TABLE* frag_table= 0;
TABLE_LIST frag_table_l;
TABLE* cluster_table= 0;
TABLE_LIST cluster_table_l;
Wsrep_storage_service storage_service(storage_thd);
Wsrep_schema_impl::binlog_off binlog_off(storage_thd);
Wsrep_schema_impl::wsrep_off wsrep_off(storage_thd);
Wsrep_schema_impl::sql_safe_updates sql_safe_updates(storage_thd);
Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd,
storage_thd);
Wsrep_server_state& server_state(Wsrep_server_state::instance());
int ret= 1;
int error;
wsrep::id cluster_id;
Wsrep_schema_impl::init_stmt(storage_thd);
storage_thd->wsrep_skip_locking= FALSE;
if (Wsrep_schema_impl::open_for_read(storage_thd, cluster_table_str.c_str(),
&cluster_table_l))
{
Wsrep_schema_impl::finish_stmt(storage_thd);
return 1;
}
cluster_table= cluster_table_l.table;
if (Wsrep_schema_impl::init_for_scan(cluster_table))
{
Wsrep_schema_impl::finish_stmt(storage_thd);
return 1;
}
if ((error= Wsrep_schema_impl::next_record(cluster_table)))
{
Wsrep_schema_impl::end_scan(cluster_table);
Wsrep_schema_impl::finish_stmt(storage_thd);
trans_commit(storage_thd);
if (error == HA_ERR_END_OF_FILE)
{
WSREP_INFO("Cluster table is empty, not recovering transactions");
return 0;
}
else
{
WSREP_ERROR("Failed to read cluster table: %d", error);
return 1;
}
}
Wsrep_schema_impl::scan(cluster_table, 0, cluster_id);
Wsrep_schema_impl::end_scan(cluster_table);
Wsrep_schema_impl::finish_stmt(storage_thd);
std::ostringstream os;
os << cluster_id;
WSREP_INFO("Recovered cluster id %s", os.str().c_str());
storage_thd->wsrep_skip_locking= TRUE;
Wsrep_schema_impl::init_stmt(storage_thd);
/*
Open the table for reading and writing so that fragments without
valid seqno can be deleted.
*/
if (Wsrep_schema_impl::open_for_write(storage_thd, sr_table_str.c_str(),
&frag_table_l))
{
WSREP_ERROR("Failed to open SR table for write");
goto out;
}
frag_table= frag_table_l.table;
if (Wsrep_schema_impl::init_for_scan(frag_table))
{
WSREP_ERROR("Failed to init for index scan");
goto out;
}
while (0 == error)
{
if ((error= Wsrep_schema_impl::next_record(frag_table)) == 0)
{
wsrep::id server_id;
Wsrep_schema_impl::scan(frag_table, 0, server_id);
wsrep::client_id client_id;
unsigned long long transaction_id_ull;
Wsrep_schema_impl::scan(frag_table, 1, transaction_id_ull);
wsrep::transaction_id transaction_id(transaction_id_ull);
long long seqno_ll;
Wsrep_schema_impl::scan(frag_table, 2, seqno_ll);
wsrep::seqno seqno(seqno_ll);
/* This is possible if the server crashes between inserting the
fragment into table and updating the fragment seqno after
certification. */
if (seqno.is_undefined())
{
Wsrep_schema_impl::delete_row(frag_table);
continue;
}
wsrep::gtid gtid(cluster_id, seqno);
int flags;
Wsrep_schema_impl::scan(frag_table, 3, flags);
String data_str;
(void)frag_table->field[4]->val_str(&data_str);
wsrep::const_buffer data(data_str.ptr(), data_str.length());
wsrep::ws_meta ws_meta(gtid,
wsrep::stid(server_id,
transaction_id,
client_id),
wsrep::seqno::undefined(),
flags);
wsrep::high_priority_service* applier;
if (!(applier= server_state.find_streaming_applier(server_id,
transaction_id)))
{
DBUG_ASSERT(wsrep::starts_transaction(flags));
applier = wsrep_create_streaming_applier(storage_thd, "recovery");
server_state.start_streaming_applier(server_id, transaction_id,
applier);
applier->start_transaction(wsrep::ws_handle(transaction_id, 0),
ws_meta);
}
applier->store_globals();
wsrep::mutable_buffer unused;
if ((ret= applier->apply_write_set(ws_meta, data, unused)) != 0)
{
WSREP_ERROR("SR trx recovery applying returned %d", ret);
}
else
{
applier->after_apply();
}
storage_service.store_globals();
}
else if (error == HA_ERR_END_OF_FILE)
{
ret= 0;
}
else
{
WSREP_ERROR("SR table scan returned error %d", error);
}
}
Wsrep_schema_impl::end_scan(frag_table);
Wsrep_schema_impl::finish_stmt(storage_thd);
trans_commit(storage_thd);
storage_thd->set_mysys_var(0);
out:
return ret;
}
int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
{
DBUG_ENTER("Wsrep_schema::recover_sr_transactions");
THD *storage_thd= new THD(next_thread_id(), true);
if (!storage_thd)
{
WSREP_WARN("Could not allocate memory for THD");
DBUG_RETURN(1);
}
storage_thd->thread_stack=
(orig_thd ? orig_thd->thread_stack : (char *) &storage_thd);
wsrep_assign_from_threadvars(storage_thd);
int ret= ::recover_sr_transactions(storage_thd, orig_thd);
delete storage_thd;
DBUG_RETURN(ret);
}
void Wsrep_schema::clear_allowlist()
{
THD* thd= new THD(next_thread_id());
if (!thd)
{
WSREP_ERROR("Unable to get thd");
return;
}
thd->thread_stack= (char*)&thd;
wsrep_init_thd_for_schema(thd);
TABLE* allowlist_table= 0;
TABLE_LIST allowlist_table_l;
int error= 0;
Wsrep_schema_impl::init_stmt(thd);
if (Wsrep_schema_impl::open_for_write(thd, allowlist_table_str.c_str(),
&allowlist_table_l) ||
(allowlist_table= allowlist_table_l.table,
Wsrep_schema_impl::init_for_scan(allowlist_table)))
{
WSREP_ERROR("Failed to open mysql.wsrep_allowlist table");
goto out;
}
while (0 == error)
{
if ((error= Wsrep_schema_impl::next_record(allowlist_table)) == 0)
{
Wsrep_schema_impl::delete_row(allowlist_table);
}
else if (error == HA_ERR_END_OF_FILE)
{
continue;
}
else
{
WSREP_ERROR("Allowlist table scan returned error %d", error);
}
}
Wsrep_schema_impl::end_scan(allowlist_table);
Wsrep_schema_impl::finish_stmt(thd);
out:
delete thd;
}
void Wsrep_schema::store_allowlist(std::vector<std::string>& ip_allowlist)
{
THD* thd= new THD(next_thread_id());
if (!thd)
{
WSREP_ERROR("Unable to get thd");
return;
}
thd->thread_stack= (char*)&thd;
wsrep_init_thd_for_schema(thd);
TABLE* allowlist_table= 0;
TABLE_LIST allowlist_table_l;
int error;
Wsrep_schema_impl::init_stmt(thd);
if (Wsrep_schema_impl::open_for_write(thd, allowlist_table_str.c_str(),
&allowlist_table_l))
{
WSREP_ERROR("Failed to open mysql.wsrep_allowlist table");
goto out;
}
allowlist_table= allowlist_table_l.table;
for (size_t i= 0; i < ip_allowlist.size(); ++i)
{
Wsrep_schema_impl::store(allowlist_table, 0, ip_allowlist[i]);
if ((error= Wsrep_schema_impl::insert(allowlist_table)))
{
if (error == HA_ERR_FOUND_DUPP_KEY)
{
WSREP_WARN("Duplicate entry (%s) found in `wsrep_allowlist` list", ip_allowlist[i].c_str());
}
else
{
WSREP_ERROR("Failed to write mysql.wsrep_allowlist table: %d", error);
goto out;
}
}
}
Wsrep_schema_impl::finish_stmt(thd);
out:
delete thd;
}
typedef struct Allowlist_check_arg
{
Allowlist_check_arg(const std::string& value)
: value(value)
, response(false)
{
}
std::string value;
bool response;
} Allowlist_check_arg;
static void *allowlist_check_thread(void *param)
{
Allowlist_check_arg *arg= (Allowlist_check_arg *) param;
my_thread_init();
THD thd(0);
thd.thread_stack= (char *) &thd;
wsrep_init_thd_for_schema(&thd);
int error;
TABLE *allowlist_table= 0;
TABLE_LIST allowlist_table_l;
bool match_found_or_empty= false;
bool table_have_rows= false;
char row[64]= {
0,
};
/*
* Read allowlist table
*/
Wsrep_schema_impl::init_stmt(&thd);
if (Wsrep_schema_impl::open_for_read(&thd, allowlist_table_str.c_str(),
&allowlist_table_l) ||
(allowlist_table= allowlist_table_l.table,
Wsrep_schema_impl::init_for_scan(allowlist_table)))
{
goto out;
}
while (true)
{
if ((error= Wsrep_schema_impl::next_record(allowlist_table)) == 0)
{
if (Wsrep_schema_impl::scan(allowlist_table, 0, row, sizeof(row)))
{
goto out;
}
table_have_rows= true;
if (!arg->value.compare(row))
{
match_found_or_empty= true;
break;
}
}
else if (error == HA_ERR_END_OF_FILE)
{
if (!table_have_rows)
{
WSREP_DEBUG("allowlist table empty, allowing all connections.");
// If table is empty we are allowing all connections
match_found_or_empty= true;
}
break;
}
else
{
goto out;
}
}
if (Wsrep_schema_impl::end_scan(allowlist_table))
{
goto out;
}
Wsrep_schema_impl::finish_stmt(&thd);
(void) trans_commit(&thd);
out:
my_thread_end();
arg->response = match_found_or_empty;
return 0;
}
bool Wsrep_schema::allowlist_check(Wsrep_allowlist_key key,
const std::string &value)
{
// We don't have wsrep schema initialized at this point
if (wsrep_schema_ready == false)
{
return true;
}
pthread_t allowlist_check_thd;
int ret;
Allowlist_check_arg arg(value);
ret= mysql_thread_create(0, /* Not instrumented */
&allowlist_check_thd, NULL,
allowlist_check_thread, &arg);
if (ret)
{
WSREP_ERROR("allowlist_check(): mysql_thread_create() failed: %d (%s)",
ret, strerror(ret));
return false;
}
pthread_join(allowlist_check_thd, NULL);
return arg.response;
}