mariadb/storage/maria/ma_clone.cc
2025-05-30 12:48:36 +05:30

1703 lines
48 KiB
C++

/*
Copyright (c) 2024, 2024, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
*/
/**
@file storage/maria/ma_clone.cc
Clone Aria Tables
Part of the implementation is taken from extra/mariabackup/aria_backup_client.cc
and plugin/clone/src/clone_se.cc
*/
#include "handler.h"
#include "clone_handler.h"
#include "mysqld_error.h"
#include "maria_def.h"
#include <aria_backup.h>
#include "log.h"
#include <array>
#include <condition_variable>
#include <cstring>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <unordered_set>
#include <vector>
namespace aria_engine
{
class Locator
{
public:
Locator(const Locator *ref_loc, uint32_t clone_index, bool is_copy);
Locator(const unsigned char *serial, size_t serial_length);
std::pair<const unsigned char *, uint32_t> get_locator() const;
bool operator==(const Locator& other) const;
uint32_t index() const { return m_index; }
static constexpr uint32_t S_CUR_VERSION= 1;
static constexpr size_t S_MAX_LENGTH= 12;
private:
void serialize();
void deserialize();
private:
uint32_t m_version= S_CUR_VERSION;
uint32_t m_clone_id= 0;
uint32_t m_index= 0;
unsigned char m_serial[S_MAX_LENGTH];
};
Locator::Locator(const unsigned char *serial, size_t serial_length)
{
DBUG_ASSERT(serial_length == S_MAX_LENGTH);
memset(&m_serial[0], 0, S_MAX_LENGTH);
auto cp_length= std::min<size_t>(serial_length, S_MAX_LENGTH);
memcpy(&m_serial[0], serial, cp_length);
deserialize();
}
void Locator::serialize()
{
unsigned char *ptr= &m_serial[0];
int4store(ptr, m_version);
ptr+= 4;
int4store(ptr, m_clone_id);
ptr+= 4;
int4store(ptr, m_index);
}
void Locator::deserialize()
{
unsigned char *ptr= &m_serial[0];
m_version= uint4korr(ptr);
ptr+= 4;
m_clone_id= uint4korr(ptr);
ptr+= 4;
m_index= uint4korr(ptr);
}
std::pair<const unsigned char *, uint32_t> Locator::get_locator() const
{
return std::make_pair(&m_serial[0], S_MAX_LENGTH);
}
bool Locator::operator==(const Locator& other) const
{
if (m_clone_id != other.m_clone_id)
return false;
DBUG_ASSERT(m_version == other.m_version);
DBUG_ASSERT(m_index == other.m_index);
return (m_version == other.m_version && m_index == other.m_index);
}
class Descriptor
{
public:
Descriptor(const unsigned char *serial, size_t serial_length);
Descriptor(const std::string &file_name, uint64_t offset, bool is_log);
std::pair<std::string, uint64_t> get_file_info() const;
std::pair<const unsigned char *, uint32_t> get_descriptor() const;
bool is_log() const { return m_is_log; }
static constexpr size_t S_MAX_META_LENGTH= 16;
static constexpr size_t S_MAX_LENGTH= S_MAX_META_LENGTH + 2 * FN_REFLEN + 1;
/* Special offset values. */
static constexpr uint64_t S_OFF_APPEND= std::numeric_limits<uint64_t>::max();
static constexpr uint64_t S_OFF_NO_DATA= S_OFF_APPEND - 1;
const uint32_t DESC_FLAG_REDO= 0x01;
private:
uint64_t m_file_offset= 0;
/* Part of 4 byte serialized flags. */
bool m_is_log= false;
size_t m_file_name_len= 0;
unsigned char m_serial[S_MAX_LENGTH];
};
Descriptor::Descriptor(const unsigned char *serial, size_t serial_length)
{
DBUG_ASSERT(serial_length <= S_MAX_LENGTH);
memset(&m_serial[0], 0, S_MAX_LENGTH);
auto cp_length= std::min<size_t>(serial_length, S_MAX_LENGTH);
memcpy(&m_serial[0], serial, cp_length);
unsigned char *ptr= &m_serial[0];
m_file_offset= uint8korr(ptr);
ptr+= 8;
uint32_t flags= uint4korr(ptr);
ptr+= 4;
m_file_name_len= uint4korr(ptr);
m_is_log= flags & DESC_FLAG_REDO;
}
Descriptor::Descriptor(const std::string &file_name, uint64_t offset,
bool is_log)
{
m_file_offset= offset;
m_is_log= is_log;
m_file_name_len= file_name.length();
unsigned char *ptr= &m_serial[0];
memset(ptr, 0, S_MAX_LENGTH);
int8store(ptr, offset);
ptr+= 8;
uint32_t flags= 0;
if (m_is_log) flags |= DESC_FLAG_REDO;
int4store(ptr, flags);
ptr+= 4;
int4store(ptr, static_cast<uint32_t>(m_file_name_len));
ptr+= 4;
if (m_file_name_len)
{
auto available_length= S_MAX_LENGTH - S_MAX_META_LENGTH;
auto cp_length= std::min<uint32_t>(m_file_name_len, available_length);
memcpy(ptr, file_name.c_str(), cp_length);
}
}
std::pair<std::string, uint64_t> Descriptor::get_file_info() const
{
auto ptr= reinterpret_cast<const char *>(&m_serial[0]);
ptr+= S_MAX_META_LENGTH;
return std::make_pair(std::string(ptr, m_file_name_len), m_file_offset);
}
std::pair<const unsigned char *, uint32_t> Descriptor::get_descriptor() const
{
auto length= static_cast<uint32_t>(m_file_name_len + S_MAX_META_LENGTH);
return std::make_pair(&m_serial[0], length);
}
static int send_data(Ha_clone_cbk *cbk_ctx, const unsigned char* data,
size_t data_len, uint64_t offset,
const std::string &file_name, bool log_file= false)
{
Descriptor data_desc(file_name, offset, log_file);
auto [desc, desc_len]= data_desc.get_descriptor();
cbk_ctx->set_data_desc(desc, desc_len);
cbk_ctx->clear_flags();
cbk_ctx->set_os_buffer_cache();
return cbk_ctx->buffer_cbk(const_cast<unsigned char*>(data), data_len);
}
static int send_file(File file_desc, uchar *buf, size_t buf_size,
Ha_clone_cbk *cbk_ctx, const std::string &fname,
const std::string &tname, size_t &copy_size,
bool is_log, bool send_file_name= true)
{
DBUG_ASSERT(file_desc >= 0);
DBUG_ASSERT(buf_size > 0);
if (file_desc < 0 || !cbk_ctx || !buf || buf_size == 0)
{
copy_size= 0;
my_error(ER_INTERNAL_ERROR, MYF(ME_ERROR_LOG),
"ARIA SE: Clone send file invalid data");
return ER_INTERNAL_ERROR;
}
uint64_t offset= send_file_name ? 0 : Descriptor::S_OFF_APPEND;
bool read_all= (copy_size == 0);
int err= 0;
size_t copied_size= 0;
auto chunk_size= read_all ? buf_size : std::min(buf_size, copy_size);
while (size_t bytes_read= my_read(file_desc, buf, chunk_size, MY_WME))
{
if (bytes_read == size_t(-1))
{
my_printf_error(ER_IO_READ_ERROR, "Error: file %s read for table %s",
ME_ERROR_LOG, fname.c_str(), tname.c_str());
return ER_IO_READ_ERROR;
}
err= send_data(cbk_ctx, buf, bytes_read, offset,
send_file_name ? fname : "", is_log);
if (err)
break;
copied_size+= bytes_read;
if (!read_all)
{
if (copied_size >= copy_size)
{
DBUG_ASSERT(copy_size == copied_size);
break;
}
auto size_left= copy_size - copied_size;
chunk_size= std::min(chunk_size, size_left);
}
send_file_name= false;
}
if (!err && copied_size == 0)
err= send_data(cbk_ctx, buf, 0, Descriptor::S_OFF_NO_DATA, fname,
is_log);
copy_size= copied_size;
return err;
}
class Table
{
public:
struct Partition
{
std::string m_file_path;
File m_files[2]= {-1, -1};
MY_STAT m_stats[2];
};
static constexpr const char *s_extns[]= {".MAI", ".MAD"};
Table(std::string &db, std::string &table, std::string &frm_name,
const char *file_path);
~Table();
void add_partition(const Table &partition)
{
DBUG_ASSERT(m_partitioned);
m_partitions.push_back(partition.m_partitions[0]);
}
int open(bool no_lock);
int copy(Ha_clone_cbk *cbk_ctx);
void close();
std::string &get_db() { return m_db; }
std::string &get_table() { return m_table; }
std::string &get_version() { return m_version; }
std::string &get_full_name() { return m_full_name; }
bool is_partitioned() const { return m_partitioned; }
bool is_online_backup_safe() const
{
DBUG_ASSERT(is_opened());
return m_cap.online_backup_safe;
}
bool is_stats() const
{
return clone_common::is_stats_table(m_db.c_str(), m_table.c_str());
}
bool is_log() const
{
return clone_common::is_log_table(m_db.c_str(), m_table.c_str());
}
bool is_opened() const
{
return !m_partitions.empty() &&
m_partitions[0].m_files[0] >= 0 &&
m_partitions[0].m_files[1] >= 0;
}
private:
std::string m_db;
std::string m_table;
std::string m_frm_name;
std::string m_version;
std::string m_full_name;
bool m_partitioned= false;
std::vector<Partition> m_partitions;
ARIA_TABLE_CAPABILITIES m_cap;
};
Table::Table(std::string &db, std::string &table, std::string &frm_name,
const char *file_path) :
m_db(std::move(db)), m_table(std::move(table)),
m_frm_name(std::move(frm_name))
{
m_full_name.assign("`").append(m_db).append("`.`");
m_full_name.append(m_table).append("`");
if (std::strstr(file_path, "#P#"))
m_partitioned= true;
Partition partition;
const char *ext_pos = std::strrchr(file_path, '.');
partition.m_file_path.assign(file_path, ext_pos - file_path);
m_partitions.push_back(std::move(partition));
}
int Table::open(bool no_lock)
{
int error= 0;
bool have_capabilities= false;
File frm_file= -1;
bool locked= false;
no_lock= true; // TODO: Remove after implementing lock
if (!no_lock /* && !backup_lock(con, m_full_name.c_str())*/)
{
my_printf_error(ER_INTERNAL_ERROR,
"Error on executing BACKUP LOCK for ARIA table %s", ME_ERROR_LOG,
m_full_name.c_str());
error= ER_INTERNAL_ERROR;
goto exit;
}
else
locked= !no_lock;
for (Partition &partition : m_partitions)
{
for (size_t index= 0; index < 2; index++)
{
auto &extn= s_extns[index];
std::string file_path= partition.m_file_path + extn;
partition.m_files[index]= mysql_file_open(0, file_path.c_str(),
O_RDONLY | O_SHARE | O_NOFOLLOW | O_CLOEXEC, MYF(MY_WME));
if (partition.m_files[index] < 0)
{
my_printf_error(ER_CANT_OPEN_FILE,
"Error on file %s open during %s ARIA table copy", ME_ERROR_LOG,
file_path.c_str(), m_full_name.c_str());
error= ER_CANT_OPEN_FILE;
goto exit;
}
if (!my_stat(file_path.c_str(), &partition.m_stats[index], MYF(0)))
{
my_printf_error(ER_INTERNAL_ERROR,
"Error: failed to get stat info for file %s of table %s",
ME_ERROR_LOG, file_path.c_str(), m_full_name.c_str());
error= ER_INTERNAL_ERROR;
goto exit;
}
}
if (!have_capabilities)
{
if ((error= aria_get_capabilities(partition.m_files[0], &m_cap)))
{
my_printf_error(ER_INTERNAL_ERROR,
"Error: ARIA getting capability: %d", ME_ERROR_LOG, error);
goto exit;
}
have_capabilities= true;
}
}
frm_file= mysql_file_open(key_file_frm, (m_frm_name + ".frm").c_str(),
O_RDONLY | O_SHARE, MYF(0));
if (frm_file < 0)
{
my_printf_error(ER_INTERNAL_ERROR,
"Error on ARIA FRM file open: %s", ME_ERROR_LOG,
(m_frm_name + ".frm").c_str());
error= ER_INTERNAL_ERROR;
}
exit:
if (locked /* TODO: && !backup_unlock(con) */)
{
my_printf_error(ER_INTERNAL_ERROR,
"Error on BACKUP UNLOCK for ARIA table %s",
ME_ERROR_LOG, m_full_name.c_str());
error= ER_INTERNAL_ERROR;
}
if (frm_file >= 0)
{
m_version= clone_common::read_table_version_id(frm_file);
mysql_file_close(frm_file, MYF(0));
}
if (error) close();
return error;
}
Table::~Table()
{
close();
}
void Table::close()
{
for (Partition &partition : m_partitions)
{
for (size_t index= 0; index < 2; index++)
{
auto file_desc= partition.m_files[index];
if (file_desc >= 0)
mysql_file_close(partition.m_files[index], MYF(0));
partition.m_files[index]= -1;
}
}
}
int Table::copy(Ha_clone_cbk *cbk_ctx)
{
auto buf_size= static_cast<size_t>(m_cap.block_size);
std::unique_ptr<uchar[]> buf(new uchar[buf_size]);
int err= 0;
for (const auto &part : m_partitions)
{
/* Loop two time for data and index file. */
for (size_t index= 0; index < 2; index++)
{
size_t data_bytes= 0;
auto &extn= s_extns[index];
std::string file_path= part.m_file_path + extn;
for (ulonglong block= 0;; block++)
{
size_t buf_len= buf_size;
if (index)
err= aria_read_data(part.m_files[index], &m_cap, block, buf.get(),
&buf_len);
else
err= aria_read_index(part.m_files[index], &m_cap, block, buf.get());
if (err == HA_ERR_END_OF_FILE)
{
err= 0;
break;
}
if (err)
{
my_printf_error(ER_IO_READ_ERROR, "Error: file %s read for table %s",
ME_ERROR_LOG, file_path.c_str(), m_full_name.c_str());
return ER_IO_READ_ERROR;
}
err= send_data(cbk_ctx, buf.get(), buf_len, Descriptor::S_OFF_APPEND,
(block == 0) ? file_path : "");
if (err)
return err;
data_bytes+= buf_len;
}
my_printf_error(ER_CLONE_SERVER_TRACE, "ARIA SE: Copied file %s for "
"table %s, %zu bytes", MYF(ME_NOTE | ME_ERROR_LOG_ONLY),
file_path.c_str(), m_full_name.c_str(), data_bytes);
}
}
return 0;
}
class Log_Files
{
public:
/** Initialize by checking existing log files on the disk. */
Log_Files(const char *datadir, uint32_t max_log_no, uint32_t min_log_no= 0);
uint32_t first() const { return m_first; }
uint32_t count() const { return m_count; }
uint32_t last() const
{
DBUG_ASSERT(m_count > 0);
return m_first + m_count - 1;
}
void report_found() const
{
if (m_count)
sql_print_information("Found %u aria log files, minimum log number %u, "
"maximum log number %u", m_count, m_first, last());
}
bool check_if_missing(uint32_t logno) const
{
DBUG_ASSERT(logno > 0);
return (!m_count || m_first > logno || last() < logno);
}
static std::string name_by_index(size_t log_num)
{
static constexpr const char *prefix= "aria_log.";
std::string log_file;
{
std::stringstream ss;
ss << std::setw(8) << std::setfill('0') << log_num;
log_file.append(prefix).append(ss.str());
}
return log_file;
}
static std::string name(const char *datadir_path, size_t log_num)
{
std::string log_file(datadir_path);
return log_file.append("/").append(name_by_index(log_num));
}
private:
/** Check to see if a file exists. Takes name of the file to check.
@return true if file exists. */
static bool file_exists(const char *filename)
{
MY_STAT stat_arg;
auto stat= my_stat(filename, &stat_arg, MYF(0));
return (stat != nullptr);
}
/*
Skip all missing log files and find the greatest existing log file, or
Skip all existing log files and find the greatest missing log file.
@param datadir - Search files in this directory
@param start - Start searching from this log number
@param stop - Search up to this point excluding stop
@param kind - true - search for an existing file
false - search for a missing file.
@returns - (stop..start] - the greatest found log file
of the searched kind
- 0 - if no log files of this kind
were found in the range (stop..start].
*/
static uint32_t find_greatest(const char *datadir, uint32_t start,
uint32_t stop, bool kind)
{
for (uint32_t i= start; i > stop; i--)
{
if (file_exists(name(datadir, i).c_str()) == kind)
return i;
}
return stop; // No log files of the searched kind were found
}
static uint32_t find_greatest_existing(const char *datadir, uint32_t start,
uint32_t stop)
{
return find_greatest(datadir, start, stop, true);
}
static uint32_t find_greatest_missing(const char *datadir, uint32_t start,
uint32_t stop)
{
return find_greatest(datadir, start, stop, false);
}
private:
uint32_t m_first= 0;
uint32_t m_count= 0;
};
Log_Files::Log_Files(const char *datadir, uint32_t max_log_no,
uint32_t min_log_no)
{
auto end= find_greatest_existing(datadir, max_log_no, min_log_no);
DBUG_ASSERT(end >= min_log_no);
if (end == min_log_no + 1)
{
// Just the very one log file (aria_log.00000001 when min_log_no= 0) was found.
m_first= min_log_no + 1;
m_count= 1;
}
else if (end > min_log_no + 1)
{
// Multiple files were found
m_first= find_greatest_missing(datadir, end - 1, min_log_no) + 1;
m_count= 1 + end - m_first;
return;
}
else
{
DBUG_ASSERT(end == min_log_no);
// No log files were found at all
m_first= 0;
m_count= 0;
}
}
class Job_Repository
{
public:
using Job= std::function<int(Ha_clone_cbk *, uint32_t, int)>;
void add_one(Job &&job);
void finish(int err, Ha_clone_stage stage);
int consume(THD *thd, uint32_t thread_id, Ha_clone_cbk *cbk,
Ha_clone_stage stage, int err);
int wait_pending(THD *thd);
private:
std::mutex m_mutex;
std::condition_variable m_cv;
std::queue<Job> m_jobs;
bool m_finished[HA_CLONE_STAGE_MAX]= {false};
int m_error= 0;
uint32_t n_pending= 0;
};
int Job_Repository::wait_pending(THD *thd)
{
auto cond_fn= [&]
{
return (n_pending == 0);
};
std::unique_lock<std::mutex> lock(m_mutex);
/* We try consuming first and then come here. No more jobs can be added at
this point. */
DBUG_ASSERT(m_jobs.empty());
uint32_t count= 0;
constexpr uint32_t max_count= 300;
while (n_pending && ++count < max_count)
{
m_cv.wait_for(lock, std::chrono::seconds(1), cond_fn);
if (thd_killed(thd))
{
my_error(ER_QUERY_INTERRUPTED, MYF(ME_ERROR_LOG));
m_error= ER_QUERY_INTERRUPTED;
return m_error;
}
}
if (n_pending)
{
my_printf_error(ER_STATEMENT_TIMEOUT,
"ARIA SE: Clone Timeout(5 minutes) while waiting for jobs to finish",
MYF(ME_ERROR_LOG));
m_error= ER_STATEMENT_TIMEOUT;
}
return m_error;
}
void Job_Repository::add_one(Job &&job)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_jobs.push(std::forward<Job>(job));
++n_pending;
DBUG_ASSERT(n_pending >= m_jobs.size());
lock.unlock();
m_cv.notify_one();
}
void Job_Repository::finish(int err, Ha_clone_stage stage)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (stage < HA_CLONE_STAGE_MAX)
m_finished[stage]= true;
if (err && !m_error)
m_error= err;
lock.unlock();
m_cv.notify_all();
}
int Job_Repository::consume(THD *thd, uint32_t thread_id, Ha_clone_cbk *cbk,
Ha_clone_stage stage, int err)
{
std::unique_lock<std::mutex> lock(m_mutex);
while (!m_finished[stage] || !m_jobs.empty())
{
while (!m_jobs.empty())
{
auto job= std::move(m_jobs.front());
m_jobs.pop();
lock.unlock();
/* Even after an error, we need to keep consuming all jobs added as jobs
could hold table object ownership that needs to be freed. The input
error would ensure we don't actually transfer any data after an error. */
err= job(cbk, thread_id, err);
DBUG_ASSERT(n_pending > 0);
--n_pending;
if (thd_killed(thd))
{
my_error(ER_QUERY_INTERRUPTED, MYF(0));
err= ER_QUERY_INTERRUPTED;
}
lock.lock();
}
if (m_error && !err)
{
my_error(ER_INTERNAL_ERROR, MYF(ME_ERROR_LOG),
"ARIA SE: Clone error in concurrent task");
err= m_error;
break;
}
else if (err && !m_error)
{
m_error= err;
break;
}
m_cv.wait_for(lock, std::chrono::seconds(1), [&]
{
return (m_finished[stage] || !m_jobs.empty() || m_error);
});
}
return err;
}
using table_key_t= std::string;
inline table_key_t table_key(const std::string &db, const std::string &table)
{
return std::string(db).append(".").append(table);
}
struct Thread_Context
{
int open(const std::string &path, const std::string &file, uint64_t offset,
bool log= false);
int open_for_read(const std::string &path, const std::string &file,
bool log= false);
void close();
void close_log();
uint32_t m_task_id= 0;
File m_file= -1;
File m_log_file= -1;
std::string m_cur_data_file;
};
int Thread_Context::open_for_read(const std::string &path,
const std::string &file, bool log)
{
log ? close_log() : close();
auto &cur_file= log ? m_log_file : m_file;
char fullpath[FN_REFLEN];
fn_format(fullpath, file.c_str(), path.c_str(), "", MYF(MY_RELATIVE_PATH));
int open_flags= O_RDONLY | O_SHARE;
cur_file= mysql_file_open(0, fullpath, open_flags, MYF(0));
if (cur_file < 0)
{
cur_file= -1;
my_error(ER_CANT_OPEN_FILE, MYF(ME_ERROR_LOG), fullpath, my_errno);
return ER_CANT_OPEN_FILE;
}
return 0;
}
int Thread_Context::open(const std::string &path, const std::string &file,
uint64_t offset, bool log)
{
/* Close previous file if there. */
log ? close_log() : close();
auto &cur_file= log ? m_log_file : m_file;
char fullpath[FN_REFLEN];
fn_format(fullpath, file.c_str(), path.c_str(), "", MYF(MY_RELATIVE_PATH));
size_t dirpath_len= 0;
char dirpath[FN_REFLEN];
dirname_part(dirpath, fullpath, &dirpath_len);
/* Make schema directory path and create file, if needed. */
if (my_mkdir(dirpath, 0777, MYF(0)) >= 0 || my_errno == EEXIST)
{
int open_flags= O_WRONLY | O_BINARY;
if (offset == Descriptor::S_OFF_APPEND)
open_flags|= O_APPEND;
else
DBUG_ASSERT(offset == Descriptor::S_OFF_NO_DATA || !offset);
cur_file= mysql_file_open(0, fullpath, open_flags, MYF(0));
if (cur_file < 0)
{
open_flags|= O_CREAT;
cur_file= mysql_file_open(0, fullpath, open_flags, MYF(0));
}
}
if (cur_file < 0)
{
cur_file= -1;
my_error(ER_CANT_OPEN_FILE, MYF(ME_ERROR_LOG), fullpath, my_errno);
return ER_CANT_OPEN_FILE;
}
if (!log)
m_cur_data_file.assign(file);
return 0;
}
void Thread_Context::close_log()
{
if (m_log_file < 0)
return;
mysql_file_close(m_log_file, MYF(0));
m_log_file= -1;
}
void Thread_Context::close()
{
if (m_file < 0)
return;
mysql_file_close(m_file, MYF(0));
m_file= -1;
}
class Clone_Handle
{
public:
Clone_Handle(bool is_copy, const Locator *ref_loc, const char *datadir,
uint32_t index) : m_is_copy(is_copy), m_loc(ref_loc, index, is_copy),
m_data_dir(datadir ? datadir : "."), m_log_dir(maria_data_root) {}
void set_error(int err);
int check_error(THD *thd);
int clone(THD *thd, uint32_t task_id, Ha_clone_stage stage,
Ha_clone_cbk *cbk);
int apply(THD *thd, uint32_t task_id, Ha_clone_cbk *cbk);
size_t attach();
bool detach(size_t id);
Locator &get_locator() { return m_loc; }
static constexpr size_t S_MAX_TASKS= 128;
bool max_task_reached() const
{
DBUG_ASSERT(m_next_task <= S_MAX_TASKS);
return m_next_task >= S_MAX_TASKS;
}
private:
int scan(bool no_lock);
int copy_offline_tables(const std::unordered_set<std::string> &exclude_tables,
bool no_lock, bool copy_stats);
int copy_log_tail(THD *thd, Ha_clone_cbk *cbk_ctx, bool finalize);
int copy_table_job(Table *table_ptr, bool online_only, bool copy_stats,
bool no_lock, Ha_clone_cbk *cbk, uint32_t thread_id,
int in_error);
int copy_file_job(std::string *file_name_ptr, bool is_log, Ha_clone_cbk *cbk,
uint32_t thread_id, int in_error);
int copy_partial_tail(Ha_clone_cbk *cbk_ctx);
int copy_finish_tail(Ha_clone_cbk *cbk_ctx);
private:
bool m_is_copy= true;
/** Number of threads attached; Protected by Clone_Sys::mutex_ */
size_t m_num_threads= 0;
size_t m_next_task= 0;
int m_error= 0;
Locator m_loc;
std::string m_data_dir;
std::string m_log_dir;
std::array<Thread_Context, S_MAX_TASKS> m_thread_ctxs;
Job_Repository m_jobs;
std::mutex m_offline_tables_mutex;
std::vector<std::unique_ptr<Table>> m_offline_tables;
size_t m_last_log_num= 0;
size_t m_last_log_offset= 0;
};
size_t Clone_Handle::attach()
{
/* ID is the index into the m_thread_ctxs vector. */
auto id= m_next_task++;
DBUG_ASSERT(id < S_MAX_TASKS);
auto &ctx= m_thread_ctxs[id];
ctx.m_task_id= id;
DBUG_ASSERT(ctx.m_file == -1);
m_num_threads++;
DBUG_ASSERT(m_thread_ctxs.size() >= m_num_threads);
return id;
}
bool Clone_Handle::detach(size_t id)
{
auto &ctx= m_thread_ctxs[id];
ctx.close();
ctx.close_log();
DBUG_ASSERT(m_num_threads > 0);
return (0 == --m_num_threads);
}
int Clone_Handle::copy_file_job(std::string *file_name_ptr, bool is_log,
Ha_clone_cbk *cbk, uint32_t, int in_error)
{
std::unique_ptr<std::string> file_name(file_name_ptr);
int err= in_error;
if (err)
return err;
std::string file_path;
if (is_log)
{
file_path.assign(m_log_dir);
if (file_path.back() != FN_LIBCHAR)
file_path+= FN_LIBCHAR;
}
file_path.append(*file_name);
File file= mysql_file_open(0, file_path.c_str(), O_RDONLY | O_SHARE,
MYF(0));
if (file < 0)
{
my_printf_error(ER_CANT_OPEN_FILE, "Error on opening file: %s",
MYF(ME_ERROR_LOG), file_name->c_str());
err= ER_CANT_OPEN_FILE;
}
else
{
size_t copy_size= 0;
static const size_t buf_size = 10 * 1024 * 1024;
std::unique_ptr<uchar[]> buf= std::make_unique<uchar[]>(buf_size);
err= send_file(file, buf.get(), buf_size, cbk, (*file_name), "",
copy_size, is_log);
mysql_file_close(file, MYF(0));
my_printf_error(ER_CLONE_SERVER_TRACE, "ARIA SE: Copied complete redo log "
"file %s of size %zu bytes", MYF(ME_NOTE | ME_ERROR_LOG_ONLY),
file_name->c_str(), copy_size);
}
return err;
}
int Clone_Handle::copy_table_job(Table *table_ptr, bool online_only,
bool copy_stats, bool no_lock,
Ha_clone_cbk *cbk, uint32_t,
int in_error)
{
std::unique_ptr<Table> table(table_ptr);
if (in_error)
return in_error;
int err= table->open(no_lock);
if (err)
return err;
bool is_online= table->is_online_backup_safe();
bool is_stats= table->is_stats();
bool need_copy= (!online_only || is_online) && (copy_stats || !is_stats);
if (need_copy)
err= table->copy(cbk);
table->close();
if (!need_copy)
{
std::lock_guard<std::mutex> lock(m_offline_tables_mutex);
m_offline_tables.push_back(std::move(table));
return 0;
}
/* TODO: Post Copy Hook for DDL */
// if (!err && m_table_post_copy_hook)
// m_table_post_copy_hook(table->get_db(), table->get_table(),
// table->get_version());
return err;
}
int Clone_Handle::scan(bool no_lock)
{
auto ctrl_file_name= std::make_unique<std::string>("aria_log_control");
using namespace std::placeholders;
m_jobs.add_one(std::bind(&Clone_Handle::copy_file_job, this,
ctrl_file_name.release(), true, _1, _2, _3));
my_printf_error(ER_CLONE_SERVER_TRACE, "ARIA SE: Start scanning engine table"
"s, need backup locks: %d",
MYF(ME_NOTE | ME_ERROR_LOG_ONLY), no_lock);
std::set<std::string> ext_list= {".MAD"};
std::unordered_map<std::string, std::unique_ptr<Table>> partitioned_tables;
namespace fsys= std::filesystem;
clone_common::foreach_file_in_dir(m_data_dir,
[&](const std::filesystem::path& file_path)
{
/* TODO: Partial Backup */
// if (check_if_skip_table(file_path))
// {
// my_printf_error(ER_CLONE_SERVER_TRACE, "ARIA SE: Skipping %s.",
// MYF(ME_NOTE | ME_ERROR_LOG_ONLY), file_path);
// return;
// }
auto db_table_fs=
clone_common::convert_filepath_to_tablename(file_path.c_str());
auto tk= table_key(std::get<0>(db_table_fs), std::get<1>(db_table_fs));
auto table= std::make_unique<Table>(std::get<0>(db_table_fs),
std::get<1>(db_table_fs), std::get<2>(db_table_fs), file_path.c_str());
if (table->is_log())
return;
if (table->is_partitioned())
{
auto table_it= partitioned_tables.find(table->get_full_name());
if (table_it == partitioned_tables.end())
partitioned_tables[table->get_full_name()]= std::move(table);
else
table_it->second->add_partition(*table);
return;
}
using namespace std::placeholders;
m_jobs.add_one(std::bind(&Clone_Handle::copy_table_job, this,
table.release(), true, false, no_lock,_1, _2, _3));
}, ext_list);
for (auto &table_it : partitioned_tables)
{
m_jobs.add_one(std::bind(&Clone_Handle::copy_table_job, this,
table_it.second.release(), true, false, no_lock, _1, _2, _3));
}
auto horizon= translog_get_horizon();
uint32_t last_file_num= LSN_FILE_NO(horizon);
my_printf_error(ER_CLONE_SERVER_TRACE, "ARIA SE: Start scanning engine redo"
"logs, last log number: %u",
MYF(ME_NOTE | ME_ERROR_LOG_ONLY), last_file_num);
Log_Files logs(m_log_dir.c_str(), last_file_num);
for (auto i= logs.first(); i < logs.last(); ++i)
{
auto log_file= std::make_unique<std::string>(Log_Files::name_by_index(i));
m_jobs.add_one(std::bind(&Clone_Handle::copy_file_job, this,
log_file.release(), true, _1, _2, _3));
}
m_last_log_num= logs.last();
m_last_log_offset= 0;
my_printf_error(ER_CLONE_SERVER_TRACE, "ARIA SE: Stop scanning engine "
"tables", MYF(ME_NOTE | ME_ERROR_LOG_ONLY));
return 0;
}
int Clone_Handle::copy_offline_tables(
const std::unordered_set<std::string> &exclude_tables,
bool no_lock, bool copy_stats)
{
std::vector<std::unique_ptr<Table>> ignored_tables;
for(;;)
{
std::unique_lock<std::mutex> lock(m_offline_tables_mutex);
if (m_offline_tables.empty())
break;
auto table= std::move(m_offline_tables.back());
m_offline_tables.pop_back();
lock.unlock();
auto tkey= table_key(table->get_db(), table->get_table());
if ((!exclude_tables.empty() && exclude_tables.count(tkey)) ||
(!copy_stats && table->is_stats()))
{
ignored_tables.push_back(std::move(table));
continue;
}
using namespace std::placeholders;
m_jobs.add_one(std::bind(&Clone_Handle::copy_table_job, this,
table.release(), false, copy_stats, no_lock, _1, _2, _3));
}
if (!ignored_tables.empty())
{
std::lock_guard<std::mutex> lock(m_offline_tables_mutex);
m_offline_tables= std::move(ignored_tables);
}
return 0;
}
int Clone_Handle::copy_finish_tail(Ha_clone_cbk *cbk_ctx)
{
int err= 0;
DBUG_ASSERT(m_last_log_num > 0);
if (m_last_log_num == 0)
return 0;
auto &ctx= m_thread_ctxs[0];
auto log_file=
std::make_unique<std::string>(Log_Files::name_by_index(m_last_log_num));
/* If the tail log file is not opened yet, send the entire log file. */
if (ctx.m_log_file == -1)
return copy_file_job(log_file.release(), true, cbk_ctx, 0, 0);
/* Send the rest of the log file. */
size_t copy_size= 0;
static const size_t buf_size= 1024 * 1024;
std::unique_ptr<uchar[]> buf= std::make_unique<uchar[]>(buf_size);
err= send_file(ctx.m_log_file, buf.get(), buf_size, cbk_ctx, (*log_file), "",
copy_size, true, false);
my_printf_error(ER_CLONE_SERVER_TRACE, "ARIA SE: Copied rest of the redo log"
" file %s of size %zu bytes from offset %zu bytes",
MYF(ME_NOTE | ME_ERROR_LOG_ONLY), log_file->c_str(), copy_size,
m_last_log_offset);
ctx.close_log();
m_last_log_num= 0;
m_last_log_offset= 0;
if (err)
return err;
/* Send the header again to update LSN. */
if ((err= ctx.open_for_read(m_log_dir, (*log_file), true)))
return err;
copy_size= LOG_HEADER_DATA_SIZE;
err= send_file(ctx.m_log_file, buf.get(), buf_size, cbk_ctx, (*log_file), "",
copy_size, true);
my_printf_error(ER_CLONE_SERVER_TRACE, "ARIA SE: Updated header of redo log"
" file %s of size %zu bytes", MYF(ME_NOTE | ME_ERROR_LOG_ONLY),
log_file->c_str(), copy_size);
ctx.close_log();
m_last_log_num= 0;
m_last_log_offset= 0;
return err;
}
template <typename T>
static T align_down(T value, T alignment)
{
DBUG_ASSERT(alignment != 0);
DBUG_ASSERT((alignment & (alignment - 1)) == 0);
return value & ~(alignment - 1);
}
int Clone_Handle::copy_partial_tail(Ha_clone_cbk *cbk_ctx)
{
int err= 0;
DBUG_ASSERT(m_last_log_num > 0);
if (m_last_log_num == 0)
return err;
auto log_file=
std::make_unique<std::string>(Log_Files::name_by_index(m_last_log_num));
auto &ctx= m_thread_ctxs[0];
bool send_file_name= false;
if (ctx.m_log_file < 0)
{
send_file_name= true;
if ((err= ctx.open_for_read(m_log_dir, (*log_file), true)))
return err;
}
MY_STAT stat_info;
memset(&stat_info, 0, sizeof(MY_STAT));
if (my_fstat(ctx.m_log_file, &stat_info, MYF(0)))
{
my_printf_error(ER_INTERNAL_ERROR, "Error: failed to get stat info for "
"ARIA log file %s", ME_ERROR_LOG, log_file->c_str());
return ER_INTERNAL_ERROR;
}
size_t file_size= static_cast<size_t>(stat_info.st_size);
if (file_size <= m_last_log_offset)
{
DBUG_ASSERT(file_size == m_last_log_offset);
return 0;
}
/* Copy without the last page, which can be rewritten. */
auto copy_size= static_cast<size_t>(file_size - m_last_log_offset);
copy_size= align_down(copy_size, static_cast<size_t>(TRANSLOG_PAGE_SIZE));
if (copy_size <= TRANSLOG_PAGE_SIZE)
return 0;
copy_size-= TRANSLOG_PAGE_SIZE;
DBUG_ASSERT(copy_size > 0);
static const size_t buf_size= 1024 * 1024;
std::unique_ptr<uchar[]> buf= std::make_unique<uchar[]>(buf_size);
err= send_file(ctx.m_log_file, buf.get(), buf_size, cbk_ctx, (*log_file), "",
copy_size, true, send_file_name);
if (!err)
my_printf_error(ER_CLONE_SERVER_TRACE, "ARIA SE: Copied partial redo log "
"file %s of size %zu bytes from offset %zu bytes",
MYF(ME_NOTE | ME_ERROR_LOG_ONLY), log_file->c_str(), copy_size,
m_last_log_offset);
m_last_log_offset+= copy_size;
return err;
}
int Clone_Handle::copy_log_tail(THD *thd, Ha_clone_cbk *cbk_ctx, bool finalize)
{
int err= 0;
if (finalize && (err= m_jobs.wait_pending(thd)))
return err;
/* Check for new log files added. */
auto horizon= translog_get_horizon();
uint32_t last_file_num= LSN_FILE_NO(horizon);
Log_Files logs(m_log_dir.c_str(), last_file_num, m_last_log_num);
if (!logs.count())
{
/* No new log files. */
err= finalize ? copy_finish_tail(cbk_ctx) : copy_partial_tail(cbk_ctx);
return err;
}
/* There are more log files added. Finish the current one and continue
with the rest. */
if ((err= copy_finish_tail(cbk_ctx)))
return err;
for (auto i= logs.first(); i < logs.last(); ++i)
{
auto log_file= std::make_unique<std::string>(Log_Files::name_by_index(i));
if ((err= copy_file_job(log_file.release(), true, cbk_ctx, 0, 0)))
return err;
}
/* Set new tail log. */
m_last_log_num= logs.last();
m_last_log_offset= 0;
err= finalize ? copy_finish_tail(cbk_ctx) : copy_partial_tail(cbk_ctx);
return err;
}
class Clone_Sys
{
public:
int start(bool is_copy, bool attach, Clone_Handle *&clone_hdl, uint32_t &id,
const Locator *ref_loc= nullptr, const char *data_dir= nullptr);
int stop(bool is_copy, Clone_Handle *&clone_hdl, uint32_t task_id);
Clone_Handle *find(const Locator *in_loc, bool is_copy);
Clone_Handle *get(uint32_t index, bool is_copy);
uint32_t next_id() { return m_next_clone_id++; }
static constexpr uint32_t S_MAX_CLONE= 1;
static std::mutex mutex_;
private:
std::mutex m_mutex;
uint32_t m_next_clone_id= 1;
std::array<Clone_Handle*, S_MAX_CLONE> m_copy_clones;
std::array<Clone_Handle*, S_MAX_CLONE> m_apply_clones;
};
inline std::mutex Clone_Sys::mutex_;
int Clone_Sys::start(bool is_copy, bool attach, Clone_Handle *&clone_hdl,
uint32_t &id, const Locator *ref_loc,
const char *data_dir)
{
if (!attach)
{
/* Create a new clone handle. */
auto &clones= is_copy ? m_copy_clones : m_apply_clones;
uint32_t index= 0;
for (auto clone_ : clones)
{
if (clone_ == nullptr)
break;
++index;
}
if (index >= S_MAX_CLONE)
{
/* Too many active clones .*/
my_error(ER_CLONE_TOO_MANY_CONCURRENT_CLONES, MYF(ME_ERROR_LOG),
S_MAX_CLONE);
return ER_CLONE_TOO_MANY_CONCURRENT_CLONES;
}
clones[index]= new(std::nothrow) Clone_Handle(is_copy, ref_loc, data_dir,
index);
clone_hdl= clones[index];
}
if (!clone_hdl)
{
DBUG_ASSERT(attach);
/* Operation has finished already */
my_error(ER_INTERNAL_ERROR, MYF(ME_ERROR_LOG),
"ARIA SE: Clone add task refers non-existing clone");
/* No active clone to attach to. */
return ER_INTERNAL_ERROR;
}
if (clone_hdl->max_task_reached())
{
DBUG_ASSERT(attach);
my_error(ER_INTERNAL_ERROR, MYF(ME_ERROR_LOG),
"ARIA SE: Maximum Tasks reached");
return ER_INTERNAL_ERROR;
}
id= static_cast<uint32_t>(clone_hdl->attach());
return 0;
}
int Clone_Sys::stop(bool is_copy, Clone_Handle *&clone_hdl, uint32_t task_id)
{
bool last= clone_hdl->detach(static_cast<size_t>(task_id));
if (last)
{
auto &clones= is_copy ? m_copy_clones : m_apply_clones;
auto index= clone_hdl->get_locator().index();
DBUG_ASSERT(clones[index] == clone_hdl);
clones[index]= nullptr;
delete clone_hdl;
clone_hdl= nullptr;
}
return 0;
}
Clone_Handle *Clone_Sys::find(const Locator *in_loc, bool is_copy)
{
if (!in_loc)
return nullptr;
auto &clones= is_copy ? m_copy_clones : m_apply_clones;
for (auto clone_hdl : clones)
{
if (!clone_hdl)
continue;
auto& loc= clone_hdl->get_locator();
if (loc == *in_loc)
return clone_hdl;
}
return nullptr;
}
Clone_Handle *Clone_Sys::get(uint32_t index, bool is_copy)
{
if (index > S_MAX_CLONE)
return nullptr;
auto &clones= is_copy ? m_copy_clones : m_apply_clones;
return clones[index];
}
static Clone_Sys clone_system;
static Clone_Sys *const clone_sys= &clone_system;
Locator::Locator(const Locator *ref_loc, uint32_t clone_index, bool is_copy)
{
m_version= S_CUR_VERSION;
if (ref_loc && m_version > ref_loc->m_version)
m_version= ref_loc->m_version;
m_index= clone_index;
uint32_t ref_id= ref_loc ? ref_loc->m_clone_id : 0;
m_clone_id= is_copy ? clone_sys->next_id() : ref_id;
serialize();
}
int Clone_Handle::check_error(THD *thd)
{
if (thd_killed(thd))
{
my_error(ER_QUERY_INTERRUPTED, MYF(ME_ERROR_LOG));
set_error(ER_QUERY_INTERRUPTED);
}
const std::lock_guard<std::mutex> lock(Clone_Sys::mutex_);
return m_error;
}
void Clone_Handle::set_error(int err)
{
if (err == 0)
return;
std::unique_lock<std::mutex> lock(Clone_Sys::mutex_);
if (m_error)
return;
m_error= err;
lock.unlock();
if (m_is_copy)
m_jobs.finish(err, HA_CLONE_STAGE_MAX);
}
int Clone_Handle::apply(THD *thd, uint32_t task_id, Ha_clone_cbk *cbk)
{
uint32_t desc_len= 0;
auto desc_buf= cbk->get_data_desc(&desc_len);
Descriptor clone_desc(desc_buf, desc_len);
auto &ctx= m_thread_ctxs[task_id];
auto [file_name, offset]= clone_desc.get_file_info();
/* Currently the write is append only or over-write */
DBUG_ASSERT(!offset || offset == Descriptor::S_OFF_APPEND ||
offset == Descriptor::S_OFF_NO_DATA);
bool is_log= clone_desc.is_log();
int err= 0;
if (!file_name.empty() &&
(err= ctx.open(m_data_dir, file_name, offset, is_log)))
return err;
if (offset == Descriptor::S_OFF_NO_DATA)
{
is_log ? ctx.close_log() : ctx.close();
return 0;
}
auto &cur_file= is_log ? ctx.m_log_file : ctx.m_file;
Ha_clone_file file;
DBUG_ASSERT(cur_file >= 0);
if (cur_file < 0)
{
my_error(err, MYF(ME_ERROR_LOG),
"ARIA SE: Cannot apply data- missing file name");
return ER_INTERNAL_ERROR;
}
#ifdef _WIN32
file.type= Ha_clone_file::FILE_HANDLE;
file.file_handle= static_cast<void *>(my_get_osfhandle(cur_file));
#else
file.type= Ha_clone_file::FILE_DESC;
file.file_desc= cur_file;
#endif /* _WIN32 */
cbk->set_os_buffer_cache();
return cbk->apply_file_cbk(file);
}
int Clone_Handle::clone(THD *thd, uint32_t task_id, Ha_clone_stage stage,
Ha_clone_cbk *cbk)
{
int err= 0;
std::unordered_set<std::string> tables_in_use;
bool copy_tail= false;
switch (stage)
{
case HA_CLONE_STAGE_CONCURRENT:
if (task_id != 0)
break;
err= scan(false);
copy_tail= true;
break;
case HA_CLONE_STAGE_NT_DML_BLOCKED:
if (task_id != 0)
break;
/* TODO: get_tables_in_use() : "SHOW OPEN TABLES WHERE In_use = 1" */
err= copy_offline_tables(tables_in_use, false, false);
copy_tail= true;
break;
case HA_CLONE_STAGE_NT_DML_FINISHED:
break;
case HA_CLONE_STAGE_DDL_BLOCKED:
if (task_id != 0)
break;
tables_in_use.clear();
err= copy_offline_tables(tables_in_use, true, false);
copy_tail= true;
break;
case HA_CLONE_STAGE_SNAPSHOT:
if (task_id != 0)
break;
tables_in_use.clear();
err= copy_offline_tables(tables_in_use, true, true);
copy_tail= true;
break;
case HA_CLONE_STAGE_END:
break;
case HA_CLONE_STAGE_MAX:
DBUG_ASSERT(false);
err= ER_INTERNAL_ERROR;
my_error(err, MYF(ME_ERROR_LOG), "ARIA SE: Invalid Execution Stage");
break;
}
if (task_id == 0)
m_jobs.finish(err, stage);
err= m_jobs.consume(thd, task_id, cbk, stage, err);
set_error(err);
if (!err && copy_tail)
{
DBUG_ASSERT(task_id == 0);
err= copy_log_tail(thd, cbk, stage == HA_CLONE_STAGE_SNAPSHOT);
}
return err;
}
} // namespace aria_engine
static void clone_get_capability(Ha_clone_flagset &flags)
{
flags.reset();
flags.set(HA_CLONE_BLOCKING);
flags.set(HA_CLONE_MULTI_TASK);
}
static int clone_begin(THD *, const uchar *&loc, uint &loc_len,
uint &task_id, Ha_clone_type, Ha_clone_mode mode)
{
aria_engine::Locator *in_loc= nullptr;
if (loc)
in_loc= new(std::nothrow) aria_engine::Locator(loc, loc_len);
int err= 0;
const std::lock_guard<std::mutex> lock(aria_engine::Clone_Sys::mutex_);
auto clone_hdl= aria_engine::clone_sys->find(in_loc, true);
switch (mode)
{
case HA_CLONE_MODE_START:
err= aria_engine::clone_sys->start(true, false, clone_hdl, task_id,
in_loc);
break;
case HA_CLONE_MODE_ADD_TASK:
err= aria_engine::clone_sys->start(true, true, clone_hdl, task_id,
in_loc);
break;
case HA_CLONE_MODE_RESTART:
err=ER_NOT_SUPPORTED_YET;
my_error(ER_NOT_SUPPORTED_YET, MYF(ME_ERROR_LOG),
"ARIA SE: Clone Restart after network failure");
break;
case HA_CLONE_MODE_VERSION:
case HA_CLONE_MODE_MAX:
err= ER_INTERNAL_ERROR;
my_error(err, MYF(ME_ERROR_LOG), "ARIA SE: Clone Begin Invalid Mode");
DBUG_ASSERT(false);
}
if (!err && clone_hdl)
{
auto &locator= clone_hdl->get_locator();
std::tie(loc, loc_len)= locator.get_locator();
}
delete in_loc;
return err;
}
static int clone_copy(THD *thd, const uchar *loc, uint loc_len, uint task_id,
Ha_clone_stage stage, Ha_clone_cbk *cbk)
{
DBUG_ASSERT(loc);
std::unique_ptr<aria_engine::Locator>
in_loc(new(std::nothrow) aria_engine::Locator(loc, loc_len));
auto clone_hdl= aria_engine::clone_sys->get(in_loc->index(), true);
int err= clone_hdl ? clone_hdl->check_error(thd) : 0;
if (!clone_hdl || err != 0)
return err;
return clone_hdl->clone(thd, task_id, stage, cbk);
}
static int clone_ack(THD *, const uchar *loc, uint loc_len,
uint, int in_err, Ha_clone_cbk *)
{
DBUG_ASSERT(loc);
std::unique_ptr<aria_engine::Locator>
in_loc(new(std::nothrow) aria_engine::Locator(loc, loc_len));
auto clone_hdl= aria_engine::clone_sys->get(in_loc->index(), true);
DBUG_ASSERT(clone_hdl);
if (!clone_hdl)
return 0;
clone_hdl->set_error(in_err);
return 0;
}
static int clone_end(THD *, const uchar *loc, uint loc_len, uint task_id,
int in_err)
{
DBUG_ASSERT(loc);
std::unique_ptr<aria_engine::Locator>
in_loc(new(std::nothrow) aria_engine::Locator(loc, loc_len));
auto clone_hdl= aria_engine::clone_sys->get(in_loc->index(), true);
DBUG_ASSERT(clone_hdl);
if (!clone_hdl)
return 0;
clone_hdl->set_error(in_err);
const std::lock_guard<std::mutex> lock(aria_engine::Clone_Sys::mutex_);
return aria_engine::clone_sys->stop(true, clone_hdl, task_id);
}
static int clone_apply_begin(THD *, const uchar *&loc,
uint &loc_len, uint &task_id, Ha_clone_mode mode,
const char *data_dir)
{
aria_engine::Locator *in_loc= nullptr;
if (loc)
in_loc= new(std::nothrow) aria_engine::Locator(loc, loc_len);
int err= 0;
const std::lock_guard<std::mutex> lock(aria_engine::Clone_Sys::mutex_);
auto clone_hdl= aria_engine::clone_sys->find(in_loc, false);
switch (mode)
{
case HA_CLONE_MODE_VERSION:
case HA_CLONE_MODE_START:
DBUG_ASSERT(!clone_hdl);
err= aria_engine::clone_sys->start(false, false, clone_hdl, task_id,
in_loc, data_dir);
task_id= 0;
break;
case HA_CLONE_MODE_ADD_TASK:
err= aria_engine::clone_sys->start(false, true, clone_hdl, task_id,
in_loc);
break;
case HA_CLONE_MODE_RESTART:
err=ER_NOT_SUPPORTED_YET;
my_error(ER_NOT_SUPPORTED_YET, MYF(ME_ERROR_LOG),
"ARIA SE: Clone Restart after network failure");
break;
case HA_CLONE_MODE_MAX:
err= ER_INTERNAL_ERROR;
my_error(err, MYF(ME_ERROR_LOG), "ARIA SE: Clone Begin Invalid Mode");
DBUG_ASSERT(false);
}
/* While attaching tasks, don't overwrite the source locator. */
if (!err && clone_hdl && mode != HA_CLONE_MODE_ADD_TASK)
{
auto &locator= clone_hdl->get_locator();
std::tie(loc, loc_len)= locator.get_locator();
}
delete in_loc;
return err;
}
static int clone_apply(THD *thd, const uchar *loc,
uint loc_len, uint task_id, int in_err,
Ha_clone_cbk *cbk)
{
DBUG_ASSERT(loc);
std::unique_ptr<aria_engine::Locator>
in_loc(new(std::nothrow) aria_engine::Locator(loc, loc_len));
auto clone_hdl= aria_engine::clone_sys->get(in_loc->index(), false);
DBUG_ASSERT(in_err != 0 || cbk != nullptr);
if (clone_hdl && (in_err != 0 || cbk == nullptr))
{
clone_hdl->set_error(in_err);
my_printf_error(ER_CLONE_CLIENT_TRACE, "ARIA SE: Set Error Code %d",
MYF(ME_NOTE | ME_ERROR_LOG_ONLY), in_err);
return 0;
}
int err= clone_hdl ? clone_hdl->check_error(thd) : 0;
if (!clone_hdl || err != 0)
return err;
err= clone_hdl->apply(thd, task_id, cbk);
clone_hdl->set_error(err);
return err;
}
static int clone_apply_end(THD *, const uchar *loc, uint loc_len,
uint task_id, int in_err)
{
DBUG_ASSERT(loc);
std::unique_ptr<aria_engine::Locator>
in_loc(new(std::nothrow) aria_engine::Locator(loc, loc_len));
auto clone_hdl= aria_engine::clone_sys->get(in_loc->index(), false);
DBUG_ASSERT(clone_hdl);
clone_hdl->set_error(in_err);
const std::lock_guard<std::mutex> lock(aria_engine::Clone_Sys::mutex_);
return aria_engine::clone_sys->stop(false, clone_hdl, task_id);
}
void init_maria_clone_interfaces(handlerton *aria_hton)
{
auto &interface= aria_hton->clone_interface;
interface.clone_capability= clone_get_capability;
interface.clone_begin= clone_begin;
interface.clone_copy= clone_copy;
interface.clone_ack= clone_ack;
interface.clone_end= clone_end;
interface.clone_apply_begin= clone_apply_begin;
interface.clone_apply= clone_apply;
interface.clone_apply_end= clone_apply_end;
}