mirror of
https://github.com/MariaDB/server.git
synced 2025-01-17 20:42:30 +01:00
838 lines
29 KiB
C++
838 lines
29 KiB
C++
/*
|
|
Copyright (c) 2015, Facebook, Inc.
|
|
|
|
This program is f
|
|
i the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
|
|
|
#include <my_global.h>
|
|
|
|
/* This C++ file's header file */
|
|
#include "./rdb_converter.h"
|
|
|
|
/* Standard C++ header files */
|
|
#include <algorithm>
|
|
#include <map>
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
/* MySQL header files */
|
|
#include "./field.h"
|
|
#include "./key.h"
|
|
#include "./m_ctype.h"
|
|
#include "./my_bit.h"
|
|
#include "./my_bitmap.h"
|
|
#include "./sql_table.h"
|
|
|
|
|
|
/* MyRocks header files */
|
|
#include "./ha_rocksdb.h"
|
|
#include "./ha_rocksdb_proto.h"
|
|
#include "./my_stacktrace.h"
|
|
#include "./rdb_cf_manager.h"
|
|
#include "./rdb_psi.h"
|
|
#include "./rdb_utils.h"
|
|
|
|
|
|
namespace myrocks {
|
|
|
|
void dbug_modify_key_varchar8(String *on_disk_rec) {
|
|
std::string res;
|
|
// The key starts with index number
|
|
res.append(on_disk_rec->ptr(), Rdb_key_def::INDEX_NUMBER_SIZE);
|
|
|
|
// Then, a mem-comparable form of a varchar(8) value.
|
|
res.append("ABCDE\0\0\0\xFC", 9);
|
|
on_disk_rec->length(0);
|
|
on_disk_rec->append(res.data(), res.size());
|
|
}
|
|
|
|
/*
|
|
Convert field from rocksdb storage format into Mysql Record format
|
|
@param buf OUT start memory to fill converted data
|
|
@param offset IN/OUT decoded data is stored in buf + offset
|
|
@param table IN current table
|
|
@param field IN current field
|
|
@param reader IN rocksdb value slice reader
|
|
@param decode IN whether to decode current field
|
|
@return
|
|
0 OK
|
|
other HA_ERR error code (can be SE-specific)
|
|
*/
|
|
int Rdb_convert_to_record_value_decoder::decode(uchar *const buf, uint *offset,
|
|
TABLE *table,
|
|
my_core::Field *field,
|
|
Rdb_field_encoder *field_dec,
|
|
Rdb_string_reader *reader,
|
|
bool decode, bool is_null) {
|
|
int err = HA_EXIT_SUCCESS;
|
|
|
|
uint field_offset = field->ptr - table->record[0];
|
|
*offset = field_offset;
|
|
uint null_offset = field->null_offset();
|
|
bool maybe_null = field->real_maybe_null();
|
|
field->move_field(buf + field_offset,
|
|
maybe_null ? buf + null_offset : nullptr, field->null_bit);
|
|
|
|
if (is_null) {
|
|
if (decode) {
|
|
// This sets the NULL-bit of this record
|
|
field->set_null();
|
|
/*
|
|
Besides that, set the field value to default value. CHECKSUM TABLE
|
|
depends on this.
|
|
*/
|
|
memcpy(field->ptr, table->s->default_values + field_offset,
|
|
field->pack_length());
|
|
}
|
|
} else {
|
|
if (decode) {
|
|
// sets non-null bits for this record
|
|
field->set_notnull();
|
|
}
|
|
|
|
if (field_dec->m_field_type == MYSQL_TYPE_BLOB) {
|
|
err = decode_blob(table, field, reader, decode);
|
|
} else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) {
|
|
err = decode_varchar(field, reader, decode);
|
|
} else {
|
|
err = decode_fixed_length_field(field, field_dec, reader, decode);
|
|
}
|
|
}
|
|
|
|
// Restore field->ptr and field->null_ptr
|
|
field->move_field(table->record[0] + field_offset,
|
|
maybe_null ? table->record[0] + null_offset : nullptr,
|
|
field->null_bit);
|
|
|
|
return err;
|
|
}
|
|
|
|
/*
|
|
Convert blob from rocksdb storage format into Mysql Record format
|
|
@param table IN current table
|
|
@param field IN current field
|
|
@param reader IN rocksdb value slice reader
|
|
@param decode IN whether to decode current field
|
|
@return
|
|
0 OK
|
|
other HA_ERR error code (can be SE-specific)
|
|
*/
|
|
int Rdb_convert_to_record_value_decoder::decode_blob(TABLE *table, Field *field,
|
|
Rdb_string_reader *reader,
|
|
bool decode) {
|
|
my_core::Field_blob *blob = (my_core::Field_blob *)field;
|
|
|
|
// Get the number of bytes needed to store length
|
|
const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
|
|
|
|
const char *data_len_str;
|
|
if (!(data_len_str = reader->read(length_bytes))) {
|
|
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
|
}
|
|
|
|
memcpy(blob->ptr, data_len_str, length_bytes);
|
|
uint32 data_len =
|
|
blob->get_length(reinterpret_cast<const uchar *>(data_len_str),
|
|
length_bytes);
|
|
const char *blob_ptr;
|
|
if (!(blob_ptr = reader->read(data_len))) {
|
|
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
|
}
|
|
|
|
if (decode) {
|
|
// set 8-byte pointer to 0, like innodb does (relevant for 32-bit
|
|
// platforms)
|
|
memset(blob->ptr + length_bytes, 0, 8);
|
|
memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **));
|
|
}
|
|
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
Convert fixed length field from rocksdb storage format into Mysql Record
|
|
format
|
|
@param field IN current field
|
|
@param field_dec IN data structure conttain field encoding data
|
|
@param reader IN rocksdb value slice reader
|
|
@param decode IN whether to decode current field
|
|
@return
|
|
0 OK
|
|
other HA_ERR error code (can be SE-specific)
|
|
*/
|
|
int Rdb_convert_to_record_value_decoder::decode_fixed_length_field(
|
|
my_core::Field *const field, Rdb_field_encoder *field_dec,
|
|
Rdb_string_reader *const reader, bool decode) {
|
|
uint len = field_dec->m_pack_length_in_rec;
|
|
if (len > 0) {
|
|
const char *data_bytes;
|
|
if ((data_bytes = reader->read(len)) == nullptr) {
|
|
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
|
}
|
|
|
|
if (decode) {
|
|
memcpy(field->ptr, data_bytes, len);
|
|
}
|
|
}
|
|
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
Convert varchar field from rocksdb storage format into Mysql Record format
|
|
@param field IN current field
|
|
@param field_dec IN data structure conttain field encoding data
|
|
@param reader IN rocksdb value slice reader
|
|
@param decode IN whether to decode current field
|
|
@return
|
|
0 OK
|
|
other HA_ERR error code (can be SE-specific)
|
|
*/
|
|
int Rdb_convert_to_record_value_decoder::decode_varchar(
|
|
Field *field, Rdb_string_reader *const reader, bool decode) {
|
|
my_core::Field_varstring *const field_var = (my_core::Field_varstring *)field;
|
|
|
|
const char *data_len_str;
|
|
if (!(data_len_str = reader->read(field_var->length_bytes))) {
|
|
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
|
}
|
|
|
|
uint data_len;
|
|
// field_var->length_bytes is 1 or 2
|
|
if (field_var->length_bytes == 1) {
|
|
data_len = (uchar)data_len_str[0];
|
|
} else {
|
|
DBUG_ASSERT(field_var->length_bytes == 2);
|
|
data_len = uint2korr(data_len_str);
|
|
}
|
|
|
|
if (data_len > field_var->field_length) {
|
|
// The data on disk is longer than table DDL allows?
|
|
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
|
}
|
|
|
|
if (!reader->read(data_len)) {
|
|
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
|
}
|
|
|
|
if (decode) {
|
|
memcpy(field_var->ptr, data_len_str, field_var->length_bytes + data_len);
|
|
}
|
|
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
template <typename value_field_decoder>
|
|
Rdb_value_field_iterator<value_field_decoder>::Rdb_value_field_iterator(
|
|
TABLE *table, Rdb_string_reader *value_slice_reader,
|
|
const Rdb_converter *rdb_converter, uchar *const buf)
|
|
: m_buf(buf) {
|
|
DBUG_ASSERT(table != nullptr);
|
|
DBUG_ASSERT(buf != nullptr);
|
|
|
|
m_table = table;
|
|
m_value_slice_reader = value_slice_reader;
|
|
auto fields = rdb_converter->get_decode_fields();
|
|
m_field_iter = fields->begin();
|
|
m_field_end = fields->end();
|
|
m_null_bytes = rdb_converter->get_null_bytes();
|
|
m_offset = 0;
|
|
}
|
|
|
|
// Iterate each requested field and decode one by one
|
|
template <typename value_field_decoder>
|
|
int Rdb_value_field_iterator<value_field_decoder>::next() {
|
|
int err = HA_EXIT_SUCCESS;
|
|
while (m_field_iter != m_field_end) {
|
|
m_field_dec = m_field_iter->m_field_enc;
|
|
bool decode = m_field_iter->m_decode;
|
|
bool maybe_null = m_field_dec->maybe_null();
|
|
// This is_null value is bind to how stroage format store its value
|
|
m_is_null = maybe_null && ((m_null_bytes[m_field_dec->m_null_offset] &
|
|
m_field_dec->m_null_mask) != 0);
|
|
|
|
// Skip the bytes we need to skip
|
|
int skip = m_field_iter->m_skip;
|
|
if (skip && !m_value_slice_reader->read(skip)) {
|
|
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
|
}
|
|
|
|
m_field = m_table->field[m_field_dec->m_field_index];
|
|
// Decode each field
|
|
err = value_field_decoder::decode(m_buf, &m_offset, m_table, m_field,
|
|
m_field_dec, m_value_slice_reader, decode,
|
|
m_is_null);
|
|
if (err != HA_EXIT_SUCCESS) {
|
|
return err;
|
|
}
|
|
m_field_iter++;
|
|
// Only break for the field that are actually decoding rather than skipping
|
|
if (decode) {
|
|
break;
|
|
}
|
|
}
|
|
return err;
|
|
}
|
|
|
|
template <typename value_field_decoder>
|
|
bool Rdb_value_field_iterator<value_field_decoder>::end_of_fields() const {
|
|
return m_field_iter == m_field_end;
|
|
}
|
|
|
|
template <typename value_field_decoder>
|
|
Field *Rdb_value_field_iterator<value_field_decoder>::get_field() const {
|
|
DBUG_ASSERT(m_field != nullptr);
|
|
return m_field;
|
|
}
|
|
|
|
template <typename value_field_decoder>
|
|
void *Rdb_value_field_iterator<value_field_decoder>::get_dst() const {
|
|
DBUG_ASSERT(m_buf != nullptr);
|
|
return m_buf + m_offset;
|
|
}
|
|
|
|
template <typename value_field_decoder>
|
|
int Rdb_value_field_iterator<value_field_decoder>::get_field_index() const {
|
|
DBUG_ASSERT(m_field_dec != nullptr);
|
|
return m_field_dec->m_field_index;
|
|
}
|
|
|
|
template <typename value_field_decoder>
|
|
enum_field_types Rdb_value_field_iterator<value_field_decoder>::get_field_type()
|
|
const {
|
|
DBUG_ASSERT(m_field_dec != nullptr);
|
|
return m_field_dec->m_field_type;
|
|
}
|
|
|
|
template <typename value_field_decoder>
|
|
bool Rdb_value_field_iterator<value_field_decoder>::is_null() const {
|
|
DBUG_ASSERT(m_field != nullptr);
|
|
return m_is_null;
|
|
}
|
|
|
|
/*
|
|
Initialize Rdb_converter with table data
|
|
@param thd IN Thread context
|
|
@param tbl_def IN MyRocks table definition
|
|
@param table IN Current open table
|
|
*/
|
|
Rdb_converter::Rdb_converter(const THD *thd, const Rdb_tbl_def *tbl_def,
|
|
TABLE *table)
|
|
: m_thd(thd), m_tbl_def(tbl_def), m_table(table) {
|
|
DBUG_ASSERT(thd != nullptr);
|
|
DBUG_ASSERT(tbl_def != nullptr);
|
|
DBUG_ASSERT(table != nullptr);
|
|
|
|
m_key_requested = false;
|
|
m_verify_row_debug_checksums = false;
|
|
m_maybe_unpack_info = false;
|
|
m_row_checksums_checked = 0;
|
|
m_null_bytes = nullptr;
|
|
setup_field_encoders();
|
|
}
|
|
|
|
Rdb_converter::~Rdb_converter() {
|
|
my_free(m_encoder_arr);
|
|
m_encoder_arr = nullptr;
|
|
// These are needed to suppress valgrind errors in rocksdb.partition
|
|
m_storage_record.free();
|
|
}
|
|
|
|
/*
|
|
Decide storage type for each encoder
|
|
*/
|
|
void Rdb_converter::get_storage_type(Rdb_field_encoder *const encoder,
|
|
const uint kp) {
|
|
auto pk_descr =
|
|
m_tbl_def->m_key_descr_arr[ha_rocksdb::pk_index(m_table, m_tbl_def)];
|
|
// STORE_SOME uses unpack_info.
|
|
if (pk_descr->has_unpack_info(kp)) {
|
|
DBUG_ASSERT(pk_descr->can_unpack(kp));
|
|
encoder->m_storage_type = Rdb_field_encoder::STORE_SOME;
|
|
m_maybe_unpack_info = true;
|
|
} else if (pk_descr->can_unpack(kp)) {
|
|
encoder->m_storage_type = Rdb_field_encoder::STORE_NONE;
|
|
}
|
|
}
|
|
|
|
/*
|
|
@brief
|
|
Setup which fields will be unpacked when reading rows
|
|
|
|
@detail
|
|
Three special cases when we still unpack all fields:
|
|
- When client requires decode_all_fields, such as this table is being
|
|
updated (m_lock_rows==RDB_LOCK_WRITE).
|
|
- When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need to
|
|
read all fields to find whether there is a row checksum at the end. We could
|
|
skip the fields instead of decoding them, but currently we do decoding.)
|
|
- On index merge as bitmap is cleared during that operation
|
|
|
|
@seealso
|
|
Rdb_converter::setup_field_encoders()
|
|
Rdb_converter::convert_record_from_storage_format()
|
|
*/
|
|
void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map,
|
|
bool decode_all_fields) {
|
|
m_key_requested = false;
|
|
m_decoders_vect.clear();
|
|
int last_useful = 0;
|
|
int skip_size = 0;
|
|
|
|
for (uint i = 0; i < m_table->s->fields; i++) {
|
|
// bitmap is cleared on index merge, but it still needs to decode columns
|
|
bool field_requested =
|
|
decode_all_fields || m_verify_row_debug_checksums ||
|
|
bitmap_is_clear_all(field_map) ||
|
|
bitmap_is_set(field_map, m_table->field[i]->field_index);
|
|
|
|
// We only need the decoder if the whole record is stored.
|
|
if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) {
|
|
// the field potentially needs unpacking
|
|
if (field_requested) {
|
|
// the field is in the read set
|
|
m_key_requested = true;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
if (field_requested) {
|
|
// We will need to decode this field
|
|
m_decoders_vect.push_back({&m_encoder_arr[i], true, skip_size});
|
|
last_useful = m_decoders_vect.size();
|
|
skip_size = 0;
|
|
} else {
|
|
if (m_encoder_arr[i].uses_variable_len_encoding() ||
|
|
m_encoder_arr[i].maybe_null()) {
|
|
// For variable-length field, we need to read the data and skip it
|
|
m_decoders_vect.push_back({&m_encoder_arr[i], false, skip_size});
|
|
skip_size = 0;
|
|
} else {
|
|
// Fixed-width field can be skipped without looking at it.
|
|
// Add appropriate skip_size to the next field.
|
|
skip_size += m_encoder_arr[i].m_pack_length_in_rec;
|
|
}
|
|
}
|
|
}
|
|
|
|
// It could be that the last few elements are varchars that just do
|
|
// skipping. Remove them.
|
|
m_decoders_vect.erase(m_decoders_vect.begin() + last_useful,
|
|
m_decoders_vect.end());
|
|
}
|
|
|
|
void Rdb_converter::setup_field_encoders() {
|
|
uint null_bytes_length = 0;
|
|
uchar cur_null_mask = 0x1;
|
|
|
|
m_encoder_arr = static_cast<Rdb_field_encoder *>(
|
|
my_malloc(m_table->s->fields * sizeof(Rdb_field_encoder), MYF(0)));
|
|
if (m_encoder_arr == nullptr) {
|
|
return;
|
|
}
|
|
|
|
for (uint i = 0; i < m_table->s->fields; i++) {
|
|
Field *const field = m_table->field[i];
|
|
m_encoder_arr[i].m_storage_type = Rdb_field_encoder::STORE_ALL;
|
|
|
|
/*
|
|
Check if this field is
|
|
- a part of primary key, and
|
|
- it can be decoded back from its key image.
|
|
If both hold, we don't need to store this field in the value part of
|
|
RocksDB's key-value pair.
|
|
|
|
If hidden pk exists, we skip this check since the field will never be
|
|
part of the hidden pk.
|
|
*/
|
|
if (!Rdb_key_def::table_has_hidden_pk(m_table)) {
|
|
KEY *const pk_info = &m_table->key_info[m_table->s->primary_key];
|
|
for (uint kp = 0; kp < pk_info->user_defined_key_parts; kp++) {
|
|
// key_part->fieldnr is counted from 1
|
|
if (field->field_index + 1 == pk_info->key_part[kp].fieldnr) {
|
|
get_storage_type(&m_encoder_arr[i], kp);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
m_encoder_arr[i].m_field_type = field->real_type();
|
|
m_encoder_arr[i].m_field_index = i;
|
|
m_encoder_arr[i].m_pack_length_in_rec = field->pack_length_in_rec();
|
|
|
|
if (field->real_maybe_null()) {
|
|
m_encoder_arr[i].m_null_mask = cur_null_mask;
|
|
m_encoder_arr[i].m_null_offset = null_bytes_length;
|
|
if (cur_null_mask == 0x80) {
|
|
cur_null_mask = 0x1;
|
|
null_bytes_length++;
|
|
} else {
|
|
cur_null_mask = cur_null_mask << 1;
|
|
}
|
|
} else {
|
|
m_encoder_arr[i].m_null_mask = 0;
|
|
}
|
|
}
|
|
|
|
// Count the last, unfinished NULL-bits byte
|
|
if (cur_null_mask != 0x1) {
|
|
null_bytes_length++;
|
|
}
|
|
|
|
m_null_bytes_length_in_record = null_bytes_length;
|
|
}
|
|
|
|
/*
|
|
EntryPoint for Decode:
|
|
Decode key slice(if requested) and value slice using built-in field
|
|
decoders
|
|
@param key_def IN key definition to decode
|
|
@param dst OUT Mysql buffer to fill decoded content
|
|
@param key_slice IN RocksDB key slice to decode
|
|
@param value_slice IN RocksDB value slice to decode
|
|
@return
|
|
0 OK
|
|
other HA_ERR error code (can be SE-specific)
|
|
*/
|
|
int Rdb_converter::decode(const std::shared_ptr<Rdb_key_def> &key_def,
|
|
uchar *dst, // address to fill data
|
|
const rocksdb::Slice *key_slice,
|
|
const rocksdb::Slice *value_slice) {
|
|
// Currently only support decode primary key, Will add decode secondary later
|
|
DBUG_ASSERT(key_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
|
|
key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
|
|
|
|
const rocksdb::Slice *updated_key_slice = key_slice;
|
|
#ifndef DBUG_OFF
|
|
String last_rowkey;
|
|
last_rowkey.copy(key_slice->data(), key_slice->size(), &my_charset_bin);
|
|
DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1",
|
|
{ dbug_modify_key_varchar8(&last_rowkey); });
|
|
rocksdb::Slice rowkey_slice(last_rowkey.ptr(), last_rowkey.length());
|
|
updated_key_slice = &rowkey_slice;
|
|
#endif
|
|
return convert_record_from_storage_format(key_def, updated_key_slice,
|
|
value_slice, dst);
|
|
}
|
|
|
|
/*
|
|
Decode value slice header
|
|
@param reader IN value slice reader
|
|
@param pk_def IN key definition to decode
|
|
@param unpack_slice OUT unpack info slice
|
|
@return
|
|
0 OK
|
|
other HA_ERR error code (can be SE-specific)
|
|
*/
|
|
int Rdb_converter::decode_value_header(
|
|
Rdb_string_reader *reader, const std::shared_ptr<Rdb_key_def> &pk_def,
|
|
rocksdb::Slice *unpack_slice) {
|
|
/* If it's a TTL record, skip the 8 byte TTL value */
|
|
if (pk_def->has_ttl()) {
|
|
const char *ttl_bytes;
|
|
if ((ttl_bytes = reader->read(ROCKSDB_SIZEOF_TTL_RECORD))) {
|
|
memcpy(m_ttl_bytes, ttl_bytes, ROCKSDB_SIZEOF_TTL_RECORD);
|
|
} else {
|
|
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
|
}
|
|
}
|
|
|
|
/* Other fields are decoded from the value */
|
|
if (m_null_bytes_length_in_record &&
|
|
!(m_null_bytes = reader->read(m_null_bytes_length_in_record))) {
|
|
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
|
}
|
|
|
|
if (m_maybe_unpack_info) {
|
|
const char *unpack_info = reader->get_current_ptr();
|
|
if (!unpack_info || !Rdb_key_def::is_unpack_data_tag(unpack_info[0]) ||
|
|
!reader->read(Rdb_key_def::get_unpack_header_size(unpack_info[0]))) {
|
|
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
|
}
|
|
|
|
uint16 unpack_info_len =
|
|
rdb_netbuf_to_uint16(reinterpret_cast<const uchar *>(unpack_info + 1));
|
|
*unpack_slice = rocksdb::Slice(unpack_info, unpack_info_len);
|
|
|
|
reader->read(unpack_info_len -
|
|
Rdb_key_def::get_unpack_header_size(unpack_info[0]));
|
|
}
|
|
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
Convert RocksDb key slice and value slice to Mysql format
|
|
@param key_def IN key definition to decode
|
|
@param key_slice IN RocksDB key slice
|
|
@param value_slice IN RocksDB value slice
|
|
@param dst OUT MySql format address
|
|
@return
|
|
0 OK
|
|
other HA_ERR error code (can be SE-specific)
|
|
*/
|
|
int Rdb_converter::convert_record_from_storage_format(
|
|
const std::shared_ptr<Rdb_key_def> &pk_def,
|
|
const rocksdb::Slice *const key_slice,
|
|
const rocksdb::Slice *const value_slice, uchar *const dst) {
|
|
int err = HA_EXIT_SUCCESS;
|
|
|
|
Rdb_string_reader value_slice_reader(value_slice);
|
|
rocksdb::Slice unpack_slice;
|
|
err = decode_value_header(&value_slice_reader, pk_def, &unpack_slice);
|
|
if (err != HA_EXIT_SUCCESS) {
|
|
return err;
|
|
}
|
|
|
|
/*
|
|
Decode PK fields from the key
|
|
*/
|
|
if (m_key_requested) {
|
|
err = pk_def->unpack_record(m_table, dst, key_slice,
|
|
!unpack_slice.empty() ? &unpack_slice : nullptr,
|
|
false /* verify_checksum */);
|
|
}
|
|
if (err != HA_EXIT_SUCCESS) {
|
|
return err;
|
|
}
|
|
|
|
Rdb_value_field_iterator<Rdb_convert_to_record_value_decoder>
|
|
value_field_iterator(m_table, &value_slice_reader, this, dst);
|
|
|
|
// Decode value slices
|
|
while (!value_field_iterator.end_of_fields()) {
|
|
err = value_field_iterator.next();
|
|
|
|
if (err != HA_EXIT_SUCCESS) {
|
|
return err;
|
|
}
|
|
}
|
|
|
|
if (m_verify_row_debug_checksums) {
|
|
return verify_row_debug_checksum(pk_def, &value_slice_reader, key_slice,
|
|
value_slice);
|
|
}
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
Verify checksum for row
|
|
@param pk_def IN key def
|
|
@param reader IN RocksDB value slice reader
|
|
@param key IN RocksDB key slice
|
|
@param value IN RocksDB value slice
|
|
@return
|
|
0 OK
|
|
other HA_ERR error code (can be SE-specific)
|
|
*/
|
|
int Rdb_converter::verify_row_debug_checksum(
|
|
const std::shared_ptr<Rdb_key_def> &pk_def, Rdb_string_reader *reader,
|
|
const rocksdb::Slice *key, const rocksdb::Slice *value) {
|
|
if (reader->remaining_bytes() == RDB_CHECKSUM_CHUNK_SIZE &&
|
|
reader->read(1)[0] == RDB_CHECKSUM_DATA_TAG) {
|
|
uint32_t stored_key_chksum =
|
|
rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
|
|
uint32_t stored_val_chksum =
|
|
rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
|
|
|
|
const uint32_t computed_key_chksum =
|
|
my_core::crc32(0, rdb_slice_to_uchar_ptr(key), key->size());
|
|
const uint32_t computed_val_chksum =
|
|
my_core::crc32(0, rdb_slice_to_uchar_ptr(value),
|
|
value->size() - RDB_CHECKSUM_CHUNK_SIZE);
|
|
|
|
DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum1", stored_key_chksum++;);
|
|
|
|
if (stored_key_chksum != computed_key_chksum) {
|
|
pk_def->report_checksum_mismatch(true, key->data(), key->size());
|
|
return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
|
|
}
|
|
|
|
DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum2", stored_val_chksum++;);
|
|
if (stored_val_chksum != computed_val_chksum) {
|
|
pk_def->report_checksum_mismatch(false, value->data(), value->size());
|
|
return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
|
|
}
|
|
|
|
m_row_checksums_checked++;
|
|
}
|
|
if (reader->remaining_bytes()) {
|
|
return HA_ERR_ROCKSDB_CORRUPT_DATA;
|
|
}
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
Convert record from table->record[0] form into a form that can be written
|
|
into rocksdb.
|
|
|
|
@param pk_def IN Current key def
|
|
@pk_unpack_info IN Unpack info generated during key pack
|
|
@is_update_row IN Whether it is update row
|
|
@store_row_debug_checksums IN Whether to store checksums
|
|
@param ttl_bytes IN/OUT Old ttl value from previous record and
|
|
ttl value during current encode
|
|
@is_ttl_bytes_updated OUT Whether ttl bytes is updated
|
|
@param value_slice OUT Data slice with record data.
|
|
*/
|
|
int Rdb_converter::encode_value_slice(
|
|
const std::shared_ptr<Rdb_key_def> &pk_def,
|
|
const rocksdb::Slice &pk_packed_slice, Rdb_string_writer *pk_unpack_info,
|
|
bool is_update_row, bool store_row_debug_checksums, char *ttl_bytes,
|
|
bool *is_ttl_bytes_updated, rocksdb::Slice *const value_slice) {
|
|
DBUG_ASSERT(pk_def != nullptr);
|
|
// Currently only primary key will store value slice
|
|
DBUG_ASSERT(pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
|
|
pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
|
|
DBUG_ASSERT_IMP(m_maybe_unpack_info, pk_unpack_info);
|
|
|
|
bool has_ttl = pk_def->has_ttl();
|
|
bool has_ttl_column = !pk_def->m_ttl_column.empty();
|
|
|
|
m_storage_record.length(0);
|
|
|
|
if (has_ttl) {
|
|
/* If it's a TTL record, reserve space for 8 byte TTL value in front. */
|
|
m_storage_record.fill(
|
|
ROCKSDB_SIZEOF_TTL_RECORD + m_null_bytes_length_in_record, 0);
|
|
// NOTE: is_ttl_bytes_updated is only used for update case
|
|
// During update, skip update sk key/values slice iff none of sk fields
|
|
// have changed and ttl bytes isn't changed. see
|
|
// ha_rocksdb::update_write_sk() for more info
|
|
*is_ttl_bytes_updated = false;
|
|
char *const data = const_cast<char *>(m_storage_record.ptr());
|
|
if (has_ttl_column) {
|
|
DBUG_ASSERT(pk_def->get_ttl_field_index() != UINT_MAX);
|
|
Field *const field = m_table->field[pk_def->get_ttl_field_index()];
|
|
DBUG_ASSERT(field->pack_length_in_rec() == ROCKSDB_SIZEOF_TTL_RECORD);
|
|
DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
|
|
|
|
uint64 ts = uint8korr(field->ptr);
|
|
#ifndef DBUG_OFF
|
|
ts += rdb_dbug_set_ttl_rec_ts();
|
|
#endif
|
|
rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
|
|
if (is_update_row) {
|
|
*is_ttl_bytes_updated =
|
|
memcmp(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
|
|
}
|
|
// Also store in m_ttl_bytes to propagate to update_write_sk
|
|
memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
|
|
} else {
|
|
/*
|
|
For implicitly generated TTL records we need to copy over the old
|
|
TTL value from the old record in the event of an update. It was stored
|
|
in m_ttl_bytes.
|
|
|
|
Otherwise, generate a timestamp using the current time.
|
|
*/
|
|
if (is_update_row) {
|
|
memcpy(data, ttl_bytes, sizeof(uint64));
|
|
} else {
|
|
uint64 ts = static_cast<uint64>(std::time(nullptr));
|
|
#ifndef DBUG_OFF
|
|
ts += rdb_dbug_set_ttl_rec_ts();
|
|
#endif
|
|
rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
|
|
// Also store in m_ttl_bytes to propagate to update_write_sk
|
|
memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
|
|
}
|
|
}
|
|
} else {
|
|
/* All NULL bits are initially 0 */
|
|
m_storage_record.fill(m_null_bytes_length_in_record, 0);
|
|
}
|
|
|
|
// If a primary key may have non-empty unpack_info for certain values,
|
|
// (m_maybe_unpack_info=TRUE), we write the unpack_info block. The block
|
|
// itself was prepared in Rdb_key_def::pack_record.
|
|
if (m_maybe_unpack_info) {
|
|
m_storage_record.append(reinterpret_cast<char *>(pk_unpack_info->ptr()),
|
|
pk_unpack_info->get_current_pos());
|
|
}
|
|
for (uint i = 0; i < m_table->s->fields; i++) {
|
|
Rdb_field_encoder &encoder = m_encoder_arr[i];
|
|
/* Don't pack decodable PK key parts */
|
|
if (encoder.m_storage_type != Rdb_field_encoder::STORE_ALL) {
|
|
continue;
|
|
}
|
|
|
|
Field *const field = m_table->field[i];
|
|
if (encoder.maybe_null()) {
|
|
char *data = const_cast<char *>(m_storage_record.ptr());
|
|
if (has_ttl) {
|
|
data += ROCKSDB_SIZEOF_TTL_RECORD;
|
|
}
|
|
|
|
if (field->is_null()) {
|
|
data[encoder.m_null_offset] |= encoder.m_null_mask;
|
|
/* Don't write anything for NULL values */
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if (encoder.m_field_type == MYSQL_TYPE_BLOB) {
|
|
my_core::Field_blob *blob =
|
|
reinterpret_cast<my_core::Field_blob *>(field);
|
|
/* Get the number of bytes needed to store length*/
|
|
const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
|
|
|
|
/* Store the length of the value */
|
|
m_storage_record.append(reinterpret_cast<char *>(blob->ptr),
|
|
length_bytes);
|
|
|
|
/* Store the blob value itself */
|
|
char *data_ptr;
|
|
memcpy(&data_ptr, blob->ptr + length_bytes, sizeof(uchar **));
|
|
m_storage_record.append(data_ptr, blob->get_length());
|
|
} else if (encoder.m_field_type == MYSQL_TYPE_VARCHAR) {
|
|
Field_varstring *const field_var =
|
|
reinterpret_cast<Field_varstring *>(field);
|
|
uint data_len;
|
|
/* field_var->length_bytes is 1 or 2 */
|
|
if (field_var->length_bytes == 1) {
|
|
data_len = field_var->ptr[0];
|
|
} else {
|
|
DBUG_ASSERT(field_var->length_bytes == 2);
|
|
data_len = uint2korr(field_var->ptr);
|
|
}
|
|
m_storage_record.append(reinterpret_cast<char *>(field_var->ptr),
|
|
field_var->length_bytes + data_len);
|
|
} else {
|
|
/* Copy the field data */
|
|
const uint len = field->pack_length_in_rec();
|
|
m_storage_record.append(reinterpret_cast<char *>(field->ptr), len);
|
|
}
|
|
}
|
|
|
|
if (store_row_debug_checksums) {
|
|
const uint32_t key_crc32 = my_core::crc32(
|
|
0, rdb_slice_to_uchar_ptr(&pk_packed_slice), pk_packed_slice.size());
|
|
const uint32_t val_crc32 =
|
|
my_core::crc32(0, rdb_mysql_str_to_uchar_str(&m_storage_record),
|
|
m_storage_record.length());
|
|
uchar key_crc_buf[RDB_CHECKSUM_SIZE];
|
|
uchar val_crc_buf[RDB_CHECKSUM_SIZE];
|
|
rdb_netbuf_store_uint32(key_crc_buf, key_crc32);
|
|
rdb_netbuf_store_uint32(val_crc_buf, val_crc32);
|
|
m_storage_record.append((const char *)&RDB_CHECKSUM_DATA_TAG, 1);
|
|
m_storage_record.append((const char *)key_crc_buf, RDB_CHECKSUM_SIZE);
|
|
m_storage_record.append((const char *)val_crc_buf, RDB_CHECKSUM_SIZE);
|
|
}
|
|
|
|
*value_slice =
|
|
rocksdb::Slice(m_storage_record.ptr(), m_storage_record.length());
|
|
|
|
return HA_EXIT_SUCCESS;
|
|
}
|
|
} // namespace myrocks
|