mariadb/extra/mariabackup/common_engine.cc
Monty 1c55b845e0 MDEV-32932 Port backup features from ES
Added support to BACKUP STAGE to maria-backup

This is a port of the code from ES 10.6
See MDEV-5336 for backup stages description.

The following old options are not supported by the new code:
--rsync             ; This is because rsync will not work on tables
                      that are in used.
--no-backup-locks   ; This is disabled as mariadb-backup will always
                      use backup locks for better performance.
2024-02-27 20:55:54 +02:00

512 lines
15 KiB
C++

#include "common_engine.h"
#include "backup_copy.h"
#include "xtrabackup.h"
#include "common.h"
#include "backup_debug.h"
#include <unordered_map>
#include <atomic>
#include <memory>
#include <chrono>
namespace common_engine {
class Table {
public:
Table(std::string &db, std::string &table, std::string &fs_name) :
m_db(std::move(db)), m_table(std::move(table)),
m_fs_name(std::move(fs_name)) {}
virtual ~Table() {}
void add_file_name(const char *file_name) { m_fnames.push_back(file_name); }
virtual bool copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock,
bool finalize, unsigned thread_num);
std::string &get_db() { return m_db; }
std::string &get_table() { return m_table; }
std::string &get_version() { return m_version; }
protected:
std::string m_db;
std::string m_table;
std::string m_fs_name;
std::string m_version;
std::vector<std::string> m_fnames;
};
bool
Table::copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock, bool, unsigned thread_num) {
static const size_t buf_size = 10 * 1024 * 1024;
std::unique_ptr<uchar[]> buf;
bool result = false;
File frm_file = -1;
std::vector<File> files;
bool locked = false;
std::string full_tname("`");
full_tname.append(m_db).append("`.`").append(m_table).append("`");
if (!no_lock && !backup_lock(con, full_tname.c_str())) {
msg(thread_num, "Error on executing BACKUP LOCK for table %s",
full_tname.c_str());
goto exit;
}
else
locked = !no_lock;
if ((frm_file = mysql_file_open(key_file_frm, (m_fs_name + ".frm").c_str(),
O_RDONLY | O_SHARE, MYF(0))) < 0 && !m_fnames.empty() &&
!ends_with(m_fnames[0].c_str(), ".ARZ") &&
!ends_with(m_fnames[0].c_str(), ".ARM")) {
// Don't treat it as error, as the table can be dropped after it
// was added to queue for copying
result = true;
goto exit;
}
for (const auto &fname : m_fnames) {
File file = mysql_file_open(0, fname.c_str(),O_RDONLY | O_SHARE, MYF(0));
if (file < 0) {
msg(thread_num, "Error on file %s open during %s table copy",
fname.c_str(), full_tname.c_str());
goto exit;
}
files.push_back(file);
}
if (locked && !backup_unlock(con)) {
msg(thread_num, "Error on BACKUP UNLOCK for table %s", full_tname.c_str());
locked = false;
goto exit;
}
locked = false;
buf.reset(new uchar[buf_size]);
for (size_t i = 0; i < m_fnames.size(); ++i) {
ds_file_t *dst_file = nullptr;
size_t bytes_read;
size_t copied_size = 0;
MY_STAT stat_info;
if (my_fstat(files[i], &stat_info, MYF(0))) {
msg(thread_num, "error: failed to get stat info for file %s of "
"table %s", m_fnames[i].c_str(), full_tname.c_str());
goto exit;
}
const char *dst_path =
(xtrabackup_copy_back || xtrabackup_move_back) ?
m_fnames[i].c_str() : trim_dotslash(m_fnames[i].c_str());
dst_file = ds_open(ds, dst_path, &stat_info, false);
if (!dst_file) {
msg(thread_num, "error: cannot open destination stream for %s, table %s",
dst_path, full_tname.c_str());
goto exit;
}
while ((bytes_read = my_read(files[i], buf.get(), buf_size, MY_WME))) {
if (bytes_read == size_t(-1)) {
msg(thread_num, "error: file %s read for table %s",
m_fnames[i].c_str(), full_tname.c_str());
ds_close(dst_file);
goto exit;
}
xtrabackup_io_throttling();
if (ds_write(dst_file, buf.get(), bytes_read)) {
msg(thread_num, "error: file %s write for table %s",
dst_path, full_tname.c_str());
ds_close(dst_file);
goto exit;
}
copied_size += bytes_read;
}
mysql_file_close(files[i], MYF(MY_WME));
files[i] = -1;
ds_close(dst_file);
msg(thread_num, "Copied file %s for table %s, %zu bytes",
m_fnames[i].c_str(), full_tname.c_str(), copied_size);
}
result = true;
#ifndef DBUG_OFF
{
std::string sql_name(m_db);
sql_name.append("/").append(m_table);
DBUG_MARIABACKUP_EVENT_LOCK("after_ce_table_copy", fil_space_t::name_type(sql_name.data(), sql_name.size()));
}
#endif // DBUG_OFF
exit:
if (frm_file >= 0) {
m_version = ::read_table_version_id(frm_file);
mysql_file_close(frm_file, MYF(MY_WME));
}
if (locked && !backup_unlock(con)) {
msg(thread_num, "Error on BACKUP UNLOCK for table %s", full_tname.c_str());
result = false;
}
for (auto file : files)
if (file >= 0)
mysql_file_close(file, MYF(MY_WME));
return result;
}
// Append-only tables
class LogTable : public Table {
public:
LogTable(std::string &db, std::string &table, std::string &fs_name) :
Table(db, table, fs_name) {}
virtual ~LogTable() { (void)close(); }
bool
copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock, bool finalize,
unsigned thread_num) override;
bool close();
private:
bool open(ds_ctxt_t *ds, unsigned thread_num);
std::vector<File> m_src;
std::vector<ds_file_t *> m_dst;
};
bool
LogTable::open(ds_ctxt_t *ds, unsigned thread_num) {
DBUG_ASSERT(m_src.empty());
DBUG_ASSERT(m_dst.empty());
std::string full_tname("`");
full_tname.append(m_db).append("`.`").append(m_table).append("`");
for (const auto &fname : m_fnames) {
File file = mysql_file_open(0, fname.c_str(),O_RDONLY | O_SHARE, MYF(0));
if (file < 0) {
msg(thread_num, "Error on file %s open during %s log table copy",
fname.c_str(), full_tname.c_str());
return false;
}
m_src.push_back(file);
MY_STAT stat_info;
if (my_fstat(file, &stat_info, MYF(0))) {
msg(thread_num, "error: failed to get stat info for file %s of "
"log table %s", fname.c_str(), full_tname.c_str());
return false;
}
const char *dst_path =
(xtrabackup_copy_back || xtrabackup_move_back) ?
fname.c_str() : trim_dotslash(fname.c_str());
ds_file_t *dst_file = ds_open(ds, dst_path, &stat_info, false);
if (!dst_file) {
msg(thread_num, "error: cannot open destination stream for %s, "
"log table %s", dst_path, full_tname.c_str());
return false;
}
m_dst.push_back(dst_file);
}
File frm_file;
if ((frm_file = mysql_file_open(key_file_frm, (m_fs_name + ".frm").c_str(),
O_RDONLY | O_SHARE, MYF(0))) < 0 && !m_fnames.empty() &&
!ends_with(m_fnames[0].c_str(), ".ARZ") &&
!ends_with(m_fnames[0].c_str(), ".ARM")) {
msg(thread_num, "Error on .frm file open for log table %s",
full_tname.c_str());
return false;
}
m_version = ::read_table_version_id(frm_file);
mysql_file_close(frm_file, MYF(MY_WME));
return true;
}
bool LogTable::close() {
while (!m_src.empty()) {
auto f = m_src.back();
m_src.pop_back();
mysql_file_close(f, MYF(MY_WME));
}
while (!m_dst.empty()) {
auto f = m_dst.back();
m_dst.pop_back();
ds_close(f);
}
return true;
}
bool
LogTable::copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock, bool finalize,
unsigned thread_num) {
static const size_t buf_size = 10 * 1024 * 1024;
DBUG_ASSERT(ds);
DBUG_ASSERT(con);
if (m_src.empty() && !open(ds, thread_num)) {
close();
return false;
}
DBUG_ASSERT(m_src.size() == m_dst.size());
std::unique_ptr<uchar[]> buf(new uchar[buf_size]);
for (size_t i = 0; i < m_src.size(); ++i) {
// .CSM can be rewritten (see write_meta_file() usage in ha_tina.cc)
if (!finalize && ends_with(m_fnames[i].c_str(), ".CSM"))
continue;
size_t bytes_read;
size_t copied_size = 0;
while ((bytes_read = my_read(m_src[i], buf.get(), buf_size, MY_WME))) {
if (bytes_read == size_t(-1)) {
msg(thread_num, "error: file %s read for log table %s",
m_fnames[i].c_str(),
std::string("`").append(m_db).append("`.`").
append(m_table).append("`").c_str());
close();
return false;
}
xtrabackup_io_throttling();
if (ds_write(m_dst[i], buf.get(), bytes_read)) {
msg(thread_num, "error: file %s write for log table %s",
m_fnames[i].c_str(), std::string("`").append(m_db).append("`.`").
append(m_table).append("`").c_str());
close();
return false;
}
copied_size += bytes_read;
}
msg(thread_num, "Copied file %s for log table %s, %zu bytes",
m_fnames[i].c_str(), std::string("`").append(m_db).append("`.`").
append(m_table).append("`").c_str(), copied_size);
}
return true;
}
class BackupImpl {
public:
BackupImpl(
const char *datadir_path, ds_ctxt_t *datasink,
std::vector<MYSQL *> &con_pool, ThreadPool &thread_pool) :
m_datadir_path(datadir_path), m_ds(datasink), m_con_pool(con_pool),
m_process_table_jobs(thread_pool) {}
~BackupImpl() { }
bool scan(
const std::unordered_set<std::string> &exclude_tables,
std::unordered_set<std::string> *out_processed_tables,
bool no_lock, bool collect_log_and_stats);
void set_post_copy_table_hook(const post_copy_table_hook_t &hook) {
m_table_post_copy_hook = hook;
}
bool copy_log_tables(bool finalize);
bool copy_stats_tables();
bool wait_for_finish();
bool close_log_tables();
private:
void process_table_job(Table *table, bool no_lock, bool delete_table,
bool finalize, unsigned thread_num);
const char *m_datadir_path;
ds_ctxt_t *m_ds;
std::vector<MYSQL *> &m_con_pool;
TasksGroup m_process_table_jobs;
post_copy_table_hook_t m_table_post_copy_hook;
std::unordered_map<table_key_t, std::unique_ptr<LogTable>> m_log_tables;
std::unordered_map<table_key_t, std::unique_ptr<Table>> m_stats_tables;
};
void BackupImpl::process_table_job(Table *table, bool no_lock,
bool delete_table, bool finalize, unsigned thread_num) {
int result = 0;
if (!m_process_table_jobs.get_result())
goto exit;
if (!table->copy(m_ds, m_con_pool[thread_num], no_lock, finalize, thread_num))
goto exit;
if (m_table_post_copy_hook)
m_table_post_copy_hook(table->get_db(), table->get_table(),
table->get_version());
result = 1;
exit:
if (delete_table)
delete table;
m_process_table_jobs.finish_task(result);
}
bool BackupImpl::scan(const std::unordered_set<table_key_t> &exclude_tables,
std::unordered_set<table_key_t> *out_processed_tables, bool no_lock,
bool collect_log_and_stats) {
msg("Start scanning common engine tables, need backup locks: %d, "
"collect log and stat tables: %d", no_lock, collect_log_and_stats);
std::unordered_map<table_key_t, std::unique_ptr<Table>> found_tables;
foreach_file_in_db_dirs(m_datadir_path,
[&](const char *file_path)->bool {
static const char *ext_list[] =
{".MYD", ".MYI", ".MRG", ".ARM", ".ARZ", ".CSM", ".CSV", NULL};
bool is_aria = ends_with(file_path, ".MAD") || ends_with(file_path, ".MAI");
if (!collect_log_and_stats && is_aria)
return true;
if (!is_aria && !filename_matches(file_path, ext_list))
return true;
if (check_if_skip_table(file_path)) {
msg("Skipping %s.", file_path);
return true;
}
auto db_table_fs = convert_filepath_to_tablename(file_path);
auto tk =
table_key(std::get<0>(db_table_fs), std::get<1>(db_table_fs));
// log and stats tables are only collected in this function,
// so there is no need to filter out them with exclude_tables.
if (collect_log_and_stats) {
if (is_log_table(std::get<0>(db_table_fs).c_str(),
std::get<1>(db_table_fs).c_str())) {
auto table_it = m_log_tables.find(tk);
if (table_it == m_log_tables.end()) {
msg("Log table found: %s", tk.c_str());
table_it = m_log_tables.emplace(tk,
std::unique_ptr<LogTable>(new LogTable(std::get<0>(db_table_fs),
std::get<1>(db_table_fs), std::get<2>(db_table_fs)))).first;
}
msg("Collect log table file: %s", file_path);
table_it->second->add_file_name(file_path);
return true;
}
// Aria can handle statistics tables
else if (is_stats_table(std::get<0>(db_table_fs).c_str(),
std::get<1>(db_table_fs).c_str()) && !is_aria) {
auto table_it = m_stats_tables.find(tk);
if (table_it == m_stats_tables.end()) {
msg("Stats table found: %s", tk.c_str());
table_it = m_stats_tables.emplace(tk,
std::unique_ptr<Table>(new Table(std::get<0>(db_table_fs),
std::get<1>(db_table_fs), std::get<2>(db_table_fs)))).first;
}
msg("Collect stats table file: %s", file_path);
table_it->second->add_file_name(file_path);
return true;
}
} else if (is_log_table(std::get<0>(db_table_fs).c_str(),
std::get<1>(db_table_fs).c_str()) ||
is_stats_table(std::get<0>(db_table_fs).c_str(),
std::get<1>(db_table_fs).c_str()))
return true;
if (is_aria)
return true;
if (exclude_tables.count(tk)) {
msg("Skip table %s at it is in exclude list", tk.c_str());
return true;
}
auto table_it = found_tables.find(tk);
if (table_it == found_tables.end()) {
table_it = found_tables.emplace(tk,
std::unique_ptr<Table>(new Table(std::get<0>(db_table_fs),
std::get<1>(db_table_fs), std::get<2>(db_table_fs)))).first;
}
table_it->second->add_file_name(file_path);
return true;
});
for (auto &table_it : found_tables) {
m_process_table_jobs.push_task(
std::bind(&BackupImpl::process_table_job, this, table_it.second.release(),
no_lock, true, false, std::placeholders::_1));
if (out_processed_tables)
out_processed_tables->insert(table_it.first);
}
msg("Stop scanning common engine tables");
return true;
}
bool BackupImpl::copy_log_tables(bool finalize) {
for (auto &table_it : m_log_tables) {
// Do not execute BACKUP LOCK for log tables as it's supposed
// that they must be copied on BLOCK_DDL and BLOCK_COMMIT locks.
m_process_table_jobs.push_task(
std::bind(&BackupImpl::process_table_job, this, table_it.second.get(),
true, false, finalize, std::placeholders::_1));
}
return true;
}
bool BackupImpl::copy_stats_tables() {
for (auto &table_it : m_stats_tables) {
// Do not execute BACKUP LOCK for stats tables as it's supposed
// that they must be copied on BLOCK_DDL and BLOCK_COMMIT locks.
// Delete stats table object after copy (see process_table_job())
m_process_table_jobs.push_task(
std::bind(&BackupImpl::process_table_job, this, table_it.second.release(),
true, true, false, std::placeholders::_1));
}
m_stats_tables.clear();
return true;
}
bool BackupImpl::wait_for_finish() {
/* Wait for threads to exit */
return m_process_table_jobs.wait_for_finish();
}
bool BackupImpl::close_log_tables() {
bool result = wait_for_finish();
for (auto &table_it : m_log_tables)
table_it.second->close();
return result;
}
Backup::Backup(const char *datadir_path, ds_ctxt_t *datasink,
std::vector<MYSQL *> &con_pool, ThreadPool &thread_pool) :
m_backup_impl(
new BackupImpl(datadir_path, datasink, con_pool,
thread_pool)) { }
Backup::~Backup() {
delete m_backup_impl;
}
bool Backup::scan(
const std::unordered_set<table_key_t> &exclude_tables,
std::unordered_set<table_key_t> *out_processed_tables,
bool no_lock, bool collect_log_and_stats) {
return m_backup_impl->scan(exclude_tables, out_processed_tables, no_lock,
collect_log_and_stats);
}
bool Backup::copy_log_tables(bool finalize) {
return m_backup_impl->copy_log_tables(finalize);
}
bool Backup::copy_stats_tables() {
return m_backup_impl->copy_stats_tables();
}
bool Backup::wait_for_finish() {
return m_backup_impl->wait_for_finish();
}
bool Backup::close_log_tables() {
return m_backup_impl->close_log_tables();
}
void Backup::set_post_copy_table_hook(const post_copy_table_hook_t &hook) {
m_backup_impl->set_post_copy_table_hook(hook);
}
} // namespace common_engine