mariadb/storage/innobase/clone/clone0copy.cc
mariadb-DebarunBanerjee 2ab1b0fd7f Clone - Key flow part-I
2025-02-19 15:12:52 +05:30

1648 lines
48 KiB
C++

/*****************************************************************************
Copyright (c) 2017, 2024, Oracle and/or its affiliates.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License, version 2.0, as published by the
Free Software Foundation.
This program is designed to work with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have either included with
the program or referenced in the documentation.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*****************************************************************************/
/** @file clone/clone0copy.cc
Innodb copy snapshot data
*******************************************************/
#include "buf0dump.h"
#include "clone0clone.h"
#include "dict0dict.h"
#include "fsp0sysspace.h"
#include "log.h"
#include "clone_handler.h"
#include "handler.h"
#include "mysqld.h"
#include "srv0start.h"
#include "trx0sys.h"
/** Callback to add an archived redo file to current snapshot
@param[in] file_name file name
@param[in] file_size file size in bytes
@param[in] file_offset start offset in bytes
@param[in] context snapshot
@return error code */
static int add_redo_file_callback(char *file_name, uint64_t file_size,
uint64_t file_offset, void *context) {
auto snapshot= static_cast<Clone_Snapshot *>(context);
auto err= snapshot->add_redo_file(file_name, file_size, file_offset);
return err;
}
/** Callback to add tracked page IDs to current snapshot
@param[in] context snapshot
@param[in] buff buffer having page IDs
@param[in] num_pages number of tracked pages
@return error code */
static int add_page_callback(void *context, byte *buff, uint num_pages) {
uint index;
Clone_Snapshot *snapshot;
space_id_t space_id;
uint32_t page_num;
snapshot = static_cast<Clone_Snapshot *>(context);
/* Extract the page Ids from the buffer. */
for (index = 0; index < num_pages; index++) {
space_id = mach_read_from_4(buff);
buff += 4;
page_num = mach_read_from_4(buff);
buff += 4;
auto err = snapshot->add_page(space_id, page_num);
if (err != 0) {
return (err);
}
}
return (0);
}
int Clone_Snapshot::add_buf_pool_file() {
char path[OS_FILE_MAX_PATH];
/* Generate the file name. */
buf_dump_generate_path(path, sizeof(path));
os_file_type_t type;
bool exists= false;
bool ret= os_file_status(path, &exists, &type);
if (!ret || !exists)
return 0;
auto file_size = os_file_get_size(path);
auto size_bytes = file_size.m_total_size;
/* Check for error */
if (size_bytes == static_cast<os_offset_t>(~0)) {
char errbuf[MYSYS_STRERROR_SIZE];
my_error(ER_CANT_OPEN_FILE, MYF(0), path, errno,
my_strerror(errbuf, sizeof(errbuf), errno));
return ER_CANT_OPEN_FILE;
}
/* Always the first file in list */
ut_ad(num_data_files() == 0);
m_data_bytes_disk += size_bytes;
m_monitor.add_estimate(size_bytes);
return add_file(path, size_bytes, size_bytes, nullptr, false);
}
int Clone_Snapshot::init_redo_archiving() {
ut_ad(m_snapshot_type != HA_CLONE_BLOCKING);
if (m_snapshot_type == HA_CLONE_BLOCKING) {
/* We are not supposed to start redo archiving in this mode. */
m_redo_file_size = m_redo_header_size = m_redo_trailer_size = 0;
return ER_INTERNAL_ERROR; /* purecov: inspected */
}
/* If not blocking clone, allocate redo header and trailer buffer. */
m_redo_ctx.get_header_size(m_redo_header_size, m_redo_trailer_size);
m_redo_header = static_cast<byte *>(mem_heap_zalloc(
m_snapshot_heap,
m_redo_header_size + m_redo_trailer_size + OS_FILE_LOG_BLOCK_SIZE));
if (m_redo_header == nullptr) {
/* purecov: begin inspected */
my_error(ER_OUTOFMEMORY, MYF(0), m_redo_header_size + m_redo_trailer_size);
return ER_OUTOFMEMORY;
/* purecov: end */
}
m_redo_header =
static_cast<byte *>(ut_align(m_redo_header, OS_FILE_LOG_BLOCK_SIZE));
m_redo_trailer = m_redo_header + m_redo_header_size;
/* Start Redo Archiving */
const int err = m_redo_ctx.start(m_redo_header, m_redo_header_size);
if (err != 0) {
m_redo_file_size = 0;
return err; /* purecov: inspected */
}
m_redo_file_size = uint64_t{m_redo_ctx.get_archived_file_size()};
ut_ad(m_redo_file_size >= LOG_FILE_MIN_SIZE);
if (m_redo_file_size < LOG_FILE_MIN_SIZE) {
my_error(ER_INTERNAL_ERROR, MYF(0));
return ER_INTERNAL_ERROR; /* purecov: inspected */
}
return 0;
}
#ifdef UNIV_DEBUG
void Clone_Snapshot::debug_wait_state_transit() {
mysql_mutex_assert_owner(&m_snapshot_mutex);
/* Allow DDL to enter and check. */
mysql_mutex_unlock(&m_snapshot_mutex);
DEBUG_SYNC_C("clone_state_transit_file_copy");
mysql_mutex_lock(&m_snapshot_mutex);
}
#endif /* UNIV_DEBUG */
int Clone_Snapshot::init_file_copy(Snapshot_State new_state) {
ut_ad(m_snapshot_handle_type == CLONE_HDL_COPY);
State_transit transit_guard(this, new_state);
int err = transit_guard.get_error();
if (err != 0) {
return err; /* purecov: inspected */
}
ut_d(debug_wait_state_transit());
m_monitor.init_state(srv_stage_clone_file_copy.m_key, m_enable_pfs);
if (m_snapshot_type == HA_CLONE_BLOCKING) {
/* In HA_CLONE_BLOCKING mode we treat redo files as usual files.
We need to clear these special treatment not to count them twice. */
m_redo_file_size = m_redo_header_size = m_redo_trailer_size = 0;
} else if (m_snapshot_type == HA_CLONE_REDO) {
err = init_redo_archiving();
} else if (m_snapshot_type == HA_CLONE_HYBRID ||
m_snapshot_type == HA_CLONE_PAGE) {
/* Start modified Page ID Archiving */
err = m_page_ctx.start(false, nullptr);
} else {
ut_ad(m_snapshot_type == HA_CLONE_BLOCKING);
err = ER_INTERNAL_ERROR;
}
if (err != 0) {
return err; /* purecov: inspected */
}
/* Initialize estimation about on disk bytes. */
init_disk_estimate();
/* Add buffer pool dump file. Always the first one in the list. */
err = add_buf_pool_file();
if (err != 0) {
return err; /* purecov: inspected */
}
/* Iterate all tablespace files and add persistent data files. */
auto error = Fil_iterator::for_each_file(
[&](fil_node_t *file) { return (add_node(file, false)); });
if (error != DB_SUCCESS) {
return ER_INTERNAL_ERROR; /* purecov: inspected */
}
ib::info()
<< "Clone State FILE COPY : " << m_num_current_chunks << " chunks, "
<< " chunk size : " << (chunk_size() * UNIV_PAGE_SIZE) / (1024 * 1024)
<< " M";
m_monitor.change_phase();
return 0;
}
int Clone_Snapshot::init_page_copy(Snapshot_State new_state, byte *page_buffer,
uint page_buffer_len) {
ut_ad(m_snapshot_handle_type == CLONE_HDL_COPY);
State_transit transit_guard(this, new_state);
int err = transit_guard.get_error();
if (err != 0) {
return err; /* purecov: inspected */
}
m_monitor.init_state(srv_stage_clone_page_copy.m_key, m_enable_pfs);
if (m_snapshot_type == HA_CLONE_HYBRID) {
/* Start Redo Archiving */
err = init_redo_archiving();
} else if (m_snapshot_type == HA_CLONE_PAGE) {
/* Start COW for all modified pages - Not implemented. */
ut_d(ut_error);
} else {
ut_d(ut_error);
}
if (err != 0) {
/* purecov: begin inspected */
m_page_ctx.release();
return err;
/* purecov: end */
}
/* Stop modified page archiving. */
err = m_page_ctx.stop(nullptr);
DEBUG_SYNC_C("clone_stop_page_archiving_without_releasing");
if (err != 0) {
/* purecov: begin inspected */
m_page_ctx.release();
return err;
/* purecov: end */
}
/* Iterate all tablespace files and add new data files created. */
auto error = Fil_iterator::for_each_file(
[&](fil_node_t *file) { return add_node(file, true); });
if (error != DB_SUCCESS) {
return ER_INTERNAL_ERROR; /* purecov: inspected */
}
/* Collect modified page Ids from Page Archiver. */
void *context;
uint aligned_size;
context = static_cast<void *>(this);
/* Check pages added for encryption. */
auto num_pages_encryption = m_page_set.size();
if (num_pages_encryption > 0) {
m_monitor.add_estimate(num_pages_encryption * UNIV_PAGE_SIZE);
}
err = m_page_ctx.get_pages(add_page_callback, context, page_buffer,
page_buffer_len);
m_page_vector.assign(m_page_set.begin(), m_page_set.end());
aligned_size = ut_calc_align(m_num_pages, chunk_size());
m_num_current_chunks = aligned_size >> m_chunk_size_pow2;
ib::info()
<< "Clone State PAGE COPY : " << m_num_pages << " pages, "
<< m_num_duplicate_pages << " duplicate pages, " << m_num_current_chunks
<< " chunks, "
<< " chunk size : " << (chunk_size() * UNIV_PAGE_SIZE) / (1024 * 1024)
<< " M";
m_page_ctx.release();
m_monitor.change_phase();
return err;
}
int Clone_Snapshot::init_redo_copy(Snapshot_State new_state,
Clone_Alert_Func cbk) {
ut_ad(m_snapshot_handle_type == CLONE_HDL_COPY);
ut_ad(m_snapshot_type != HA_CLONE_BLOCKING);
/* Block external XA operations. XA prepare commit and rollback operations
are first logged to binlog and added to global gtid_executed before doing
operation in SE. Without blocking, we might persist such GTIDs from global
gtid_executed before the operations are persisted in Innodb. */
int binlog_error= 0;
auto thd= current_thd;
Clone_handler::XA_Block xa_block_guard(thd);
if (xa_block_guard.failed()) {
if (thd_killed(thd)) {
my_error(ER_QUERY_INTERRUPTED, MYF(0));
binlog_error = ER_QUERY_INTERRUPTED;
} else {
my_error(ER_INTERNAL_ERROR, MYF(0),
"Clone wait for XA operation timed out.");
binlog_error = ER_INTERNAL_ERROR;
ut_d(ut_error);
}
}
/* Before stopping redo log archiving synchronize with binlog and GTID. At
this point a transaction can commit only in the order they are written to
binary log. We have ensure this by forcing ordered commit and waiting for
all unordered transactions to finish. */
/* TODO: Binary log position needs to be synchronized across all SEs and we
need to do it in SE agnostic way after acquiring commit lock. Skip poring
binary log synchronization in Innodb. */
// if (binlog_error == 0) {
// binlog_error = synchronize_binlog_gtid(cbk);
// }
/* Save all dynamic metadata to DD buffer table and let all future operations
to save data immediately after generation. This makes clone recovery
independent of dynamic metadata stored in redo log and avoids generating
redo log during recovery. */
/* Dynamic metadata not there for MariaDB */
// dict_persist_t::Enable_immediate dyn_metadata_guard(dict_persist);
/* Use it only for local clone. For remote clone, donor session is different
from the sessions created within mtr test case. */
DEBUG_SYNC_C("clone_donor_after_saving_dynamic_metadata");
/* Start transition to next state. */
State_transit transit_guard(this, new_state);
/* Stop redo archiving even on error. */
auto redo_error = m_redo_ctx.stop(m_redo_trailer, m_redo_trailer_size,
m_redo_trailer_offset);
if (binlog_error != 0) {
return binlog_error; /* purecov: inspected */
}
if (redo_error != 0) {
return redo_error; /* purecov: inspected */
}
int transit_error = transit_guard.get_error();
if (transit_error != 0) {
return transit_error; /* purecov: inspected */
}
m_monitor.init_state(srv_stage_clone_redo_copy.m_key, m_enable_pfs);
/* Iterate all tablespace files and add new data files created. */
auto error = Fil_iterator::for_each_file(
[&](fil_node_t *file) { return add_node(file, true); });
if (error != DB_SUCCESS) {
return ER_INTERNAL_ERROR; /* purecov: inspected */
}
/* Collect archived redo log files from Log Archiver. */
auto context = static_cast<void *>(this);
redo_error = m_redo_ctx.get_files(add_redo_file_callback, context);
/* Add another chunk for the redo log header. */
++m_num_redo_chunks;
m_monitor.add_estimate(m_redo_header_size);
/* Add another chunk for the redo log trailer. */
++m_num_redo_chunks;
if (m_redo_trailer_size != 0) {
m_monitor.add_estimate(m_redo_trailer_size);
}
m_num_current_chunks = m_num_redo_chunks;
ib::info()
<< "Clone State REDO COPY : " << m_num_current_chunks << " chunks, "
<< " chunk size : " << (chunk_size() * UNIV_PAGE_SIZE) / (1024 * 1024)
<< " M";
m_monitor.change_phase();
return redo_error;
}
bool Clone_Snapshot::build_file_name(Clone_File_Meta *file_meta,
const char *file_name) {
size_t new_len = strlen(file_name) + 1;
auto new_name = const_cast<char *>(file_meta->m_file_name);
/* Check if allocation required. */
if (new_len <= file_meta->m_file_name_alloc_len) {
strcpy(new_name, file_name);
file_meta->m_file_name_len = new_len;
return true;
}
if (new_len > FN_REFLEN_SE) {
/* purecov: begin deadcode */
my_error(ER_PATH_LENGTH, MYF(0), "CLONE FILE NAME");
ut_d(ut_error);
return false;
/* purecov: end */
}
size_t alloc_len = new_len;
/* For reallocation, allocate in multiple of base size to avoid frequent
allocation by rename DDL. */
if (file_meta->m_file_name_alloc_len > 0) {
alloc_len = ut_calc_align(new_len, S_FILE_NAME_BASE_LEN);
}
new_name = static_cast<char *>(mem_heap_zalloc(m_snapshot_heap, alloc_len));
if (new_name == nullptr) {
/* purecov: begin inspected */
my_error(ER_OUTOFMEMORY, MYF(0), static_cast<int>(alloc_len));
return false;
/* purecov: end */
}
strcpy(new_name, file_name);
file_meta->m_file_name = new_name;
file_meta->m_file_name_len = new_len;
file_meta->m_file_name_alloc_len = alloc_len;
return true;
}
Clone_file_ctx *Clone_Snapshot::build_file(const char *file_name,
uint64_t file_size,
uint64_t file_offset,
uint &num_chunks) {
/* Allocate for file metadata from snapshot heap. */
uint64_t aligned_size = sizeof(Clone_file_ctx);
auto file_ctx = static_cast<Clone_file_ctx *>(
mem_heap_alloc(m_snapshot_heap, aligned_size));
if (file_ctx == nullptr) {
/* purecov: begin inspected */
my_error(ER_OUTOFMEMORY, MYF(0), static_cast<int>(aligned_size));
return nullptr;
/* purecov: end */
}
file_ctx->init(Clone_file_ctx::Extension::NONE);
auto file_meta = file_ctx->get_file_meta();
/* For redo file with no data, add dummy entry. */
if (file_name == nullptr) {
num_chunks = 1;
file_meta->m_begin_chunk = 1;
file_meta->m_end_chunk = 1;
return file_ctx;
}
file_meta->m_file_size = file_size;
/* reduce offset amount from total size */
ut_ad(file_size >= file_offset);
file_size -= file_offset;
/* Calculate and set chunk parameters. */
uint64_t size_in_pages = ut_uint64_align_up(file_size, UNIV_PAGE_SIZE);
size_in_pages /= UNIV_PAGE_SIZE;
aligned_size = ut_uint64_align_up(size_in_pages, chunk_size());
num_chunks = static_cast<uint>(aligned_size >> m_chunk_size_pow2);
file_meta->m_begin_chunk = m_num_current_chunks + 1;
file_meta->m_end_chunk = m_num_current_chunks + num_chunks;
bool success = build_file_name(file_meta, file_name);
return success ? file_ctx : nullptr;
}
bool Clone_Snapshot::file_ctx_changed(const fil_node_t *node,
Clone_file_ctx *&file_ctx) {
file_ctx = nullptr;
auto space = node->space;
auto count = m_data_file_map.count(space->id);
/* This is a new file after clone has started. */
if (count == 0) {
return true;
}
auto file_index = m_data_file_map[space->id];
if (file_index == 0) {
/* purecov: begin deadcode */
ut_d(ut_error);
return true;
/* purecov: end */
}
/* File descriptor already exists. */
--file_index;
auto num_data_files = m_data_file_vector.size();
ut_ad(file_index < num_data_files);
if (file_index < num_data_files) {
file_ctx = m_data_file_vector[file_index];
}
if (file_ctx == nullptr) {
/* purecov: begin deadcode */
ut_d(ut_error);
return false;
/* purecov: end */
}
ut_ad(!file_ctx->modifying());
bool file_changed =
file_ctx->m_state.load() != Clone_file_ctx::State::CREATED;
/* Consider file modification only from previous state. */
if (file_changed && file_ctx->by_ddl(get_state())) {
return true;
}
/* The file is not modified in previous state. Next we consider
encryption or compression type changes. The information is not
useful for "redo copy" state. Such changes would be applied
during redo recovery. */
if (get_state() == CLONE_SNAPSHOT_REDO_COPY) {
return false;
}
const auto file_meta = file_ctx->get_file_meta_read();
/* Check if encryption property has changed. */
if (file_meta->can_encrypt() != space->is_encrypted())
return true;
/* Check if compression property has changed. */
if (file_meta->can_compress() != space->is_compressed())
return true;
return false;
}
int Clone_Snapshot::add_file(const char *name, uint64_t size_bytes,
uint64_t alloc_bytes, fil_node_t *node,
bool by_ddl) {
ut_ad(m_snapshot_handle_type == CLONE_HDL_COPY);
Clone_file_ctx *file_ctx = nullptr;
/* Check if ddl has modified this space. */
if (by_ddl && !file_ctx_changed(node, file_ctx)) {
return 0;
}
if (file_ctx == nullptr) {
uint32_t num_chunks = 0;
/* Build file metadata entry and add to data file vector. */
file_ctx = build_file(name, size_bytes, 0, num_chunks);
if (file_ctx == nullptr) {
return (ER_OUTOFMEMORY); /* purecov: inspected */
}
auto file_meta = file_ctx->get_file_meta();
file_meta->m_alloc_size = alloc_bytes;
file_meta->m_file_index = static_cast<uint32_t>(num_data_files());
m_data_file_vector.push_back(file_ctx);
if (!by_ddl) {
/* Update total number of chunks. */
m_num_data_chunks += num_chunks;
m_num_current_chunks = m_num_data_chunks;
}
}
/* All done if not a space file node like buffer pool dump file. */
if (node == nullptr) {
return 0;
}
auto file_meta = file_ctx->get_file_meta();
/* Update maximum file name length in snapshot. */
if (file_meta->m_file_name_len > m_max_file_name_len) {
m_max_file_name_len = static_cast<uint32_t>(file_meta->m_file_name_len);
}
ut_ad(file_meta->m_deleted == file_ctx->deleted());
ut_ad(file_meta->m_renamed == file_ctx->renamed());
if (by_ddl) {
file_ctx->set_ddl(get_state());
/* Rebuild file name for renamed descriptor. */
if (file_ctx->renamed()) {
build_file_name(file_meta, name);
}
}
/* Set space ID, compression and encryption attribute */
auto space = node->space;
file_meta->m_space_id = space->id;
file_meta->m_is_compressed= space->is_compressed();
file_meta->m_fsp_flags = static_cast<uint32_t>(space->flags);
file_meta->m_punch_hole = node->punch_hole;
file_meta->m_fsblk_size = node->block_size;
file_meta->m_is_encrypted= space->is_encrypted();
/* TOD0: File metadata: Encryption information */
// file_meta->m_encryption_metadata = space->m_encryption_metadata;
/* Modify file meta encryption flag if space encryption or decryption
already started. This would allow clone to send pages accordingly
during page copy and persist the flag. */
/* TODO: Handle Encrypt/Un-Encrypt DDL via notification infrastructure. */
// if (space->encryption_op_in_progress == Encryption::Progress::DECRYPTION) {
// fsp_flags_unset_encryption(file_meta->m_fsp_flags);
// } else if (space->encryption_op_in_progress ==
// Encryption::Progress::ENCRYPTION) {
// fsp_flags_set_encryption(file_meta->m_fsp_flags);
// }
bool is_redo_copy = (get_state() == CLONE_SNAPSHOT_REDO_COPY);
if (file_meta->can_encrypt()) {
/* All encrypted files created during PAGE_COPY state have their keys redo
logged. The recovered keys from redo log are encrypted by donor master key
and cannot be used is recipient. We send the keys explicitly in this case
to be encrypted by recipient with its own master key.
NOTE: encrypted files created during FILE_COPY state don't have this issue
as page-0 is always sent during PAGE_COPY with unencrypted data file key.
We always check for SSL connection before sending keys for encrypted tables
and error out for security. */
if (is_redo_copy && by_ddl) {
file_meta->m_transfer_encryption_key = true;
}
}
/* Add to hash map only for first node of the tablespace. */
auto space_id = file_meta->m_space_id;
if (m_data_file_map[space_id] == 0) {
m_data_file_map[space_id] = file_meta->m_file_index + 1;
}
return 0;
}
dberr_t Clone_Snapshot::add_node(fil_node_t *node, bool by_ddl) {
ut_ad(m_snapshot_handle_type == CLONE_HDL_COPY);
auto space = node->space;
bool is_page_copy = (get_state() == CLONE_SNAPSHOT_PAGE_COPY);
if (by_ddl && is_page_copy && space->is_encrypted()) {
/* Add page 0 always for encrypted tablespace. */
Clone_Page page_zero;
page_zero.m_space_id = space->id;
page_zero.m_page_no = 0;
m_page_set.insert(page_zero);
++m_num_pages;
}
/* For compressed pages the file size doesn't match
physical page size multiplied by number of pages. It is
because we use UNIV_PAGE_SIZE while creating the node
and tablespace. */
auto file_size = os_file_get_size(node->name);
auto size_bytes = file_size.m_total_size;
auto alloc_size = file_size.m_alloc_size;
/* Check for error */
if (size_bytes == static_cast<os_offset_t>(~0)) {
char errbuf[MYSYS_STRERROR_SIZE];
my_error(ER_CANT_OPEN_FILE, MYF(0), node->name, errno,
my_strerror(errbuf, sizeof(errbuf), errno));
return (DB_ERROR);
}
/* Update estimation */
if (!by_ddl) {
m_data_bytes_disk += alloc_size;
m_monitor.add_estimate(size_bytes);
}
/* Add file to snapshot. */
auto err = add_file(node->name, size_bytes, alloc_size, node, by_ddl);
return (err != 0 ? DB_ERROR : DB_SUCCESS);
}
int Clone_Snapshot::add_page(space_id_t space_id, uint32_t page_num) {
/* Skip pages belonging to tablespace not included for clone. This could
be some left over pages from drop or truncate in buffer pool which
would eventually get removed. Or it may be a page for an undo tablespace
that was deleted with BUF_REMOVE_NONE. */
auto count = m_data_file_map.count(space_id);
if (count == 0) {
return (0);
}
Clone_Page cur_page;
cur_page.m_space_id = space_id;
cur_page.m_page_no = page_num;
auto result = m_page_set.insert(cur_page);
if (result.second) {
m_num_pages++;
m_monitor.add_estimate(UNIV_PAGE_SIZE);
} else {
m_num_duplicate_pages++;
}
return (0);
}
int Clone_Snapshot::add_redo_file(char *file_name, uint64_t file_size,
uint64_t file_offset) {
ut_ad(m_snapshot_handle_type == CLONE_HDL_COPY);
uint num_chunks;
/* Build redo file metadata and add to redo vector. */
auto file_ctx = build_file(file_name, file_size, file_offset, num_chunks);
if (file_ctx == nullptr) {
return (ER_OUTOFMEMORY);
}
auto file_meta = file_ctx->get_file_meta();
m_monitor.add_estimate(file_meta->m_file_size - file_offset);
/* Set the start offset for first redo file. This could happen
if redo archiving was already in progress, possibly by another
concurrent snapshot. */
if (num_redo_files() == 0) {
m_redo_start_offset = file_offset;
} else {
ut_ad(file_offset == 0);
}
file_meta->m_alloc_size = 0;
file_meta->m_space_id= SRV_SPACE_ID_UPPER_BOUND;
file_meta->m_is_compressed= false;
/* TOD0: File metadata: Encryption information */
// file_meta->m_encryption_metadata = log_sys->m_encryption_metadata;
file_meta->m_fsp_flags = ULINT32_UNDEFINED;
file_meta->m_punch_hole = false;
file_meta->m_fsblk_size = 0;
file_meta->m_file_index = static_cast<uint32_t>(num_redo_files());
m_redo_file_vector.push_back(file_ctx);
m_num_redo_chunks += num_chunks;
m_num_current_chunks = m_num_redo_chunks;
/* In rare case of small redo file, large concurrent DMLs and
slow data transfer. Currently we support maximum 1k redo files. */
if (num_redo_files() > LOG_FILE_MAX_NUM) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"More than %zu archived redo files. Please retry clone.",
LOG_FILE_MAX_NUM);
return ER_INTERNAL_ERROR;
}
return (0);
}
int Clone_Handle::send_task_metadata(Clone_Task *task, Ha_clone_cbk *callback) {
Clone_Desc_Task_Meta task_desc;
ut_ad(m_clone_handle_type == CLONE_HDL_COPY);
/* Build task descriptor with metadata */
task_desc.init_header(get_version());
task_desc.m_task_meta = task->m_task_meta;
auto desc_len = task->m_alloc_len;
task_desc.serialize(task->m_serial_desc, desc_len, nullptr);
callback->set_data_desc(task->m_serial_desc, desc_len);
callback->clear_flags();
callback->set_ack();
auto err = callback->buffer_cbk(nullptr, 0);
return (err);
}
int Clone_Handle::send_keep_alive(Clone_Task *task, Ha_clone_cbk *callback) {
ut_ad(m_clone_handle_type == CLONE_HDL_COPY);
Clone_Desc_State state_desc;
state_desc.init_header(get_version());
/* Build state descriptor from snapshot and task */
auto snapshot = m_clone_task_manager.get_snapshot();
snapshot->get_state_info(false, &state_desc);
state_desc.m_is_ack = true;
auto task_meta = &task->m_task_meta;
state_desc.m_task_index = task_meta->m_task_index;
auto desc_len = task->m_alloc_len;
state_desc.serialize(task->m_serial_desc, desc_len, nullptr);
callback->set_data_desc(task->m_serial_desc, desc_len);
callback->clear_flags();
auto err = callback->buffer_cbk(nullptr, 0);
return (err);
}
int Clone_Handle::send_state_metadata(Clone_Task *task, Ha_clone_cbk *callback,
bool is_start) {
ut_ad(m_clone_handle_type == CLONE_HDL_COPY);
/* Before starting state, check and send any new metadata added by DDL. */
if (is_start) {
auto err = send_all_ddl_metadata(task, callback);
if (err != 0) {
return err; /* purecov: inspected */
}
}
Clone_Desc_State state_desc;
state_desc.init_header(get_version());
/* Build state descriptor from snapshot and task */
auto snapshot = m_clone_task_manager.get_snapshot();
/* Master needs to send estimation while beginning state */
auto get_estimate = (task->m_is_master && is_start);
snapshot->get_state_info(get_estimate, &state_desc);
/* Indicate if it is the end of state */
state_desc.m_is_start = is_start;
/* Check if remote has already acknowledged state transfer */
if (!is_start && task->m_is_master &&
!m_clone_task_manager.check_ack(&state_desc)) {
ut_ad(task->m_is_master);
ut_ad(m_clone_task_manager.is_restarted());
ib::info()
<< "CLONE COPY: Skip ACK after restart for state "
<< state_desc.m_state;
return (0);
}
auto task_meta = &task->m_task_meta;
state_desc.m_task_index = task_meta->m_task_index;
auto desc_len = task->m_alloc_len;
state_desc.serialize(task->m_serial_desc, desc_len, nullptr);
callback->set_data_desc(task->m_serial_desc, desc_len);
callback->clear_flags();
callback->set_ack();
auto err = callback->buffer_cbk(nullptr, 0);
if (err != 0) {
return (err);
}
if (is_start) {
/* Send all file metadata while starting state */
err = send_all_file_metadata(task, callback);
} else {
/* Wait for ACK while finishing state */
err = m_clone_task_manager.wait_ack(this, task, callback);
}
return (err);
}
int Clone_Handle::send_all_file_metadata(Clone_Task *task,
Ha_clone_cbk *callback) {
ut_ad(m_clone_handle_type == CLONE_HDL_COPY);
if (!task->m_is_master) {
return 0;
}
DEBUG_SYNC_C("clone_before_init_meta");
auto snapshot = m_clone_task_manager.get_snapshot();
bool is_redo = (snapshot->get_state() == CLONE_SNAPSHOT_REDO_COPY);
/* Send all file metadata for data/redo files */
auto err = snapshot->iterate_files([&](Clone_file_ctx *file_ctx) {
auto file_meta = file_ctx->get_file_meta();
/* While sending initial metadata, reset the DDL state so that
recipient could create the files as new file. */
Clone_File_Meta local_meta = *file_meta;
local_meta.reset_ddl();
auto err_file = send_file_metadata(task, &local_meta, is_redo, callback);
return err_file;
});
return err;
}
int Clone_Handle::send_file_metadata(Clone_Task *task,
const Clone_File_Meta *file_meta,
bool is_redo, Ha_clone_cbk *callback) {
ut_ad(m_clone_handle_type == CLONE_HDL_COPY);
auto snapshot = m_clone_task_manager.get_snapshot();
Clone_Desc_File_MetaData file_desc;
file_desc.m_file_meta = *file_meta;
file_desc.m_state = snapshot->get_state();
if (is_redo)
{
/* For Redo log always send the fixed redo file size. */
file_desc.m_file_meta.m_file_size = snapshot->get_redo_file_size();
file_desc.m_file_meta.m_file_name = nullptr;
file_desc.m_file_meta.m_file_name_len = 0;
file_desc.m_file_meta.m_file_name_alloc_len = 0;
}
else if (file_meta->m_space_id == UINT32_MAX)
{
/* Server buffer dump file ib_buffer_pool. */
ut_ad(file_desc.m_state == CLONE_SNAPSHOT_FILE_COPY);
ut_ad(file_meta->m_file_index == 0);
file_desc.m_file_meta.m_file_name = SRV_BUF_DUMP_FILENAME_DEFAULT;
file_desc.m_file_meta.m_file_name_len =
static_cast<uint32_t>(strlen(SRV_BUF_DUMP_FILENAME_DEFAULT)) + 1;
file_desc.m_file_meta.m_file_name_alloc_len = 0;
}
else if (file_meta->m_space_id == TRX_SYS_SPACE
|| srv_is_undo_tablespace(file_meta->m_space_id))
{
/* For system tablespace, remove path. */
auto name_ptr = strrchr(file_meta->m_file_name, OS_PATH_SEPARATOR);
if (name_ptr != nullptr)
{
name_ptr++;
file_desc.m_file_meta.m_file_name = name_ptr;
file_desc.m_file_meta.m_file_name_len =
static_cast<uint32_t>(strlen(name_ptr)) + 1;
}
file_desc.m_file_meta.m_file_name_alloc_len = 0;
}
file_desc.init_header(get_version());
auto desc_len = task->m_alloc_len;
file_desc.serialize(task->m_serial_desc, desc_len, nullptr);
callback->set_data_desc(task->m_serial_desc, desc_len);
callback->clear_flags();
/* Check for secure transfer for encrypted table. */
if (file_meta->can_encrypt() || srv_encrypt_tables || srv_encrypt_log)
callback->set_secure();
auto err = callback->buffer_cbk(nullptr, 0);
return (err);
}
int Clone_Handle::send_data(Clone_Task *task, const Clone_file_ctx *file_ctx,
uint64_t offset, byte *buffer, uint32_t size,
uint64_t new_file_size, Ha_clone_cbk *callback) {
ut_ad(m_clone_handle_type == CLONE_HDL_COPY);
auto snapshot = m_clone_task_manager.get_snapshot();
auto file_meta = file_ctx->get_file_meta_read();
/* Build data descriptor */
Clone_Desc_Data data_desc;
data_desc.init_header(get_version());
data_desc.m_state = snapshot->get_state();
data_desc.m_task_meta = task->m_task_meta;
data_desc.m_file_index = file_meta->m_file_index;
data_desc.m_data_len = size;
data_desc.m_file_offset = offset;
data_desc.m_file_size = file_meta->m_file_size;
/* Adjust file size to extend automatically while copying page 0. */
if (new_file_size > data_desc.m_file_size) {
ut_ad(snapshot->get_state() == CLONE_SNAPSHOT_PAGE_COPY);
ut_ad(offset == 0);
data_desc.m_file_size = new_file_size;
}
/* Serialize data descriptor and set in callback */
auto desc_len = task->m_alloc_len;
data_desc.serialize(task->m_serial_desc, desc_len, nullptr);
callback->set_data_desc(task->m_serial_desc, desc_len);
callback->clear_flags();
auto file_type = OS_CLONE_DATA_FILE;
bool is_log_file = (data_desc.m_state == CLONE_SNAPSHOT_REDO_COPY);
if (is_log_file || file_meta->m_space_id == UINT32_MAX) {
file_type = OS_CLONE_LOG_FILE;
}
int err = 0;
if (buffer != nullptr) {
/* Send data from buffer. */
err = callback->buffer_cbk(buffer, size);
} else {
/* Send data from file. */
if (task->m_current_file_des.m_file == OS_FILE_CLOSED) {
File_init_cbk empty_cbk;
err = open_file(task, file_ctx, file_type, false, empty_cbk);
if (err != 0) {
return (err);
}
}
ut_ad(task->m_current_file_index == file_meta->m_file_index);
os_file_t file_hdl;
char errbuf[MYSYS_STRERROR_SIZE];
file_hdl = task->m_current_file_des.m_file;
auto success = os_file_seek(nullptr, file_hdl, offset);
if (!success) {
my_error(ER_ERROR_ON_READ, MYF(0), file_meta->m_file_name, errno,
my_strerror(errbuf, sizeof(errbuf), errno));
return (ER_ERROR_ON_READ);
}
if (task->m_file_cache) {
callback->set_os_buffer_cache();
/* For data file recommend zero copy for cached IO. */
if (!is_log_file) {
callback->set_zero_copy();
}
}
callback->set_source_name(file_meta->m_file_name);
err = file_callback(callback, task, size, false, offset
#ifdef UNIV_PFS_IO
, __FILE__, __LINE__
#endif /* UNIV_PFS_IO */
);
}
task->m_data_size += size;
return (err);
}
void Clone_Handle::display_progress(
uint32_t cur_chunk, uint32_t max_chunk, uint32_t &percent_done,
std::chrono::steady_clock::time_point &disp_time) {
auto current_time = std::chrono::steady_clock::now();
auto current_percent = (cur_chunk * 100) / max_chunk;
if (current_percent >= percent_done + 20 ||
(current_time - disp_time > std::chrono::seconds{5} &&
current_percent > percent_done)) {
percent_done = current_percent;
disp_time = current_time;
ib::info()
<< "Stage progress: " << percent_done << "% completed.";
}
}
int Clone_Handle::copy(uint task_id, Ha_clone_cbk *callback) {
ut_ad(m_clone_handle_type == CLONE_HDL_COPY);
/* Get task from task manager. */
auto task = m_clone_task_manager.get_task_by_index(task_id);
auto err = m_clone_task_manager.alloc_buffer(task);
if (err != 0) {
return (err);
}
/* Allow restart only after copy is started. */
m_allow_restart = true;
/* Send the task metadata. */
err = send_task_metadata(task, callback);
if (err != 0) {
return (err);
}
auto send_matadata = m_clone_task_manager.is_restart_metadata(task);
/* Send state metadata to remote during restart */
if (send_matadata) {
ut_ad(task->m_is_master);
ut_ad(m_clone_task_manager.is_restarted());
err = send_state_metadata(task, callback, true);
/* Send all file metadata during restart */
} else if (task->m_is_master &&
m_clone_task_manager.get_state() == CLONE_SNAPSHOT_FILE_COPY &&
!m_clone_task_manager.is_file_metadata_transferred()) {
ut_ad(m_clone_task_manager.is_restarted());
err = send_all_file_metadata(task, callback);
}
if (err != 0) {
return (err);
}
/* Adjust block size based on client buffer size. */
auto snapshot = m_clone_task_manager.get_snapshot();
snapshot->update_block_size(callback->get_client_buffer_size());
auto max_chunks = snapshot->get_num_chunks();
/* Set time values for tracking stage progress. */
auto disp_time = std::chrono::steady_clock::now();
/* Loop and process data until snapshot is moved to DONE state. */
uint32_t percent_done = 0;
while (m_clone_task_manager.get_state() != CLONE_SNAPSHOT_DONE) {
/* Reserve next chunk for current state from snapshot. */
uint32_t current_chunk = 0;
uint32_t current_block = 0;
err = m_clone_task_manager.reserve_next_chunk(task, current_chunk,
current_block);
if (err != 0) {
break;
}
if (current_chunk != 0) {
/* Send blocks from the reserved chunk. */
err = process_chunk(task, current_chunk, current_block, callback);
/* Display stage progress based on % completion. */
if (task->m_is_master) {
display_progress(current_chunk, max_chunks, percent_done, disp_time);
}
} else {
/* No more chunks in current state. Transit to next state. */
/* Close the last open file before proceeding to next state */
err = close_and_unpin_file(task);
if (err != 0) {
break;
}
/* Inform that the data transfer for current state
is over before moving to next state. The remote
needs to send back state transfer ACK for the state
transfer to complete. */
err = send_state_metadata(task, callback, false);
if (err != 0) {
break;
}
/* Next state is decided by snapshot for Copy. */
err = move_to_next_state(task, callback, nullptr);
ut_d(task->m_ignore_sync = false);
if (err != 0) {
break;
}
max_chunks = snapshot->get_num_chunks();
percent_done = 0;
disp_time = std::chrono::steady_clock::now();
/* Send state metadata before processing chunks. */
err = send_state_metadata(task, callback, true);
}
if (err != 0) {
break;
}
}
/* Close the last open file. */
auto err2 = close_and_unpin_file(task);
if (err == 0) {
err = err2;
}
return (err);
}
int Clone_Handle::process_chunk(Clone_Task *task, uint32_t chunk_num,
uint32_t block_num, Ha_clone_cbk *callback) {
ut_ad(m_clone_handle_type == CLONE_HDL_COPY);
auto &task_meta = task->m_task_meta;
/* If chunks are in increasing order, optimize file
search by index */
uint32_t file_index_hint = 0;
if (task_meta.m_chunk_num <= chunk_num) {
file_index_hint = task->m_current_file_index;
}
auto state = m_clone_task_manager.get_state();
bool is_page_copy = (state == CLONE_SNAPSHOT_PAGE_COPY);
bool is_redo_copy = (state == CLONE_SNAPSHOT_REDO_COPY);
/* Except for page copy, file remains same for all blocks of a chunk. */
auto snapshot = m_clone_task_manager.get_snapshot();
const auto *file_ctx =
snapshot->get_file_ctx(chunk_num, block_num, file_index_hint);
const Clone_File_Meta *file_meta = nullptr;
/* For page copy, file context is null if current chunk is over. */
ut_ad(is_page_copy || file_ctx != nullptr);
if (file_ctx != nullptr) {
file_meta = file_ctx->get_file_meta_read();
}
/* Loop over all the blocks of current chunk and send data. */
int err = 0;
uint32_t pin_loop_count = 0;
uint32_t unpin_count = snapshot->get_max_blocks_pin();
while (err == 0) {
/* For page copy same chunk might have pages from different files. */
if (is_page_copy) {
file_ctx = snapshot->get_file_ctx(chunk_num, block_num, 0);
if (file_ctx == nullptr) {
/* Already handled all pages. */
break;
}
file_meta = file_ctx->get_file_meta_read();
}
bool pins_other;
bool pins_current;
std::tie(pins_current, pins_other) = pins_file(task, file_ctx);
/* Let any waiting DDL file operation to proceed after handling a
set of data blocks. */
bool allow_ddl =
snapshot->blocks_clone(file_ctx) && (pin_loop_count >= unpin_count);
if (pins_other || (pins_current && allow_ddl)) {
err = close_and_unpin_file(task);
if (err != 0) {
break; /* purecov: inspected */
}
pin_loop_count = 0;
}
bool delete_action = false;
auto mutable_ctx = const_cast<Clone_file_ctx *>(file_ctx);
err = check_and_pin_file(task, mutable_ctx, delete_action);
if (err != 0) {
break; /* purecov: inspected */
}
++pin_loop_count;
/* Check if file is already deleted. */
if (file_ctx->deleted()) {
/* One task get to handle the deleted file chunks. Other tasks can ignore
the chunks of a deleted file */
if (delete_action) {
auto delete_file_meta = *file_meta;
delete_file_meta.set_deleted_chunk(chunk_num);
err = send_file_metadata(task, &delete_file_meta, false, callback);
}
auto err2 = close_and_unpin_file(task);
if (err == 0) {
err = err2;
}
if (err != 0) {
break; /* purecov: inspected */
}
/* Send deleted block, to check recipient can handle it. It otherwise is
hit only in rare concurrency case. */
DBUG_EXECUTE_IF("clone_send_deleted_block", {
if (delete_action && is_page_copy) {
auto data_buf = task->m_current_buffer;
send_data(task, file_ctx, 0, data_buf, UNIV_PAGE_SIZE, 0, callback);
}
});
snapshot->skip_deleted_blocks(chunk_num, block_num);
/* No more blocks in current chunk. */
if (block_num == 0) {
break;
}
/* Only in page copy state we can have more blocks belonging to
another tablespace. */
ut_ad(is_page_copy);
continue;
}
/* Get next block from snapshot */
auto data_buf = task->m_current_buffer;
auto data_size = task->m_buffer_alloc_len;
uint64_t data_offset = 0;
uint64_t file_size = 0;
err = snapshot->get_next_block(chunk_num, block_num, file_ctx, data_offset,
data_buf, data_size, file_size);
file_meta = file_ctx->get_file_meta_read();
/* '0' block number indicates no more blocks. */
if (err != 0 || block_num == 0) {
break;
}
/* Check for error from other tasks and DDL */
err = m_clone_task_manager.handle_error_other_task(task->m_has_thd);
if (err != 0) {
break;
}
task->m_task_meta.m_block_num = block_num;
task->m_task_meta.m_chunk_num = chunk_num;
/* During redo copy, worker could be ahead of master and needs to
send the metadata */
if (is_redo_copy && !pins_current) {
err = send_file_metadata(task, file_meta, true, callback);
if (err != 0) {
break;
}
}
/* For remote clone, donor clone threads cannot be controlled
by debug sync and hence need to release PIN after every page so as
to fire concurrent DDLs after blocking clone in apply. In file copy
state we cannot release the PIN as we could be reading the file
while sending data after this point. */
DBUG_EXECUTE_IF("remote_release_clone_file_pin", {
if (is_page_copy) {
close_and_unpin_file(task);
}
};);
if (data_size != 0) {
err = send_data(task, file_ctx, data_offset, data_buf, data_size,
file_size, callback);
}
}
/* Save current error and file name. */
if (err != 0) {
m_clone_task_manager.set_error(err, file_meta->m_file_name);
}
return (err);
}
int Clone_Handle::restart_copy(THD *thd, const byte *loc, uint loc_len) {
mysql_mutex_assert_owner(clone_sys->get_mutex());
if (is_abort()) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"Innodb Clone Restart failed, existing clone aborted");
return (ER_INTERNAL_ERROR);
}
/* Wait for the Idle state */
if (!is_idle()) {
/* Sleep for 1 second */
Clone_Msec sleep_time(Clone_Sec(1));
/* Generate alert message every 5 seconds. */
Clone_Sec alert_time(5);
/* Wait for 30 seconds for server to reach idle state. */
Clone_Sec time_out(30);
bool is_timeout = false;
auto err = Clone_Sys::wait(
sleep_time, time_out, alert_time,
[&](bool alert, bool &result) {
mysql_mutex_assert_owner(clone_sys->get_mutex());
result = !is_idle();
if (thd_killed(thd)) {
my_error(ER_QUERY_INTERRUPTED, MYF(0));
return (ER_QUERY_INTERRUPTED);
} else if (is_abort()) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"Innodb Clone Restart failed, existing clone aborted");
return (ER_INTERNAL_ERROR);
} else if (Clone_Sys::s_clone_sys_state == CLONE_SYS_ABORT) {
my_error(ER_CLONE_DDL_IN_PROGRESS, MYF(0));
return (ER_CLONE_DDL_IN_PROGRESS);
}
if (result && alert) {
ib::info() << "Clone Master Restart "
"wait for idle state";
}
return (0);
},
clone_sys->get_mutex(), is_timeout);
if (err != 0) {
return (err);
} else if (is_timeout) {
ib::info()
<< "Clone Master restart wait for idle timed out";
my_error(ER_INTERNAL_ERROR, MYF(0),
"Clone restart wait for idle state timed out");
return (ER_INTERNAL_ERROR);
}
}
ut_ad(is_idle());
m_clone_task_manager.reinit_copy_state(loc, loc_len);
set_state(CLONE_STATE_ACTIVE);
return (0);
}
int Clone_Handle::check_and_pin_file(Clone_Task *task, Clone_file_ctx *file_ctx,
bool &handle_deleted) {
bool pin_current;
bool pin_other;
std::tie(pin_current, pin_other) = pins_file(task, file_ctx);
handle_deleted = false;
/* Nothing to do, the file is already pinned. */
if (pin_current) {
ut_ad(task->m_pinned_file);
return 0;
}
/* If pinning any other file, release it. */
if (pin_other) {
/* purecov: begin inspected */
auto err2 = close_and_unpin_file(task);
if (err2 != 0) {
return err2;
}
/* purecov: end */
}
auto snapshot = m_clone_task_manager.get_snapshot();
auto err = snapshot->pin_file(file_ctx, handle_deleted);
if (err == 0) {
task->m_pinned_file = true;
const auto file_meta = file_ctx->get_file_meta_read();
task->m_current_file_index = file_meta->m_file_index;
}
return err;
}
int Clone_Handle::close_and_unpin_file(Clone_Task *task) {
/* Close open file if there. */
auto err = close_file(task);
/* Task doesn't hold any pin. */
if (!task->m_pinned_file) {
return err;
}
auto snapshot = m_clone_task_manager.get_snapshot();
auto file_ctx = snapshot->get_file_ctx_by_index(task->m_current_file_index);
if (file_ctx == nullptr) {
/* purecov: begin deadcode */
err = ER_INTERNAL_ERROR;
my_error(ER_INTERNAL_ERROR, MYF(0), "Clone file missing before unpin");
ut_d(ut_error);
return err;
/* purecov: end */
}
snapshot->unpin_file(file_ctx);
task->m_pinned_file = false;
task->m_current_file_index = 0;
return err;
}
std::tuple<bool, bool> Clone_Handle::pins_file(const Clone_Task *task,
const Clone_file_ctx *file_ctx) {
/* Task doesn't hold any pin. */
if (!task->m_pinned_file) {
return std::make_tuple(false, false);
}
const auto file_meta = file_ctx->get_file_meta_read();
/* Task pins input file. */
if (task->m_current_file_index == file_meta->m_file_index) {
return std::make_tuple(true, false);
}
/* Task pins other file. */
return std::make_tuple(false, true);
}
int Clone_Handle::send_all_ddl_metadata(Clone_Task *task,
Ha_clone_cbk *callback) {
ut_ad(m_clone_handle_type == CLONE_HDL_COPY);
if (!task->m_is_master) {
return 0;
}
auto state = m_clone_task_manager.get_state();
/* Send DDL metadata added during 'file copy' and 'page copy' in the
beginning of next stage. */
if (state != CLONE_SNAPSHOT_PAGE_COPY && state != CLONE_SNAPSHOT_REDO_COPY) {
return 0;
}
ut_d(m_clone_task_manager.debug_wait_ddl_meta());
auto snapshot = m_clone_task_manager.get_snapshot();
/* Send all file metadata for data/redo files */
auto err = snapshot->iterate_data_files([&](Clone_file_ctx *file_ctx) {
/* Skip entries if not modified by ddl in previous state. */
if (!file_ctx->by_ddl(state)) {
return 0;
}
auto file_meta = file_ctx->get_file_meta();
auto err_file = send_file_metadata(task, file_meta, false, callback);
return err_file;
});
return err;
}