mirror of
https://github.com/MariaDB/server.git
synced 2025-07-22 03:08:18 +02:00
3071 lines
80 KiB
C++
3071 lines
80 KiB
C++
/*****************************************************************************
|
|
|
|
Copyright (c) 2017, 2024, Oracle and/or its affiliates.
|
|
|
|
This program is free software; you can redistribute it and/or modify it under
|
|
the terms of the GNU General Public License, version 2.0, as published by the
|
|
Free Software Foundation.
|
|
|
|
This program is designed to work with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have either included with
|
|
the program or referenced in the documentation.
|
|
|
|
This program is distributed in the hope that it will be useful, but WITHOUT
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
|
|
for more details.
|
|
|
|
You should have received a copy of the GNU General Public License along with
|
|
this program; if not, write to the Free Software Foundation, Inc.,
|
|
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
*****************************************************************************/
|
|
|
|
/** @file arch/arch0page.cc
|
|
Innodb implementation for page archive
|
|
|
|
*******************************************************/
|
|
|
|
#include "arch0page.h"
|
|
#include "arch0recv.h"
|
|
#include "clone0clone.h"
|
|
#include "log0log.h"
|
|
#include "srv0start.h"
|
|
#include "srv0mon.h"
|
|
|
|
#include "log.h"
|
|
#include "sql_class.h"
|
|
|
|
#ifdef UNIV_DEBUG
|
|
/** Archived page file default size in number of blocks. */
|
|
uint ARCH_PAGE_FILE_CAPACITY=
|
|
(ARCH_PAGE_BLK_SIZE - ARCH_PAGE_BLK_HEADER_LENGTH) / ARCH_BLK_PAGE_ID_SIZE;
|
|
|
|
/** Archived page data file size (without header) in number of blocks. */
|
|
uint ARCH_PAGE_FILE_DATA_CAPACITY=
|
|
ARCH_PAGE_FILE_CAPACITY - ARCH_PAGE_FILE_NUM_RESET_PAGE;
|
|
#endif
|
|
|
|
void Arch_Reset_File::init()
|
|
{
|
|
m_file_index= 0;
|
|
m_lsn= LSN_MAX;
|
|
m_start_point.clear();
|
|
}
|
|
|
|
Arch_File_Ctx Arch_Group::s_dblwr_file_ctx;
|
|
|
|
Arch_Group::~Arch_Group()
|
|
{
|
|
ut_ad(!m_is_active);
|
|
|
|
m_file_ctx.close();
|
|
|
|
if (m_active_file.m_file != OS_FILE_CLOSED)
|
|
os_file_close(m_active_file);
|
|
|
|
if (m_durable_file.m_file != OS_FILE_CLOSED)
|
|
os_file_close(m_durable_file);
|
|
|
|
if (m_active_file_name != nullptr)
|
|
ut_free(m_active_file_name);
|
|
|
|
if (m_durable_file_name != nullptr)
|
|
ut_free(m_durable_file_name);
|
|
|
|
if (!is_durable())
|
|
m_file_ctx.delete_files(m_begin_lsn);
|
|
}
|
|
|
|
dberr_t Arch_Group::write_to_doublewrite_file(Arch_File_Ctx *from_file,
|
|
byte *from_buffer,
|
|
uint write_size,
|
|
Arch_Page_Dblwr_Offset offset)
|
|
{
|
|
dberr_t err= DB_SUCCESS;
|
|
|
|
ut_ad(!s_dblwr_file_ctx.is_closed());
|
|
|
|
switch (offset)
|
|
{
|
|
case ARCH_PAGE_DBLWR_RESET_PAGE:
|
|
DBUG_EXECUTE_IF("crash_before_reset_block_dblwr_flush", DBUG_SUICIDE(););
|
|
break;
|
|
|
|
case ARCH_PAGE_DBLWR_PARTIAL_FLUSH_PAGE:
|
|
DBUG_EXECUTE_IF("crash_before_partial_block_dblwr_flush",
|
|
DBUG_SUICIDE(););
|
|
break;
|
|
|
|
case ARCH_PAGE_DBLWR_FULL_FLUSH_PAGE:
|
|
DBUG_EXECUTE_IF("crash_before_full_block_dblwr_flush", DBUG_SUICIDE(););
|
|
break;
|
|
}
|
|
|
|
err= s_dblwr_file_ctx.write(from_file, from_buffer,
|
|
offset * ARCH_PAGE_BLK_SIZE, write_size);
|
|
if (err != DB_SUCCESS)
|
|
return (err);
|
|
|
|
s_dblwr_file_ctx.flush();
|
|
|
|
return err;
|
|
}
|
|
|
|
dberr_t Arch_Group::init_dblwr_file_ctx(const char *path, const char *base_file,
|
|
uint num_files, uint64_t file_size)
|
|
{
|
|
auto err= s_dblwr_file_ctx.init(ARCH_DIR, path, base_file, num_files);
|
|
|
|
if (err != DB_SUCCESS)
|
|
{
|
|
ut_ad(s_dblwr_file_ctx.get_phy_size() == file_size);
|
|
return (err);
|
|
}
|
|
|
|
err= s_dblwr_file_ctx.open(false, LSN_MAX, 0, 0, 0);
|
|
|
|
if (err != DB_SUCCESS)
|
|
return (err);
|
|
|
|
return s_dblwr_file_ctx.resize_and_overwrite_with_zeros(file_size);
|
|
}
|
|
|
|
dberr_t Arch_Group::build_active_file_name()
|
|
{
|
|
char dir_name[MAX_ARCH_DIR_NAME_LEN];
|
|
auto length = MAX_ARCH_DIR_NAME_LEN + 1 + MAX_ARCH_PAGE_FILE_NAME_LEN + 1;
|
|
|
|
if (m_active_file_name != nullptr)
|
|
return DB_SUCCESS;
|
|
|
|
m_active_file_name= static_cast<char *>(ut_malloc(length, mem_key_archive));
|
|
|
|
if (m_active_file_name == nullptr)
|
|
return DB_OUT_OF_MEMORY;
|
|
|
|
get_dir_name(dir_name, MAX_ARCH_DIR_NAME_LEN);
|
|
|
|
snprintf(m_active_file_name, length, "%s%c%s", dir_name, OS_PATH_SEPARATOR,
|
|
ARCH_PAGE_GROUP_ACTIVE_FILE_NAME);
|
|
|
|
return DB_SUCCESS;
|
|
}
|
|
|
|
dberr_t Arch_Group::build_durable_file_name()
|
|
{
|
|
char dir_name[MAX_ARCH_DIR_NAME_LEN];
|
|
auto length = MAX_ARCH_DIR_NAME_LEN + 1 + MAX_ARCH_PAGE_FILE_NAME_LEN + 1;
|
|
|
|
if (m_durable_file_name != nullptr)
|
|
return DB_SUCCESS;
|
|
|
|
m_durable_file_name= static_cast<char *>(ut_malloc(length, mem_key_archive));
|
|
|
|
if (m_durable_file_name == nullptr)
|
|
return DB_OUT_OF_MEMORY;
|
|
|
|
get_dir_name(dir_name, MAX_ARCH_DIR_NAME_LEN);
|
|
|
|
snprintf(m_durable_file_name, length, "%s%c%s", dir_name, OS_PATH_SEPARATOR,
|
|
ARCH_PAGE_GROUP_DURABLE_FILE_NAME);
|
|
|
|
return DB_SUCCESS;
|
|
}
|
|
|
|
int Arch_Group::mark_active()
|
|
{
|
|
dberr_t db_err= build_active_file_name();
|
|
|
|
if (db_err != DB_SUCCESS)
|
|
return ER_OUTOFMEMORY;
|
|
|
|
os_file_create_t option;
|
|
os_file_type_t type;
|
|
|
|
bool success;
|
|
bool exists;
|
|
|
|
success= os_file_status(m_active_file_name, &exists, &type);
|
|
|
|
if (!success)
|
|
return ER_CANT_OPEN_FILE;
|
|
|
|
ut_ad(!exists);
|
|
option= OS_FILE_CREATE;
|
|
|
|
ut_ad(m_active_file.m_file == OS_FILE_CLOSED);
|
|
|
|
/* In case of a failure, we would use the error from os_file_create. */
|
|
std::ignore= os_file_create_subdirs_if_needed(m_active_file_name);
|
|
|
|
m_active_file= os_file_create(innodb_arch_file_key, m_active_file_name,
|
|
option, OS_CLONE_LOG_FILE, false, &success);
|
|
|
|
int err= (success ? 0 : ER_CANT_OPEN_FILE);
|
|
return err;
|
|
}
|
|
|
|
int Arch_Group::mark_durable()
|
|
{
|
|
dberr_t db_err= build_durable_file_name();
|
|
|
|
if (db_err != DB_SUCCESS)
|
|
return ER_OUTOFMEMORY;
|
|
|
|
os_file_create_t option;
|
|
os_file_type_t type;
|
|
|
|
bool success;
|
|
bool exists;
|
|
|
|
success= os_file_status(m_durable_file_name, &exists, &type);
|
|
|
|
if (exists)
|
|
return 0;
|
|
|
|
if (!success)
|
|
return ER_CANT_OPEN_FILE;
|
|
|
|
option= OS_FILE_CREATE;
|
|
|
|
/* In case of a failure, we would use the error from os_file_create. */
|
|
std::ignore= os_file_create_subdirs_if_needed(m_durable_file_name);
|
|
|
|
ut_ad(m_durable_file.m_file == OS_FILE_CLOSED);
|
|
|
|
m_durable_file= os_file_create(innodb_arch_file_key, m_durable_file_name,
|
|
option, OS_CLONE_LOG_FILE, false, &success);
|
|
|
|
int err= (success ? 0 : ER_CANT_OPEN_FILE);
|
|
return err;
|
|
}
|
|
|
|
int Arch_Group::mark_inactive()
|
|
{
|
|
os_file_type_t type;
|
|
|
|
bool success;
|
|
bool exists;
|
|
|
|
dberr_t db_err;
|
|
|
|
db_err= build_active_file_name();
|
|
|
|
if (db_err != DB_SUCCESS)
|
|
return ER_OUTOFMEMORY;
|
|
|
|
success= os_file_status(m_active_file_name, &exists, &type);
|
|
|
|
if (!success)
|
|
return ER_CANT_OPEN_FILE;
|
|
|
|
if (!exists)
|
|
return 0;
|
|
|
|
if (m_active_file.m_file != OS_FILE_CLOSED)
|
|
{
|
|
os_file_close(m_active_file);
|
|
m_active_file.m_file = OS_FILE_CLOSED;
|
|
}
|
|
|
|
success= os_file_delete(innodb_arch_file_key, m_active_file_name);
|
|
|
|
int err= (success ? 0 : ER_CANT_OPEN_FILE);
|
|
return err;
|
|
}
|
|
|
|
dberr_t Arch_Group::write_file_header(byte *from_buffer, uint length)
|
|
{
|
|
dberr_t err;
|
|
|
|
ut_ad(!m_file_ctx.is_closed());
|
|
|
|
/* Write to the doublewrite buffer before writing to the actual file */
|
|
Arch_Group::write_to_doublewrite_file(nullptr, from_buffer, length,
|
|
ARCH_PAGE_DBLWR_RESET_PAGE);
|
|
|
|
DBUG_EXECUTE_IF("crash_after_reset_block_dblwr_flush", DBUG_SUICIDE(););
|
|
|
|
err= m_file_ctx.write(nullptr, from_buffer, 0, length);
|
|
|
|
if (err == DB_SUCCESS)
|
|
/* Flush the file to make sure the changes are made persistent as there
|
|
would be no way to recover the data otherwise in case of a crash. */
|
|
m_file_ctx.flush();
|
|
|
|
return err;
|
|
}
|
|
|
|
dberr_t Arch_Group::open_file(Arch_Page_Pos write_pos, bool create_new)
|
|
{
|
|
dberr_t err;
|
|
|
|
ut_d(auto count = get_file_count());
|
|
ut_ad(count > 0);
|
|
|
|
ut_a(m_file_size == ARCH_PAGE_BLK_SIZE * ARCH_PAGE_FILE_CAPACITY);
|
|
|
|
if (!create_new)
|
|
{
|
|
auto block_num= write_pos.m_block_num;
|
|
uint file_index= Arch_Block::get_file_index(block_num, ARCH_DATA_BLOCK);
|
|
auto offset= static_cast<uint>(
|
|
Arch_Block::get_file_offset(block_num, ARCH_DATA_BLOCK));
|
|
|
|
ut_ad(file_index + 1 == count);
|
|
|
|
err= m_file_ctx.open(false, m_begin_lsn, file_index, offset, m_file_size);
|
|
}
|
|
else
|
|
err= m_file_ctx.open_new(m_begin_lsn, m_file_size, m_header_len);
|
|
|
|
return err;
|
|
}
|
|
|
|
void Arch_File_Ctx::update_stop_point(uint file_index, lsn_t stop_lsn)
|
|
{
|
|
auto last_point_index= m_stop_points.size() - 1;
|
|
|
|
if (!m_stop_points.size() || last_point_index != file_index)
|
|
m_stop_points.push_back(stop_lsn);
|
|
else
|
|
m_stop_points[last_point_index] = stop_lsn;
|
|
}
|
|
|
|
void Arch_File_Ctx::save_reset_point_in_mem(lsn_t lsn, Arch_Page_Pos pos)
|
|
{
|
|
uint current_file_index=
|
|
Arch_Block::get_file_index(pos.m_block_num, ARCH_DATA_BLOCK);
|
|
|
|
Arch_Point reset_point;
|
|
reset_point.lsn= lsn;
|
|
reset_point.pos= pos;
|
|
|
|
Arch_Reset_File reset_file;
|
|
|
|
if (m_reset.size())
|
|
{
|
|
reset_file= m_reset.back();
|
|
|
|
if (reset_file.m_file_index == current_file_index)
|
|
{
|
|
reset_file.m_start_point.push_back(reset_point);
|
|
m_reset[m_reset.size() - 1]= reset_file;
|
|
return;
|
|
}
|
|
}
|
|
/* Reset info maintained in a new file. */
|
|
reset_file.init();
|
|
reset_file.m_file_index= current_file_index;
|
|
reset_file.m_lsn= lsn;
|
|
reset_file.m_start_point.push_back(reset_point);
|
|
m_reset.push_back(reset_file);
|
|
}
|
|
|
|
bool Arch_File_Ctx::find_reset_point(lsn_t check_lsn, Arch_Point &reset_point)
|
|
{
|
|
if (!m_reset.size())
|
|
return false;
|
|
|
|
Arch_Reset_File file_reset_compare;
|
|
file_reset_compare.m_lsn= check_lsn;
|
|
|
|
/* Finds the file which has the element that is >= to check_lsn */
|
|
auto reset_it = std::lower_bound(
|
|
m_reset.begin(), m_reset.end(), file_reset_compare,
|
|
[](const Arch_Reset_File &lhs, const Arch_Reset_File &rhs)
|
|
{
|
|
return (lhs.m_lsn < rhs.m_lsn);
|
|
});
|
|
|
|
if (reset_it != m_reset.end() && reset_it->m_lsn == check_lsn)
|
|
{
|
|
reset_point= reset_it->m_start_point.front();
|
|
return true;
|
|
}
|
|
|
|
if (reset_it == m_reset.begin())
|
|
return false;
|
|
|
|
/* The element that is less than check_lsn, which we're interested in,
|
|
will be in the previous position. */
|
|
--reset_it;
|
|
ut_ad(reset_it->m_lsn < check_lsn);
|
|
|
|
auto reset_file= *reset_it;
|
|
auto reset_start_point= reset_file.m_start_point;
|
|
|
|
Arch_Point reset_point_compare;
|
|
reset_point_compare.lsn= check_lsn;
|
|
|
|
/* Find the first start point whose lsn is >= to check_lsn. */
|
|
auto reset_point_it= std::lower_bound(
|
|
reset_start_point.begin(), reset_start_point.end(), reset_point_compare,
|
|
[](const Arch_Point &lhs, const Arch_Point &rhs)
|
|
{
|
|
return (lhs.lsn < rhs.lsn);
|
|
});
|
|
|
|
if (reset_point_it == reset_start_point.end() ||
|
|
reset_point_it->lsn != check_lsn)
|
|
{
|
|
ut_ad(reset_point_it != reset_start_point.begin());
|
|
--reset_point_it;
|
|
}
|
|
|
|
reset_point= *reset_point_it;
|
|
return true;
|
|
}
|
|
|
|
dberr_t Arch_File_Ctx::write(Arch_File_Ctx *from_file, byte *from_buffer,
|
|
uint offset, uint size)
|
|
{
|
|
dberr_t err;
|
|
|
|
ut_ad(offset + size <= m_size);
|
|
ut_ad(!is_closed());
|
|
|
|
if (from_buffer == nullptr)
|
|
{
|
|
ut_ad(offset + size <= from_file->get_size());
|
|
ut_ad(!from_file->is_closed());
|
|
|
|
err= os_file_copy(from_file->m_file, offset, m_file, offset, size);
|
|
} else
|
|
err= os_file_write(IORequestWrite, "Page Track File", m_file, from_buffer,
|
|
offset, size);
|
|
return err;
|
|
}
|
|
|
|
bool Arch_File_Ctx::find_stop_point(Arch_Group *group, lsn_t check_lsn,
|
|
Arch_Point &stop_point,
|
|
Arch_Page_Pos last_pos)
|
|
{
|
|
stop_point.lsn= LSN_MAX;
|
|
stop_point.pos.init();
|
|
auto arch_page_sys= arch_sys->page_sys();
|
|
|
|
arch_page_sys->arch_oper_mutex_enter();
|
|
|
|
if (!m_stop_points.size())
|
|
{
|
|
arch_page_sys->arch_oper_mutex_exit();
|
|
return false;
|
|
}
|
|
ut_ad(m_stop_points.back() <= arch_page_sys->get_latest_stop_lsn());
|
|
|
|
/* 1. Find the file where the block we need to stop at is present */
|
|
uint file_index= 0;
|
|
|
|
for (uint i = 0; i < m_stop_points.size(); ++i)
|
|
{
|
|
file_index= i;
|
|
|
|
if (m_stop_points[i] >= check_lsn)
|
|
break;
|
|
}
|
|
ut_ad((m_stop_points[file_index] >= check_lsn &&
|
|
(file_index == 0 || m_stop_points[file_index - 1] < check_lsn)));
|
|
|
|
arch_page_sys->arch_oper_mutex_exit();
|
|
|
|
/* 2. Find the block in the file where to stop. */
|
|
byte header_buf[ARCH_PAGE_BLK_HEADER_LENGTH];
|
|
|
|
Arch_Page_Pos left_pos;
|
|
left_pos.m_block_num= ARCH_PAGE_FILE_DATA_CAPACITY * file_index;
|
|
|
|
Arch_Page_Pos right_pos;
|
|
|
|
if (file_index < m_stop_points.size() - 1)
|
|
right_pos.m_block_num=
|
|
left_pos.m_block_num + ARCH_PAGE_FILE_DATA_CAPACITY - 1;
|
|
else
|
|
right_pos.m_block_num = last_pos.m_block_num;
|
|
|
|
lsn_t block_stop_lsn;
|
|
int err;
|
|
|
|
while (left_pos.m_block_num <= right_pos.m_block_num)
|
|
{
|
|
Arch_Page_Pos middle_pos;
|
|
middle_pos.init();
|
|
middle_pos.m_offset = 0;
|
|
|
|
middle_pos.m_block_num= left_pos.m_block_num +
|
|
(right_pos.m_block_num - left_pos.m_block_num) / 2;
|
|
|
|
/* Read the block header for data length and stop lsn info. */
|
|
err= group->read_data(middle_pos, header_buf, ARCH_PAGE_BLK_HEADER_LENGTH);
|
|
|
|
if (err != 0)
|
|
return false;
|
|
|
|
block_stop_lsn= Arch_Block::get_stop_lsn(header_buf);
|
|
auto data_len= Arch_Block::get_data_len(header_buf);
|
|
|
|
middle_pos.m_offset= data_len + ARCH_PAGE_BLK_HEADER_LENGTH;
|
|
|
|
if (block_stop_lsn >= check_lsn)
|
|
{
|
|
stop_point.lsn= block_stop_lsn;
|
|
stop_point.pos= middle_pos;
|
|
}
|
|
|
|
if (left_pos.m_block_num == right_pos.m_block_num ||
|
|
block_stop_lsn == check_lsn)
|
|
break;
|
|
|
|
if (block_stop_lsn > check_lsn)
|
|
right_pos.m_block_num= middle_pos.m_block_num - 1;
|
|
else
|
|
left_pos.m_block_num= middle_pos.m_block_num + 1;
|
|
}
|
|
|
|
ut_ad(stop_point.lsn != LSN_MAX);
|
|
return true;
|
|
}
|
|
|
|
#ifdef UNIV_DEBUG
|
|
|
|
bool Arch_File_Ctx::validate_stop_point_in_file(Arch_Group *group,
|
|
pfs_os_file_t file,
|
|
uint file_index)
|
|
{
|
|
lsn_t stop_lsn= LSN_MAX;
|
|
bool last_file= file_index + 1 == m_count;
|
|
|
|
if (last_file && group->is_active() && group->get_end_lsn() == LSN_MAX)
|
|
/* Just return true if this is the case as the block might not have been
|
|
flushed to disk yet */
|
|
return true;
|
|
|
|
if (file_index >= m_stop_points.size())
|
|
ut_error;
|
|
|
|
/* Read from file to the user buffer. */
|
|
uint64_t offset;
|
|
|
|
if (!last_file)
|
|
offset= ARCH_PAGE_FILE_DATA_CAPACITY * ARCH_PAGE_BLK_SIZE;
|
|
else
|
|
offset= Arch_Block::get_file_offset(group->get_stop_pos().m_block_num,
|
|
ARCH_DATA_BLOCK);
|
|
|
|
byte buf[ARCH_PAGE_BLK_SIZE];
|
|
|
|
/* Read the entire reset block. */
|
|
dberr_t err=
|
|
os_file_read(IORequestRead, file, buf, offset, ARCH_PAGE_BLK_SIZE,
|
|
nullptr);
|
|
|
|
if (err != DB_SUCCESS)
|
|
return false;
|
|
|
|
stop_lsn = Arch_Block::get_stop_lsn(buf);
|
|
|
|
if (stop_lsn != m_stop_points[file_index])
|
|
ut_error;
|
|
|
|
DBUG_PRINT("page_archiver", ("File stop point: %" PRIu64 "", stop_lsn));
|
|
return true;
|
|
}
|
|
|
|
bool Arch_File_Ctx::validate_reset_block_in_file(pfs_os_file_t file,
|
|
uint file_index,
|
|
uint &reset_count)
|
|
{
|
|
/* Read from file to the user buffer. */
|
|
byte buf[ARCH_PAGE_BLK_SIZE];
|
|
|
|
/* Read the entire reset block. */
|
|
dberr_t err=
|
|
os_file_read(IORequestRead, file, buf, 0, ARCH_PAGE_BLK_SIZE, nullptr);
|
|
|
|
if (err != DB_SUCCESS)
|
|
return false;
|
|
|
|
auto data_length= Arch_Block::get_data_len(buf);
|
|
|
|
if (data_length == 0)
|
|
/* No reset, move to the next file. */
|
|
return true;
|
|
|
|
ut_ad(data_length >= ARCH_PAGE_FILE_HEADER_RESET_LSN_SIZE +
|
|
ARCH_PAGE_FILE_HEADER_RESET_POS_SIZE);
|
|
|
|
Arch_Reset_File reset_file;
|
|
|
|
if (!m_reset.size() || reset_count >= m_reset.size())
|
|
ut_error;
|
|
|
|
reset_file= m_reset.at(reset_count);
|
|
|
|
if (reset_file.m_file_index != file_index)
|
|
ut_error;
|
|
|
|
byte *block_data= buf + ARCH_PAGE_BLK_HEADER_LENGTH;
|
|
|
|
lsn_t file_reset_lsn= mach_read_from_8(block_data);
|
|
uint length= ARCH_PAGE_FILE_HEADER_RESET_LSN_SIZE;
|
|
|
|
if (reset_file.m_lsn != file_reset_lsn)
|
|
ut_error;
|
|
|
|
DBUG_PRINT("page_archiver", ("File lsn : %" PRIu64 "", file_reset_lsn));
|
|
|
|
uint index= 0;
|
|
Arch_Point start_point;
|
|
|
|
while (length < data_length)
|
|
{
|
|
if (index >= reset_file.m_start_point.size())
|
|
ut_error;
|
|
|
|
start_point= reset_file.m_start_point.at(index);
|
|
|
|
uint64_t block_num= mach_read_from_2(block_data + length);
|
|
length+= ARCH_PAGE_FILE_HEADER_RESET_BLOCK_NUM_SIZE;
|
|
|
|
uint64_t block_offset= mach_read_from_2(block_data + length);
|
|
length+= ARCH_PAGE_FILE_HEADER_RESET_BLOCK_OFFSET_SIZE;
|
|
|
|
if (block_num != start_point.pos.m_block_num ||
|
|
block_offset != start_point.pos.m_offset)
|
|
ut_error;
|
|
|
|
DBUG_PRINT("page_archiver",
|
|
("Reset point %u : %" PRIu64 ", %" PRIu64 ", %" PRIu64 "", index,
|
|
start_point.lsn, block_num, block_offset));
|
|
++index;
|
|
}
|
|
|
|
ut_ad(length == data_length);
|
|
|
|
if (reset_file.m_start_point.size() != index)
|
|
ut_error;
|
|
|
|
++reset_count;
|
|
return true;
|
|
}
|
|
|
|
bool Arch_Group::validate_info_in_files()
|
|
{
|
|
uint reset_count= 0;
|
|
uint file_count= m_file_ctx.get_count();
|
|
bool success= true;
|
|
|
|
DBUG_PRINT("page_archiver", ("RESET PAGE"));
|
|
|
|
for (uint file_index= 0; file_index < file_count; ++file_index)
|
|
{
|
|
bool last_file= file_index + 1 == file_count;
|
|
|
|
if (last_file && m_file_ctx.get_phy_size() == 0)
|
|
{
|
|
success= false;
|
|
break;
|
|
}
|
|
|
|
success= m_file_ctx.validate(this, file_index, m_begin_lsn, reset_count);
|
|
if (!success)
|
|
break;
|
|
}
|
|
|
|
DBUG_PRINT("page_archiver", ("\n"));
|
|
return success;
|
|
}
|
|
|
|
bool Arch_File_Ctx::validate(Arch_Group *group, uint file_index,
|
|
lsn_t start_lsn, uint &reset_count)
|
|
{
|
|
char file_name[MAX_ARCH_PAGE_FILE_NAME_LEN];
|
|
|
|
build_name(file_index, start_lsn, file_name, MAX_ARCH_PAGE_FILE_NAME_LEN);
|
|
|
|
os_file_type_t type;
|
|
bool exists= false;
|
|
bool ret;
|
|
|
|
ret= os_file_status(file_name, &exists, &type);
|
|
|
|
if (!ret || !exists)
|
|
/* Could be the case if files are purged. */
|
|
return true;
|
|
|
|
bool success;
|
|
pfs_os_file_t file;
|
|
|
|
file= os_file_create(innodb_arch_file_key, file_name, OS_FILE_OPEN,
|
|
OS_CLONE_LOG_FILE, true, &success);
|
|
|
|
if (!success)
|
|
return false;
|
|
|
|
DBUG_PRINT("page_archiver", ("File : %u", file_index));
|
|
success= validate_reset_block_in_file(file, file_index, reset_count);
|
|
|
|
ut_ad(success);
|
|
if (!success)
|
|
{
|
|
if (file.m_file != OS_FILE_CLOSED)
|
|
os_file_close(file);
|
|
return false;
|
|
}
|
|
success= validate_stop_point_in_file(group, file, file_index);
|
|
|
|
if (file.m_file != OS_FILE_CLOSED)
|
|
os_file_close(file);
|
|
|
|
if (!success ||
|
|
(file_index + 1 == m_count && reset_count != m_reset.size()))
|
|
ut_error;
|
|
return true;
|
|
}
|
|
#endif /* UNIV_DEBUG */
|
|
|
|
lsn_t Arch_File_Ctx::purge(lsn_t begin_lsn, lsn_t end_lsn, lsn_t purge_lsn)
|
|
{
|
|
Arch_Point reset_point;
|
|
|
|
/* Find reset lsn which is <= purge_lsn. */
|
|
auto success= find_reset_point(purge_lsn, reset_point);
|
|
|
|
if (!success || reset_point.lsn == begin_lsn)
|
|
{
|
|
const char* mesg= my_get_err_msg(ER_IB_MSG_PAGE_ARCH_NO_RESET_POINTS);
|
|
sql_print_information("%s", mesg);
|
|
return LSN_MAX;
|
|
}
|
|
|
|
ut_ad(begin_lsn < reset_point.lsn && reset_point.lsn <= end_lsn);
|
|
|
|
Arch_Reset_File file_reset_compare;
|
|
file_reset_compare.m_lsn= reset_point.lsn;
|
|
|
|
/* Finds the file which has the element that is >= to reset_point.lsn. */
|
|
auto reset_file_it= std::lower_bound(
|
|
m_reset.begin(), m_reset.end(), file_reset_compare,
|
|
[](const Arch_Reset_File &lhs, const Arch_Reset_File &rhs)
|
|
{
|
|
return (lhs.m_lsn < rhs.m_lsn);
|
|
});
|
|
|
|
/* The element that is less than check_lsn, which we're interested in,
|
|
will be in the previous position. */
|
|
if (reset_file_it != m_reset.begin() &&
|
|
(reset_file_it == m_reset.end() || reset_file_it->m_lsn != purge_lsn))
|
|
--reset_file_it;
|
|
|
|
if (reset_file_it == m_reset.begin())
|
|
return LSN_MAX;
|
|
|
|
lsn_t purged_lsn= reset_file_it->m_lsn;
|
|
|
|
for (auto it= m_reset.begin(); it != reset_file_it;)
|
|
{
|
|
bool success= delete_file(it->m_file_index, begin_lsn);
|
|
|
|
if (success)
|
|
/** Removes the deleted file from reset info, thereby incrementing the
|
|
iterator. */
|
|
it= m_reset.erase(it);
|
|
else
|
|
{
|
|
purged_lsn= it->m_lsn;
|
|
reset_file_it= it;
|
|
ut_d(ut_error);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/** Only files which have a reset would be purged in the above loop. We want
|
|
to purge all the files preceding reset_file_it regardless of whether it has
|
|
a reset or not. */
|
|
for (uint file_index= 0; file_index < reset_file_it->m_file_index;
|
|
++file_index)
|
|
delete_file(file_index, begin_lsn);
|
|
|
|
return purged_lsn;
|
|
}
|
|
|
|
uint Arch_Group::purge(lsn_t purge_lsn, lsn_t &group_purged_lsn)
|
|
{
|
|
mysql_mutex_assert_owner(m_arch_mutex);
|
|
|
|
if (m_begin_lsn > purge_lsn)
|
|
{
|
|
group_purged_lsn= LSN_MAX;
|
|
return 0;
|
|
}
|
|
|
|
/** For a group (active or non-active) if there are any non-durable clients
|
|
attached then we don't purge the group at all. */
|
|
if (m_ref_count > 0)
|
|
{
|
|
group_purged_lsn= LSN_MAX;
|
|
return ER_PAGE_TRACKING_CANNOT_PURGE;
|
|
}
|
|
|
|
if (!m_is_active && m_end_lsn <= purge_lsn)
|
|
{
|
|
m_file_ctx.delete_files(m_begin_lsn);
|
|
group_purged_lsn= m_end_lsn;
|
|
return 0;
|
|
}
|
|
|
|
lsn_t purged_lsn= m_file_ctx.purge(m_begin_lsn, m_end_lsn, purge_lsn);
|
|
group_purged_lsn= purged_lsn;
|
|
return 0;
|
|
}
|
|
|
|
#ifdef UNIV_DEBUG
|
|
void Page_Arch_Client_Ctx::print()
|
|
{
|
|
DBUG_PRINT("page_archiver", ("CLIENT INFO"));
|
|
DBUG_PRINT("page_archiver", ("Transient Client - %u", !m_is_durable));
|
|
DBUG_PRINT("page_archiver", ("Start LSN - %" PRIu64 "", m_start_lsn));
|
|
DBUG_PRINT("page_archiver", ("Stop LSN - %" PRIu64 "", m_stop_lsn));
|
|
DBUG_PRINT("page_archiver",
|
|
("Last Reset LSN - %" PRIu64 "", m_last_reset_lsn));
|
|
DBUG_PRINT("page_archiver", ("Start pos - %" PRIu64 ", %u",
|
|
m_start_pos.m_block_num, m_start_pos.m_offset));
|
|
DBUG_PRINT("page_archiver", ("Stop pos - %" PRIu64 ", %u\n",
|
|
m_stop_pos.m_block_num, m_stop_pos.m_offset));
|
|
}
|
|
#endif
|
|
|
|
int Page_Arch_Client_Ctx::start(bool recovery, uint64_t *start_id)
|
|
{
|
|
bool reset= false;
|
|
int err= 0;
|
|
|
|
arch_client_mutex_enter();
|
|
|
|
switch (m_state)
|
|
{
|
|
case ARCH_CLIENT_STATE_STOPPED:
|
|
if (!m_is_durable)
|
|
{
|
|
arch_client_mutex_exit();
|
|
return ER_PAGE_TRACKING_NOT_STARTED;
|
|
}
|
|
DBUG_PRINT("page_archiver", ("Archiver in progress"));
|
|
DBUG_PRINT("page_archiver", ("[->] Starting page archiving."));
|
|
break;
|
|
|
|
case ARCH_CLIENT_STATE_INIT:
|
|
DBUG_PRINT("page_archiver", ("Archiver in progress"));
|
|
DBUG_PRINT("page_archiver", ("[->] Starting page archiving."));
|
|
break;
|
|
|
|
case ARCH_CLIENT_STATE_STARTED:
|
|
DBUG_PRINT("page_archiver", ("[->] Resetting page archiving."));
|
|
ut_ad(m_group != nullptr);
|
|
reset= true;
|
|
break;
|
|
|
|
default:
|
|
ut_d(ut_error);
|
|
}
|
|
|
|
/* Start archiving. */
|
|
err= arch_sys->page_sys()->start(&m_group, &m_last_reset_lsn, &m_start_pos,
|
|
m_is_durable, reset, recovery);
|
|
if (err != 0)
|
|
{
|
|
arch_client_mutex_exit();
|
|
return err;
|
|
}
|
|
|
|
if (!reset)
|
|
m_start_lsn= m_last_reset_lsn;
|
|
|
|
if (start_id != nullptr)
|
|
*start_id= m_last_reset_lsn;
|
|
|
|
if (!is_active())
|
|
m_state= ARCH_CLIENT_STATE_STARTED;
|
|
|
|
arch_client_mutex_exit();
|
|
|
|
if (!m_is_durable)
|
|
{
|
|
/* Update DD table buffer to get rid of recovery dependency for auto INC */
|
|
/* Auto INC is persisted differently in MariDB page_set_autoinc(). */
|
|
// dict_persist_to_dd_table_buffer();
|
|
|
|
/* Make sure all written pages are synced to disk. */
|
|
fil_flush_file_spaces();
|
|
|
|
ib::info() << "Clone Start PAGE ARCH : start LSN : "
|
|
<< m_start_lsn << ", checkpoint LSN : "
|
|
<< log_sys.last_checkpoint_lsn.load();
|
|
}
|
|
return err;
|
|
}
|
|
|
|
int Page_Arch_Client_Ctx::init_during_recovery(Arch_Group *group,
|
|
lsn_t last_lsn)
|
|
{
|
|
/* Initialise the sys client */
|
|
m_state= ARCH_CLIENT_STATE_STARTED;
|
|
m_group= group;
|
|
m_start_lsn= group->get_begin_lsn();
|
|
m_last_reset_lsn= last_lsn;
|
|
m_start_pos.init();
|
|
|
|
/* Start page archiving. */
|
|
int error= start(true, nullptr);
|
|
|
|
ut_d(print());
|
|
return error;
|
|
}
|
|
|
|
int Page_Arch_Client_Ctx::stop(lsn_t *stop_id)
|
|
{
|
|
arch_client_mutex_enter();
|
|
|
|
if (!is_active())
|
|
{
|
|
arch_client_mutex_exit();
|
|
const char* mesg= my_get_err_msg(ER_PAGE_TRACKING_NOT_STARTED);
|
|
sql_print_error("%s", mesg);
|
|
return ER_PAGE_TRACKING_NOT_STARTED;
|
|
}
|
|
|
|
ut_ad(m_group != nullptr);
|
|
|
|
/* Stop archiving. */
|
|
auto err= arch_sys->page_sys()->stop(m_group, &m_stop_lsn, &m_stop_pos,
|
|
m_is_durable);
|
|
ut_d(print());
|
|
|
|
/* We stop the client even in cases of an error. */
|
|
m_state= ARCH_CLIENT_STATE_STOPPED;
|
|
|
|
if (stop_id != nullptr) {
|
|
*stop_id= m_stop_lsn;
|
|
}
|
|
|
|
arch_client_mutex_exit();
|
|
|
|
ib::info() << "Clone Stop PAGE ARCH : end LSN : " << m_stop_lsn
|
|
<< ", log sys LSN : " << log_sys.get_lsn();
|
|
|
|
return err;
|
|
}
|
|
|
|
int Page_Arch_Client_Ctx::get_pages(Page_Arch_Cbk *cbk_func, void *cbk_ctx,
|
|
byte *buff, uint buf_len)
|
|
{
|
|
int err= 0;
|
|
uint num_pages;
|
|
uint read_len;
|
|
|
|
arch_client_mutex_enter();
|
|
|
|
ut_ad(m_state == ARCH_CLIENT_STATE_STOPPED);
|
|
|
|
auto cur_pos= m_start_pos;
|
|
|
|
while (true)
|
|
{
|
|
ut_ad(cur_pos.m_block_num <= m_stop_pos.m_block_num);
|
|
|
|
/* Check if last block */
|
|
if (cur_pos.m_block_num >= m_stop_pos.m_block_num)
|
|
{
|
|
if (cur_pos.m_offset > m_stop_pos.m_offset)
|
|
{
|
|
my_error(ER_INTERNAL_ERROR, MYF(0), "Wrong Archiver page offset");
|
|
err= ER_INTERNAL_ERROR;
|
|
ut_d(ut_error);
|
|
break;
|
|
}
|
|
read_len= m_stop_pos.m_offset - cur_pos.m_offset;
|
|
|
|
if (read_len == 0)
|
|
break;
|
|
|
|
}
|
|
else
|
|
{
|
|
if (cur_pos.m_offset > ARCH_PAGE_BLK_SIZE)
|
|
{
|
|
my_error(ER_INTERNAL_ERROR, MYF(0), "Wrong Archiver page offset");
|
|
err= ER_INTERNAL_ERROR;
|
|
ut_d(ut_error);
|
|
break;
|
|
}
|
|
read_len= ARCH_PAGE_BLK_SIZE - cur_pos.m_offset;
|
|
|
|
/* Move to next block. */
|
|
if (read_len == 0)
|
|
{
|
|
cur_pos.set_next();
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if (read_len > buf_len)
|
|
read_len = buf_len;
|
|
|
|
err= m_group->read_data(cur_pos, buff, read_len);
|
|
if (err != 0)
|
|
break;
|
|
|
|
cur_pos.m_offset+= read_len;
|
|
num_pages= read_len / ARCH_BLK_PAGE_ID_SIZE;
|
|
|
|
err= cbk_func(cbk_ctx, buff, num_pages);
|
|
if (err != 0)
|
|
break;
|
|
}
|
|
arch_client_mutex_exit();
|
|
return err;
|
|
}
|
|
|
|
void Page_Arch_Client_Ctx::release()
|
|
{
|
|
arch_client_mutex_enter();
|
|
|
|
switch (m_state)
|
|
{
|
|
case ARCH_CLIENT_STATE_INIT:
|
|
arch_client_mutex_exit();
|
|
return;
|
|
|
|
case ARCH_CLIENT_STATE_STARTED:
|
|
arch_client_mutex_exit();
|
|
stop(nullptr);
|
|
break;
|
|
|
|
case ARCH_CLIENT_STATE_STOPPED:
|
|
break;
|
|
|
|
default:
|
|
ut_d(ut_error);
|
|
}
|
|
|
|
ut_ad(m_group != nullptr);
|
|
arch_sys->page_sys()->release(m_group, m_is_durable, m_start_pos);
|
|
|
|
m_state= ARCH_CLIENT_STATE_INIT;
|
|
m_group= nullptr;
|
|
m_start_lsn= LSN_MAX;
|
|
m_stop_lsn= LSN_MAX;
|
|
m_last_reset_lsn= LSN_MAX;
|
|
m_start_pos.init();
|
|
m_stop_pos.init();
|
|
|
|
arch_client_mutex_exit();
|
|
}
|
|
|
|
bool wait_flush_archiver(Page_Wait_Flush_Archiver_Cbk cbk_func)
|
|
{
|
|
auto arch_page_sys= arch_sys->page_sys();
|
|
mysql_mutex_assert_owner(arch_page_sys->get_oper_mutex());
|
|
|
|
while (cbk_func())
|
|
{
|
|
/* Need to wait for flush. We don't expect it
|
|
to happen normally. With no duplicate page ID
|
|
dirty page growth should be very slow. */
|
|
arch_sys->signal_archiver();
|
|
bool is_timeout= false;
|
|
int alert_count= 0;
|
|
auto thd= current_thd;
|
|
|
|
auto err= Clone_Sys::wait_default(
|
|
[&](bool alert, bool &result)
|
|
{
|
|
mysql_mutex_assert_owner(arch_page_sys->get_oper_mutex());
|
|
result= cbk_func();
|
|
|
|
int err2= 0;
|
|
if (srv_shutdown_state.load() == SRV_SHUTDOWN_LAST_PHASE ||
|
|
srv_shutdown_state.load() == SRV_SHUTDOWN_EXIT_THREADS ||
|
|
arch_page_sys->is_abort() ||
|
|
(thd && thd_killed(thd)))
|
|
{
|
|
if (thd) my_error(ER_QUERY_INTERRUPTED, MYF(0));
|
|
err2= ER_QUERY_INTERRUPTED;
|
|
}
|
|
else if (result)
|
|
{
|
|
arch_sys->signal_archiver();
|
|
if (alert && ++alert_count == 12)
|
|
{
|
|
alert_count= 0;
|
|
sql_print_information(
|
|
"Clone Page Tracking: waiting for block to flush");
|
|
}
|
|
}
|
|
return err2;
|
|
},
|
|
arch_page_sys->get_oper_mutex(), is_timeout);
|
|
|
|
if (err != 0)
|
|
return false;
|
|
else if (is_timeout)
|
|
{
|
|
sql_print_warning("Clone Page Tracking: wait for block flush timed out");
|
|
ut_d(ut_error);
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
uint Arch_Block::get_file_index(uint64_t block_num, Arch_Blk_Type type)
|
|
{
|
|
size_t file_index= std::numeric_limits<size_t>::max();
|
|
|
|
switch (type)
|
|
{
|
|
case ARCH_RESET_BLOCK:
|
|
file_index= block_num;
|
|
break;
|
|
|
|
case ARCH_DATA_BLOCK:
|
|
file_index= block_num / ARCH_PAGE_FILE_DATA_CAPACITY;
|
|
break;
|
|
|
|
default:
|
|
ut_d(ut_error);
|
|
}
|
|
return static_cast<uint>(file_index);
|
|
}
|
|
|
|
bool Arch_Block::is_zeros(const void *start, size_t number_of_bytes)
|
|
{
|
|
auto *first_byte= reinterpret_cast<const char *>(start);
|
|
return number_of_bytes == 0 || (*first_byte == 0 &&
|
|
std::memcmp(first_byte, first_byte + 1, number_of_bytes - 1) == 0);
|
|
}
|
|
|
|
Arch_Blk_Type Arch_Block::get_type(byte *block)
|
|
{
|
|
return static_cast<Arch_Blk_Type>(
|
|
mach_read_from_1(block + ARCH_PAGE_BLK_HEADER_TYPE_OFFSET));
|
|
}
|
|
|
|
uint Arch_Block::get_data_len(byte *block)
|
|
{
|
|
return (mach_read_from_2(block + ARCH_PAGE_BLK_HEADER_DATA_LEN_OFFSET));
|
|
}
|
|
|
|
lsn_t Arch_Block::get_stop_lsn(byte *block)
|
|
{
|
|
return (mach_read_from_8(block + ARCH_PAGE_BLK_HEADER_STOP_LSN_OFFSET));
|
|
}
|
|
|
|
uint64_t Arch_Block::get_block_number(byte *block)
|
|
{
|
|
return (mach_read_from_8(block + ARCH_PAGE_BLK_HEADER_NUMBER_OFFSET));
|
|
}
|
|
|
|
lsn_t Arch_Block::get_reset_lsn(byte *block)
|
|
{
|
|
return (mach_read_from_8(block + ARCH_PAGE_BLK_HEADER_RESET_LSN_OFFSET));
|
|
}
|
|
|
|
uint32_t Arch_Block::get_checksum(byte *block)
|
|
{
|
|
return (mach_read_from_4(block + ARCH_PAGE_BLK_HEADER_CHECKSUM_OFFSET));
|
|
}
|
|
|
|
uint64_t Arch_Block::get_file_offset(uint64_t block_num, Arch_Blk_Type type)
|
|
{
|
|
uint64_t offset= 0;
|
|
|
|
switch (type)
|
|
{
|
|
case ARCH_RESET_BLOCK:
|
|
offset= 0;
|
|
break;
|
|
|
|
case ARCH_DATA_BLOCK:
|
|
offset= block_num % ARCH_PAGE_FILE_DATA_CAPACITY;
|
|
offset+= ARCH_PAGE_FILE_NUM_RESET_PAGE;
|
|
offset*= ARCH_PAGE_BLK_SIZE;
|
|
break;
|
|
|
|
default:
|
|
ut_d(ut_error);
|
|
}
|
|
return offset;
|
|
}
|
|
|
|
bool Arch_Block::validate(byte *block)
|
|
{
|
|
auto data_length= Arch_Block::get_data_len(block);
|
|
auto block_checksum= Arch_Block::get_checksum(block);
|
|
auto checksum= my_crc32c(0, block + ARCH_PAGE_BLK_HEADER_LENGTH, data_length);
|
|
|
|
if (checksum != block_checksum)
|
|
{
|
|
const char* format= my_get_err_msg(
|
|
ER_IB_ERR_PAGE_ARCH_INVALID_DOUBLE_WRITE_BUF);
|
|
|
|
my_printf_error(ER_IB_ERR_PAGE_ARCH_INVALID_DOUBLE_WRITE_BUF, format,
|
|
MYF(ME_ERROR_LOG_ONLY|ME_WARNING),
|
|
Arch_Block::get_block_number(block));
|
|
ut_d(ut_error);
|
|
return false;
|
|
}
|
|
else if (Arch_Block::is_zeros(block, ARCH_PAGE_BLK_SIZE))
|
|
return false;
|
|
|
|
return true;
|
|
}
|
|
|
|
void Arch_Block::update_block_header(lsn_t stop_lsn, lsn_t reset_lsn)
|
|
{
|
|
mach_write_to_2(m_data + ARCH_PAGE_BLK_HEADER_DATA_LEN_OFFSET, m_data_len);
|
|
|
|
if (stop_lsn != LSN_MAX)
|
|
{
|
|
m_stop_lsn= stop_lsn;
|
|
mach_write_to_8(m_data + ARCH_PAGE_BLK_HEADER_STOP_LSN_OFFSET, m_stop_lsn);
|
|
}
|
|
|
|
if (reset_lsn != LSN_MAX)
|
|
{
|
|
m_reset_lsn= reset_lsn;
|
|
mach_write_to_8(m_data + ARCH_PAGE_BLK_HEADER_RESET_LSN_OFFSET,
|
|
m_reset_lsn);
|
|
}
|
|
}
|
|
|
|
/** Set the block ready to begin writing page ID
|
|
@param[in] pos position to initiate block number */
|
|
void Arch_Block::begin_write(Arch_Page_Pos pos)
|
|
{
|
|
m_data_len= 0;
|
|
m_state= ARCH_BLOCK_ACTIVE;
|
|
|
|
m_number=
|
|
(m_type == ARCH_DATA_BLOCK
|
|
? pos.m_block_num
|
|
: Arch_Block::get_file_index(pos.m_block_num, ARCH_DATA_BLOCK));
|
|
|
|
m_oldest_lsn= LSN_MAX;
|
|
m_reset_lsn= LSN_MAX;
|
|
|
|
if (m_type == ARCH_DATA_BLOCK)
|
|
arch_sys->page_sys()->update_stop_info(this);
|
|
}
|
|
|
|
/** End writing to a block.
|
|
Change state to #ARCH_BLOCK_READY_TO_FLUSH */
|
|
void Arch_Block::end_write() { m_state = ARCH_BLOCK_READY_TO_FLUSH; }
|
|
|
|
/** Add page ID to current block
|
|
@param[in] page page from buffer pool
|
|
@param[in] pos Archiver current position
|
|
@return true, if successful
|
|
false, if no more space in current block */
|
|
bool Arch_Block::add_page(buf_page_t *page, Arch_Page_Pos *pos)
|
|
{
|
|
space_id_t space_id;
|
|
page_no_t page_num;
|
|
byte *data_ptr;
|
|
|
|
ut_ad(pos->m_offset <= ARCH_PAGE_BLK_SIZE);
|
|
ut_ad(m_type == ARCH_DATA_BLOCK);
|
|
ut_ad(pos->m_offset == m_data_len + ARCH_PAGE_BLK_HEADER_LENGTH);
|
|
|
|
if ((pos->m_offset + ARCH_BLK_PAGE_ID_SIZE) > ARCH_PAGE_BLK_SIZE)
|
|
{
|
|
ut_ad(pos->m_offset == ARCH_PAGE_BLK_SIZE);
|
|
return false;
|
|
}
|
|
|
|
data_ptr= m_data + pos->m_offset;
|
|
|
|
/* Write serialized page ID: tablespace ID and offset */
|
|
space_id= page->id().space();
|
|
page_num= page->id().page_no();
|
|
|
|
mach_write_to_4(data_ptr + ARCH_BLK_SPCE_ID_OFFSET, space_id);
|
|
mach_write_to_4(data_ptr + ARCH_BLK_PAGE_NO_OFFSET, page_num);
|
|
|
|
/* Update position. */
|
|
pos->m_offset+= ARCH_BLK_PAGE_ID_SIZE;
|
|
m_data_len+= ARCH_BLK_PAGE_ID_SIZE;
|
|
|
|
/* Update oldest LSN from page. */
|
|
if (arch_sys->page_sys()->get_latest_stop_lsn() > m_oldest_lsn ||
|
|
m_oldest_lsn > page->oldest_modification())
|
|
m_oldest_lsn = page->oldest_modification();
|
|
|
|
return true;
|
|
}
|
|
|
|
bool Arch_Block::get_data(Arch_Page_Pos *read_pos, uint read_len,
|
|
byte *read_buff)
|
|
{
|
|
ut_ad(read_pos->m_offset + read_len <= m_size);
|
|
|
|
if (m_state == ARCH_BLOCK_INIT || m_number != read_pos->m_block_num)
|
|
/* The block is already overwritten. */
|
|
return false;
|
|
|
|
byte *src= m_data + read_pos->m_offset;
|
|
memcpy(read_buff, src, read_len);
|
|
|
|
return true;
|
|
}
|
|
|
|
bool Arch_Block::set_data(uint read_len, byte *read_buff, uint read_offset)
|
|
{
|
|
ut_ad(m_state != ARCH_BLOCK_INIT);
|
|
ut_ad(read_offset + read_len <= m_size);
|
|
|
|
byte *dest= m_data + read_offset;
|
|
memcpy(dest, read_buff, read_len);
|
|
|
|
set_reset_lsn(Arch_Block::get_reset_lsn(m_data));
|
|
return true;
|
|
}
|
|
|
|
/** Flush this block to the file group.
|
|
@param[in] file_group current archive group
|
|
@param[in] type flush type
|
|
@return error code. */
|
|
dberr_t Arch_Block::flush(Arch_Group *file_group, Arch_Blk_Flush_Type type)
|
|
{
|
|
dberr_t err= DB_SUCCESS;
|
|
uint32_t checksum= my_crc32c(0, m_data + ARCH_PAGE_BLK_HEADER_LENGTH,
|
|
m_data_len);
|
|
/* Update block's header. */
|
|
mach_write_to_1(m_data + ARCH_PAGE_BLK_HEADER_VERSION_OFFSET,
|
|
ARCH_PAGE_FILE_VERSION);
|
|
mach_write_to_1(m_data + ARCH_PAGE_BLK_HEADER_TYPE_OFFSET, m_type);
|
|
mach_write_to_2(m_data + ARCH_PAGE_BLK_HEADER_DATA_LEN_OFFSET, m_data_len);
|
|
mach_write_to_4(m_data + ARCH_PAGE_BLK_HEADER_CHECKSUM_OFFSET, checksum);
|
|
mach_write_to_8(m_data + ARCH_PAGE_BLK_HEADER_STOP_LSN_OFFSET, m_stop_lsn);
|
|
mach_write_to_8(m_data + ARCH_PAGE_BLK_HEADER_RESET_LSN_OFFSET, m_reset_lsn);
|
|
mach_write_to_8(m_data + ARCH_PAGE_BLK_HEADER_NUMBER_OFFSET, m_number);
|
|
|
|
switch (m_type)
|
|
{
|
|
case ARCH_RESET_BLOCK:
|
|
err= file_group->write_file_header(m_data, m_size);
|
|
break;
|
|
|
|
case ARCH_DATA_BLOCK:
|
|
{
|
|
bool is_partial_flush= (type == ARCH_FLUSH_PARTIAL);
|
|
|
|
/* We allow partial flush to happen even if there were no pages added
|
|
since the last partial flush as the block's header might contain some
|
|
useful info required during recovery. */
|
|
err= file_group->write_to_file(nullptr, m_data, m_size, is_partial_flush,
|
|
true);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
ut_d(ut_error);
|
|
}
|
|
return err;
|
|
}
|
|
|
|
void Arch_Block::add_reset(lsn_t reset_lsn, Arch_Page_Pos reset_pos)
|
|
{
|
|
ut_ad(m_type == ARCH_RESET_BLOCK);
|
|
ut_ad(m_data_len <= ARCH_PAGE_BLK_SIZE);
|
|
ut_ad(m_data_len + ARCH_PAGE_FILE_HEADER_RESET_POS_SIZE <=
|
|
ARCH_PAGE_BLK_SIZE);
|
|
|
|
byte *buf= m_data + ARCH_PAGE_BLK_HEADER_LENGTH;
|
|
|
|
if (m_data_len == 0)
|
|
{
|
|
/* Write file lsn. */
|
|
mach_write_to_8(buf, reset_lsn);
|
|
m_data_len+= ARCH_PAGE_FILE_HEADER_RESET_LSN_SIZE;
|
|
}
|
|
|
|
ut_ad(m_data_len >= ARCH_PAGE_FILE_HEADER_RESET_LSN_SIZE);
|
|
|
|
mach_write_to_2(buf + m_data_len, reset_pos.m_block_num);
|
|
m_data_len+= ARCH_PAGE_FILE_HEADER_RESET_BLOCK_NUM_SIZE;
|
|
|
|
mach_write_to_2(buf + m_data_len, reset_pos.m_offset);
|
|
m_data_len+= ARCH_PAGE_FILE_HEADER_RESET_BLOCK_OFFSET_SIZE;
|
|
}
|
|
|
|
void Arch_Block::copy_data(const Arch_Block *block)
|
|
{
|
|
m_data_len= block->m_data_len;
|
|
m_size= block->m_size;
|
|
m_state= block->m_state;
|
|
m_number= block->m_number;
|
|
m_type= block->m_type;
|
|
m_stop_lsn= block->m_stop_lsn;
|
|
m_reset_lsn= block->m_reset_lsn;
|
|
m_oldest_lsn= block->m_oldest_lsn;
|
|
set_data(m_size, block->m_data, 0);
|
|
}
|
|
|
|
/** Initialize a position */
|
|
void Arch_Page_Pos::init()
|
|
{
|
|
m_block_num= 0;
|
|
m_offset= ARCH_PAGE_BLK_HEADER_LENGTH;
|
|
}
|
|
|
|
/** Position in the beginning of next block */
|
|
void Arch_Page_Pos::set_next()
|
|
{
|
|
m_block_num++;
|
|
m_offset= ARCH_PAGE_BLK_HEADER_LENGTH;
|
|
}
|
|
|
|
/** Allocate buffer and initialize blocks
|
|
@return true, if successful */
|
|
bool ArchPageData::init()
|
|
{
|
|
uint alloc_size;
|
|
uint index;
|
|
byte *mem_ptr;
|
|
|
|
ut_ad(m_buffer == nullptr);
|
|
|
|
m_block_size= ARCH_PAGE_BLK_SIZE;
|
|
m_num_data_blocks= ARCH_PAGE_NUM_BLKS;
|
|
|
|
/* block size and number must be in power of 2 */
|
|
ut_ad(ut_is_2pow(m_block_size));
|
|
ut_ad(ut_is_2pow(m_num_data_blocks));
|
|
|
|
alloc_size= m_block_size * m_num_data_blocks;
|
|
|
|
/* For reset block. */
|
|
alloc_size+= m_block_size;
|
|
|
|
/* For partial flush block. */
|
|
alloc_size+= m_block_size;
|
|
|
|
/* For alignment */
|
|
alloc_size+= m_block_size;
|
|
|
|
/* Allocate buffer for memory blocks. */
|
|
m_buffer= static_cast<byte *>(ut_zalloc(alloc_size, mem_key_archive));
|
|
|
|
if (m_buffer == nullptr)
|
|
return false;
|
|
|
|
mem_ptr = static_cast<byte *>(
|
|
ut_align_down(m_buffer + m_block_size, m_block_size));
|
|
|
|
Arch_Block *cur_blk;
|
|
|
|
/* Create memory blocks. */
|
|
for (index= 0; index < m_num_data_blocks; index++)
|
|
{
|
|
cur_blk= UT_NEW(Arch_Block(mem_ptr, m_block_size, ARCH_DATA_BLOCK),
|
|
mem_key_archive);
|
|
if (cur_blk == nullptr)
|
|
return false;
|
|
|
|
m_data_blocks.push_back(cur_blk);
|
|
mem_ptr+= m_block_size;
|
|
}
|
|
m_reset_block= UT_NEW(Arch_Block(mem_ptr, m_block_size, ARCH_RESET_BLOCK),
|
|
mem_key_archive);
|
|
if (m_reset_block == nullptr)
|
|
return false;
|
|
|
|
mem_ptr+= m_block_size;
|
|
|
|
m_partial_flush_block=
|
|
UT_NEW(Arch_Block(mem_ptr, m_block_size, ARCH_DATA_BLOCK),
|
|
mem_key_archive);
|
|
if (m_partial_flush_block == nullptr)
|
|
return false;
|
|
|
|
return true;
|
|
}
|
|
|
|
/** Delete blocks and buffer */
|
|
void ArchPageData::clean()
|
|
{
|
|
for (auto &block : m_data_blocks)
|
|
{
|
|
UT_DELETE(block);
|
|
block= nullptr;
|
|
}
|
|
|
|
if (m_reset_block != nullptr)
|
|
{
|
|
UT_DELETE(m_reset_block);
|
|
m_reset_block= nullptr;
|
|
}
|
|
|
|
if (m_partial_flush_block != nullptr)
|
|
{
|
|
UT_DELETE(m_partial_flush_block);
|
|
m_partial_flush_block= nullptr;
|
|
}
|
|
ut_free(m_buffer);
|
|
}
|
|
|
|
/** Get the block for a position
|
|
@param[in] pos position in page archive sys
|
|
@param[in] type block type
|
|
@return page archive in memory block */
|
|
Arch_Block *ArchPageData::get_block(Arch_Page_Pos *pos, Arch_Blk_Type type)
|
|
{
|
|
switch (type)
|
|
{
|
|
case ARCH_DATA_BLOCK:
|
|
{
|
|
/* index = block_num % m_num_blocks */
|
|
ut_ad(ut_is_2pow(m_num_data_blocks));
|
|
|
|
auto index= pos->m_block_num & (m_num_data_blocks - 1);
|
|
return m_data_blocks[index];
|
|
}
|
|
case ARCH_RESET_BLOCK:
|
|
return m_reset_block;
|
|
|
|
default:
|
|
ut_d(ut_error);
|
|
}
|
|
return nullptr;
|
|
}
|
|
|
|
Arch_Page_Sys::Arch_Page_Sys()
|
|
{
|
|
mysql_mutex_init(0, &m_mutex, nullptr);
|
|
mysql_mutex_init(0, &m_oper_mutex, nullptr);
|
|
|
|
m_ctx= UT_NEW(Page_Arch_Client_Ctx(true), mem_key_archive);
|
|
|
|
DBUG_EXECUTE_IF("page_archiver_simulate_more_archived_files",
|
|
ARCH_PAGE_FILE_CAPACITY = 8;
|
|
ARCH_PAGE_FILE_DATA_CAPACITY =
|
|
ARCH_PAGE_FILE_CAPACITY - ARCH_PAGE_FILE_NUM_RESET_PAGE;);
|
|
}
|
|
|
|
Arch_Page_Sys::~Arch_Page_Sys()
|
|
{
|
|
ut_ad(m_state == ARCH_STATE_INIT || m_state == ARCH_STATE_ABORT ||
|
|
m_state == ARCH_STATE_READ_ONLY);
|
|
ut_ad(m_current_group == nullptr);
|
|
|
|
for (auto group : m_group_list)
|
|
UT_DELETE(group);
|
|
|
|
Arch_Group::shutdown();
|
|
m_data.clean();
|
|
|
|
UT_DELETE(m_ctx);
|
|
mysql_mutex_destroy(&m_mutex);
|
|
mysql_mutex_destroy(&m_oper_mutex);
|
|
}
|
|
|
|
void Arch_Page_Sys::post_recovery_init()
|
|
{
|
|
if (!is_active())
|
|
return;
|
|
|
|
arch_oper_mutex_enter();
|
|
m_latest_stop_lsn=
|
|
log_sys.last_checkpoint_lsn.load(std::memory_order_seq_cst);
|
|
auto cur_block= m_data.get_block(&m_write_pos, ARCH_DATA_BLOCK);
|
|
update_stop_info(cur_block);
|
|
arch_oper_mutex_exit();
|
|
}
|
|
|
|
void Arch_Page_Sys::flush_at_checkpoint(lsn_t checkpoint_lsn)
|
|
{
|
|
arch_oper_mutex_enter();
|
|
|
|
if (!is_active())
|
|
{
|
|
arch_oper_mutex_exit();
|
|
return;
|
|
}
|
|
|
|
lsn_t end_lsn= m_current_group->get_end_lsn();
|
|
|
|
if (m_write_pos.m_offset == ARCH_PAGE_BLK_HEADER_LENGTH)
|
|
{
|
|
arch_oper_mutex_exit();
|
|
return;
|
|
}
|
|
|
|
Arch_Page_Pos request_flush_pos;
|
|
|
|
if (end_lsn == LSN_MAX)
|
|
{
|
|
Arch_Block *cur_block = m_data.get_block(&m_write_pos, ARCH_DATA_BLOCK);
|
|
|
|
ut_ad(cur_block->get_state() == ARCH_BLOCK_ACTIVE);
|
|
|
|
m_latest_stop_lsn= checkpoint_lsn;
|
|
update_stop_info(cur_block);
|
|
|
|
if (cur_block->get_oldest_lsn() != LSN_MAX &&
|
|
cur_block->get_oldest_lsn() <= checkpoint_lsn)
|
|
/* If the oldest modified page in the block added since the last
|
|
checkpoint was modified before the checkpoint_lsn then the block needs to
|
|
be flushed*/
|
|
request_flush_pos = m_write_pos;
|
|
else
|
|
{
|
|
/* Wait for blocks that are not active to be flushed. */
|
|
|
|
if (m_write_pos.m_block_num == 0)
|
|
{
|
|
arch_oper_mutex_exit();
|
|
return;
|
|
}
|
|
request_flush_pos.init();
|
|
request_flush_pos.m_block_num= m_write_pos.m_block_num - 1;
|
|
}
|
|
|
|
if (request_flush_pos < m_flush_pos)
|
|
{
|
|
arch_oper_mutex_exit();
|
|
return;
|
|
}
|
|
|
|
if (m_request_flush_pos < request_flush_pos)
|
|
m_request_flush_pos= request_flush_pos;
|
|
}
|
|
else
|
|
{
|
|
request_flush_pos= m_current_group->get_stop_pos();
|
|
m_request_flush_pos= request_flush_pos;
|
|
}
|
|
|
|
if (request_flush_pos.m_block_num == m_write_pos.m_block_num)
|
|
MONITOR_INC(MONITOR_PAGE_TRACK_CHECKPOINT_PARTIAL_FLUSH_REQUEST);
|
|
|
|
/* We need to ensure that blocks are flushed until request_flush_pos */
|
|
auto cbk = [&] { return (request_flush_pos < m_flush_pos ? false : true); };
|
|
|
|
if (!wait_flush_archiver(cbk))
|
|
{
|
|
const char* mesg= my_get_err_msg(ER_IB_WRN_PAGE_ARCH_FLUSH_DATA);
|
|
sql_print_warning("%s", mesg);
|
|
}
|
|
arch_oper_mutex_exit();
|
|
}
|
|
|
|
void Arch_Page_Sys::track_page(buf_page_t *bpage, lsn_t track_lsn,
|
|
lsn_t oldest_lsn, bool track_mark)
|
|
{
|
|
Arch_Block *cur_blk;
|
|
uint count= 0;
|
|
|
|
if (oldest_lsn > track_lsn && !track_mark)
|
|
/* If the LSN is bigger than track LSNand track mark is not set, it
|
|
is already added to tracking list. */
|
|
return;
|
|
|
|
/* We need to track this page. */
|
|
arch_oper_mutex_enter();
|
|
|
|
while (true)
|
|
{
|
|
if (m_state != ARCH_STATE_ACTIVE)
|
|
break;
|
|
|
|
/* Can possibly loop only two times. */
|
|
if (count >= 2)
|
|
{
|
|
if (srv_shutdown_state.load() >= SRV_SHUTDOWN_CLEANUP)
|
|
{
|
|
arch_oper_mutex_exit();
|
|
return;
|
|
}
|
|
ib::warn() << "Fail to add page for tracking."
|
|
<< " Space ID: " << bpage->id().space();
|
|
|
|
m_state= ARCH_STATE_ABORT;
|
|
arch_oper_mutex_exit();
|
|
ut_d(ut_error);
|
|
return;
|
|
}
|
|
|
|
cur_blk= m_data.get_block(&m_write_pos, ARCH_DATA_BLOCK);
|
|
|
|
if (cur_blk->get_state() == ARCH_BLOCK_ACTIVE)
|
|
{
|
|
if (cur_blk->add_page(bpage, &m_write_pos))
|
|
/* page added successfully. */
|
|
break;
|
|
|
|
/* Current block is full. Move to next block. */
|
|
cur_blk->end_write();
|
|
m_write_pos.set_next();
|
|
|
|
/* Writing to a new file so move to the next reset block. */
|
|
if (m_write_pos.m_block_num % ARCH_PAGE_FILE_DATA_CAPACITY == 0)
|
|
{
|
|
Arch_Block *reset_block=
|
|
m_data.get_block(&m_reset_pos, ARCH_RESET_BLOCK);
|
|
reset_block->end_write();
|
|
m_reset_pos.set_next();
|
|
}
|
|
arch_sys->signal_archiver();
|
|
|
|
++count;
|
|
continue;
|
|
|
|
}
|
|
else if (cur_blk->get_state() == ARCH_BLOCK_INIT ||
|
|
cur_blk->get_state() == ARCH_BLOCK_FLUSHED)
|
|
{
|
|
ut_ad(m_write_pos.m_offset == ARCH_PAGE_BLK_HEADER_LENGTH);
|
|
cur_blk->begin_write(m_write_pos);
|
|
|
|
if (!cur_blk->add_page(bpage, &m_write_pos)) {
|
|
/* Should always succeed. */
|
|
ut_d(ut_error);
|
|
}
|
|
/* page added successfully. */
|
|
break;
|
|
|
|
}
|
|
else
|
|
{
|
|
ut_a(cur_blk->get_state() == ARCH_BLOCK_READY_TO_FLUSH);
|
|
auto cbk= std::bind(&Arch_Block::is_flushable, *cur_blk);
|
|
|
|
/* Might release operation mutex temporarily. Need to
|
|
loop again verifying the state. */
|
|
bool success= wait_flush_archiver(cbk);
|
|
count= success ? 0 : 2;
|
|
continue;
|
|
}
|
|
}
|
|
arch_oper_mutex_exit();
|
|
}
|
|
|
|
/** Get page IDs from a specific position.
|
|
Caller must ensure that read_len doesn't exceed the block.
|
|
@param[in] group group whose pages we're interested in
|
|
@param[in] read_pos position in archived data
|
|
@param[in] read_len amount of data to read
|
|
@param[out] read_buff buffer to return the page IDs.
|
|
@note Caller must allocate the buffer.
|
|
@return true if we could successfully read the block. */
|
|
bool Arch_Page_Sys::get_pages(Arch_Group *group, Arch_Page_Pos *read_pos,
|
|
uint read_len, byte *read_buff)
|
|
{
|
|
arch_oper_mutex_enter();
|
|
|
|
if (group != m_current_group)
|
|
{
|
|
arch_oper_mutex_exit();
|
|
return false;
|
|
}
|
|
|
|
/* Get the block to read from. */
|
|
auto read_blk= m_data.get_block(read_pos, ARCH_DATA_BLOCK);
|
|
read_blk->update_block_header(LSN_MAX, LSN_MAX);
|
|
|
|
/* Read from the block. */
|
|
bool success= read_blk->get_data(read_pos, read_len, read_buff);
|
|
|
|
arch_oper_mutex_exit();
|
|
return success;
|
|
}
|
|
|
|
int Arch_Page_Sys::get_pages(MYSQL_THD thd, Page_Track_Callback cbk_func,
|
|
void *cbk_ctx, lsn_t &start_id, lsn_t &stop_id,
|
|
byte *buf, uint buf_len)
|
|
{
|
|
DBUG_PRINT("page_archiver", ("Fetch pages"));
|
|
arch_mutex_enter();
|
|
|
|
if (m_state == ARCH_STATE_READ_ONLY)
|
|
{
|
|
arch_mutex_exit();
|
|
return 0;
|
|
}
|
|
/** 1. Get appropriate LSN range. */
|
|
Arch_Group *group= nullptr;
|
|
|
|
int error= fetch_group_within_lsn_range(start_id, stop_id, &group);
|
|
DBUG_PRINT("page_archiver", ("Start id: %" PRIu64 ", stop id: %" PRIu64 "",
|
|
start_id, stop_id));
|
|
if (error != 0)
|
|
{
|
|
arch_mutex_exit();
|
|
return error;
|
|
}
|
|
ut_ad(group != nullptr);
|
|
|
|
/** 2. Get block position from where to start. */
|
|
Arch_Point reset_point;
|
|
|
|
auto success= group->find_reset_point(start_id, reset_point);
|
|
Arch_Page_Pos start_pos = reset_point.pos;
|
|
start_id= reset_point.lsn;
|
|
|
|
if (!success)
|
|
{
|
|
arch_mutex_exit();
|
|
DBUG_PRINT("page_archiver",
|
|
("Can't fetch pages - No matching reset point."));
|
|
return ER_PAGE_TRACKING_RANGE_NOT_TRACKED;
|
|
}
|
|
|
|
/* 3. Fetch tracked pages. */
|
|
DBUG_PRINT("page_archiver",
|
|
("Trying to get pages between %" PRIu64 " to %" PRIu64 "",
|
|
start_id, stop_id));
|
|
|
|
byte header_buf[ARCH_PAGE_BLK_HEADER_LENGTH];
|
|
|
|
int err= 0;
|
|
auto cur_pos= start_pos;
|
|
Arch_Page_Pos temp_pos;
|
|
uint num_pages;
|
|
bool new_block= true;
|
|
bool last_block= false;
|
|
lsn_t block_stop_lsn= LSN_MAX;
|
|
uint read_len= 0;
|
|
uint bytes_left= 0;
|
|
|
|
arch_oper_mutex_enter();
|
|
auto end_lsn= group->get_end_lsn();
|
|
|
|
Arch_Page_Pos last_pos=
|
|
(end_lsn == LSN_MAX) ? m_write_pos : group->get_stop_pos();
|
|
arch_oper_mutex_exit();
|
|
|
|
while (true)
|
|
{
|
|
if (new_block)
|
|
{
|
|
temp_pos.m_block_num= cur_pos.m_block_num;
|
|
temp_pos.m_offset= 0;
|
|
|
|
/* Read the block header for data length and stop lsn info. */
|
|
err= group->read_data(temp_pos, header_buf, ARCH_PAGE_BLK_HEADER_LENGTH);
|
|
if (err != 0)
|
|
break;
|
|
|
|
block_stop_lsn= Arch_Block::get_stop_lsn(header_buf);
|
|
auto data_len= Arch_Block::get_data_len(header_buf);
|
|
bytes_left= data_len + ARCH_PAGE_BLK_HEADER_LENGTH;
|
|
|
|
ut_ad(bytes_left <= ARCH_PAGE_BLK_SIZE);
|
|
ut_ad(block_stop_lsn != LSN_MAX);
|
|
|
|
bytes_left-= cur_pos.m_offset;
|
|
|
|
if (data_len == 0 || cur_pos.m_block_num == last_pos.m_block_num ||
|
|
block_stop_lsn > stop_id)
|
|
{
|
|
ut_ad(block_stop_lsn >= stop_id);
|
|
stop_id= block_stop_lsn;
|
|
last_block= true;
|
|
}
|
|
|
|
DBUG_PRINT("page_archiver",
|
|
("%" PRIu64 " -> length : %u, stop lsn : %" PRIu64
|
|
", last block : %u",
|
|
cur_pos.m_block_num, data_len, block_stop_lsn, last_block));
|
|
}
|
|
|
|
ut_ad(cur_pos.m_offset <= ARCH_PAGE_BLK_SIZE);
|
|
|
|
/* Read how much ever is left to be read in the block. */
|
|
read_len= bytes_left;
|
|
|
|
if (last_block && read_len == 0)
|
|
/* There is nothing to read. */
|
|
break;
|
|
|
|
if (read_len > buf_len)
|
|
read_len= buf_len;
|
|
|
|
/* Read the block for list of pages */
|
|
err= group->read_data(cur_pos, buf, read_len);
|
|
if (err != 0)
|
|
break;
|
|
|
|
cur_pos.m_offset+= read_len;
|
|
bytes_left-= read_len;
|
|
num_pages= read_len / ARCH_BLK_PAGE_ID_SIZE;
|
|
|
|
err= cbk_func(thd, buf, buf_len, num_pages, cbk_ctx);
|
|
if (err != 0)
|
|
break;
|
|
|
|
if (bytes_left == 0)
|
|
{
|
|
/* We have read all the pages in the block. */
|
|
|
|
if (last_block)
|
|
break;
|
|
else
|
|
{
|
|
new_block= true;
|
|
bytes_left= 0;
|
|
read_len= 0;
|
|
cur_pos.set_next();
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
/* We still have some bytes to read from the current block. */
|
|
new_block = false;
|
|
}
|
|
arch_mutex_exit();
|
|
return 0;
|
|
}
|
|
|
|
bool Arch_Page_Sys::get_num_pages(Arch_Page_Pos start_pos,
|
|
Arch_Page_Pos stop_pos, uint64_t &num_pages)
|
|
{
|
|
if (start_pos.m_block_num > stop_pos.m_block_num ||
|
|
((start_pos.m_block_num == stop_pos.m_block_num) &&
|
|
(start_pos.m_offset >= stop_pos.m_offset)))
|
|
return false;
|
|
|
|
uint64_t length= 0;
|
|
|
|
if (start_pos.m_block_num != stop_pos.m_block_num)
|
|
{
|
|
length = ARCH_PAGE_BLK_SIZE - start_pos.m_offset;
|
|
length += stop_pos.m_offset - ARCH_PAGE_BLK_HEADER_LENGTH;
|
|
|
|
auto num_blocks = stop_pos.m_block_num - start_pos.m_block_num - 1;
|
|
length += num_blocks * (ARCH_PAGE_BLK_SIZE - ARCH_PAGE_BLK_HEADER_LENGTH);
|
|
|
|
}
|
|
else
|
|
length = stop_pos.m_offset - start_pos.m_offset;
|
|
|
|
num_pages= length / ARCH_BLK_PAGE_ID_SIZE;
|
|
return true;
|
|
}
|
|
|
|
int Arch_Page_Sys::get_num_pages(lsn_t &start_id, lsn_t &stop_id,
|
|
uint64_t *num_pages)
|
|
{
|
|
DBUG_PRINT("page_archiver", ("Fetch num pages"));
|
|
|
|
arch_mutex_enter();
|
|
/** 1. Get appropriate LSN range. */
|
|
Arch_Group *group= nullptr;
|
|
int error= fetch_group_within_lsn_range(start_id, stop_id, &group);
|
|
|
|
#ifdef UNIV_DEBUG
|
|
arch_oper_mutex_enter();
|
|
DBUG_PRINT("page_archiver", ("Start id: %" PRIu64 ", stop id: %" PRIu64 "",
|
|
start_id, stop_id));
|
|
if (is_active())
|
|
DBUG_PRINT("page_archiver",
|
|
("Write_pos : %" PRIu64 ", %u", m_write_pos.m_block_num,
|
|
m_write_pos.m_offset));
|
|
DBUG_PRINT("page_archiver",
|
|
("Latest stop lsn : %" PRIu64 "", m_latest_stop_lsn));
|
|
|
|
arch_oper_mutex_exit();
|
|
#endif
|
|
if (error != 0)
|
|
{
|
|
arch_mutex_exit();
|
|
return error;
|
|
}
|
|
ut_ad(group != nullptr);
|
|
|
|
/** 2. Get block position from where to start. */
|
|
Arch_Point start_point;
|
|
bool success = group->find_reset_point(start_id, start_point);
|
|
|
|
if (!success)
|
|
{
|
|
DBUG_PRINT("page_archiver",
|
|
("Can't fetch pages - No matching reset point."));
|
|
arch_mutex_exit();
|
|
return ER_PAGE_TRACKING_RANGE_NOT_TRACKED;
|
|
}
|
|
|
|
DBUG_PRINT(
|
|
"page_archiver",
|
|
("Start point - lsn : %" PRIu64 " \tpos : %" PRIu64 ", %u",
|
|
start_point.lsn, start_point.pos.m_block_num, start_point.pos.m_offset));
|
|
|
|
Arch_Page_Pos start_pos = start_point.pos;
|
|
start_id= start_point.lsn;
|
|
|
|
/** 3. Get block position where to stop */
|
|
Arch_Point stop_point;
|
|
|
|
success= group->find_stop_point(stop_id, stop_point, m_write_pos);
|
|
ut_ad(success);
|
|
|
|
DBUG_PRINT(
|
|
"page_archiver",
|
|
("Stop point - lsn : %" PRIu64 " \tpos : %" PRIu64 ", %u", stop_point.lsn,
|
|
stop_point.pos.m_block_num, stop_point.pos.m_offset));
|
|
|
|
arch_mutex_exit();
|
|
|
|
Arch_Page_Pos stop_pos= stop_point.pos;
|
|
stop_id= stop_point.lsn;
|
|
|
|
/** 4. Fetch number of pages tracked. */
|
|
ut_ad(start_point.lsn <= stop_point.lsn);
|
|
ut_ad(start_point.pos.m_block_num <= stop_point.pos.m_block_num);
|
|
|
|
success= get_num_pages(start_pos, stop_pos, *num_pages);
|
|
|
|
if (!success)
|
|
num_pages = nullptr;
|
|
|
|
DBUG_PRINT("page_archiver",
|
|
("Number of pages tracked : %" PRIu64 "", *num_pages));
|
|
return 0;
|
|
}
|
|
|
|
/** Wait for archive system to come out of #ARCH_STATE_PREPARE_IDLE.
|
|
If the system is preparing to idle, #start needs to wait
|
|
for it to come to idle state.
|
|
@return true, if successful
|
|
false, if needs to abort */
|
|
bool Arch_Page_Sys::wait_idle()
|
|
{
|
|
mysql_mutex_assert_owner(&m_mutex);
|
|
|
|
if (m_state == ARCH_STATE_PREPARE_IDLE)
|
|
{
|
|
arch_sys->signal_archiver();
|
|
bool is_timeout= false;
|
|
int alert_count= 0;
|
|
auto thd= current_thd;
|
|
|
|
auto err= Clone_Sys::wait_default(
|
|
[&](bool alert, bool &result)
|
|
{
|
|
mysql_mutex_assert_owner(&m_mutex);
|
|
result= (m_state == ARCH_STATE_PREPARE_IDLE);
|
|
|
|
if (srv_shutdown_state.load() >= SRV_SHUTDOWN_CLEANUP ||
|
|
(thd && thd_killed(thd)))
|
|
{
|
|
if (thd) my_error(ER_QUERY_INTERRUPTED, MYF(0));
|
|
return ER_QUERY_INTERRUPTED;
|
|
}
|
|
if (result)
|
|
{
|
|
arch_sys->signal_archiver();
|
|
/* Print messages every 1 minute - default is 5 seconds. */
|
|
if (alert && ++alert_count == 12)
|
|
{
|
|
alert_count= 0;
|
|
sql_print_information(
|
|
"Page Tracking start: waiting for idle state.");
|
|
}
|
|
}
|
|
return 0;
|
|
},
|
|
&m_mutex, is_timeout);
|
|
|
|
if (err == 0 && is_timeout)
|
|
{
|
|
err= ER_INTERNAL_ERROR;
|
|
sql_print_information(
|
|
"Page Tracking start: wait for idle state timed out");
|
|
ut_d(ut_error);
|
|
}
|
|
if (err != 0)
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/** Check if the gap from last reset is short.
|
|
If not many page IDs are added till last reset, we avoid taking a new reset
|
|
point
|
|
@return true, if the gap is small. */
|
|
bool Arch_Page_Sys::is_gap_small()
|
|
{
|
|
ut_ad(m_last_pos.m_block_num <= m_write_pos.m_block_num);
|
|
|
|
if (m_last_pos.m_block_num == m_write_pos.m_block_num)
|
|
return true;
|
|
|
|
auto next_block_num= m_last_pos.m_block_num + 1;
|
|
auto length= ARCH_PAGE_BLK_SIZE - m_last_pos.m_offset;
|
|
|
|
if (next_block_num != m_write_pos.m_block_num)
|
|
return false;
|
|
|
|
length+= m_write_pos.m_offset - ARCH_PAGE_BLK_HEADER_LENGTH;
|
|
|
|
/* Pages added since last reset. */
|
|
auto num_pages= length / ARCH_BLK_PAGE_ID_SIZE;
|
|
return num_pages < ARCH_PAGE_RESET_THRESHOLD;
|
|
}
|
|
|
|
/** Track pages for which IO is already started. */
|
|
void Arch_Page_Sys::track_initial_pages()
|
|
{
|
|
/* Page tracking must already be active. */
|
|
ut_ad(buf_pool.is_tracking().first);
|
|
|
|
mysql_mutex_lock(&buf_pool.flush_list_mutex);
|
|
buf_page_t *bpage= UT_LIST_GET_LAST(buf_pool.flush_list);
|
|
|
|
/* Add all pages for which IO is already started. */
|
|
while (bpage != nullptr)
|
|
{
|
|
if (fsp_is_system_temporary(bpage->id().space()))
|
|
{
|
|
bpage= UT_LIST_GET_PREV(list, bpage);
|
|
continue;
|
|
}
|
|
/* Check if we could finish traversing flush list earlier. */
|
|
if (buf_pool.is_lsn_more_than_max_io_lsn(bpage->oldest_modification()))
|
|
{
|
|
/* All pages with oldest_modification smaller than
|
|
bpage->oldest_modification have already been traversed. */
|
|
break;
|
|
}
|
|
if (bpage->is_write_fixed())
|
|
/* IO has already started. Must add the page */
|
|
track_page(bpage, LSN_MAX, LSN_MAX, true);
|
|
bpage= UT_LIST_GET_PREV(list, bpage);
|
|
}
|
|
mysql_mutex_unlock(&buf_pool.flush_list_mutex);
|
|
}
|
|
|
|
/** Enable tracking pages in all buffer pools.
|
|
@param[in] tracking_lsn track pages from this LSN */
|
|
void Arch_Page_Sys::set_tracking_buf_pool(lsn_t tracking_lsn)
|
|
{
|
|
mysql_mutex_lock(&buf_pool.mutex);
|
|
buf_pool.set_tracking(tracking_lsn);
|
|
mysql_mutex_unlock(&buf_pool.mutex);
|
|
}
|
|
|
|
int Arch_Page_Sys::recovery_load_and_start(const Arch_Recv_Group_Info &info)
|
|
{
|
|
/* Initialise the page archiver with the info parsed from the files. */
|
|
|
|
m_current_group= info.m_group;
|
|
|
|
m_write_pos= info.m_write_pos;
|
|
m_reset_pos= info.m_reset_pos;
|
|
m_flush_pos= m_write_pos;
|
|
|
|
Arch_Reset_File last_reset_file= info.m_last_reset_file;
|
|
ut_ad(last_reset_file.m_start_point.size() > 0);
|
|
Arch_Point reset_point= last_reset_file.m_start_point.back();
|
|
|
|
m_last_pos= reset_point.pos;
|
|
m_last_lsn= reset_point.lsn;
|
|
m_last_reset_file_index= last_reset_file.m_file_index;
|
|
|
|
ut_ad(m_last_lsn != LSN_MAX);
|
|
|
|
auto err= m_ctx->init_during_recovery(m_current_group, m_last_lsn);
|
|
if (err != 0)
|
|
return err;
|
|
|
|
if (info.m_new_empty_file)
|
|
{
|
|
m_flush_pos.set_next();
|
|
m_write_pos.set_next();
|
|
m_reset_pos.set_next();
|
|
m_last_reset_file_index= m_reset_pos.m_block_num;
|
|
}
|
|
|
|
/* Reload both reset block and write block active at the time of a crash. */
|
|
auto cur_blk= m_data.get_block(&m_write_pos, ARCH_DATA_BLOCK);
|
|
auto reset_block= m_data.get_block(&m_reset_pos, ARCH_RESET_BLOCK);
|
|
|
|
arch_mutex_enter();
|
|
arch_oper_mutex_enter();
|
|
|
|
cur_blk->begin_write(m_write_pos);
|
|
reset_block->begin_write(m_write_pos);
|
|
|
|
if (!info.m_new_empty_file)
|
|
{
|
|
cur_blk->set_data_len(m_write_pos.m_offset - ARCH_PAGE_BLK_HEADER_LENGTH);
|
|
cur_blk->set_data(ARCH_PAGE_BLK_SIZE, info.m_last_data_block, 0);
|
|
|
|
reset_block->set_data_len(m_reset_pos.m_offset -
|
|
ARCH_PAGE_BLK_HEADER_LENGTH);
|
|
reset_block->set_data(ARCH_PAGE_BLK_SIZE, info.m_last_reset_block, 0);
|
|
}
|
|
|
|
ut_d(print());
|
|
|
|
arch_oper_mutex_exit();
|
|
arch_mutex_exit();
|
|
return err;
|
|
}
|
|
|
|
int Arch_Page_Sys::start(Arch_Group **group, lsn_t *start_lsn,
|
|
Arch_Page_Pos *start_pos, bool is_durable,
|
|
bool restart, bool recovery)
|
|
{
|
|
/* Check if archiver task needs to be started. */
|
|
arch_mutex_enter();
|
|
|
|
if (m_state == ARCH_STATE_READ_ONLY)
|
|
{
|
|
arch_mutex_exit();
|
|
return 0;
|
|
}
|
|
|
|
bool start_archiver= true;
|
|
bool attach_to_current= false;
|
|
bool acquired_oper_mutex= false;
|
|
|
|
lsn_t log_sys_lsn= LSN_MAX;
|
|
|
|
start_archiver= is_init();
|
|
|
|
/* Wait for idle state, if preparing to idle. */
|
|
if (!wait_idle())
|
|
{
|
|
int err= 0;
|
|
|
|
if (srv_shutdown_state.load() >= SRV_SHUTDOWN_CLEANUP)
|
|
{
|
|
err= ER_QUERY_INTERRUPTED;
|
|
my_error(err, MYF(0));
|
|
}
|
|
else
|
|
{
|
|
err= ER_INTERNAL_ERROR;
|
|
my_error(err, MYF(0), "Page Archiver wait too long");
|
|
}
|
|
|
|
arch_mutex_exit();
|
|
return err;
|
|
}
|
|
|
|
switch (m_state)
|
|
{
|
|
case ARCH_STATE_ABORT:
|
|
arch_mutex_exit();
|
|
my_error(ER_QUERY_INTERRUPTED, MYF(0));
|
|
return ER_QUERY_INTERRUPTED;
|
|
|
|
case ARCH_STATE_INIT:
|
|
case ARCH_STATE_IDLE:
|
|
[[fallthrough]];
|
|
|
|
case ARCH_STATE_ACTIVE:
|
|
|
|
if (m_current_group != nullptr)
|
|
{
|
|
/* If gap is small, just attach to current group */
|
|
attach_to_current= (recovery ? false : is_gap_small());
|
|
|
|
if (attach_to_current)
|
|
DBUG_PRINT("page_archiver",
|
|
("Gap is small - last pos : %" PRIu64
|
|
" %u, write_pos : %" PRIu64 " %u",
|
|
m_last_pos.m_block_num, m_last_pos.m_offset,
|
|
m_write_pos.m_block_num, m_write_pos.m_offset));
|
|
}
|
|
|
|
if (!attach_to_current)
|
|
{
|
|
log_sys.latch.wr_lock(SRW_LOCK_CALL);
|
|
|
|
if (!recovery)
|
|
MONITOR_INC(MONITOR_PAGE_TRACK_RESETS);
|
|
|
|
log_sys_lsn= recovery ? m_last_lsn : log_sys.get_lsn();
|
|
|
|
/* Enable/Reset buffer pool page tracking. */
|
|
set_tracking_buf_pool(log_sys_lsn);
|
|
|
|
/* Take operation mutex before releasing log_sys to
|
|
ensure that all pages modified after log_sys_lsn are
|
|
tracked. */
|
|
arch_oper_mutex_enter();
|
|
acquired_oper_mutex= true;
|
|
|
|
log_sys.latch.wr_unlock();
|
|
}
|
|
else
|
|
{
|
|
arch_oper_mutex_enter();
|
|
acquired_oper_mutex= true;
|
|
}
|
|
break;
|
|
|
|
case ARCH_STATE_PREPARE_IDLE:
|
|
default:
|
|
ut_d(ut_error);
|
|
}
|
|
|
|
if (is_init() && !m_data.init())
|
|
{
|
|
ut_ad(!attach_to_current);
|
|
acquired_oper_mutex= false;
|
|
arch_oper_mutex_exit();
|
|
arch_mutex_exit();
|
|
|
|
my_error(ER_OUTOFMEMORY, MYF(0), ARCH_PAGE_BLK_SIZE);
|
|
return ER_OUTOFMEMORY;
|
|
}
|
|
|
|
/* Start archiver background task. */
|
|
if (start_archiver)
|
|
{
|
|
ut_ad(!attach_to_current);
|
|
auto err= arch_sys->start_archiver();
|
|
|
|
if (err != 0)
|
|
{
|
|
acquired_oper_mutex= false;
|
|
arch_oper_mutex_exit();
|
|
arch_mutex_exit();
|
|
|
|
sql_print_error("Could not start Page Archiver background task");
|
|
return err;
|
|
}
|
|
}
|
|
|
|
/* Create a new archive group. */
|
|
if (m_current_group == nullptr)
|
|
{
|
|
ut_ad(!attach_to_current);
|
|
|
|
m_last_pos.init();
|
|
m_flush_pos.init();
|
|
m_write_pos.init();
|
|
m_reset_pos.init();
|
|
m_request_flush_pos.init();
|
|
m_request_blk_num_with_lsn= std::numeric_limits<uint64_t>::max();
|
|
m_flush_blk_num_with_lsn= std::numeric_limits<uint64_t>::max();
|
|
|
|
m_last_lsn= log_sys_lsn;
|
|
m_last_reset_file_index= 0;
|
|
|
|
m_current_group=
|
|
UT_NEW(Arch_Group(log_sys_lsn, ARCH_PAGE_FILE_HDR_SIZE, &m_mutex),
|
|
mem_key_archive);
|
|
if (m_current_group == nullptr)
|
|
{
|
|
acquired_oper_mutex= false;
|
|
arch_oper_mutex_exit();
|
|
arch_mutex_exit();
|
|
|
|
my_error(ER_OUTOFMEMORY, MYF(0), sizeof(Arch_Group));
|
|
return ER_OUTOFMEMORY;
|
|
}
|
|
|
|
const uint64_t new_file_size=
|
|
static_cast<uint64_t>(ARCH_PAGE_BLK_SIZE) * ARCH_PAGE_FILE_CAPACITY;
|
|
|
|
/* Initialize archiver file context. */
|
|
auto db_err= m_current_group->init_file_ctx(
|
|
ARCH_DIR, ARCH_PAGE_DIR, ARCH_PAGE_FILE, 0, new_file_size, 0);
|
|
|
|
if (db_err != DB_SUCCESS)
|
|
{
|
|
arch_oper_mutex_exit();
|
|
arch_mutex_exit();
|
|
|
|
my_error(ER_OUTOFMEMORY, MYF(0), sizeof(Arch_File_Ctx));
|
|
return ER_OUTOFMEMORY;
|
|
}
|
|
|
|
m_group_list.push_back(m_current_group);
|
|
Arch_Block *reset_block= m_data.get_block(&m_reset_pos, ARCH_RESET_BLOCK);
|
|
reset_block->begin_write(m_write_pos);
|
|
|
|
DBUG_PRINT("page_archiver", ("Creating a new archived group."));
|
|
|
|
}
|
|
else if (!attach_to_current && !recovery)
|
|
{
|
|
/* It's a reset. */
|
|
m_last_lsn= log_sys_lsn;
|
|
m_last_pos= m_write_pos;
|
|
DBUG_PRINT("page_archiver", ("It's a reset."));
|
|
}
|
|
|
|
m_state= ARCH_STATE_ACTIVE;
|
|
*start_lsn= m_last_lsn;
|
|
|
|
bool wait_for_block_flush= false;
|
|
|
|
if (!recovery)
|
|
{
|
|
if (!attach_to_current)
|
|
wait_for_block_flush= save_reset_point(is_durable);
|
|
|
|
else if (is_durable && !m_current_group->is_durable())
|
|
{
|
|
/* In case this is the first durable archiving of the group and if the
|
|
gap is small for a reset then set the below variable and wait for the
|
|
reset info to be flushed before we return to the caller. */
|
|
|
|
wait_for_block_flush= true;
|
|
m_request_blk_num_with_lsn= m_last_pos.m_block_num;
|
|
}
|
|
}
|
|
acquired_oper_mutex= false;
|
|
arch_oper_mutex_exit();
|
|
|
|
ut_ad(m_last_lsn != LSN_MAX);
|
|
ut_ad(m_current_group != nullptr);
|
|
|
|
if (!restart)
|
|
{
|
|
/* Add pages to tracking for which IO has already started. */
|
|
track_initial_pages();
|
|
|
|
*group= m_current_group;
|
|
*start_pos= m_last_pos;
|
|
|
|
arch_oper_mutex_enter();
|
|
acquired_oper_mutex= true;
|
|
|
|
/* Attach to the group. */
|
|
m_current_group->attach(is_durable);
|
|
|
|
}
|
|
else if (recovery)
|
|
{
|
|
arch_oper_mutex_enter();
|
|
acquired_oper_mutex= true;
|
|
|
|
/* Attach to the group. */
|
|
m_current_group->attach(is_durable);
|
|
}
|
|
|
|
ut_ad(*group == m_current_group);
|
|
|
|
if (acquired_oper_mutex)
|
|
arch_oper_mutex_exit();
|
|
|
|
arch_mutex_exit();
|
|
|
|
if (wait_for_block_flush)
|
|
{
|
|
bool success= wait_for_reset_info_flush(m_request_blk_num_with_lsn);
|
|
|
|
if (!success)
|
|
{
|
|
const char* mesg= my_get_err_msg(ER_IB_WRN_PAGE_ARCH_FLUSH_DATA);
|
|
sql_print_warning("%s", mesg);
|
|
}
|
|
ut_ad(m_current_group->get_file_count());
|
|
}
|
|
|
|
if (!recovery)
|
|
{
|
|
if (is_durable && !restart)
|
|
{
|
|
m_current_group->mark_active();
|
|
m_current_group->mark_durable();
|
|
}
|
|
/* Request checkpoint */
|
|
log_make_checkpoint();
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int Arch_Page_Sys::stop(Arch_Group *group, lsn_t *stop_lsn,
|
|
Arch_Page_Pos *stop_pos, bool is_durable)
|
|
{
|
|
Arch_Block *cur_blk;
|
|
arch_mutex_enter();
|
|
|
|
if (m_state == ARCH_STATE_READ_ONLY)
|
|
{
|
|
arch_mutex_exit();
|
|
return 0;
|
|
}
|
|
ut_ad(group == m_current_group);
|
|
ut_ad(m_state == ARCH_STATE_ACTIVE);
|
|
|
|
arch_oper_mutex_enter();
|
|
|
|
*stop_lsn= m_latest_stop_lsn;
|
|
cur_blk= m_data.get_block(&m_write_pos, ARCH_DATA_BLOCK);
|
|
update_stop_info(cur_blk);
|
|
|
|
auto count= group->detach(*stop_lsn, &m_write_pos);
|
|
arch_oper_mutex_exit();
|
|
|
|
int err= 0;
|
|
bool wait_for_block_flush= false;
|
|
|
|
/* If no other active client, let the system get into idle state. */
|
|
if (count == 0 && m_state != ARCH_STATE_ABORT)
|
|
{
|
|
set_tracking_buf_pool(LSN_MAX);
|
|
arch_oper_mutex_enter();
|
|
|
|
m_state= ARCH_STATE_PREPARE_IDLE;
|
|
*stop_pos= m_write_pos;
|
|
|
|
cur_blk->end_write();
|
|
m_request_flush_pos= m_write_pos;
|
|
m_write_pos.set_next();
|
|
|
|
arch_sys->signal_archiver();
|
|
wait_for_block_flush= m_current_group->is_durable() ? true : false;
|
|
|
|
}
|
|
else
|
|
{
|
|
if (m_state != ARCH_STATE_ABORT && is_durable &&
|
|
!m_current_group->is_durable_client_active())
|
|
/* In case the non-durable clients are still active but there are no
|
|
active durable clients we need to mark the group inactive for recovery
|
|
to know that no durable clients were active. */
|
|
err = m_current_group->mark_inactive();
|
|
|
|
arch_oper_mutex_enter();
|
|
*stop_pos= m_write_pos;
|
|
}
|
|
|
|
if (m_state == ARCH_STATE_ABORT)
|
|
{
|
|
my_error(ER_QUERY_INTERRUPTED, MYF(0));
|
|
err= ER_QUERY_INTERRUPTED;
|
|
}
|
|
|
|
arch_oper_mutex_exit();
|
|
arch_mutex_exit();
|
|
|
|
if (wait_for_block_flush)
|
|
{
|
|
/* Wait for flush archiver to flush the blocks. */
|
|
auto cbk= [&]()
|
|
{
|
|
return (m_flush_pos.m_block_num > m_request_flush_pos.m_block_num ? false
|
|
: true);
|
|
};
|
|
arch_oper_mutex_enter();
|
|
|
|
if (!wait_flush_archiver(cbk))
|
|
{
|
|
const char* mesg= my_get_err_msg(ER_IB_WRN_PAGE_ARCH_FLUSH_DATA);
|
|
sql_print_warning("%s", mesg);
|
|
}
|
|
arch_oper_mutex_exit();
|
|
ut_ad(group->validate_info_in_files());
|
|
}
|
|
return err;
|
|
}
|
|
|
|
void Arch_Page_Sys::release(Arch_Group *group, bool is_durable,
|
|
Arch_Page_Pos start_pos [[maybe_unused]])
|
|
{
|
|
arch_mutex_enter();
|
|
arch_oper_mutex_enter();
|
|
|
|
group->release(is_durable);
|
|
arch_oper_mutex_exit();
|
|
|
|
if (group->is_active())
|
|
{
|
|
arch_mutex_exit();
|
|
return;
|
|
}
|
|
|
|
ut_ad(group != m_current_group);
|
|
|
|
if (!group->is_referenced())
|
|
{
|
|
m_group_list.remove(group);
|
|
UT_DELETE(group);
|
|
}
|
|
arch_mutex_exit();
|
|
}
|
|
|
|
dberr_t Arch_Page_Sys::flush_inactive_blocks(Arch_Page_Pos &cur_pos,
|
|
Arch_Page_Pos end_pos)
|
|
{
|
|
dberr_t err= DB_SUCCESS;
|
|
Arch_Block *cur_blk;
|
|
|
|
/* Write all blocks that are ready for flushing. */
|
|
while (cur_pos.m_block_num < end_pos.m_block_num)
|
|
{
|
|
cur_blk= m_data.get_block(&cur_pos, ARCH_DATA_BLOCK);
|
|
|
|
err= cur_blk->flush(m_current_group, ARCH_FLUSH_NORMAL);
|
|
if (err != DB_SUCCESS)
|
|
break;
|
|
|
|
MONITOR_INC(MONITOR_PAGE_TRACK_FULL_BLOCK_WRITES);
|
|
|
|
arch_oper_mutex_enter();
|
|
|
|
m_flush_blk_num_with_lsn= cur_pos.m_block_num;
|
|
cur_pos.set_next();
|
|
cur_blk->set_flushed();
|
|
m_flush_pos.set_next();
|
|
|
|
arch_oper_mutex_exit();
|
|
}
|
|
return err;
|
|
}
|
|
|
|
dberr_t Arch_Page_Sys::flush_active_block(Arch_Page_Pos cur_pos,
|
|
bool partial_reset_block_flush)
|
|
{
|
|
Arch_Block *cur_blk;
|
|
cur_blk= m_data.get_block(&cur_pos, ARCH_DATA_BLOCK);
|
|
|
|
arch_oper_mutex_enter();
|
|
|
|
if (!cur_blk->is_active())
|
|
{
|
|
arch_oper_mutex_exit();
|
|
return DB_SUCCESS;
|
|
}
|
|
|
|
/* Copy block data so that we can release the arch_oper_mutex soon. */
|
|
Arch_Block *flush_blk= m_data.get_partial_flush_block();
|
|
flush_blk->copy_data(cur_blk);
|
|
|
|
arch_oper_mutex_exit();
|
|
|
|
dberr_t err= flush_blk->flush(m_current_group, ARCH_FLUSH_PARTIAL);
|
|
if (err != DB_SUCCESS)
|
|
return (err);
|
|
|
|
MONITOR_INC(MONITOR_PAGE_TRACK_PARTIAL_BLOCK_WRITES);
|
|
|
|
if (partial_reset_block_flush)
|
|
{
|
|
arch_oper_mutex_enter();
|
|
Arch_Block *reset_block= m_data.get_block(&m_reset_pos, ARCH_RESET_BLOCK);
|
|
|
|
arch_oper_mutex_exit();
|
|
|
|
err= reset_block->flush(m_current_group, ARCH_FLUSH_NORMAL);
|
|
if (err != DB_SUCCESS)
|
|
return err;
|
|
}
|
|
|
|
arch_oper_mutex_enter();
|
|
|
|
m_flush_pos.m_offset=
|
|
flush_blk->get_data_len() + ARCH_PAGE_BLK_HEADER_LENGTH;
|
|
|
|
arch_oper_mutex_exit();
|
|
return err;
|
|
}
|
|
|
|
dberr_t Arch_Page_Sys::flush_blocks(bool *wait)
|
|
{
|
|
arch_oper_mutex_enter();
|
|
|
|
auto request_flush_pos= m_request_flush_pos;
|
|
auto cur_pos= m_flush_pos;
|
|
auto end_pos= m_write_pos;
|
|
auto request_blk_num_with_lsn= m_request_blk_num_with_lsn;
|
|
auto flush_blk_num_with_lsn= m_flush_blk_num_with_lsn;
|
|
|
|
arch_oper_mutex_exit();
|
|
|
|
uint64_t ARCH_UNKNOWN_BLOCK= std::numeric_limits<uint64_t>::max();
|
|
|
|
ut_ad(cur_pos.m_block_num <= end_pos.m_block_num);
|
|
|
|
/* Caller needs to wait/sleep, if nothing to flush. */
|
|
*wait = (cur_pos.m_block_num == end_pos.m_block_num);
|
|
|
|
auto err = flush_inactive_blocks(cur_pos, end_pos);
|
|
if (err != DB_SUCCESS)
|
|
return err;
|
|
|
|
if (cur_pos.m_block_num == end_pos.m_block_num)
|
|
{
|
|
/* Partial Flush */
|
|
bool data_block_flush=
|
|
request_flush_pos.m_block_num == cur_pos.m_block_num &&
|
|
request_flush_pos.m_offset > cur_pos.m_offset;
|
|
bool reset_block_flush=
|
|
request_blk_num_with_lsn != ARCH_UNKNOWN_BLOCK &&
|
|
(flush_blk_num_with_lsn == ARCH_UNKNOWN_BLOCK ||
|
|
request_blk_num_with_lsn > flush_blk_num_with_lsn);
|
|
|
|
/* We do partial flush only if we're explicitly requested to flush. */
|
|
if (data_block_flush || reset_block_flush)
|
|
{
|
|
err= flush_active_block(cur_pos, reset_block_flush);
|
|
if (err != DB_SUCCESS)
|
|
return err;
|
|
}
|
|
arch_oper_mutex_enter();
|
|
|
|
if (request_blk_num_with_lsn != ARCH_UNKNOWN_BLOCK &&
|
|
(flush_blk_num_with_lsn == ARCH_UNKNOWN_BLOCK ||
|
|
request_blk_num_with_lsn > flush_blk_num_with_lsn))
|
|
m_flush_blk_num_with_lsn = request_blk_num_with_lsn;
|
|
|
|
arch_oper_mutex_exit();
|
|
}
|
|
return err;
|
|
}
|
|
|
|
bool Arch_Page_Sys::archive(bool *wait)
|
|
{
|
|
dberr_t db_err;
|
|
|
|
auto is_abort= (srv_shutdown_state.load() == SRV_SHUTDOWN_LAST_PHASE ||
|
|
srv_shutdown_state.load() == SRV_SHUTDOWN_EXIT_THREADS ||
|
|
m_state == ARCH_STATE_ABORT);
|
|
|
|
arch_oper_mutex_enter();
|
|
|
|
/* Check if archiving state is inactive. */
|
|
if (m_state == ARCH_STATE_IDLE || m_state == ARCH_STATE_INIT)
|
|
{
|
|
*wait= true;
|
|
|
|
if (is_abort)
|
|
{
|
|
m_state = ARCH_STATE_ABORT;
|
|
arch_oper_mutex_exit();
|
|
return true;
|
|
}
|
|
arch_oper_mutex_exit();
|
|
return false;
|
|
}
|
|
|
|
/* ARCH_STATE_ABORT is set for flush timeout which is asserted in debug. */
|
|
ut_ad(m_state == ARCH_STATE_ACTIVE || m_state == ARCH_STATE_PREPARE_IDLE);
|
|
|
|
auto set_idle= (m_state == ARCH_STATE_PREPARE_IDLE);
|
|
arch_oper_mutex_exit();
|
|
|
|
db_err= flush_blocks(wait);
|
|
|
|
if (db_err != DB_SUCCESS)
|
|
is_abort= true;
|
|
|
|
/* Move to idle state or abort, if needed. */
|
|
if (set_idle || is_abort)
|
|
{
|
|
arch_mutex_enter();
|
|
arch_oper_mutex_enter();
|
|
|
|
m_current_group->disable(LSN_MAX);
|
|
m_current_group->close_file_ctxs();
|
|
|
|
int err= 0;
|
|
|
|
if (!is_abort && m_current_group->is_durable())
|
|
{
|
|
err= m_current_group->mark_inactive();
|
|
|
|
Arch_Group::init_dblwr_file_ctx(
|
|
ARCH_DBLWR_DIR, ARCH_DBLWR_FILE, ARCH_DBLWR_NUM_FILES,
|
|
static_cast<uint64_t>(ARCH_PAGE_BLK_SIZE) * ARCH_DBLWR_FILE_CAPACITY);
|
|
|
|
ut_ad(m_current_group->validate_info_in_files());
|
|
}
|
|
if (err != 0)
|
|
is_abort= true;
|
|
|
|
/* Cleanup group, if no reference. */
|
|
if (!m_current_group->is_referenced())
|
|
{
|
|
m_group_list.remove(m_current_group);
|
|
UT_DELETE(m_current_group);
|
|
}
|
|
|
|
m_current_group= nullptr;
|
|
m_state= is_abort ? ARCH_STATE_ABORT : ARCH_STATE_IDLE;
|
|
|
|
arch_oper_mutex_exit();
|
|
arch_mutex_exit();
|
|
}
|
|
return is_abort;
|
|
}
|
|
|
|
int Arch_Group::read_from_file(Arch_Page_Pos *read_pos, uint read_len,
|
|
byte *read_buff)
|
|
{
|
|
char errbuf[MYSYS_STRERROR_SIZE];
|
|
char file_name[MAX_ARCH_PAGE_FILE_NAME_LEN];
|
|
|
|
/* Build file name */
|
|
auto file_index= static_cast<uint>(
|
|
Arch_Block::get_file_index(read_pos->m_block_num, ARCH_DATA_BLOCK));
|
|
|
|
get_file_name(file_index, file_name, MAX_ARCH_PAGE_FILE_NAME_LEN);
|
|
|
|
/* Find offset to read from. */
|
|
os_offset_t offset=
|
|
Arch_Block::get_file_offset(read_pos->m_block_num, ARCH_DATA_BLOCK);
|
|
offset+= read_pos->m_offset;
|
|
|
|
bool success;
|
|
/* Open file in read only mode. */
|
|
pfs_os_file_t file=
|
|
os_file_create(innodb_arch_file_key, file_name, OS_FILE_OPEN,
|
|
OS_CLONE_LOG_FILE, true, &success);
|
|
|
|
if (!success)
|
|
{
|
|
my_error(ER_CANT_OPEN_FILE, MYF(0), file_name, errno,
|
|
my_strerror(errbuf, sizeof(errbuf), errno));
|
|
return ER_CANT_OPEN_FILE;
|
|
}
|
|
|
|
/* Read from file to the user buffer. */
|
|
auto db_err=
|
|
os_file_read(IORequestRead, file, read_buff, offset, read_len, nullptr);
|
|
|
|
os_file_close(file);
|
|
|
|
if (db_err != DB_SUCCESS)
|
|
{
|
|
my_error(ER_ERROR_ON_READ, MYF(0), file_name, errno,
|
|
my_strerror(errbuf, sizeof(errbuf), errno));
|
|
return ER_ERROR_ON_READ;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int Arch_Group::read_data(Arch_Page_Pos cur_pos, byte *buff, uint buff_len)
|
|
{
|
|
int err= 0;
|
|
|
|
/* Attempt to read from in memory buffer. */
|
|
auto success= arch_sys->page_sys()->get_pages(this, &cur_pos, buff_len,
|
|
buff);
|
|
if (!success)
|
|
/* The buffer is overwritten. Read from file. */
|
|
err= read_from_file(&cur_pos, buff_len, buff);
|
|
|
|
return err;
|
|
}
|
|
|
|
bool Arch_Page_Sys::save_reset_point(bool is_durable)
|
|
{
|
|
/* 1. Add the reset info to the reset block */
|
|
uint current_file_index=
|
|
Arch_Block::get_file_index(m_last_pos.m_block_num, ARCH_DATA_BLOCK);
|
|
|
|
auto reset_block= m_data.get_block(&m_reset_pos, ARCH_RESET_BLOCK);
|
|
|
|
/* If the reset info should belong to a new file then re-intialize the
|
|
block as the block from now on will contain reset information belonging
|
|
to the new file */
|
|
if (m_last_reset_file_index != current_file_index)
|
|
{
|
|
ut_ad(current_file_index > m_last_reset_file_index);
|
|
reset_block->begin_write(m_last_pos);
|
|
}
|
|
m_last_reset_file_index= current_file_index;
|
|
reset_block->add_reset(m_last_lsn, m_last_pos);
|
|
m_current_group->save_reset_point_in_mem(m_last_lsn, m_last_pos);
|
|
|
|
auto cur_block= m_data.get_block(&m_last_pos, ARCH_DATA_BLOCK);
|
|
|
|
if (cur_block->get_state() == ARCH_BLOCK_INIT ||
|
|
cur_block->get_state() == ARCH_BLOCK_FLUSHED)
|
|
cur_block->begin_write(m_last_pos);
|
|
|
|
m_latest_stop_lsn=
|
|
log_sys.last_checkpoint_lsn.load(std::memory_order_seq_cst);
|
|
update_stop_info(cur_block);
|
|
|
|
/* 2. Add the reset lsn to the current write_pos block header and request the
|
|
flush archiver to flush the data block and reset block */
|
|
cur_block->update_block_header(LSN_MAX, m_last_lsn);
|
|
ut_d(auto ARCH_UNKNOWN_BLOCK = std::numeric_limits<uint64_t>::max());
|
|
|
|
/* Reset LSN for a block can be updated only once. */
|
|
ut_ad(m_flush_blk_num_with_lsn == ARCH_UNKNOWN_BLOCK ||
|
|
m_flush_blk_num_with_lsn < cur_block->get_number());
|
|
ut_ad(m_request_blk_num_with_lsn == ARCH_UNKNOWN_BLOCK ||
|
|
m_request_blk_num_with_lsn < cur_block->get_number());
|
|
|
|
uint64_t request_blk_num_with_lsn = cur_block->get_number();
|
|
m_request_blk_num_with_lsn= request_blk_num_with_lsn;
|
|
|
|
DBUG_PRINT("page_archiver",
|
|
("Saved reset point at %u - %" PRIu64 ", %" PRIu64 ", %u\n",
|
|
m_last_reset_file_index, m_last_lsn, m_last_pos.m_block_num,
|
|
m_last_pos.m_offset));
|
|
|
|
return is_durable;
|
|
}
|
|
|
|
bool Arch_Page_Sys::wait_for_reset_info_flush(uint64_t request_blk)
|
|
{
|
|
auto ARCH_UNKNOWN_BLOCK = std::numeric_limits<uint64_t>::max();
|
|
|
|
auto cbk = [&]()
|
|
{
|
|
return (m_flush_blk_num_with_lsn == ARCH_UNKNOWN_BLOCK ||
|
|
request_blk > m_flush_blk_num_with_lsn);
|
|
};
|
|
|
|
arch_oper_mutex_enter();
|
|
bool success= wait_flush_archiver(cbk);
|
|
arch_oper_mutex_exit();
|
|
|
|
return success;
|
|
}
|
|
|
|
int Arch_Page_Sys::fetch_group_within_lsn_range(lsn_t &start_id, lsn_t &stop_id,
|
|
Arch_Group **group)
|
|
{
|
|
mysql_mutex_assert_owner(&m_mutex);
|
|
|
|
if (start_id != 0 && stop_id != 0 && start_id >= stop_id)
|
|
return ER_PAGE_TRACKING_RANGE_NOT_TRACKED;
|
|
|
|
arch_oper_mutex_enter();
|
|
auto latest_stop_lsn= m_latest_stop_lsn;
|
|
arch_oper_mutex_exit();
|
|
|
|
ut_ad(latest_stop_lsn != LSN_MAX);
|
|
|
|
if (start_id == 0 || stop_id == 0)
|
|
{
|
|
if (m_current_group == nullptr || !m_current_group->is_active())
|
|
return ER_PAGE_TRACKING_RANGE_NOT_TRACKED;
|
|
|
|
*group= m_current_group;
|
|
ut_ad(m_last_lsn != LSN_MAX);
|
|
|
|
start_id= (start_id == 0) ? m_last_lsn : start_id;
|
|
stop_id= (stop_id == 0) ? latest_stop_lsn : stop_id;
|
|
}
|
|
|
|
if (start_id >= stop_id || start_id == LSN_MAX || stop_id == LSN_MAX)
|
|
return ER_PAGE_TRACKING_RANGE_NOT_TRACKED;
|
|
|
|
if (*group == nullptr)
|
|
{
|
|
for (auto it : m_group_list)
|
|
{
|
|
*group= it;
|
|
if (start_id < (*group)->get_begin_lsn() ||
|
|
(!(*group)->is_active() && stop_id > (*group)->get_end_lsn()) ||
|
|
((*group)->is_active() && stop_id > latest_stop_lsn))
|
|
{
|
|
*group= nullptr;
|
|
continue;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
return (*group) ? 0 : ER_PAGE_TRACKING_RANGE_NOT_TRACKED;
|
|
}
|
|
|
|
uint Arch_Page_Sys::purge(lsn_t *purge_lsn)
|
|
{
|
|
lsn_t purged_lsn= LSN_MAX;
|
|
uint err= 0;
|
|
|
|
if (*purge_lsn == 0)
|
|
*purge_lsn = log_sys.last_checkpoint_lsn.load(std::memory_order_seq_cst);
|
|
|
|
DBUG_PRINT("page_archiver", ("Purging of files - %" PRIu64 "", *purge_lsn));
|
|
arch_mutex_enter();
|
|
|
|
for (auto it = m_group_list.begin(); it != m_group_list.end();)
|
|
{
|
|
lsn_t group_purged_lsn= LSN_MAX;
|
|
auto group= *it;
|
|
DBUG_PRINT("page_archiver",
|
|
("End lsn - %" PRIu64 "", group->get_end_lsn()));
|
|
|
|
err= group->purge(*purge_lsn, group_purged_lsn);
|
|
|
|
if (group_purged_lsn == LSN_MAX)
|
|
break;
|
|
|
|
DBUG_PRINT("page_archiver",
|
|
("Group purged lsn - %" PRIu64 "", group_purged_lsn));
|
|
|
|
if (purged_lsn == LSN_MAX || group_purged_lsn > purged_lsn)
|
|
purged_lsn= group_purged_lsn;
|
|
|
|
if (!group->is_active() && group->get_end_lsn() <= group_purged_lsn)
|
|
{
|
|
it= m_group_list.erase(it);
|
|
UT_DELETE(group);
|
|
|
|
DBUG_PRINT("page_archiver", ("Purged entire group."));
|
|
continue;
|
|
}
|
|
++it;
|
|
}
|
|
DBUG_PRINT("page_archiver",
|
|
("Purged archived file until : %" PRIu64 "", purged_lsn));
|
|
*purge_lsn= purged_lsn;
|
|
|
|
if (purged_lsn == LSN_MAX)
|
|
{
|
|
arch_mutex_exit();
|
|
return err;
|
|
}
|
|
m_latest_purged_lsn = purged_lsn;
|
|
arch_mutex_exit();
|
|
|
|
return err;
|
|
}
|
|
|
|
void Arch_Page_Sys::update_stop_info(Arch_Block *cur_blk)
|
|
{
|
|
mysql_mutex_assert_owner(&m_oper_mutex);
|
|
|
|
if (cur_blk != nullptr)
|
|
cur_blk->update_block_header(m_latest_stop_lsn, LSN_MAX);
|
|
|
|
if (m_current_group != nullptr)
|
|
m_current_group->update_stop_point(m_write_pos, m_latest_stop_lsn);
|
|
}
|
|
|
|
#ifdef UNIV_DEBUG
|
|
void Arch_Page_Sys::print()
|
|
{
|
|
DBUG_PRINT("page_archiver", ("State : %u", m_state));
|
|
DBUG_PRINT("page_archiver", ("Last pos : %" PRIu64 ", %u",
|
|
m_last_pos.m_block_num, m_last_pos.m_offset));
|
|
DBUG_PRINT("page_archiver", ("Last lsn : %" PRIu64 "", m_last_lsn));
|
|
DBUG_PRINT("page_archiver",
|
|
("Latest stop lsn : %" PRIu64 "", m_latest_stop_lsn));
|
|
DBUG_PRINT("page_archiver", ("Flush pos : %" PRIu64 ", %u",
|
|
m_flush_pos.m_block_num, m_flush_pos.m_offset));
|
|
DBUG_PRINT("page_archiver", ("Write pos : %" PRIu64 ", %u",
|
|
m_write_pos.m_block_num, m_write_pos.m_offset));
|
|
DBUG_PRINT("page_archiver", ("Reset pos : %" PRIu64 ", %u",
|
|
m_reset_pos.m_block_num, m_reset_pos.m_offset));
|
|
DBUG_PRINT("page_archiver",
|
|
("Last reset file index : %u", m_last_reset_file_index));
|
|
|
|
DBUG_PRINT("page_archiver", ("Latest reset block data len: %u",
|
|
(m_data.get_block(&m_reset_pos, ARCH_RESET_BLOCK))->get_data_len()));
|
|
|
|
DBUG_PRINT("page_archiver", ("Latest data block data len: %u",
|
|
(m_data.get_block(&m_write_pos, ARCH_DATA_BLOCK))->get_data_len()));
|
|
}
|
|
#endif
|