mirror of
https://github.com/MariaDB/server.git
synced 2025-01-27 01:04:19 +01:00
1c55b845e0
Added support to BACKUP STAGE to maria-backup This is a port of the code from ES 10.6 See MDEV-5336 for backup stages description. The following old options are not supported by the new code: --rsync ; This is because rsync will not work on tables that are in used. --no-backup-locks ; This is disabled as mariadb-backup will always use backup locks for better performance.
512 lines
15 KiB
C++
512 lines
15 KiB
C++
#include "common_engine.h"
|
|
#include "backup_copy.h"
|
|
#include "xtrabackup.h"
|
|
#include "common.h"
|
|
#include "backup_debug.h"
|
|
|
|
#include <unordered_map>
|
|
#include <atomic>
|
|
#include <memory>
|
|
#include <chrono>
|
|
|
|
namespace common_engine {
|
|
|
|
class Table {
|
|
public:
|
|
Table(std::string &db, std::string &table, std::string &fs_name) :
|
|
m_db(std::move(db)), m_table(std::move(table)),
|
|
m_fs_name(std::move(fs_name)) {}
|
|
virtual ~Table() {}
|
|
void add_file_name(const char *file_name) { m_fnames.push_back(file_name); }
|
|
virtual bool copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock,
|
|
bool finalize, unsigned thread_num);
|
|
std::string &get_db() { return m_db; }
|
|
std::string &get_table() { return m_table; }
|
|
std::string &get_version() { return m_version; }
|
|
|
|
protected:
|
|
std::string m_db;
|
|
std::string m_table;
|
|
std::string m_fs_name;
|
|
std::string m_version;
|
|
std::vector<std::string> m_fnames;
|
|
};
|
|
|
|
bool
|
|
Table::copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock, bool, unsigned thread_num) {
|
|
static const size_t buf_size = 10 * 1024 * 1024;
|
|
std::unique_ptr<uchar[]> buf;
|
|
bool result = false;
|
|
File frm_file = -1;
|
|
std::vector<File> files;
|
|
bool locked = false;
|
|
std::string full_tname("`");
|
|
full_tname.append(m_db).append("`.`").append(m_table).append("`");
|
|
|
|
if (!no_lock && !backup_lock(con, full_tname.c_str())) {
|
|
msg(thread_num, "Error on executing BACKUP LOCK for table %s",
|
|
full_tname.c_str());
|
|
goto exit;
|
|
}
|
|
else
|
|
locked = !no_lock;
|
|
|
|
if ((frm_file = mysql_file_open(key_file_frm, (m_fs_name + ".frm").c_str(),
|
|
O_RDONLY | O_SHARE, MYF(0))) < 0 && !m_fnames.empty() &&
|
|
!ends_with(m_fnames[0].c_str(), ".ARZ") &&
|
|
!ends_with(m_fnames[0].c_str(), ".ARM")) {
|
|
// Don't treat it as error, as the table can be dropped after it
|
|
// was added to queue for copying
|
|
result = true;
|
|
goto exit;
|
|
}
|
|
|
|
for (const auto &fname : m_fnames) {
|
|
File file = mysql_file_open(0, fname.c_str(),O_RDONLY | O_SHARE, MYF(0));
|
|
if (file < 0) {
|
|
msg(thread_num, "Error on file %s open during %s table copy",
|
|
fname.c_str(), full_tname.c_str());
|
|
goto exit;
|
|
}
|
|
files.push_back(file);
|
|
}
|
|
|
|
if (locked && !backup_unlock(con)) {
|
|
msg(thread_num, "Error on BACKUP UNLOCK for table %s", full_tname.c_str());
|
|
locked = false;
|
|
goto exit;
|
|
}
|
|
|
|
locked = false;
|
|
|
|
buf.reset(new uchar[buf_size]);
|
|
|
|
for (size_t i = 0; i < m_fnames.size(); ++i) {
|
|
ds_file_t *dst_file = nullptr;
|
|
size_t bytes_read;
|
|
size_t copied_size = 0;
|
|
MY_STAT stat_info;
|
|
|
|
if (my_fstat(files[i], &stat_info, MYF(0))) {
|
|
msg(thread_num, "error: failed to get stat info for file %s of "
|
|
"table %s", m_fnames[i].c_str(), full_tname.c_str());
|
|
goto exit;
|
|
}
|
|
|
|
const char *dst_path =
|
|
(xtrabackup_copy_back || xtrabackup_move_back) ?
|
|
m_fnames[i].c_str() : trim_dotslash(m_fnames[i].c_str());
|
|
|
|
dst_file = ds_open(ds, dst_path, &stat_info, false);
|
|
if (!dst_file) {
|
|
msg(thread_num, "error: cannot open destination stream for %s, table %s",
|
|
dst_path, full_tname.c_str());
|
|
goto exit;
|
|
}
|
|
|
|
while ((bytes_read = my_read(files[i], buf.get(), buf_size, MY_WME))) {
|
|
if (bytes_read == size_t(-1)) {
|
|
msg(thread_num, "error: file %s read for table %s",
|
|
m_fnames[i].c_str(), full_tname.c_str());
|
|
ds_close(dst_file);
|
|
goto exit;
|
|
}
|
|
xtrabackup_io_throttling();
|
|
if (ds_write(dst_file, buf.get(), bytes_read)) {
|
|
msg(thread_num, "error: file %s write for table %s",
|
|
dst_path, full_tname.c_str());
|
|
ds_close(dst_file);
|
|
goto exit;
|
|
}
|
|
copied_size += bytes_read;
|
|
}
|
|
mysql_file_close(files[i], MYF(MY_WME));
|
|
files[i] = -1;
|
|
ds_close(dst_file);
|
|
msg(thread_num, "Copied file %s for table %s, %zu bytes",
|
|
m_fnames[i].c_str(), full_tname.c_str(), copied_size);
|
|
}
|
|
|
|
result = true;
|
|
|
|
#ifndef DBUG_OFF
|
|
{
|
|
std::string sql_name(m_db);
|
|
sql_name.append("/").append(m_table);
|
|
DBUG_MARIABACKUP_EVENT_LOCK("after_ce_table_copy", fil_space_t::name_type(sql_name.data(), sql_name.size()));
|
|
}
|
|
#endif // DBUG_OFF
|
|
exit:
|
|
if (frm_file >= 0) {
|
|
m_version = ::read_table_version_id(frm_file);
|
|
mysql_file_close(frm_file, MYF(MY_WME));
|
|
}
|
|
if (locked && !backup_unlock(con)) {
|
|
msg(thread_num, "Error on BACKUP UNLOCK for table %s", full_tname.c_str());
|
|
result = false;
|
|
}
|
|
for (auto file : files)
|
|
if (file >= 0)
|
|
mysql_file_close(file, MYF(MY_WME));
|
|
return result;
|
|
}
|
|
|
|
// Append-only tables
|
|
class LogTable : public Table {
|
|
public:
|
|
LogTable(std::string &db, std::string &table, std::string &fs_name) :
|
|
Table(db, table, fs_name) {}
|
|
|
|
virtual ~LogTable() { (void)close(); }
|
|
bool
|
|
copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock, bool finalize,
|
|
unsigned thread_num) override;
|
|
bool close();
|
|
private:
|
|
bool open(ds_ctxt_t *ds, unsigned thread_num);
|
|
std::vector<File> m_src;
|
|
std::vector<ds_file_t *> m_dst;
|
|
};
|
|
|
|
bool
|
|
LogTable::open(ds_ctxt_t *ds, unsigned thread_num) {
|
|
DBUG_ASSERT(m_src.empty());
|
|
DBUG_ASSERT(m_dst.empty());
|
|
|
|
std::string full_tname("`");
|
|
full_tname.append(m_db).append("`.`").append(m_table).append("`");
|
|
|
|
for (const auto &fname : m_fnames) {
|
|
File file = mysql_file_open(0, fname.c_str(),O_RDONLY | O_SHARE, MYF(0));
|
|
if (file < 0) {
|
|
msg(thread_num, "Error on file %s open during %s log table copy",
|
|
fname.c_str(), full_tname.c_str());
|
|
return false;
|
|
}
|
|
m_src.push_back(file);
|
|
|
|
MY_STAT stat_info;
|
|
if (my_fstat(file, &stat_info, MYF(0))) {
|
|
msg(thread_num, "error: failed to get stat info for file %s of "
|
|
"log table %s", fname.c_str(), full_tname.c_str());
|
|
return false;
|
|
}
|
|
const char *dst_path =
|
|
(xtrabackup_copy_back || xtrabackup_move_back) ?
|
|
fname.c_str() : trim_dotslash(fname.c_str());
|
|
ds_file_t *dst_file = ds_open(ds, dst_path, &stat_info, false);
|
|
if (!dst_file) {
|
|
msg(thread_num, "error: cannot open destination stream for %s, "
|
|
"log table %s", dst_path, full_tname.c_str());
|
|
return false;
|
|
}
|
|
m_dst.push_back(dst_file);
|
|
}
|
|
|
|
File frm_file;
|
|
if ((frm_file = mysql_file_open(key_file_frm, (m_fs_name + ".frm").c_str(),
|
|
O_RDONLY | O_SHARE, MYF(0))) < 0 && !m_fnames.empty() &&
|
|
!ends_with(m_fnames[0].c_str(), ".ARZ") &&
|
|
!ends_with(m_fnames[0].c_str(), ".ARM")) {
|
|
msg(thread_num, "Error on .frm file open for log table %s",
|
|
full_tname.c_str());
|
|
return false;
|
|
}
|
|
|
|
m_version = ::read_table_version_id(frm_file);
|
|
mysql_file_close(frm_file, MYF(MY_WME));
|
|
|
|
return true;
|
|
}
|
|
|
|
bool LogTable::close() {
|
|
while (!m_src.empty()) {
|
|
auto f = m_src.back();
|
|
m_src.pop_back();
|
|
mysql_file_close(f, MYF(MY_WME));
|
|
}
|
|
while (!m_dst.empty()) {
|
|
auto f = m_dst.back();
|
|
m_dst.pop_back();
|
|
ds_close(f);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool
|
|
LogTable::copy(ds_ctxt_t *ds, MYSQL *con, bool no_lock, bool finalize,
|
|
unsigned thread_num) {
|
|
static const size_t buf_size = 10 * 1024 * 1024;
|
|
DBUG_ASSERT(ds);
|
|
DBUG_ASSERT(con);
|
|
if (m_src.empty() && !open(ds, thread_num)) {
|
|
close();
|
|
return false;
|
|
}
|
|
DBUG_ASSERT(m_src.size() == m_dst.size());
|
|
|
|
std::unique_ptr<uchar[]> buf(new uchar[buf_size]);
|
|
for (size_t i = 0; i < m_src.size(); ++i) {
|
|
// .CSM can be rewritten (see write_meta_file() usage in ha_tina.cc)
|
|
if (!finalize && ends_with(m_fnames[i].c_str(), ".CSM"))
|
|
continue;
|
|
size_t bytes_read;
|
|
size_t copied_size = 0;
|
|
while ((bytes_read = my_read(m_src[i], buf.get(), buf_size, MY_WME))) {
|
|
if (bytes_read == size_t(-1)) {
|
|
msg(thread_num, "error: file %s read for log table %s",
|
|
m_fnames[i].c_str(),
|
|
std::string("`").append(m_db).append("`.`").
|
|
append(m_table).append("`").c_str());
|
|
close();
|
|
return false;
|
|
}
|
|
xtrabackup_io_throttling();
|
|
if (ds_write(m_dst[i], buf.get(), bytes_read)) {
|
|
msg(thread_num, "error: file %s write for log table %s",
|
|
m_fnames[i].c_str(), std::string("`").append(m_db).append("`.`").
|
|
append(m_table).append("`").c_str());
|
|
close();
|
|
return false;
|
|
}
|
|
copied_size += bytes_read;
|
|
}
|
|
msg(thread_num, "Copied file %s for log table %s, %zu bytes",
|
|
m_fnames[i].c_str(), std::string("`").append(m_db).append("`.`").
|
|
append(m_table).append("`").c_str(), copied_size);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
class BackupImpl {
|
|
public:
|
|
BackupImpl(
|
|
const char *datadir_path, ds_ctxt_t *datasink,
|
|
std::vector<MYSQL *> &con_pool, ThreadPool &thread_pool) :
|
|
m_datadir_path(datadir_path), m_ds(datasink), m_con_pool(con_pool),
|
|
m_process_table_jobs(thread_pool) {}
|
|
~BackupImpl() { }
|
|
bool scan(
|
|
const std::unordered_set<std::string> &exclude_tables,
|
|
std::unordered_set<std::string> *out_processed_tables,
|
|
bool no_lock, bool collect_log_and_stats);
|
|
void set_post_copy_table_hook(const post_copy_table_hook_t &hook) {
|
|
m_table_post_copy_hook = hook;
|
|
}
|
|
bool copy_log_tables(bool finalize);
|
|
bool copy_stats_tables();
|
|
bool wait_for_finish();
|
|
bool close_log_tables();
|
|
private:
|
|
|
|
void process_table_job(Table *table, bool no_lock, bool delete_table,
|
|
bool finalize, unsigned thread_num);
|
|
|
|
const char *m_datadir_path;
|
|
ds_ctxt_t *m_ds;
|
|
std::vector<MYSQL *> &m_con_pool;
|
|
TasksGroup m_process_table_jobs;
|
|
|
|
post_copy_table_hook_t m_table_post_copy_hook;
|
|
std::unordered_map<table_key_t, std::unique_ptr<LogTable>> m_log_tables;
|
|
std::unordered_map<table_key_t, std::unique_ptr<Table>> m_stats_tables;
|
|
};
|
|
|
|
void BackupImpl::process_table_job(Table *table, bool no_lock,
|
|
bool delete_table, bool finalize, unsigned thread_num) {
|
|
int result = 0;
|
|
|
|
if (!m_process_table_jobs.get_result())
|
|
goto exit;
|
|
|
|
if (!table->copy(m_ds, m_con_pool[thread_num], no_lock, finalize, thread_num))
|
|
goto exit;
|
|
|
|
if (m_table_post_copy_hook)
|
|
m_table_post_copy_hook(table->get_db(), table->get_table(),
|
|
table->get_version());
|
|
|
|
result = 1;
|
|
|
|
exit:
|
|
if (delete_table)
|
|
delete table;
|
|
m_process_table_jobs.finish_task(result);
|
|
}
|
|
|
|
bool BackupImpl::scan(const std::unordered_set<table_key_t> &exclude_tables,
|
|
std::unordered_set<table_key_t> *out_processed_tables, bool no_lock,
|
|
bool collect_log_and_stats) {
|
|
|
|
msg("Start scanning common engine tables, need backup locks: %d, "
|
|
"collect log and stat tables: %d", no_lock, collect_log_and_stats);
|
|
|
|
std::unordered_map<table_key_t, std::unique_ptr<Table>> found_tables;
|
|
|
|
foreach_file_in_db_dirs(m_datadir_path,
|
|
[&](const char *file_path)->bool {
|
|
|
|
static const char *ext_list[] =
|
|
{".MYD", ".MYI", ".MRG", ".ARM", ".ARZ", ".CSM", ".CSV", NULL};
|
|
|
|
bool is_aria = ends_with(file_path, ".MAD") || ends_with(file_path, ".MAI");
|
|
|
|
if (!collect_log_and_stats && is_aria)
|
|
return true;
|
|
|
|
if (!is_aria && !filename_matches(file_path, ext_list))
|
|
return true;
|
|
|
|
if (check_if_skip_table(file_path)) {
|
|
msg("Skipping %s.", file_path);
|
|
return true;
|
|
}
|
|
|
|
auto db_table_fs = convert_filepath_to_tablename(file_path);
|
|
auto tk =
|
|
table_key(std::get<0>(db_table_fs), std::get<1>(db_table_fs));
|
|
|
|
// log and stats tables are only collected in this function,
|
|
// so there is no need to filter out them with exclude_tables.
|
|
if (collect_log_and_stats) {
|
|
if (is_log_table(std::get<0>(db_table_fs).c_str(),
|
|
std::get<1>(db_table_fs).c_str())) {
|
|
auto table_it = m_log_tables.find(tk);
|
|
if (table_it == m_log_tables.end()) {
|
|
msg("Log table found: %s", tk.c_str());
|
|
table_it = m_log_tables.emplace(tk,
|
|
std::unique_ptr<LogTable>(new LogTable(std::get<0>(db_table_fs),
|
|
std::get<1>(db_table_fs), std::get<2>(db_table_fs)))).first;
|
|
}
|
|
msg("Collect log table file: %s", file_path);
|
|
table_it->second->add_file_name(file_path);
|
|
return true;
|
|
}
|
|
// Aria can handle statistics tables
|
|
else if (is_stats_table(std::get<0>(db_table_fs).c_str(),
|
|
std::get<1>(db_table_fs).c_str()) && !is_aria) {
|
|
auto table_it = m_stats_tables.find(tk);
|
|
if (table_it == m_stats_tables.end()) {
|
|
msg("Stats table found: %s", tk.c_str());
|
|
table_it = m_stats_tables.emplace(tk,
|
|
std::unique_ptr<Table>(new Table(std::get<0>(db_table_fs),
|
|
std::get<1>(db_table_fs), std::get<2>(db_table_fs)))).first;
|
|
}
|
|
msg("Collect stats table file: %s", file_path);
|
|
table_it->second->add_file_name(file_path);
|
|
return true;
|
|
}
|
|
} else if (is_log_table(std::get<0>(db_table_fs).c_str(),
|
|
std::get<1>(db_table_fs).c_str()) ||
|
|
is_stats_table(std::get<0>(db_table_fs).c_str(),
|
|
std::get<1>(db_table_fs).c_str()))
|
|
return true;
|
|
|
|
if (is_aria)
|
|
return true;
|
|
|
|
if (exclude_tables.count(tk)) {
|
|
msg("Skip table %s at it is in exclude list", tk.c_str());
|
|
return true;
|
|
}
|
|
|
|
auto table_it = found_tables.find(tk);
|
|
if (table_it == found_tables.end()) {
|
|
table_it = found_tables.emplace(tk,
|
|
std::unique_ptr<Table>(new Table(std::get<0>(db_table_fs),
|
|
std::get<1>(db_table_fs), std::get<2>(db_table_fs)))).first;
|
|
}
|
|
|
|
table_it->second->add_file_name(file_path);
|
|
|
|
return true;
|
|
});
|
|
|
|
for (auto &table_it : found_tables) {
|
|
m_process_table_jobs.push_task(
|
|
std::bind(&BackupImpl::process_table_job, this, table_it.second.release(),
|
|
no_lock, true, false, std::placeholders::_1));
|
|
if (out_processed_tables)
|
|
out_processed_tables->insert(table_it.first);
|
|
}
|
|
|
|
msg("Stop scanning common engine tables");
|
|
return true;
|
|
}
|
|
|
|
bool BackupImpl::copy_log_tables(bool finalize) {
|
|
for (auto &table_it : m_log_tables) {
|
|
// Do not execute BACKUP LOCK for log tables as it's supposed
|
|
// that they must be copied on BLOCK_DDL and BLOCK_COMMIT locks.
|
|
m_process_table_jobs.push_task(
|
|
std::bind(&BackupImpl::process_table_job, this, table_it.second.get(),
|
|
true, false, finalize, std::placeholders::_1));
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool BackupImpl::copy_stats_tables() {
|
|
for (auto &table_it : m_stats_tables) {
|
|
// Do not execute BACKUP LOCK for stats tables as it's supposed
|
|
// that they must be copied on BLOCK_DDL and BLOCK_COMMIT locks.
|
|
// Delete stats table object after copy (see process_table_job())
|
|
m_process_table_jobs.push_task(
|
|
std::bind(&BackupImpl::process_table_job, this, table_it.second.release(),
|
|
true, true, false, std::placeholders::_1));
|
|
}
|
|
m_stats_tables.clear();
|
|
return true;
|
|
}
|
|
|
|
bool BackupImpl::wait_for_finish() {
|
|
/* Wait for threads to exit */
|
|
return m_process_table_jobs.wait_for_finish();
|
|
}
|
|
|
|
bool BackupImpl::close_log_tables() {
|
|
bool result = wait_for_finish();
|
|
for (auto &table_it : m_log_tables)
|
|
table_it.second->close();
|
|
return result;
|
|
}
|
|
|
|
Backup::Backup(const char *datadir_path, ds_ctxt_t *datasink,
|
|
std::vector<MYSQL *> &con_pool, ThreadPool &thread_pool) :
|
|
m_backup_impl(
|
|
new BackupImpl(datadir_path, datasink, con_pool,
|
|
thread_pool)) { }
|
|
|
|
Backup::~Backup() {
|
|
delete m_backup_impl;
|
|
}
|
|
|
|
bool Backup::scan(
|
|
const std::unordered_set<table_key_t> &exclude_tables,
|
|
std::unordered_set<table_key_t> *out_processed_tables,
|
|
bool no_lock, bool collect_log_and_stats) {
|
|
return m_backup_impl->scan(exclude_tables, out_processed_tables, no_lock,
|
|
collect_log_and_stats);
|
|
}
|
|
|
|
bool Backup::copy_log_tables(bool finalize) {
|
|
return m_backup_impl->copy_log_tables(finalize);
|
|
}
|
|
|
|
bool Backup::copy_stats_tables() {
|
|
return m_backup_impl->copy_stats_tables();
|
|
}
|
|
|
|
bool Backup::wait_for_finish() {
|
|
return m_backup_impl->wait_for_finish();
|
|
}
|
|
|
|
bool Backup::close_log_tables() {
|
|
return m_backup_impl->close_log_tables();
|
|
}
|
|
|
|
void Backup::set_post_copy_table_hook(const post_copy_table_hook_t &hook) {
|
|
m_backup_impl->set_post_copy_table_hook(hook);
|
|
}
|
|
|
|
} // namespace common_engine
|