mirror of
https://github.com/MariaDB/server.git
synced 2025-07-28 06:04:59 +02:00
1702 lines
51 KiB
C++
1702 lines
51 KiB
C++
/*****************************************************************************
|
|
|
|
Copyright (c) 2017, 2024, Oracle and/or its affiliates.
|
|
|
|
This program is free software; you can redistribute it and/or modify it under
|
|
the terms of the GNU General Public License, version 2.0, as published by the
|
|
Free Software Foundation.
|
|
|
|
This program is designed to work with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have either included with
|
|
the program or referenced in the documentation.
|
|
|
|
This program is distributed in the hope that it will be useful, but WITHOUT
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
|
|
for more details.
|
|
|
|
You should have received a copy of the GNU General Public License along with
|
|
this program; if not, write to the Free Software Foundation, Inc.,
|
|
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
*****************************************************************************/
|
|
|
|
/** @file clone/clone0apply.cc
|
|
Innodb apply snapshot data
|
|
|
|
*******************************************************/
|
|
|
|
#include <fstream>
|
|
#include <sstream>
|
|
|
|
#include "buf0dump.h"
|
|
#include "clone0api.h"
|
|
#include "clone0clone.h"
|
|
#include "dict0dict.h"
|
|
#include "handler.h"
|
|
|
|
int Clone_Snapshot::get_file_from_desc(const Clone_File_Meta *file_meta,
|
|
const char *data_dir, bool desc_create,
|
|
bool &desc_exists,
|
|
Clone_file_ctx *&file_ctx) {
|
|
int err = 0;
|
|
|
|
mysql_mutex_lock(&m_snapshot_mutex);
|
|
|
|
auto idx = file_meta->m_file_index;
|
|
|
|
ut_ad(m_snapshot_handle_type == CLONE_HDL_APPLY);
|
|
|
|
ut_ad(m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY ||
|
|
m_snapshot_state == CLONE_SNAPSHOT_PAGE_COPY ||
|
|
m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY);
|
|
|
|
desc_exists = false;
|
|
|
|
/* File metadata is already there, possibly sent by another task. */
|
|
file_ctx = get_file_ctx_by_index(idx);
|
|
|
|
if (file_ctx != nullptr) {
|
|
desc_exists = true;
|
|
|
|
} else if (desc_create) {
|
|
/* Create the descriptor. */
|
|
err = create_desc(data_dir, file_meta, false, file_ctx);
|
|
}
|
|
|
|
mysql_mutex_unlock(&m_snapshot_mutex);
|
|
|
|
return (err);
|
|
}
|
|
|
|
int Clone_Snapshot::rename_desc(const Clone_File_Meta *file_meta,
|
|
const char *data_dir,
|
|
Clone_file_ctx *&file_ctx) {
|
|
/* Create new file context with new name. */
|
|
auto err = create_desc(data_dir, file_meta, true, file_ctx);
|
|
|
|
if (err != 0) {
|
|
return err; /* purecov: inspected */
|
|
}
|
|
|
|
file_ctx->m_state.store(Clone_file_ctx::State::RENAMED);
|
|
|
|
/* Overwrite with the renamed file context. */
|
|
add_file_from_desc(file_ctx, false);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int Clone_Snapshot::fix_ddl_extension(const char *data_dir,
|
|
Clone_file_ctx *file_ctx) {
|
|
ut_ad(file_ctx->m_extension == Clone_file_ctx::Extension::DDL);
|
|
|
|
/* If data directory is being replaced. */
|
|
bool replace_dir = (data_dir == nullptr);
|
|
|
|
auto file_meta = file_ctx->get_file_meta();
|
|
bool is_undo_file = srv_is_undo_tablespace(file_meta->m_space_id);
|
|
bool is_redo_file = file_meta->m_space_id == SRV_SPACE_ID_UPPER_BOUND;
|
|
|
|
auto extn = Clone_file_ctx::Extension::NONE;
|
|
const std::string file_path(file_meta->m_file_name);
|
|
|
|
/* Check if file is already present and extension is needed. */
|
|
auto err = handle_existing_file(replace_dir, is_undo_file, is_redo_file,
|
|
file_meta->m_file_index, file_path, extn);
|
|
if (err == 0) {
|
|
file_ctx->m_extension = extn;
|
|
}
|
|
|
|
return err;
|
|
}
|
|
|
|
int Clone_Snapshot::update_sys_file_name(bool replace,
|
|
const Clone_File_Meta *file_meta,
|
|
std::string &file_name) {
|
|
/* Currently needed only while replacing data directory. */
|
|
if (!replace) {
|
|
return (0);
|
|
}
|
|
auto space_id = file_meta->m_space_id;
|
|
|
|
/* Update buffer pool dump file path for provisioning. */
|
|
if (space_id == UINT32_MAX) {
|
|
ut_ad(0 == strcmp(file_name.c_str(), SRV_BUF_DUMP_FILENAME_DEFAULT));
|
|
|
|
char path[OS_FILE_MAX_PATH];
|
|
buf_dump_generate_path(path, sizeof(path));
|
|
|
|
file_name.assign(path);
|
|
return (0);
|
|
}
|
|
|
|
/* Change name to system configured file when replacing current directory. */
|
|
if (!is_system_tablespace(space_id)) {
|
|
return (0);
|
|
}
|
|
|
|
/* Find out the node index of the file within system tablespace. */
|
|
auto loop_index = file_meta->m_file_index;
|
|
|
|
if (loop_index >= num_data_files()) {
|
|
/* purecov: begin deadcode */
|
|
int err = ER_CLONE_PROTOCOL;
|
|
my_error(err, MYF(0), "Wrong Clone RPC: Invalid File Index");
|
|
ut_d(ut_error);
|
|
return err;
|
|
/* purecov: end */
|
|
}
|
|
|
|
decltype(loop_index) node_index = 0;
|
|
|
|
while (loop_index > 0) {
|
|
--loop_index;
|
|
auto file_ctx = get_file_ctx_by_index(loop_index);
|
|
auto cur_desc = file_ctx->get_file_meta();
|
|
/* Loop through all files of current tablespace. */
|
|
if (cur_desc->m_space_id != space_id) {
|
|
break;
|
|
}
|
|
++node_index;
|
|
}
|
|
|
|
auto last_file_index =
|
|
static_cast<decltype(node_index)>(srv_sys_space.m_files.size() - 1);
|
|
|
|
/* Check if the file is beyond maximum configured files. */
|
|
if (node_index > last_file_index) {
|
|
std::ostringstream err_strm;
|
|
err_strm << "innodb_data_file_path: Recipient file count: "
|
|
<< last_file_index + 1 << " is less than Donor file count.";
|
|
|
|
std::string err_str(err_strm.str());
|
|
|
|
my_error(ER_CLONE_SYS_CONFIG, MYF(0), err_str.c_str());
|
|
|
|
return (ER_CLONE_SYS_CONFIG);
|
|
}
|
|
|
|
auto &file = srv_sys_space.m_files[node_index];
|
|
auto page_sz= fil_space_t::physical_size(srv_sys_space.flags());
|
|
|
|
auto size_bytes = static_cast<uint64_t>(file.size());
|
|
size_bytes *= page_sz;
|
|
|
|
/* Check if the file size matches with configured files. */
|
|
if (file_meta->m_file_size != size_bytes) {
|
|
/* For last file it could mismatch if auto extend is specified. */
|
|
if (node_index != last_file_index ||
|
|
!srv_sys_space.can_auto_extend_last_file()) {
|
|
/* purecov: begin tested */
|
|
std::ostringstream err_strm;
|
|
|
|
err_strm << "innodb_data_file_path: Recipient value for " << node_index
|
|
<< "th file size: " << size_bytes
|
|
<< " doesn't match Donor file size: " << file_meta->m_file_size;
|
|
|
|
std::string err_str(err_strm.str());
|
|
|
|
my_error(ER_CLONE_SYS_CONFIG, MYF(0), err_str.c_str());
|
|
|
|
return (ER_CLONE_SYS_CONFIG);
|
|
/* purecov: end */
|
|
}
|
|
}
|
|
|
|
/* Change filename to currently configured name. */
|
|
file_name.assign(file.filepath());
|
|
return (0);
|
|
}
|
|
|
|
int Clone_Snapshot::handle_existing_file(bool replace, bool undo_file,
|
|
bool redo_file,
|
|
uint32_t data_file_index,
|
|
const std::string &data_file,
|
|
Clone_file_ctx::Extension &extn) {
|
|
extn = Clone_file_ctx::Extension::NONE;
|
|
/* For undo tablespace, check for duplicate file name. Currently it
|
|
is possible to create multiple undo tablespaces of same name under
|
|
different directory. This should not be recommended and in future
|
|
we aim to disallow specifying file name for tablespaces and generate
|
|
it internally based on space ID. Till that time, Clone needs to identify
|
|
and disallow undo tablespaces of same name as Clone creates all undo
|
|
tablespaces under innodb_undo_directory configuration in recipient. */
|
|
if (undo_file) {
|
|
for (auto undo_index : m_undo_file_indexes) {
|
|
auto undo_file_ctx = get_file_ctx_by_index(undo_index);
|
|
if (undo_file_ctx == nullptr || undo_file_ctx->deleted()) {
|
|
continue;
|
|
}
|
|
auto undo_meta = undo_file_ctx->get_file_meta();
|
|
|
|
if (0 == strcmp(undo_meta->m_file_name, data_file.c_str())) {
|
|
/* purecov: begin tested */
|
|
std::ostringstream err_strm;
|
|
err_strm << "Found multiple undo files with same name: " << data_file;
|
|
std::string err_str(err_strm.str());
|
|
my_error(ER_CLONE_SYS_CONFIG, MYF(0), err_str.c_str());
|
|
return (ER_CLONE_SYS_CONFIG);
|
|
/* purecov: end */
|
|
}
|
|
}
|
|
m_undo_file_indexes.push_back(data_file_index);
|
|
/* With concurrent DDL support there could be deleted undo file
|
|
indexes here. At the end of every stage, new undo files could be
|
|
added limited by TRX_SYS_MAX_UNDO_SPACES. */
|
|
ut_ad(m_undo_file_indexes.size() <=
|
|
CLONE_MAX_TRANSFER_STAGES * TRX_SYS_MAX_UNDO_SPACES);
|
|
}
|
|
|
|
os_file_type_t type= OS_FILE_TYPE_UNKNOWN;
|
|
bool exists= false;
|
|
|
|
bool ret= os_file_status(data_file.c_str(), &exists, &type);
|
|
if (ret && !exists)
|
|
{
|
|
int err= 0;
|
|
if (replace) {
|
|
/* Add file to new file list to enable rollback. */
|
|
err= clone_add_to_list_file(CLONE_INNODB_NEW_FILES, data_file.c_str());
|
|
}
|
|
/* Nothing to do if file doesn't exist. */
|
|
extn= Clone_file_ctx::Extension::NONE;
|
|
return err;
|
|
}
|
|
|
|
if (!ret || type != OS_FILE_TYPE_FILE) {
|
|
/* purecov: begin inspected */
|
|
/* Either the stat() call failed or the name is a
|
|
directory/block device, or permission error etc. */
|
|
char errbuf[MYSYS_STRERROR_SIZE];
|
|
my_error(ER_ERROR_ON_WRITE, MYF(0), data_file.c_str(), errno,
|
|
my_strerror(errbuf, sizeof(errbuf), errno));
|
|
return ER_ERROR_ON_WRITE;
|
|
/* purecov: end */
|
|
}
|
|
|
|
/* For cloning to different data directory, we must ensure that the
|
|
file is not present. This would always fail for local clone. */
|
|
if (!replace) {
|
|
my_error(ER_FILE_EXISTS_ERROR, MYF(0), data_file.c_str());
|
|
return ER_FILE_EXISTS_ERROR;
|
|
}
|
|
std::string clone_file= data_file + CLONE_INNODB_REPLACED_FILE_EXTN;
|
|
|
|
/* Check that file with clone extension is not present */
|
|
type= OS_FILE_TYPE_UNKNOWN;
|
|
exists= false;
|
|
ret= os_file_status(clone_file.c_str(), &exists, &type);
|
|
|
|
if (ret && exists)
|
|
{
|
|
my_error(ER_FILE_EXISTS_ERROR, MYF(0), clone_file.c_str());
|
|
return ER_FILE_EXISTS_ERROR;
|
|
}
|
|
extn= Clone_file_ctx::Extension::REPLACE;
|
|
|
|
/* Add file name to files to be replaced before recovery. */
|
|
return clone_add_to_list_file(CLONE_INNODB_REPLACED_FILES, data_file.c_str());
|
|
}
|
|
|
|
int Clone_Snapshot::build_file_path(const char *data_dir,
|
|
const Clone_File_Meta *file_meta,
|
|
std::string &built_path)
|
|
{
|
|
std::string source;
|
|
|
|
bool redo_file= (m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY);
|
|
bool absolute_path= false;
|
|
|
|
if (!redo_file)
|
|
{
|
|
source.assign(file_meta->m_file_name);
|
|
|
|
bool replace= (data_dir == nullptr);
|
|
auto err= update_sys_file_name(replace, file_meta, source);
|
|
|
|
if (err != 0)
|
|
return err;
|
|
absolute_path= is_absolute_path(source.c_str());
|
|
}
|
|
|
|
/* For absolute path, copy the name and return. */
|
|
if (absolute_path)
|
|
{
|
|
auto is_hard_path= test_if_hard_path(source.c_str());
|
|
|
|
/* Check if the absolute path is not in right format */
|
|
if (is_hard_path == 0)
|
|
{
|
|
my_error(ER_WRONG_VALUE, MYF(0), "file path", source.c_str());
|
|
return ER_WRONG_VALUE;
|
|
}
|
|
|
|
built_path.assign(source);
|
|
return 0;
|
|
}
|
|
bool undo_file= srv_is_undo_tablespace(file_meta->m_space_id);
|
|
|
|
/* Append appropriate data directory path. */
|
|
if (data_dir != nullptr)
|
|
built_path= std::string{data_dir};
|
|
else if (redo_file)
|
|
/* Use configured path when cloning into current data directory. */
|
|
built_path= std::string{srv_log_group_home_dir};
|
|
else if (undo_file)
|
|
built_path= std::string{srv_undo_dir};
|
|
else
|
|
built_path = std::string{};
|
|
|
|
/* Add path separator if required. */
|
|
if (!built_path.empty() && built_path.back() != OS_PATH_SEPARATOR)
|
|
built_path+= OS_PATH_SEPARATOR_STR;
|
|
|
|
/* Add file name. For redo file use standard name. */
|
|
if (redo_file)
|
|
{
|
|
/* TODO: Append all data to sinlgle redo log file. */
|
|
/* This is redo file. Use standard name. */
|
|
built_path+= LOG_FILE_NAME;
|
|
return 0;
|
|
}
|
|
ut_ad(!source.empty());
|
|
|
|
/* Remove dot slash prefix from source, if there. */
|
|
std::string dot_slash= "." + OS_PATH_SEPARATOR;
|
|
if (std::equal(dot_slash.begin(), dot_slash.end(), source.begin()))
|
|
source.erase(0, 2);
|
|
|
|
built_path+= source;
|
|
return 0;
|
|
}
|
|
|
|
int Clone_Snapshot::build_file_ctx(Clone_file_ctx::Extension extn,
|
|
const Clone_File_Meta *file_meta,
|
|
const std::string &file_path,
|
|
Clone_file_ctx *&file_ctx) {
|
|
size_t alloc_size = sizeof(Clone_file_ctx) + file_path.length() + 1;
|
|
|
|
/* Allocate for file path string. */
|
|
auto path = static_cast<char *>(mem_heap_alloc(m_snapshot_heap, alloc_size));
|
|
|
|
if (path == nullptr) {
|
|
/* purecov: begin inspected */
|
|
my_error(ER_OUTOFMEMORY, MYF(0), alloc_size);
|
|
return (ER_OUTOFMEMORY);
|
|
/* purecov: end */
|
|
}
|
|
|
|
/* Copy file metadata */
|
|
file_ctx = reinterpret_cast<Clone_file_ctx *>(path);
|
|
file_ctx->init(extn);
|
|
path += sizeof(Clone_file_ctx);
|
|
|
|
strcpy(path, file_path.c_str());
|
|
|
|
auto ctx_file_meta = file_ctx->get_file_meta();
|
|
*ctx_file_meta = *file_meta;
|
|
|
|
ctx_file_meta->m_file_name = static_cast<const char *>(path);
|
|
|
|
ctx_file_meta->m_file_name_len = file_path.length() + 1;
|
|
|
|
ctx_file_meta->m_file_name_alloc_len = ctx_file_meta->m_file_name_len;
|
|
|
|
return 0;
|
|
}
|
|
|
|
/** Add directory path to file
|
|
@param[in] dir directory
|
|
@param[in] file file name
|
|
@param[out] path file along with path. */
|
|
static void add_directory_path(const char *dir, const char *file,
|
|
std::string &path) {
|
|
path.clear();
|
|
/* Append directory */
|
|
if (dir != nullptr) {
|
|
path.assign(dir);
|
|
if (path.back() != OS_PATH_SEPARATOR) {
|
|
path.append(OS_PATH_SEPARATOR_STR); /* purecov: inspected */
|
|
}
|
|
}
|
|
/* Append file */
|
|
if (file != nullptr) {
|
|
path.append(file);
|
|
}
|
|
}
|
|
|
|
int Clone_Snapshot::create_desc(const char *data_dir,
|
|
const Clone_File_Meta *file_meta, bool is_ddl,
|
|
Clone_file_ctx *&file_ctx) {
|
|
/* Update file path from configuration. */
|
|
std::string file_path;
|
|
|
|
auto err = build_file_path(data_dir, file_meta, file_path);
|
|
|
|
if (err != 0) {
|
|
return (err);
|
|
}
|
|
|
|
auto extn = Clone_file_ctx::Extension::NONE;
|
|
|
|
if (is_ddl) {
|
|
extn = Clone_file_ctx::Extension::DDL;
|
|
|
|
std::string ddl_list_file;
|
|
add_directory_path(data_dir, CLONE_INNODB_DDL_FILES, ddl_list_file);
|
|
|
|
err = clone_add_to_list_file(ddl_list_file.c_str(), file_path.c_str());
|
|
|
|
} else {
|
|
/* If data directory is being replaced. */
|
|
bool replace_dir = (data_dir == nullptr);
|
|
bool is_undo_file = srv_is_undo_tablespace(file_meta->m_space_id);
|
|
bool is_redo_file = file_meta->m_space_id == SRV_SPACE_ID_UPPER_BOUND;
|
|
|
|
/* Check if file is already present in recipient. */
|
|
err = handle_existing_file(replace_dir, is_undo_file, is_redo_file,
|
|
file_meta->m_file_index, file_path, extn);
|
|
}
|
|
|
|
if (err == 0) {
|
|
/* Build complete path for the new file to be added. */
|
|
err = build_file_ctx(extn, file_meta, file_path, file_ctx);
|
|
}
|
|
return (err);
|
|
}
|
|
|
|
bool Clone_Snapshot::add_file_from_desc(Clone_file_ctx *&file_ctx,
|
|
bool ddl_create) {
|
|
mysql_mutex_lock(&m_snapshot_mutex);
|
|
|
|
ut_ad(m_snapshot_handle_type == CLONE_HDL_APPLY);
|
|
auto file_meta = file_ctx->get_file_meta();
|
|
|
|
if (m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY ||
|
|
m_snapshot_state == CLONE_SNAPSHOT_PAGE_COPY) {
|
|
if (ddl_create) {
|
|
ut_a(file_meta->m_file_index == num_data_files());
|
|
/* Add data file at the end and extend length. */
|
|
m_data_file_vector.push_back(file_ctx);
|
|
} else {
|
|
m_data_file_vector[file_meta->m_file_index] = file_ctx;
|
|
}
|
|
} else {
|
|
ut_ad(m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY);
|
|
m_redo_file_vector[file_meta->m_file_index] = file_ctx;
|
|
}
|
|
|
|
mysql_mutex_unlock(&m_snapshot_mutex);
|
|
|
|
/** Check if it the last file */
|
|
if (file_meta->m_file_index == num_data_files() - 1) {
|
|
return true;
|
|
}
|
|
|
|
return (false);
|
|
}
|
|
|
|
int Clone_Handle::apply_task_metadata(Clone_Task *task,
|
|
Ha_clone_cbk *callback) {
|
|
ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
|
|
uint desc_len = 0;
|
|
auto serial_desc = callback->get_data_desc(&desc_len);
|
|
|
|
Clone_Desc_Task_Meta task_desc;
|
|
auto success = task_desc.deserialize(serial_desc, desc_len);
|
|
|
|
if (!success) {
|
|
int err = ER_CLONE_PROTOCOL;
|
|
my_error(err, MYF(0), "Wrong Clone RPC: Invalid Task Descriptor");
|
|
ut_d(ut_error);
|
|
return err;
|
|
}
|
|
task->m_task_meta = task_desc.m_task_meta;
|
|
return (0);
|
|
}
|
|
|
|
int Clone_Handle::check_space(const Clone_Task *task) {
|
|
/* Do space check only during file copy. */
|
|
auto current_state = m_clone_task_manager.get_state();
|
|
if (!task->m_is_master || current_state != CLONE_SNAPSHOT_FILE_COPY) {
|
|
return (0);
|
|
}
|
|
uint64_t free_space;
|
|
std::string MySQL_datadir_abs_path= mysql_real_data_home;
|
|
auto data_dir =
|
|
(replace_datadir() ? MySQL_datadir_abs_path.c_str() : get_datadir());
|
|
|
|
auto db_err = os_get_free_space(data_dir, free_space);
|
|
/* We skip space check if the OS interface returns error. */
|
|
if (db_err != DB_SUCCESS) {
|
|
ib::warn()
|
|
<< "Clone could not validate available free space";
|
|
return (0);
|
|
}
|
|
|
|
auto snapshot = m_clone_task_manager.get_snapshot();
|
|
auto bytes_disk = snapshot->get_disk_estimate();
|
|
|
|
std::string avaiable_space;
|
|
std::string clone_space;
|
|
ut_format_byte_value(bytes_disk, clone_space);
|
|
ut_format_byte_value(free_space, avaiable_space);
|
|
|
|
int err = 0;
|
|
if (bytes_disk > free_space) {
|
|
err = ER_CLONE_DISK_SPACE;
|
|
my_error(err, MYF(0), clone_space.c_str(), avaiable_space.c_str());
|
|
}
|
|
|
|
ib::info()
|
|
<< "Clone estimated size: " << clone_space.c_str()
|
|
<< " Available space: " << avaiable_space.c_str();
|
|
return (err);
|
|
}
|
|
|
|
int Clone_Handle::apply_state_metadata(Clone_Task *task,
|
|
Ha_clone_cbk *callback) {
|
|
int err = 0;
|
|
uint desc_len = 0;
|
|
auto serial_desc = callback->get_data_desc(&desc_len);
|
|
|
|
Clone_Desc_State state_desc;
|
|
auto success = state_desc.deserialize(serial_desc, desc_len);
|
|
|
|
if (!success) {
|
|
err = ER_CLONE_PROTOCOL;
|
|
my_error(err, MYF(0), "Wrong Clone RPC: Invalid State Descriptor");
|
|
ut_d(ut_error);
|
|
return err;
|
|
}
|
|
if (m_clone_handle_type == CLONE_HDL_COPY) {
|
|
ut_ad(state_desc.m_is_ack);
|
|
m_clone_task_manager.ack_state(&state_desc);
|
|
return (0);
|
|
}
|
|
|
|
ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
|
|
|
|
/* ACK descriptor is sent for keeping the connection alive. */
|
|
if (state_desc.m_is_ack) {
|
|
return (0);
|
|
}
|
|
|
|
/* Reset current chunk information */
|
|
auto &task_meta = task->m_task_meta;
|
|
task_meta.m_chunk_num = 0;
|
|
task_meta.m_block_num = 0;
|
|
|
|
/* Move to the new state */
|
|
if (state_desc.m_is_start) {
|
|
#ifdef UNIV_DEBUG
|
|
/* Network failure before moving to new state */
|
|
err = m_clone_task_manager.debug_restart(task, err, 5);
|
|
if (err != 0) {
|
|
return err;
|
|
}
|
|
#endif /* UNIV_DEBUG */
|
|
|
|
/** Notify state change via callback. */
|
|
notify_state_change(task, callback, &state_desc);
|
|
|
|
err = fix_all_renamed(task);
|
|
|
|
if (err == 0) {
|
|
err = move_to_next_state(task, nullptr, &state_desc);
|
|
}
|
|
|
|
#ifdef UNIV_DEBUG
|
|
/* Network failure after moving to new state */
|
|
err = m_clone_task_manager.debug_restart(task, err, 0);
|
|
#endif /* UNIV_DEBUG */
|
|
|
|
/* Check if enough space available on disk */
|
|
if (err == 0) {
|
|
err = check_space(task);
|
|
}
|
|
|
|
return (err);
|
|
}
|
|
|
|
/* It is the end of current state. Close active file. */
|
|
err = close_file(task);
|
|
|
|
#ifdef UNIV_DEBUG
|
|
/* Network failure before finishing state */
|
|
err = m_clone_task_manager.debug_restart(task, err, 2);
|
|
#endif /* UNIV_DEBUG */
|
|
|
|
if (err != 0) {
|
|
return (err);
|
|
}
|
|
|
|
ut_ad(state_desc.m_state == m_clone_task_manager.get_state());
|
|
|
|
/* Mark current state finished for the task */
|
|
err = m_clone_task_manager.finish_state(task);
|
|
|
|
#ifdef UNIV_DEBUG
|
|
/* Network failure before sending ACK */
|
|
err = m_clone_task_manager.debug_restart(task, err, 3);
|
|
#endif /* UNIV_DEBUG */
|
|
|
|
/* Send acknowledgement back to remote server */
|
|
if (err == 0 && task->m_is_master) {
|
|
if (state_desc.m_state == CLONE_SNAPSHOT_FILE_COPY) {
|
|
DEBUG_SYNC_C("clone_file_copy_end_before_ack");
|
|
}
|
|
err = ack_state_metadata(task, callback, &state_desc);
|
|
|
|
if (err != 0) {
|
|
ib::info()
|
|
<< "Clone Apply Master ACK finshed state: " << state_desc.m_state;
|
|
}
|
|
}
|
|
|
|
#ifdef UNIV_DEBUG
|
|
/* Network failure after sending ACK */
|
|
err = m_clone_task_manager.debug_restart(task, err, 4);
|
|
#endif /* UNIV_DEBUG */
|
|
|
|
return (err);
|
|
}
|
|
|
|
void Clone_Handle::notify_state_change(Clone_Task *task, Ha_clone_cbk *callback,
|
|
Clone_Desc_State *state_desc) {
|
|
if (!task->m_is_master) {
|
|
return;
|
|
}
|
|
callback->mark_state_change(state_desc->m_estimate);
|
|
callback->buffer_cbk(nullptr, 0);
|
|
callback->clear_flags();
|
|
}
|
|
|
|
int Clone_Handle::ack_state_metadata(Clone_Task *, Ha_clone_cbk *callback,
|
|
Clone_Desc_State *state_desc) {
|
|
ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
|
|
|
|
state_desc->m_is_ack = true;
|
|
|
|
byte desc_buf[CLONE_DESC_MAX_BASE_LEN];
|
|
|
|
auto serial_desc = &desc_buf[0];
|
|
uint desc_len = CLONE_DESC_MAX_BASE_LEN;
|
|
|
|
state_desc->serialize(serial_desc, desc_len, nullptr);
|
|
|
|
callback->set_data_desc(serial_desc, desc_len);
|
|
callback->clear_flags();
|
|
|
|
auto err = callback->buffer_cbk(nullptr, 0);
|
|
|
|
return (err);
|
|
}
|
|
|
|
int Clone_Handle::apply_file_delete(Clone_Task *task, Clone_file_ctx *file_ctx,
|
|
const Clone_File_Meta *new_meta) {
|
|
auto err = close_file(task);
|
|
if (err != 0) {
|
|
return err; /* purecov: inspected */
|
|
}
|
|
|
|
auto file_meta = file_ctx->get_file_meta();
|
|
|
|
if (task->m_current_file_index != file_meta->m_file_index) {
|
|
task->m_current_file_index = file_meta->m_file_index;
|
|
}
|
|
|
|
auto snapshot = m_clone_task_manager.get_snapshot();
|
|
|
|
auto begin_chunk = file_meta->m_begin_chunk;
|
|
auto end_chunk = file_meta->m_end_chunk;
|
|
auto block_num = snapshot->get_blocks_per_chunk();
|
|
auto data_size = snapshot->get_chunk_size();
|
|
|
|
/* For page copy, we reset one page of the current chunk passed. Chunks
|
|
in file_meta corresponds to chunk in file copy. */
|
|
if (snapshot->get_state() == CLONE_SNAPSHOT_PAGE_COPY) {
|
|
begin_chunk = new_meta->m_begin_chunk;
|
|
end_chunk = begin_chunk;
|
|
block_num = 0;
|
|
data_size = UNIV_PAGE_SIZE;
|
|
}
|
|
|
|
Clone_Task_Meta new_task_meta = task->m_task_meta;
|
|
|
|
/* Consume all chunks of deleted file. */
|
|
for (auto cur_chunk = begin_chunk; cur_chunk <= end_chunk; ++cur_chunk) {
|
|
/* Set current chunk details. */
|
|
new_task_meta.m_chunk_num = cur_chunk;
|
|
new_task_meta.m_block_num = block_num;
|
|
|
|
if (m_clone_task_manager.is_chunk_reserved(cur_chunk)) {
|
|
continue;
|
|
}
|
|
|
|
m_clone_task_manager.set_chunk(task, &new_task_meta);
|
|
|
|
/* Set data size for progress estimation. */
|
|
task->m_data_size = data_size;
|
|
}
|
|
|
|
if (!file_ctx->deleted()) {
|
|
file_ctx->m_state.store(Clone_file_ctx::State::DROPPED);
|
|
}
|
|
|
|
std::string old_file;
|
|
file_ctx->get_file_name(old_file);
|
|
|
|
std::string mesg("FILE : ");
|
|
mesg.append(old_file);
|
|
mesg.append(" Space ID: ");
|
|
mesg.append(std::to_string(file_meta->m_space_id));
|
|
mesg.append(" Chunks : ");
|
|
mesg.append(std::to_string(begin_chunk));
|
|
mesg.append(" - ");
|
|
mesg.append(std::to_string(end_chunk));
|
|
|
|
ib::info() << "Clone DDL Invalidate : " << mesg;
|
|
return 0;
|
|
}
|
|
|
|
int Clone_Handle::apply_ddl(const Clone_File_Meta *new_meta,
|
|
Clone_file_ctx *file_ctx) {
|
|
auto snapshot = m_clone_task_manager.get_snapshot();
|
|
ut_ad(snapshot->get_state() == CLONE_SNAPSHOT_FILE_COPY ||
|
|
snapshot->get_state() == CLONE_SNAPSHOT_PAGE_COPY);
|
|
|
|
std::string old_file;
|
|
file_ctx->get_file_name(old_file);
|
|
|
|
std::string mesg("DELETE FILE : ");
|
|
|
|
if (new_meta->is_deleted()) {
|
|
/* Check if we have already deleted the file context. This is possible
|
|
in case of a network error and restart where donor could send the delete
|
|
request again. */
|
|
if (file_ctx->m_state.load() == Clone_file_ctx::State::DROPPED_HANDLED) {
|
|
mesg.append(" IGNORE : ");
|
|
|
|
} else {
|
|
/* File needs to be deleted. */
|
|
if (!os_file_delete(innodb_clone_file_key, old_file.c_str())) {
|
|
/* purecov: begin inspected */
|
|
mesg.append("Innodb Clone Apply Failed to delete file: ");
|
|
mesg.append(old_file);
|
|
my_error(ER_INTERNAL_ERROR, MYF(0), mesg.c_str());
|
|
return ER_INTERNAL_ERROR;
|
|
/* purecov: end */
|
|
}
|
|
file_ctx->m_state.store(Clone_file_ctx::State::DROPPED_HANDLED);
|
|
}
|
|
mesg.append(old_file);
|
|
mesg.append(" Space ID: ");
|
|
mesg.append(std::to_string(new_meta->m_space_id));
|
|
|
|
ib::info() << "Clone DDL APPLY: " << mesg;
|
|
return 0;
|
|
}
|
|
|
|
auto old_meta = file_ctx->get_file_meta();
|
|
|
|
/* Check if file needs to be renamed. */
|
|
if (!new_meta->is_renamed()) {
|
|
std::string update_mesg;
|
|
/* Set new encryption and compression type. */
|
|
/* TODO: Handle if encryption is enabled/disabled. */
|
|
if (old_meta->can_compress() != new_meta->can_compress()) {
|
|
old_meta->m_is_compressed = new_meta->m_is_compressed;
|
|
if (new_meta->can_compress())
|
|
update_mesg.assign("UNCOMPRESSED ");
|
|
else
|
|
update_mesg.assign("COMPRESSED ");
|
|
}
|
|
|
|
auto err = set_compression(file_ctx);
|
|
|
|
std::string mesg("SET FILE ");
|
|
mesg.append(update_mesg);
|
|
mesg.append(": ");
|
|
mesg.append(old_file);
|
|
mesg.append(" Space ID: ");
|
|
mesg.append(std::to_string(new_meta->m_space_id));
|
|
|
|
ib::info() << "Clone DDL APPLY: " << mesg;
|
|
return err;
|
|
}
|
|
|
|
Clone_file_ctx *new_ctx = nullptr;
|
|
|
|
/* Rename file context. */
|
|
auto err = snapshot->rename_desc(new_meta, m_clone_dir, new_ctx);
|
|
|
|
if (err != 0) {
|
|
return err; /* purecov: inspected */
|
|
}
|
|
|
|
std::string new_file;
|
|
new_ctx->get_file_name(new_file);
|
|
|
|
/* Preserve the old file size which could have been extended while applying
|
|
page 0 changes and set it to new descriptor. */
|
|
auto file_meta = new_ctx->get_file_meta();
|
|
auto file_size = file_meta->m_file_size;
|
|
|
|
if (file_size < old_meta->m_file_size) {
|
|
file_size = old_meta->m_file_size;
|
|
}
|
|
file_meta->m_file_size = file_size;
|
|
|
|
/* Do the actual rename. At this point we rename the files with temp DDL
|
|
extension. After all rename and delete requests are received we rename
|
|
the files again removing the ddl extension. This is required as file rename
|
|
requests are not in the real order and there could be conflicts. */
|
|
ut_ad(new_ctx->m_extension == Clone_file_ctx::Extension::DDL);
|
|
|
|
std::string rename_mesg("RENAME FILE WITH EXTN: ");
|
|
|
|
if (old_file.compare(new_file) == 0) {
|
|
rename_mesg.append(" IGNORE : ");
|
|
|
|
} else {
|
|
bool success =
|
|
os_file_rename(OS_CLONE_DATA_FILE, old_file.c_str(), new_file.c_str());
|
|
|
|
if (!success) {
|
|
/* purecov: begin inspected */
|
|
char errbuf[MYSYS_STRERROR_SIZE];
|
|
err = ER_ERROR_ON_RENAME;
|
|
|
|
my_error(ER_ERROR_ON_RENAME, MYF(0), old_file.c_str(), new_file.c_str(),
|
|
errno, my_strerror(errbuf, sizeof(errbuf), errno));
|
|
/* purecov: end */
|
|
}
|
|
}
|
|
|
|
rename_mesg.append(old_file);
|
|
rename_mesg.append(" to ");
|
|
rename_mesg.append(new_file);
|
|
rename_mesg.append(" Space ID: ");
|
|
rename_mesg.append(std::to_string(new_meta->m_space_id));
|
|
|
|
ib::info() << "Clone DDL APPLY: " << rename_mesg;
|
|
|
|
if (err == 0) {
|
|
err = set_compression(new_ctx);
|
|
}
|
|
return err;
|
|
}
|
|
|
|
int Clone_Handle::fix_all_renamed(const Clone_Task *task) {
|
|
/* Do space check only during file copy and page copy. */
|
|
auto current_state = m_clone_task_manager.get_state();
|
|
|
|
bool fix_needed = current_state == CLONE_SNAPSHOT_FILE_COPY ||
|
|
current_state == CLONE_SNAPSHOT_PAGE_COPY;
|
|
|
|
if (!task->m_is_master || !fix_needed) {
|
|
return 0;
|
|
}
|
|
|
|
auto snapshot = m_clone_task_manager.get_snapshot();
|
|
|
|
ut_ad(snapshot->get_state() == CLONE_SNAPSHOT_FILE_COPY ||
|
|
snapshot->get_state() == CLONE_SNAPSHOT_PAGE_COPY);
|
|
|
|
auto fix_func = [&](Clone_file_ctx *file_ctx) {
|
|
/* Need to handle files with DDL extension. */
|
|
if (file_ctx->deleted() ||
|
|
file_ctx->m_extension != Clone_file_ctx::Extension::DDL) {
|
|
return 0;
|
|
}
|
|
/* Save old file name */
|
|
std::string old_file;
|
|
file_ctx->get_file_name(old_file);
|
|
|
|
auto err = snapshot->fix_ddl_extension(m_clone_dir, file_ctx);
|
|
if (err != 0) {
|
|
return err; /* purecov: inspected */
|
|
}
|
|
/* Get new file name. */
|
|
std::string new_file;
|
|
file_ctx->get_file_name(new_file);
|
|
|
|
/* Rename file */
|
|
bool success =
|
|
os_file_rename(OS_CLONE_DATA_FILE, old_file.c_str(), new_file.c_str());
|
|
if (!success) {
|
|
/* purecov: begin inspected */
|
|
char errbuf[MYSYS_STRERROR_SIZE];
|
|
err = ER_ERROR_ON_RENAME;
|
|
|
|
my_error(ER_ERROR_ON_RENAME, MYF(0), old_file.c_str(), new_file.c_str(),
|
|
errno, my_strerror(errbuf, sizeof(errbuf), errno));
|
|
/* purecov: end */
|
|
}
|
|
|
|
std::string mesg("RENAMED FILE REMOVED EXTN : ");
|
|
mesg.append(old_file);
|
|
mesg.append(" to ");
|
|
mesg.append(new_file);
|
|
mesg.append(" Space ID: ");
|
|
auto file_meta = file_ctx->get_file_meta_read();
|
|
mesg.append(std::to_string(file_meta->m_space_id));
|
|
|
|
ib::info() << "Clone DDL APPLY: " << mesg;
|
|
return err;
|
|
};
|
|
|
|
auto err = snapshot->iterate_data_files(fix_func);
|
|
|
|
/* Delete ddl list file. */
|
|
if (err == 0) {
|
|
std::string ddl_list_file;
|
|
add_directory_path(m_clone_dir, CLONE_INNODB_DDL_FILES, ddl_list_file);
|
|
|
|
clone_remove_list_file(ddl_list_file.c_str());
|
|
}
|
|
|
|
return err;
|
|
}
|
|
|
|
/* Check and set punch hole for compressed page table. */
|
|
int Clone_Handle::set_compression(Clone_file_ctx *file_ctx) {
|
|
auto file_meta = file_ctx->get_file_meta();
|
|
|
|
if (!file_meta->can_compress() || file_ctx->deleted())
|
|
return 0;
|
|
|
|
/* Disable punch hole if donor compression is not effective. */
|
|
auto comp_type= fil_space_t::get_compression_algo(file_meta->m_fsp_flags);
|
|
if (comp_type == PAGE_UNCOMPRESSED ||
|
|
file_meta->m_fsblk_size * 2 > srv_page_size)
|
|
{
|
|
file_meta->m_punch_hole= false;
|
|
return 0;
|
|
}
|
|
|
|
os_file_stat_t stat_info;
|
|
std::string file_name;
|
|
file_ctx->get_file_name(file_name);
|
|
|
|
os_file_get_status(file_name.c_str(), &stat_info, false, false);
|
|
|
|
/* Check and disable punch hole if recipient cannot support it. */
|
|
file_meta->m_punch_hole= (stat_info.block_size * 2 <= srv_page_size);
|
|
|
|
/* Old format for compressed and encrypted page is
|
|
dependent on file system block size. */
|
|
if (file_meta->can_encrypt() &&
|
|
file_meta->m_fsblk_size != stat_info.block_size) {
|
|
auto donor_str= std::to_string(file_meta->m_fsblk_size);
|
|
auto recipient_str= std::to_string(stat_info.block_size);
|
|
|
|
/* TODO: Check and get rid of this restriction. */
|
|
my_error(ER_CLONE_CONFIG, MYF(0), "FS Block Size", donor_str.c_str(),
|
|
recipient_str.c_str());
|
|
return ER_CLONE_CONFIG;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int Clone_Handle::file_create_init(const Clone_file_ctx *file_ctx,
|
|
ulint file_type, bool init)
|
|
{
|
|
/* Create the file and path. */
|
|
File_init_cbk init_cbk= [&](pfs_os_file_t file)
|
|
{
|
|
if (!init)
|
|
return DB_SUCCESS;
|
|
|
|
std::string file_name;
|
|
file_ctx->get_file_name(file_name);
|
|
|
|
const auto file_meta= file_ctx->get_file_meta_read();
|
|
bool is_undo_file= srv_is_undo_tablespace(file_meta->m_space_id);
|
|
|
|
page_no_t size_in_pages=
|
|
is_undo_file ? SRV_UNDO_TABLESPACE_SIZE_IN_PAGES : FIL_IBD_FILE_INITIAL_SIZE;
|
|
|
|
dberr_t db_err= DB_SUCCESS;
|
|
std::string mesg("CREATE NEW FILE : ");
|
|
|
|
ut_ad(!file_meta->m_transfer_encryption_key);
|
|
/* TODO: 1. Check if initial pages need to be written.
|
|
2. Write page header encryption information. */
|
|
if (!os_file_set_size(file_name.c_str(), file,
|
|
size_in_pages << srv_page_size_shift,
|
|
file_meta->m_punch_hole))
|
|
db_err= DB_OUT_OF_FILE_SPACE;
|
|
|
|
mesg.append(file_name);
|
|
mesg.append(" Space ID: ");
|
|
mesg.append(std::to_string(file_meta->m_space_id));
|
|
|
|
if (db_err != DB_SUCCESS)
|
|
mesg.append(" FAILED");
|
|
|
|
ib::info() << "Clone DDL APPLY: " << mesg;
|
|
return db_err;
|
|
};
|
|
|
|
auto err= open_file(nullptr, file_ctx, file_type, true, init_cbk);
|
|
return err;
|
|
}
|
|
|
|
int Clone_Handle::apply_file_metadata(Clone_Task *task,
|
|
Ha_clone_cbk *callback) {
|
|
ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
|
|
|
|
uint desc_len = 0;
|
|
auto serial_desc = callback->get_data_desc(&desc_len);
|
|
|
|
Clone_Desc_File_MetaData file_desc;
|
|
auto success = file_desc.deserialize(serial_desc, desc_len);
|
|
|
|
if (!success) {
|
|
int err = ER_CLONE_PROTOCOL;
|
|
my_error(err, MYF(0), "Wrong Clone RPC: Invalid File Descriptor");
|
|
ut_d(ut_error);
|
|
return err;
|
|
}
|
|
const auto file_desc_meta = &file_desc.m_file_meta;
|
|
auto snapshot = m_clone_task_manager.get_snapshot();
|
|
|
|
/* At end of current state DDL file alterations are communicated. */
|
|
bool ddl_desc = (file_desc.m_state == snapshot->get_next_state());
|
|
|
|
ut_ad(ddl_desc || snapshot->get_state() == file_desc.m_state);
|
|
|
|
bool file_deleted = file_desc_meta->is_deleted();
|
|
|
|
bool desc_exists = false;
|
|
Clone_file_ctx *file_ctx = nullptr;
|
|
|
|
/* Check file metadata entry based on the descriptor. */
|
|
auto err = snapshot->get_file_from_desc(file_desc_meta, m_clone_dir, false,
|
|
desc_exists, file_ctx);
|
|
if (err != 0) {
|
|
return (err);
|
|
}
|
|
|
|
if (desc_exists) {
|
|
if (ddl_desc) {
|
|
err = apply_ddl(file_desc_meta, file_ctx);
|
|
|
|
} else if (file_deleted) {
|
|
/* File delete notification sent immediately for chunk adjustment. */
|
|
err = apply_file_delete(task, file_ctx, file_desc_meta);
|
|
}
|
|
return err;
|
|
}
|
|
|
|
mysql_mutex_lock(m_clone_task_manager.get_mutex());
|
|
|
|
/* Create file metadata entry based on the descriptor. */
|
|
err = snapshot->get_file_from_desc(file_desc_meta, m_clone_dir, true,
|
|
desc_exists, file_ctx);
|
|
if (err != 0 || desc_exists) {
|
|
mysql_mutex_unlock(m_clone_task_manager.get_mutex());
|
|
|
|
/* Save error with file name. */
|
|
if (err != 0) {
|
|
m_clone_task_manager.set_error(err, file_desc_meta->m_file_name);
|
|
}
|
|
return (err);
|
|
}
|
|
|
|
auto file_meta = file_ctx->get_file_meta();
|
|
file_meta->m_punch_hole = false;
|
|
|
|
bool is_file_copy = snapshot->get_state() == CLONE_SNAPSHOT_FILE_COPY;
|
|
bool is_page_copy = snapshot->get_state() == CLONE_SNAPSHOT_PAGE_COPY;
|
|
|
|
if (is_file_copy || is_page_copy) {
|
|
ut_ad(is_file_copy || ddl_desc);
|
|
|
|
auto file_type = OS_CLONE_DATA_FILE;
|
|
|
|
if (file_meta->m_space_id == UINT32_MAX) {
|
|
file_type = OS_CLONE_LOG_FILE;
|
|
}
|
|
|
|
if (file_deleted) {
|
|
/* Mark the newly created descriptor deleted. */
|
|
file_ctx->m_state.store(Clone_file_ctx::State::DROPPED_HANDLED);
|
|
|
|
std::string file_name;
|
|
file_ctx->get_file_name(file_name);
|
|
|
|
std::string mesg("ADD DELETED FILE : ");
|
|
mesg.append(file_name);
|
|
mesg.append(" Space ID: ");
|
|
mesg.append(std::to_string(file_meta->m_space_id));
|
|
ib::info() << "Clone DDL APPLY: " << mesg;
|
|
|
|
} else {
|
|
/* Create the file and write initial pages if created by DDL. */
|
|
err = file_create_init(file_ctx, file_type, ddl_desc);
|
|
}
|
|
|
|
/* If last file is received, set all file metadata transferred */
|
|
if (snapshot->add_file_from_desc(file_ctx, ddl_desc)) {
|
|
m_clone_task_manager.set_file_meta_transferred();
|
|
}
|
|
|
|
mysql_mutex_unlock(m_clone_task_manager.get_mutex());
|
|
|
|
if (err == 0 && file_type == OS_CLONE_DATA_FILE) {
|
|
err = set_compression(file_ctx);
|
|
}
|
|
return err;
|
|
}
|
|
|
|
ut_ad(snapshot->get_state() == CLONE_SNAPSHOT_REDO_COPY);
|
|
ut_ad(file_desc.m_state == CLONE_SNAPSHOT_REDO_COPY);
|
|
ut_ad(!ddl_desc);
|
|
|
|
/* open and reserve the redo file size */
|
|
File_init_cbk empty_cbk;
|
|
|
|
err = open_file(nullptr, file_ctx, OS_CLONE_LOG_FILE, true, empty_cbk);
|
|
|
|
snapshot->add_file_from_desc(file_ctx, false);
|
|
|
|
mysql_mutex_unlock(m_clone_task_manager.get_mutex());
|
|
return (err);
|
|
}
|
|
|
|
bool Clone_Handle::read_compressed_len(unsigned char *buffer, uint32_t len,
|
|
bool crc32, uint32_t block_size,
|
|
uint32_t &compressed_len)
|
|
{
|
|
bool compressed=false;
|
|
if (crc32)
|
|
{
|
|
ut_a(len >= FIL_PAGE_TYPE + 2);
|
|
compressed_len= buf_page_full_crc32_size(buffer, &compressed, nullptr);
|
|
return compressed;
|
|
}
|
|
uint32_t header_len= FIL_PAGE_DATA;
|
|
|
|
switch (fil_page_get_type(buffer))
|
|
{
|
|
case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
|
|
header_len+= FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
|
|
break;
|
|
case FIL_PAGE_PAGE_COMPRESSED:
|
|
header_len+= FIL_PAGE_COMP_METADATA_LEN;
|
|
break;
|
|
default:
|
|
compressed_len= static_cast<uint32_t>(srv_page_size);
|
|
return false;
|
|
}
|
|
|
|
compressed=true;
|
|
ut_a(len >= FIL_PAGE_DATA + FIL_PAGE_COMP_SIZE + 2);
|
|
compressed_len= mach_read_from_2(buffer + FIL_PAGE_DATA + FIL_PAGE_COMP_SIZE);
|
|
|
|
compressed_len+= header_len;
|
|
|
|
/* Align compressed length. TODO: Check if this is required. */
|
|
compressed_len= ut_calc_align(compressed_len, block_size);
|
|
return true;
|
|
}
|
|
|
|
int Clone_Handle::sparse_file_write(Clone_File_Meta *file_meta,
|
|
unsigned char *buffer, uint32_t len,
|
|
pfs_os_file_t file, uint64_t start_off) {
|
|
dberr_t err= DB_SUCCESS;
|
|
auto page_len= fil_space_t::physical_size(file_meta->m_fsp_flags);
|
|
|
|
/* Loop through all pages in current data block */
|
|
while (len >= page_len) {
|
|
bool full_crc32= fil_space_t::full_crc32(file_meta->m_fsp_flags);
|
|
uint32_t comp_len;
|
|
bool is_compressed= read_compressed_len(
|
|
buffer, len, full_crc32,
|
|
static_cast<uint32_t>(file_meta->m_fsblk_size), comp_len);
|
|
|
|
auto write_len= is_compressed ? comp_len : page_len;
|
|
|
|
/* Punch hole if needed */
|
|
bool first_page= (start_off == 0);
|
|
|
|
/* In rare case during file copy the page could be a torn page
|
|
and the size may not be correct. In such case the page is going to
|
|
be replaced later during page copy.*/
|
|
if (first_page || write_len > page_len)
|
|
write_len= page_len;
|
|
|
|
/* Write Data Page */
|
|
errno= 0;
|
|
err= os_file_write(IORequestWrite, "Clone data file", file,
|
|
reinterpret_cast<char *>(buffer), start_off,
|
|
(start_off == 0) ? page_len : write_len);
|
|
if (err != DB_SUCCESS)
|
|
{
|
|
char errbuf[MYSYS_STRERROR_SIZE];
|
|
my_error(ER_ERROR_ON_WRITE, MYF(0), file_meta->m_file_name, errno,
|
|
my_strerror(errbuf, sizeof(errbuf), errno));
|
|
|
|
return ER_ERROR_ON_WRITE;
|
|
}
|
|
|
|
os_offset_t offset= start_off + write_len;
|
|
os_offset_t hole_size= page_len - write_len;
|
|
|
|
if (file_meta->m_punch_hole && hole_size > 0)
|
|
{
|
|
err= os_file_punch_hole(file.m_file, offset, hole_size);
|
|
if (err != DB_SUCCESS)
|
|
{
|
|
/* Disable for whole file */
|
|
file_meta->m_punch_hole = false;
|
|
ut_ad(err == DB_IO_NO_PUNCH_HOLE);
|
|
ib::info()
|
|
<< "Innodb Clone Apply failed to punch hole: "
|
|
<< file_meta->m_file_name;
|
|
}
|
|
}
|
|
start_off+= page_len;
|
|
buffer+= page_len;
|
|
len-= page_len;
|
|
}
|
|
|
|
/* Must have consumed all data. */
|
|
ut_ad(err != DB_SUCCESS || len == 0);
|
|
return 0;
|
|
}
|
|
|
|
int Clone_Handle::modify_and_write(const Clone_Task *task, uint64_t offset,
|
|
unsigned char *buffer, uint32_t buf_len) {
|
|
ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
|
|
|
|
auto snapshot = m_clone_task_manager.get_snapshot();
|
|
auto file_meta = snapshot->get_file_by_index(task->m_current_file_index);
|
|
|
|
ut_ad(!file_meta->can_encrypt());
|
|
|
|
if (file_meta->m_punch_hole) {
|
|
auto err = sparse_file_write(file_meta, buffer, buf_len,
|
|
task->m_current_file_des, offset);
|
|
return err;
|
|
}
|
|
|
|
/* No more compression/encryption is needed. For redo/undo log files and
|
|
uncompressed tables, directly write to file */
|
|
errno = 0;
|
|
auto db_err =
|
|
os_file_write(IORequestWrite, "Clone data file", task->m_current_file_des,
|
|
reinterpret_cast<char *>(buffer), offset, buf_len);
|
|
if (db_err != DB_SUCCESS) {
|
|
char errbuf[MYSYS_STRERROR_SIZE];
|
|
my_error(ER_ERROR_ON_WRITE, MYF(0), file_meta->m_file_name, errno,
|
|
my_strerror(errbuf, sizeof(errbuf), errno));
|
|
|
|
return (ER_ERROR_ON_WRITE);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int Clone_Handle::receive_data(Clone_Task *task, uint64_t offset,
|
|
uint64_t file_size, uint32_t size,
|
|
Ha_clone_cbk *callback) {
|
|
ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
|
|
|
|
auto snapshot = m_clone_task_manager.get_snapshot();
|
|
|
|
auto file_ctx = snapshot->get_file_ctx_by_index(task->m_current_file_index);
|
|
auto file_meta = file_ctx->get_file_meta();
|
|
|
|
std::string file_name;
|
|
file_ctx->get_file_name(file_name);
|
|
|
|
/* If the file is deleted, then fetch the data and ignore. */
|
|
if (file_ctx->deleted()) {
|
|
unsigned char *data_buf = nullptr;
|
|
uint32_t data_len = 0;
|
|
callback->apply_buffer_cbk(data_buf, data_len);
|
|
|
|
std::string mesg("IGNORE DATA for DELETED FILE: ");
|
|
mesg.append(file_name);
|
|
mesg.append(" Space ID: ");
|
|
mesg.append(std::to_string(file_meta->m_space_id));
|
|
|
|
ib::info() << "Clone DDL APPLY: " << mesg;
|
|
return 0;
|
|
}
|
|
|
|
bool is_page_copy = (snapshot->get_state() == CLONE_SNAPSHOT_PAGE_COPY);
|
|
bool is_log_file = (snapshot->get_state() == CLONE_SNAPSHOT_REDO_COPY);
|
|
|
|
/* During page and redo copy, we encrypt the key in header page. */
|
|
bool key_page = (is_page_copy && offset == 0);
|
|
bool key_log = (is_log_file && file_meta->m_file_index == 0 && offset == 0);
|
|
|
|
if (key_page) {
|
|
/* Check and update file size for space header page */
|
|
if (file_meta->m_file_size < file_size) {
|
|
snapshot->update_file_size(task->m_current_file_index, file_size);
|
|
}
|
|
}
|
|
|
|
auto file_type = OS_CLONE_DATA_FILE;
|
|
|
|
if (is_log_file || is_page_copy ||
|
|
file_meta->m_space_id == UINT32_MAX ||
|
|
file_meta->m_punch_hole) {
|
|
file_type = OS_CLONE_LOG_FILE;
|
|
}
|
|
|
|
/* Open destination file for first block. */
|
|
if (task->m_current_file_des.m_file == OS_FILE_CLOSED) {
|
|
ut_ad(file_meta != nullptr);
|
|
|
|
File_init_cbk empty_cbk;
|
|
auto err = open_file(task, file_ctx, file_type, true, empty_cbk);
|
|
|
|
if (err != 0) {
|
|
/* Save error with file name. */
|
|
/* purecov: begin inspected */
|
|
m_clone_task_manager.set_error(err, file_name.c_str());
|
|
return (err);
|
|
/* purecov: end */
|
|
}
|
|
}
|
|
|
|
ut_ad(task->m_current_file_index == file_meta->m_file_index);
|
|
|
|
/* Copy data to current destination file using callback. */
|
|
char errbuf[MYSYS_STRERROR_SIZE];
|
|
|
|
auto file_hdl = task->m_current_file_des.m_file;
|
|
auto success = os_file_seek(nullptr, file_hdl, offset);
|
|
|
|
if (!success) {
|
|
/* purecov: begin inspected */
|
|
my_error(ER_ERROR_ON_READ, MYF(0), file_name.c_str(), errno,
|
|
my_strerror(errbuf, sizeof(errbuf), errno));
|
|
/* Save error with file name. */
|
|
m_clone_task_manager.set_error(ER_ERROR_ON_READ, file_name.c_str());
|
|
return (ER_ERROR_ON_READ);
|
|
/* purecov: end */
|
|
}
|
|
|
|
if (task->m_file_cache) {
|
|
callback->set_os_buffer_cache();
|
|
/* For data file recommend zero copy for cached IO. */
|
|
if (!is_log_file) {
|
|
callback->set_zero_copy();
|
|
}
|
|
}
|
|
|
|
callback->set_dest_name(file_meta->m_file_name);
|
|
|
|
bool modify_buffer = false;
|
|
|
|
/* In case of page compression we need to punch hole. */
|
|
if (file_meta->m_punch_hole) {
|
|
ut_ad(!is_log_file);
|
|
modify_buffer = true;
|
|
}
|
|
|
|
/* We need to encrypt the tablespace key by master key. */
|
|
if (file_meta->can_encrypt() && (key_page || key_log)) {
|
|
modify_buffer = true;
|
|
}
|
|
auto err = file_callback(callback, task, size, modify_buffer, offset
|
|
#ifdef UNIV_PFS_IO
|
|
, __FILE__, __LINE__
|
|
#endif /* UNIV_PFS_IO */
|
|
);
|
|
|
|
task->m_data_size += size;
|
|
|
|
if (err != 0) {
|
|
/* Save error with file name. */
|
|
/* purecov: begin inspected */
|
|
m_clone_task_manager.set_error(err, file_name.c_str());
|
|
/* purecov: end */
|
|
}
|
|
return (err);
|
|
}
|
|
|
|
int Clone_Handle::apply_data(Clone_Task *task, Ha_clone_cbk *callback) {
|
|
ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
|
|
|
|
/* Extract the data descriptor. */
|
|
uint desc_len = 0;
|
|
auto serial_desc = callback->get_data_desc(&desc_len);
|
|
|
|
Clone_Desc_Data data_desc;
|
|
auto success = data_desc.deserialize(serial_desc, desc_len);
|
|
|
|
if (!success) {
|
|
int err = ER_CLONE_PROTOCOL;
|
|
my_error(err, MYF(0), "Wrong Clone RPC: Invalid Data Descriptor");
|
|
ut_d(ut_error);
|
|
return err;
|
|
}
|
|
/* Identify the task for the current block of data. */
|
|
int err = 0;
|
|
auto task_meta = &data_desc.m_task_meta;
|
|
|
|
/* The data is from a different file. Close the current one. */
|
|
if (task->m_current_file_index != data_desc.m_file_index) {
|
|
err = close_file(task);
|
|
if (err != 0) {
|
|
return (err);
|
|
}
|
|
task->m_current_file_index = data_desc.m_file_index;
|
|
}
|
|
|
|
/* Receive data from callback and apply. */
|
|
err = receive_data(task, data_desc.m_file_offset, data_desc.m_file_size,
|
|
data_desc.m_data_len, callback);
|
|
|
|
/* Close file in case of error. */
|
|
if (err != 0) {
|
|
close_file(task);
|
|
} else {
|
|
err = m_clone_task_manager.set_chunk(task, task_meta);
|
|
}
|
|
|
|
return (err);
|
|
}
|
|
|
|
int Clone_Handle::apply(THD *, uint task_id, Ha_clone_cbk *callback) {
|
|
int err = 0;
|
|
uint desc_len = 0;
|
|
|
|
auto clone_desc = callback->get_data_desc(&desc_len);
|
|
ut_ad(clone_desc != nullptr);
|
|
|
|
Clone_Desc_Header header;
|
|
auto success = header.deserialize(clone_desc, desc_len);
|
|
|
|
if (!success) {
|
|
err = ER_CLONE_PROTOCOL;
|
|
my_error(err, MYF(0), "Wrong Clone RPC: Invalid Descriptor Header");
|
|
ut_d(ut_error);
|
|
return err;
|
|
}
|
|
|
|
/* Check the descriptor type in header and apply */
|
|
auto task = m_clone_task_manager.get_task_by_index(task_id);
|
|
|
|
switch (header.m_type) {
|
|
case CLONE_DESC_TASK_METADATA:
|
|
err = apply_task_metadata(task, callback);
|
|
break;
|
|
|
|
case CLONE_DESC_STATE:
|
|
err = apply_state_metadata(task, callback);
|
|
break;
|
|
|
|
case CLONE_DESC_FILE_METADATA:
|
|
err = apply_file_metadata(task, callback);
|
|
break;
|
|
|
|
case CLONE_DESC_DATA:
|
|
err = apply_data(task, callback);
|
|
break;
|
|
|
|
default:
|
|
ut_d(ut_error);
|
|
break;
|
|
}
|
|
|
|
if (err != 0) {
|
|
close_file(task);
|
|
}
|
|
|
|
return (err);
|
|
}
|
|
|
|
int Clone_Handle::restart_apply(THD *, const byte *&loc, uint &loc_len) {
|
|
auto init_loc = m_restart_loc;
|
|
auto init_len = m_restart_loc_len;
|
|
auto alloc_len = m_restart_loc_len;
|
|
|
|
/* Get latest locator */
|
|
loc = get_locator(loc_len);
|
|
|
|
m_clone_task_manager.reinit_apply_state(loc, loc_len, init_loc, init_len,
|
|
alloc_len);
|
|
|
|
/* Return the original locator if no state information */
|
|
if (init_loc == nullptr) {
|
|
return (0);
|
|
}
|
|
|
|
loc = init_loc;
|
|
loc_len = init_len;
|
|
|
|
/* Reset restart loc buffer if newly allocated */
|
|
if (alloc_len > m_restart_loc_len) {
|
|
m_restart_loc = init_loc;
|
|
m_restart_loc_len = alloc_len;
|
|
}
|
|
|
|
ut_ad(loc == m_restart_loc);
|
|
|
|
auto master_task = m_clone_task_manager.get_task_by_index(0);
|
|
|
|
auto err = close_file(master_task);
|
|
|
|
return (err);
|
|
}
|
|
|
|
void Clone_Snapshot::update_file_size(uint32_t file_index, uint64_t file_size) {
|
|
/* Update file size when file is extended during page copy */
|
|
ut_ad(m_snapshot_state == CLONE_SNAPSHOT_PAGE_COPY);
|
|
|
|
auto cur_file = get_file_by_index(file_index);
|
|
|
|
while (file_size > cur_file->m_file_size) {
|
|
++file_index;
|
|
|
|
if (file_index >= num_data_files()) {
|
|
/* Update file size for the last file. */
|
|
cur_file->m_file_size = file_size;
|
|
break;
|
|
}
|
|
|
|
auto next_file = get_file_by_index(file_index);
|
|
|
|
if (next_file->m_space_id != cur_file->m_space_id) {
|
|
/* Update file size for the last file. */
|
|
cur_file->m_file_size = file_size;
|
|
break;
|
|
}
|
|
|
|
/* Only system tablespace can have multiple nodes. */
|
|
ut_ad(cur_file->m_space_id == 0);
|
|
|
|
file_size -= cur_file->m_file_size;
|
|
cur_file = next_file;
|
|
}
|
|
}
|
|
|
|
int Clone_Snapshot::init_apply_state(Clone_Desc_State *state_desc) {
|
|
Mysql_mutex_guard guard(&m_snapshot_mutex);
|
|
|
|
set_state_info(state_desc);
|
|
int err = 0;
|
|
|
|
switch (m_snapshot_state) {
|
|
case CLONE_SNAPSHOT_FILE_COPY:
|
|
ib::info() << "Clone Apply State FILE COPY: ";
|
|
break;
|
|
|
|
case CLONE_SNAPSHOT_PAGE_COPY:
|
|
ib::info() << "Clone Apply State PAGE COPY: ";
|
|
break;
|
|
|
|
case CLONE_SNAPSHOT_REDO_COPY:
|
|
ib::info() << "Clone Apply State REDO COPY: ";
|
|
break;
|
|
|
|
case CLONE_SNAPSHOT_DONE:
|
|
/* Extend and flush data files. */
|
|
ib::info() << "Clone Apply State FLUSH DATA: ";
|
|
err = extend_and_flush_files(false);
|
|
if (err != 0) {
|
|
ib::info()
|
|
<< "Clone Apply FLUSH DATA failed code: " << err;
|
|
break;
|
|
}
|
|
/* Flush redo files. */
|
|
ib::info() << "Clone Apply State FLUSH REDO: ";
|
|
err = extend_and_flush_files(true);
|
|
if (err != 0) {
|
|
ib::info()
|
|
<< "Clone Apply FLUSH REDO failed code: " << err;
|
|
break;
|
|
}
|
|
ib::info() << "Clone Apply State DONE";
|
|
break;
|
|
|
|
case CLONE_SNAPSHOT_NONE:
|
|
case CLONE_SNAPSHOT_INIT:
|
|
default:
|
|
err = ER_INTERNAL_ERROR;
|
|
my_error(err, MYF(0), "Innodb Clone Snapshot Invalid state");
|
|
ut_d(ut_error);
|
|
break;
|
|
}
|
|
return (err);
|
|
}
|
|
|
|
int Clone_Snapshot::extend_and_flush_files(bool flush_redo) {
|
|
auto &file_vector = (flush_redo) ? m_redo_file_vector : m_data_file_vector;
|
|
|
|
for (auto file_ctx : file_vector) {
|
|
if (file_ctx->deleted()) {
|
|
ut_ad(file_ctx->m_state.load() == Clone_file_ctx::State::DROPPED_HANDLED);
|
|
continue;
|
|
}
|
|
char errbuf[MYSYS_STRERROR_SIZE];
|
|
bool success = true;
|
|
auto file_meta = file_ctx->get_file_meta();
|
|
|
|
std::string file_name;
|
|
file_ctx->get_file_name(file_name);
|
|
|
|
auto file = os_file_create(
|
|
innodb_clone_file_key, file_name.c_str(), OS_FILE_OPEN,
|
|
flush_redo ? OS_CLONE_LOG_FILE : OS_CLONE_DATA_FILE, false, &success);
|
|
|
|
if (!success) {
|
|
/* purecov: begin inspected */
|
|
my_error(ER_CANT_OPEN_FILE, MYF(0), file_name.c_str(), errno,
|
|
my_strerror(errbuf, sizeof(errbuf), errno));
|
|
|
|
return (ER_CANT_OPEN_FILE);
|
|
/* purecov: end */
|
|
}
|
|
|
|
auto file_size = os_file_get_size(file);
|
|
|
|
size_t aligned_size = 0;
|
|
/* If file size is not aligned to extent size, recovery handling has
|
|
some issues. This work around eliminates dependency with that. */
|
|
if (file_meta->m_fsp_flags != ULINT32_UNDEFINED) {
|
|
auto page_size= fil_space_t::physical_size(file_meta->m_fsp_flags);
|
|
auto extent_size= page_size * FSP_EXTENT_SIZE;
|
|
/* Skip extending files smaller than one extent. */
|
|
if (file_size > extent_size) {
|
|
aligned_size = ut_uint64_align_up(file_size, extent_size);
|
|
}
|
|
}
|
|
|
|
if (file_size < file_meta->m_file_size) {
|
|
success = os_file_set_size(file_name.c_str(), file,
|
|
file_meta->m_file_size);
|
|
} else if (file_size < aligned_size) {
|
|
success = os_file_set_size(file_name.c_str(), file, aligned_size);
|
|
} else {
|
|
success = os_file_flush(file);
|
|
}
|
|
|
|
os_file_close(file);
|
|
|
|
if (!success) {
|
|
/* purecov: begin inspected */
|
|
my_error(ER_ERROR_ON_WRITE, MYF(0), file_name.c_str(), errno,
|
|
my_strerror(errbuf, sizeof(errbuf), errno));
|
|
|
|
return (ER_ERROR_ON_WRITE);
|
|
/* purecov: end */
|
|
}
|
|
}
|
|
return (0);
|
|
}
|