mirror of
https://github.com/MariaDB/server.git
synced 2025-01-27 09:14:17 +01:00
b9f5793176
Two new variables added: - max_tmp_space_usage : Limits the the temporary space allowance per user - max_total_tmp_space_usage: Limits the temporary space allowance for all users. New status variables: tmp_space_used & max_tmp_space_used New field in information_schema.process_list: TMP_SPACE_USED The temporary space is counted for: - All SQL level temporary files. This includes files for filesort, transaction temporary space, analyze, binlog_stmt_cache etc. It does not include engine internal temporary files used for repair, alter table, index pre sorting etc. - All internal on disk temporary tables created as part of resolving a SELECT, multi-source update etc. Special cases: - When doing a commit, the last flush of the binlog_stmt_cache will not cause an error even if the temporary space limit is exceeded. This is to avoid giving errors on commit. This means that a user can temporary go over the limit with up to binlog_stmt_cache_size. Noteworthy issue: - One has to be careful when using small values for max_tmp_space_limit together with binary logging and with non transactional tables. If a the binary log entry for the query is bigger than binlog_stmt_cache_size and one hits the limit of max_tmp_space_limit when flushing the entry to disk, the query will abort and the binary log will not contain the last changes to the table. This will also stop the slave! This is also true for all Aria tables as Aria cannot do rollback (except in case of crashes)! One way to avoid it is to use @@binlog_format=statement for queries that updates a lot of rows. Implementation: - All writes to temporary files or internal temporary tables, that increases the file size, are routed through temp_file_size_cb_func() which updates and checks the temp space usage. - Most of the temporary file monitoring is done inside IO_CACHE. Temporary file monitoring is done inside the Aria engine. - MY_TRACK and MY_TRACK_WITH_LIMIT are new flags for ini_io_cache(). MY_TRACK means that we track the file usage. TRACK_WITH_LIMIT means that we track the file usage and we give an error if the limit is breached. This is used to not give an error on commit when binlog_stmp_cache is flushed. - global_tmp_space_used contains the total tmp space used so far. This is needed quickly check against max_total_tmp_space_usage. - Temporary space errors are using EE_LOCAL_TMP_SPACE_FULL and handler errors are using HA_ERR_LOCAL_TMP_SPACE_FULL. This is needed until we move general errors to it's own error space so that they cannot conflict with system error numbers. - Return value of my_chsize() and mysql_file_chsize() has changed so that -1 is returned in the case my_chsize() could not decrease the file size (very unlikely and will not happen on modern systems). All calls to _chsize() are updated to check for > 0 as the error condition. - At the destruction of THD we check that THD::tmp_file_space == 0 - At server end we check that global_tmp_space_used == 0 - As a precaution against errors in the tmp_space_used code, one can set max_tmp_space_usage and max_total_tmp_space_usage to 0 to disable the tmp space quota errors. - truncate_io_cache() function added. - Aria tables using static or dynamic row length are registered in 8K increments to avoid some calls to update_tmp_file_size(). Other things: - Ensure that all handler errors are registered. Before, some engine errors could be printed as "Unknown error". - Fixed bug in filesort() that causes a assert if there was an error when writing to the temporay file. - Fixed that compute_window_func() now takes into account write errors. - In case of parallel replication, rpl_group_info::cleanup_context() could call trans_rollback() with thd->error set, which would cause an assert. Fixed by resetting the error before calling trans_rollback(). - Fixed bug in subselect3.inc which caused following test to use heap tables with low value for max_heap_table_size - Fixed bug in sql_expression_cache where it did not overflow heap table to Aria table. - Added Max_tmp_disk_space_used to slow query log. - Fixed some bugs in log_slow_innodb.test
630 lines
19 KiB
C++
630 lines
19 KiB
C++
/*
|
|
Copyright (c) 2016, Facebook, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
|
|
|
|
#include <my_global.h>
|
|
|
|
/* This C++ file's header file */
|
|
#include "./rdb_index_merge.h"
|
|
|
|
/* MySQL header files */
|
|
#include "../sql/sql_class.h"
|
|
|
|
/* MyRocks header files */
|
|
#include "./ha_rocksdb.h"
|
|
#include "./rdb_datadic.h"
|
|
|
|
namespace myrocks {
|
|
|
|
Rdb_index_merge::Rdb_index_merge(const char *const tmpfile_path,
|
|
const ulonglong merge_buf_size,
|
|
const ulonglong merge_combine_read_size,
|
|
const ulonglong merge_tmp_file_removal_delay,
|
|
rocksdb::ColumnFamilyHandle *cf)
|
|
: m_tmpfile_path(tmpfile_path),
|
|
m_merge_buf_size(merge_buf_size),
|
|
m_merge_combine_read_size(merge_combine_read_size),
|
|
m_merge_tmp_file_removal_delay(merge_tmp_file_removal_delay),
|
|
m_cf_handle(cf),
|
|
m_rec_buf_unsorted(nullptr),
|
|
m_output_buf(nullptr) {}
|
|
|
|
Rdb_index_merge::~Rdb_index_merge() {
|
|
/*
|
|
If merge_tmp_file_removal_delay is set, sleep between calls to chsize.
|
|
|
|
This helps mitigate potential trim stalls on flash when large files are
|
|
being deleted too quickly.
|
|
*/
|
|
if (m_merge_tmp_file_removal_delay > 0) {
|
|
uint64 curr_size = m_merge_buf_size * m_merge_file.m_num_sort_buffers;
|
|
for (uint i = 0; i < m_merge_file.m_num_sort_buffers; i++) {
|
|
if (my_chsize(m_merge_file.m_fd, curr_size, 0, MYF(MY_WME)) > 0) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Error truncating file during fast index creation.");
|
|
}
|
|
|
|
my_sleep(m_merge_tmp_file_removal_delay * 1000);
|
|
// Not aborting on fsync error since the tmp file is not used anymore
|
|
if (mysql_file_sync(m_merge_file.m_fd, MYF(MY_WME))) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Error flushing truncated MyRocks merge buffer.");
|
|
}
|
|
curr_size -= m_merge_buf_size;
|
|
}
|
|
}
|
|
|
|
/*
|
|
Close file descriptor, we don't need to worry about deletion,
|
|
mysql handles it.
|
|
*/
|
|
my_close(m_merge_file.m_fd, MYF(MY_WME));
|
|
}
|
|
|
|
int Rdb_index_merge::init() {
|
|
/*
|
|
Create a temporary merge file on disk to store sorted chunks during
|
|
inplace index creation.
|
|
*/
|
|
if (merge_file_create()) {
|
|
return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
|
|
}
|
|
|
|
/*
|
|
Then, allocate buffer to store unsorted records before they are written
|
|
to disk. They will be written to disk sorted. A sorted tree is used to
|
|
keep track of the offset of each record within the unsorted buffer.
|
|
*/
|
|
m_rec_buf_unsorted =
|
|
std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
|
|
|
|
/*
|
|
Allocate output buffer that will contain sorted block that is written to
|
|
disk.
|
|
*/
|
|
m_output_buf =
|
|
std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
|
|
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
Create a merge file in the given location.
|
|
*/
|
|
int Rdb_index_merge::merge_file_create() {
|
|
DBUG_ASSERT(m_merge_file.m_fd == -1);
|
|
|
|
int fd;
|
|
#ifdef MARIAROCKS_NOT_YET // mysql_tmpfile_path use
|
|
/* If no path set for tmpfile, use mysql_tmpdir by default */
|
|
if (m_tmpfile_path == nullptr) {
|
|
fd = mysql_tmpfile("myrocks");
|
|
} else {
|
|
fd = mysql_tmpfile_path(m_tmpfile_path, "myrocks");
|
|
}
|
|
#else
|
|
fd = mysql_tmpfile("myrocks");
|
|
#endif
|
|
if (fd < 0) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Failed to create temp file during fast index creation.");
|
|
return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
|
|
}
|
|
|
|
m_merge_file.m_fd = fd;
|
|
m_merge_file.m_num_sort_buffers = 0;
|
|
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
Add record to offset tree (and unsorted merge buffer) in preparation for
|
|
writing out to disk in sorted chunks.
|
|
|
|
If buffer in memory is full, write the buffer out to disk sorted using the
|
|
offset tree, and clear the tree. (Happens in merge_buf_write)
|
|
*/
|
|
int Rdb_index_merge::add(const rocksdb::Slice &key, const rocksdb::Slice &val) {
|
|
/* Adding a record after heap is already created results in error */
|
|
DBUG_ASSERT(m_merge_min_heap.empty());
|
|
|
|
/*
|
|
Check if sort buffer is going to be out of space, if so write it
|
|
out to disk in sorted order using offset tree.
|
|
*/
|
|
const uint total_offset = RDB_MERGE_CHUNK_LEN +
|
|
m_rec_buf_unsorted->m_curr_offset +
|
|
RDB_MERGE_KEY_DELIMITER + RDB_MERGE_VAL_DELIMITER +
|
|
key.size() + val.size();
|
|
if (total_offset >= m_rec_buf_unsorted->m_total_size) {
|
|
/*
|
|
If the offset tree is empty here, that means that the proposed key to
|
|
add is too large for the buffer.
|
|
*/
|
|
if (m_offset_tree.empty()) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error(
|
|
"Sort buffer size is too small to process merge. "
|
|
"Please set merge buffer size to a higher value.");
|
|
return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
|
|
}
|
|
|
|
if (merge_buf_write()) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Error writing sort buffer to disk.");
|
|
return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
|
|
}
|
|
}
|
|
|
|
const ulonglong rec_offset = m_rec_buf_unsorted->m_curr_offset;
|
|
|
|
/*
|
|
Store key and value in temporary unsorted in memory buffer pointed to by
|
|
offset tree.
|
|
*/
|
|
m_rec_buf_unsorted->store_key_value(key, val);
|
|
|
|
/* Find sort order of the new record */
|
|
auto res =
|
|
m_offset_tree.emplace(m_rec_buf_unsorted->m_block.get() + rec_offset,
|
|
m_cf_handle->GetComparator());
|
|
if (!res.second) {
|
|
my_printf_error(ER_DUP_ENTRY,
|
|
"Failed to insert the record: the key already exists",
|
|
MYF(0));
|
|
return ER_DUP_ENTRY;
|
|
}
|
|
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
Sort + write merge buffer chunk out to disk.
|
|
*/
|
|
int Rdb_index_merge::merge_buf_write() {
|
|
DBUG_ASSERT(m_merge_file.m_fd != -1);
|
|
DBUG_ASSERT(m_rec_buf_unsorted != nullptr);
|
|
DBUG_ASSERT(m_output_buf != nullptr);
|
|
DBUG_ASSERT(!m_offset_tree.empty());
|
|
|
|
/* Write actual chunk size to first 8 bytes of the merge buffer */
|
|
merge_store_uint64(m_output_buf->m_block.get(),
|
|
m_rec_buf_unsorted->m_curr_offset + RDB_MERGE_CHUNK_LEN);
|
|
m_output_buf->m_curr_offset += RDB_MERGE_CHUNK_LEN;
|
|
|
|
/*
|
|
Iterate through the offset tree. Should be ordered by the secondary key
|
|
at this point.
|
|
*/
|
|
for (const auto &rec : m_offset_tree) {
|
|
DBUG_ASSERT(m_output_buf->m_curr_offset <= m_merge_buf_size);
|
|
|
|
/* Read record from offset (should never fail) */
|
|
rocksdb::Slice key;
|
|
rocksdb::Slice val;
|
|
merge_read_rec(rec.m_block, &key, &val);
|
|
|
|
/* Store key and value into sorted output buffer */
|
|
m_output_buf->store_key_value(key, val);
|
|
}
|
|
|
|
DBUG_ASSERT(m_output_buf->m_curr_offset <= m_output_buf->m_total_size);
|
|
|
|
/*
|
|
Write output buffer to disk.
|
|
|
|
Need to position cursor to the chunk it needs to be at on filesystem
|
|
then write into the respective merge buffer.
|
|
*/
|
|
if (my_seek(m_merge_file.m_fd,
|
|
m_merge_file.m_num_sort_buffers * m_merge_buf_size, SEEK_SET,
|
|
MYF(0)) == MY_FILEPOS_ERROR) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Error seeking to location in merge file on disk.");
|
|
return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
|
|
}
|
|
|
|
/*
|
|
Add a file sync call here to flush the data out. Otherwise, the filesystem
|
|
cache can flush out all of the files at the same time, causing a write
|
|
burst.
|
|
*/
|
|
if (my_write(m_merge_file.m_fd, m_output_buf->m_block.get(),
|
|
m_output_buf->m_total_size, MYF(MY_WME | MY_NABP)) ||
|
|
mysql_file_sync(m_merge_file.m_fd, MYF(MY_WME))) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Error writing sorted merge buffer to disk.");
|
|
return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
|
|
}
|
|
|
|
/* Increment merge file offset to track number of merge buffers written */
|
|
m_merge_file.m_num_sort_buffers += 1;
|
|
|
|
/* Reset everything for next run */
|
|
merge_reset();
|
|
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
Prepare n-way merge of n sorted buffers on disk, using a heap sorted by
|
|
secondary key records.
|
|
*/
|
|
int Rdb_index_merge::merge_heap_prepare() {
|
|
DBUG_ASSERT(m_merge_min_heap.empty());
|
|
|
|
/*
|
|
If the offset tree is not empty, there are still some records that need to
|
|
be written to disk. Write them out now.
|
|
*/
|
|
if (!m_offset_tree.empty() && merge_buf_write()) {
|
|
return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
|
|
}
|
|
|
|
DBUG_ASSERT(m_merge_file.m_num_sort_buffers > 0);
|
|
|
|
/*
|
|
For an n-way merge, we need to read chunks of each merge file
|
|
simultaneously.
|
|
*/
|
|
ulonglong chunk_size =
|
|
m_merge_combine_read_size / m_merge_file.m_num_sort_buffers;
|
|
if (chunk_size >= m_merge_buf_size) {
|
|
chunk_size = m_merge_buf_size;
|
|
}
|
|
|
|
/* Allocate buffers for each chunk */
|
|
for (ulonglong i = 0; i < m_merge_file.m_num_sort_buffers; i++) {
|
|
const auto entry =
|
|
std::make_shared<merge_heap_entry>(m_cf_handle->GetComparator());
|
|
|
|
/*
|
|
Read chunk_size bytes from each chunk on disk, and place inside
|
|
respective chunk buffer.
|
|
*/
|
|
const size_t total_size =
|
|
entry->prepare(m_merge_file.m_fd, i * m_merge_buf_size, chunk_size);
|
|
|
|
if (total_size == (size_t)-1) {
|
|
return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
|
|
}
|
|
|
|
/* Can reach this condition if an index was added on table w/ no rows */
|
|
if (total_size - RDB_MERGE_CHUNK_LEN == 0) {
|
|
break;
|
|
}
|
|
|
|
/* Read the first record from each buffer to initially populate the heap */
|
|
if (entry->read_rec(&entry->m_key, &entry->m_val)) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Chunk size is too small to process merge.");
|
|
return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
|
|
}
|
|
|
|
m_merge_min_heap.push(std::move(entry));
|
|
}
|
|
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
Create and/or iterate through keys in the merge heap.
|
|
*/
|
|
int Rdb_index_merge::next(rocksdb::Slice *const key,
|
|
rocksdb::Slice *const val) {
|
|
/*
|
|
If table fits in one sort buffer, we can optimize by writing
|
|
the sort buffer directly through to the sstfilewriter instead of
|
|
needing to create tmp files/heap to merge the sort buffers.
|
|
|
|
If there are no sort buffer records (alters on empty tables),
|
|
also exit here.
|
|
*/
|
|
if (m_merge_file.m_num_sort_buffers == 0) {
|
|
if (m_offset_tree.empty()) {
|
|
return -1;
|
|
}
|
|
|
|
const auto rec = m_offset_tree.begin();
|
|
|
|
/* Read record from offset */
|
|
merge_read_rec(rec->m_block, key, val);
|
|
|
|
m_offset_tree.erase(rec);
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
int res;
|
|
|
|
/*
|
|
If heap and heap chunk info are empty, we must be beginning the merge phase
|
|
of the external sort. Populate the heap with initial values from each
|
|
disk chunk.
|
|
*/
|
|
if (m_merge_min_heap.empty()) {
|
|
if ((res = merge_heap_prepare())) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Error during preparation of heap.");
|
|
return res;
|
|
}
|
|
|
|
/*
|
|
Return the first top record without popping, as we haven't put this
|
|
inside the SST file yet.
|
|
*/
|
|
merge_heap_top(key, val);
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
DBUG_ASSERT(!m_merge_min_heap.empty());
|
|
return merge_heap_pop_and_get_next(key, val);
|
|
}
|
|
|
|
/**
|
|
Get current top record from the heap.
|
|
*/
|
|
void Rdb_index_merge::merge_heap_top(rocksdb::Slice *const key,
|
|
rocksdb::Slice *const val) {
|
|
DBUG_ASSERT(!m_merge_min_heap.empty());
|
|
|
|
const std::shared_ptr<merge_heap_entry> &entry = m_merge_min_heap.top();
|
|
*key = entry->m_key;
|
|
*val = entry->m_val;
|
|
}
|
|
|
|
/**
|
|
Pops the top record, and uses it to read next record from the
|
|
corresponding sort buffer and push onto the heap.
|
|
|
|
Returns -1 when there are no more records in the heap.
|
|
*/
|
|
int Rdb_index_merge::merge_heap_pop_and_get_next(rocksdb::Slice *const key,
|
|
rocksdb::Slice *const val) {
|
|
/*
|
|
Make a new reference to shared ptr so it doesn't get destroyed
|
|
during pop(). We are going to push this entry back onto the heap.
|
|
*/
|
|
const std::shared_ptr<merge_heap_entry> entry = m_merge_min_heap.top();
|
|
m_merge_min_heap.pop();
|
|
|
|
/*
|
|
We are finished w/ current chunk if:
|
|
current_offset + disk_offset == m_total_size
|
|
|
|
Return without adding entry back onto heap.
|
|
If heap is also empty, we must be finished with merge.
|
|
*/
|
|
if (entry->m_chunk_info->is_chunk_finished()) {
|
|
if (m_merge_min_heap.empty()) {
|
|
return -1;
|
|
}
|
|
|
|
merge_heap_top(key, val);
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
Make sure we haven't reached the end of the chunk.
|
|
*/
|
|
DBUG_ASSERT(!entry->m_chunk_info->is_chunk_finished());
|
|
|
|
/*
|
|
If merge_read_rec fails, it means the either the chunk was cut off
|
|
or we've reached the end of the respective chunk.
|
|
*/
|
|
if (entry->read_rec(&entry->m_key, &entry->m_val)) {
|
|
if (entry->read_next_chunk_from_disk(m_merge_file.m_fd)) {
|
|
return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
|
|
}
|
|
|
|
/* Try reading record again, should never fail. */
|
|
if (entry->read_rec(&entry->m_key, &entry->m_val)) {
|
|
return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
|
|
}
|
|
}
|
|
|
|
/* Push entry back on to the heap w/ updated buffer + offset ptr */
|
|
m_merge_min_heap.push(std::move(entry));
|
|
|
|
/* Return the current top record on heap */
|
|
merge_heap_top(key, val);
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
int Rdb_index_merge::merge_heap_entry::read_next_chunk_from_disk(File fd) {
|
|
if (m_chunk_info->read_next_chunk_from_disk(fd)) {
|
|
return HA_EXIT_FAILURE;
|
|
}
|
|
|
|
m_block = m_chunk_info->m_block.get();
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
int Rdb_index_merge::merge_buf_info::read_next_chunk_from_disk(File fd) {
|
|
m_disk_curr_offset += m_curr_offset;
|
|
|
|
if (my_seek(fd, m_disk_curr_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Error seeking to location in merge file on disk.");
|
|
return HA_EXIT_FAILURE;
|
|
}
|
|
|
|
/* Overwrite the old block */
|
|
const size_t bytes_read =
|
|
my_read(fd, m_block.get(), m_block_len, MYF(MY_WME));
|
|
if (bytes_read == (size_t)-1) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Error reading merge file from disk.");
|
|
return HA_EXIT_FAILURE;
|
|
}
|
|
|
|
m_curr_offset = 0;
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
Get records from offset within sort buffer and compare them.
|
|
Sort by least to greatest.
|
|
*/
|
|
int Rdb_index_merge::merge_record_compare(
|
|
const uchar *const a_block, const uchar *const b_block,
|
|
const rocksdb::Comparator *const comparator) {
|
|
return comparator->Compare(as_slice(a_block), as_slice(b_block));
|
|
}
|
|
|
|
/**
|
|
Given an offset in a merge sort buffer, read out the keys + values.
|
|
After this, block will point to the next record in the buffer.
|
|
**/
|
|
void Rdb_index_merge::merge_read_rec(const uchar *const block,
|
|
rocksdb::Slice *const key,
|
|
rocksdb::Slice *const val) {
|
|
/* Read key at block offset into key slice and the value into value slice*/
|
|
read_slice(key, block);
|
|
read_slice(val, block + RDB_MERGE_REC_DELIMITER + key->size());
|
|
}
|
|
|
|
void Rdb_index_merge::read_slice(rocksdb::Slice *slice,
|
|
const uchar *block_ptr) {
|
|
uint64 slice_len;
|
|
merge_read_uint64(&block_ptr, &slice_len);
|
|
|
|
*slice = rocksdb::Slice(reinterpret_cast<const char *>(block_ptr), slice_len);
|
|
}
|
|
|
|
int Rdb_index_merge::merge_heap_entry::read_rec(rocksdb::Slice *const key,
|
|
rocksdb::Slice *const val) {
|
|
const uchar *block_ptr = m_block;
|
|
const auto orig_offset = m_chunk_info->m_curr_offset;
|
|
const auto orig_block = m_block;
|
|
|
|
/* Read key at block offset into key slice and the value into value slice*/
|
|
if (read_slice(key, &block_ptr) != 0) {
|
|
return HA_EXIT_FAILURE;
|
|
}
|
|
|
|
m_chunk_info->m_curr_offset += (uintptr_t)block_ptr - (uintptr_t)m_block;
|
|
m_block += (uintptr_t)block_ptr - (uintptr_t)m_block;
|
|
|
|
if (read_slice(val, &block_ptr) != 0) {
|
|
m_chunk_info->m_curr_offset = orig_offset;
|
|
m_block = orig_block;
|
|
return HA_EXIT_FAILURE;
|
|
}
|
|
|
|
m_chunk_info->m_curr_offset += (uintptr_t)block_ptr - (uintptr_t)m_block;
|
|
m_block += (uintptr_t)block_ptr - (uintptr_t)m_block;
|
|
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
int Rdb_index_merge::merge_heap_entry::read_slice(rocksdb::Slice *const slice,
|
|
const uchar **block_ptr) {
|
|
if (!m_chunk_info->has_space(RDB_MERGE_REC_DELIMITER)) {
|
|
return HA_EXIT_FAILURE;
|
|
}
|
|
|
|
uint64 slice_len;
|
|
merge_read_uint64(block_ptr, &slice_len);
|
|
if (!m_chunk_info->has_space(RDB_MERGE_REC_DELIMITER + slice_len)) {
|
|
return HA_EXIT_FAILURE;
|
|
}
|
|
|
|
*slice =
|
|
rocksdb::Slice(reinterpret_cast<const char *>(*block_ptr), slice_len);
|
|
*block_ptr += slice_len;
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
size_t Rdb_index_merge::merge_heap_entry::prepare(File fd, ulonglong f_offset,
|
|
ulonglong chunk_size) {
|
|
m_chunk_info = std::make_shared<merge_buf_info>(chunk_size);
|
|
const size_t res = m_chunk_info->prepare(fd, f_offset);
|
|
if (res != (size_t)-1) {
|
|
m_block = m_chunk_info->m_block.get() + RDB_MERGE_CHUNK_LEN;
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
size_t Rdb_index_merge::merge_buf_info::prepare(File fd, ulonglong f_offset) {
|
|
m_disk_start_offset = f_offset;
|
|
m_disk_curr_offset = f_offset;
|
|
|
|
/*
|
|
Need to position cursor to the chunk it needs to be at on filesystem
|
|
then read 'chunk_size' bytes into the respective chunk buffer.
|
|
*/
|
|
if (my_seek(fd, f_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Error seeking to location in merge file on disk.");
|
|
return (size_t)-1;
|
|
}
|
|
|
|
const size_t bytes_read =
|
|
my_read(fd, m_block.get(), m_total_size, MYF(MY_WME));
|
|
if (bytes_read == (size_t)-1) {
|
|
// NO_LINT_DEBUG
|
|
sql_print_error("Error reading merge file from disk.");
|
|
return (size_t)-1;
|
|
}
|
|
|
|
/*
|
|
Read the first 8 bytes of each chunk, this gives us the actual
|
|
size of each chunk.
|
|
*/
|
|
const uchar *block_ptr = m_block.get();
|
|
merge_read_uint64(&block_ptr, &m_total_size);
|
|
m_curr_offset += RDB_MERGE_CHUNK_LEN;
|
|
return m_total_size;
|
|
}
|
|
|
|
/* Store key and value w/ their respective delimiters at the given offset */
|
|
void Rdb_index_merge::merge_buf_info::store_key_value(
|
|
const rocksdb::Slice &key, const rocksdb::Slice &val) {
|
|
store_slice(key);
|
|
store_slice(val);
|
|
}
|
|
|
|
void Rdb_index_merge::merge_buf_info::store_slice(const rocksdb::Slice &slice) {
|
|
/* Store length delimiter */
|
|
merge_store_uint64(&m_block[m_curr_offset], slice.size());
|
|
|
|
/* Store slice data */
|
|
memcpy(&m_block[m_curr_offset + RDB_MERGE_REC_DELIMITER], slice.data(),
|
|
slice.size());
|
|
|
|
m_curr_offset += slice.size() + RDB_MERGE_REC_DELIMITER;
|
|
}
|
|
|
|
void Rdb_index_merge::merge_reset() {
|
|
/*
|
|
Either error, or all values in the sort buffer have been written to disk,
|
|
so we need to clear the offset tree.
|
|
*/
|
|
m_offset_tree.clear();
|
|
|
|
/* Reset sort buffer block */
|
|
if (m_rec_buf_unsorted && m_rec_buf_unsorted->m_block) {
|
|
m_rec_buf_unsorted->m_curr_offset = 0;
|
|
}
|
|
|
|
/* Reset output buf */
|
|
if (m_output_buf && m_output_buf->m_block) {
|
|
m_output_buf->m_curr_offset = 0;
|
|
}
|
|
}
|
|
|
|
} // namespace myrocks
|