mariadb/storage/innobase/mtr/mtr0log.cc
Marko Mäkelä 49019dde65 MDEV-17138 follow-up: Optimize index page creation
btr_create(), btr_root_raise_and_insert(): Write a MLOG_MEMSET record
to set FIL_PAGE_PREV,FIL_PAGE_NEXT to FIL_NULL, instead of writing
two MLOG_4BYTES records.

For ROW_FORMAT=COMPRESSED pages, we will not use MLOG_MEMSET
because we want the crash-downgrade to earlier 10.4 releases to succeed.

mlog_parse_nbytes(): Relax the too strict assertion. There is no problem
with MLOG_MEMSET records that affect the uncompressed header of
ROW_FORMAT=COMPRESSED index pages.
2019-11-13 18:35:04 +02:00

729 lines
18 KiB
C++

/*****************************************************************************
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2019, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
*****************************************************************************/
/**************************************************//**
@file mtr/mtr0log.cc
Mini-transaction log routines
Created 12/7/1995 Heikki Tuuri
*******************************************************/
#include "mtr0log.h"
#include "buf0buf.h"
#include "dict0dict.h"
#include "log0recv.h"
#include "page0page.h"
#include "buf0dblwr.h"
#include "dict0boot.h"
/********************************************************//**
Catenates n bytes to the mtr log. */
void
mlog_catenate_string(
/*=================*/
mtr_t* mtr, /*!< in: mtr */
const byte* str, /*!< in: string to write */
ulint len) /*!< in: string length */
{
if (mtr_get_log_mode(mtr) == MTR_LOG_NONE) {
return;
}
mtr->get_log()->push(str, ib_uint32_t(len));
}
/********************************************************//**
Writes the initial part of a log record consisting of one-byte item
type and four-byte space and page numbers. Also pushes info
to the mtr memo that a buffer page has been modified. */
void
mlog_write_initial_log_record(
/*==========================*/
const byte* ptr, /*!< in: pointer to (inside) a buffer
frame holding the file page where
modification is made */
mlog_id_t type, /*!< in: log item type: MLOG_1BYTE, ... */
mtr_t* mtr) /*!< in: mini-transaction handle */
{
byte* log_ptr;
ut_ad(type <= MLOG_BIGGEST_TYPE || EXTRA_CHECK_MLOG_NUMBER(type));
ut_ad(type > MLOG_8BYTES);
log_ptr = mlog_open(mtr, 11);
/* If no logging is requested, we may return now */
if (log_ptr == NULL) {
return;
}
log_ptr = mlog_write_initial_log_record_fast(ptr, type, log_ptr, mtr);
mlog_close(mtr, log_ptr);
}
/********************************************************//**
Parses an initial log record written by mlog_write_initial_log_record.
@return parsed record end, NULL if not a complete record */
byte*
mlog_parse_initial_log_record(
/*==========================*/
const byte* ptr, /*!< in: buffer */
const byte* end_ptr,/*!< in: buffer end */
mlog_id_t* type, /*!< out: log record type: MLOG_1BYTE, ... */
ulint* space, /*!< out: space id */
ulint* page_no)/*!< out: page number */
{
if (end_ptr < ptr + 1) {
return(NULL);
}
*type = mlog_id_t(*ptr & ~MLOG_SINGLE_REC_FLAG);
if (UNIV_UNLIKELY(*type > MLOG_BIGGEST_TYPE
&& !EXTRA_CHECK_MLOG_NUMBER(*type))) {
recv_sys.found_corrupt_log = true;
return NULL;
}
ptr++;
if (end_ptr < ptr + 2) {
return(NULL);
}
*space = mach_parse_compressed(&ptr, end_ptr);
if (ptr != NULL) {
*page_no = mach_parse_compressed(&ptr, end_ptr);
}
return(const_cast<byte*>(ptr));
}
/********************************************************//**
Parses a log record written by mlog_write_ulint, mlog_write_ull, mlog_memset.
@return parsed record end, NULL if not a complete record or a corrupt record */
byte*
mlog_parse_nbytes(
/*==============*/
mlog_id_t type, /*!< in: log record type: MLOG_1BYTE, ... */
const byte* ptr, /*!< in: buffer */
const byte* end_ptr,/*!< in: buffer end */
byte* page, /*!< in: page where to apply the log
record, or NULL */
void* page_zip)/*!< in/out: compressed page, or NULL */
{
ulint offset;
ulint val;
ib_uint64_t dval;
ut_ad(type <= MLOG_8BYTES || type == MLOG_MEMSET);
ut_a(!page || !page_zip
|| type == MLOG_MEMSET
|| !fil_page_index_page_check(page));
if (end_ptr < ptr + 2) {
return NULL;
}
offset = mach_read_from_2(ptr);
ptr += 2;
if (UNIV_UNLIKELY(offset >= srv_page_size)) {
goto corrupt;
}
switch (type) {
case MLOG_MEMSET:
if (end_ptr < ptr + 3) {
return NULL;
}
val = mach_read_from_2(ptr);
ptr += 2;
if (UNIV_UNLIKELY(offset + val > srv_page_size)) {
goto corrupt;
}
if (page) {
memset(page + offset, *ptr, val);
if (page_zip) {
ut_ad(offset + val <= PAGE_DATA
|| !fil_page_index_page_check(page));
memset(static_cast<page_zip_des_t*>(page_zip)
->data + offset, *ptr, val);
}
}
return const_cast<byte*>(++ptr);
case MLOG_8BYTES:
dval = mach_u64_parse_compressed(&ptr, end_ptr);
if (ptr == NULL) {
return NULL;
}
if (page) {
if (page_zip) {
mach_write_to_8
(((page_zip_des_t*) page_zip)->data
+ offset, dval);
}
mach_write_to_8(page + offset, dval);
}
return const_cast<byte*>(ptr);
default:
val = mach_parse_compressed(&ptr, end_ptr);
}
if (ptr == NULL) {
return NULL;
}
switch (type) {
case MLOG_1BYTE:
if (val > 0xFFUL) {
goto corrupt;
}
if (page) {
if (page_zip) {
mach_write_to_1
(((page_zip_des_t*) page_zip)->data
+ offset, val);
}
mach_write_to_1(page + offset, val);
}
break;
case MLOG_2BYTES:
if (val > 0xFFFFUL) {
goto corrupt;
}
if (page) {
if (page_zip) {
mach_write_to_2
(((page_zip_des_t*) page_zip)->data
+ offset, val);
}
mach_write_to_2(page + offset, val);
}
break;
case MLOG_4BYTES:
if (page) {
if (page_zip) {
mach_write_to_4
(((page_zip_des_t*) page_zip)->data
+ offset, val);
}
mach_write_to_4(page + offset, val);
}
break;
default:
corrupt:
recv_sys.found_corrupt_log = true;
ptr = NULL;
}
return const_cast<byte*>(ptr);
}
/********************************************************//**
Writes 1, 2 or 4 bytes to a file page. Writes the corresponding log
record to the mini-transaction log if mtr is not NULL. */
void
mlog_write_ulint(
/*=============*/
byte* ptr, /*!< in: pointer where to write */
ulint val, /*!< in: value to write */
mlog_id_t type, /*!< in: MLOG_1BYTE, MLOG_2BYTES, MLOG_4BYTES */
mtr_t* mtr) /*!< in: mini-transaction handle */
{
switch (type) {
case MLOG_1BYTE:
mach_write_to_1(ptr, val);
break;
case MLOG_2BYTES:
mach_write_to_2(ptr, val);
break;
case MLOG_4BYTES:
mach_write_to_4(ptr, val);
break;
default:
ut_error;
}
if (mtr != 0) {
byte* log_ptr = mlog_open(mtr, 11 + 2 + 5);
/* If no logging is requested, we may return now */
if (log_ptr != 0) {
log_ptr = mlog_write_initial_log_record_fast(
ptr, type, log_ptr, mtr);
mach_write_to_2(log_ptr, page_offset(ptr));
log_ptr += 2;
log_ptr += mach_write_compressed(log_ptr, val);
mlog_close(mtr, log_ptr);
}
}
}
/********************************************************//**
Writes 8 bytes to a file page. Writes the corresponding log
record to the mini-transaction log, only if mtr is not NULL */
void
mlog_write_ull(
/*===========*/
byte* ptr, /*!< in: pointer where to write */
ib_uint64_t val, /*!< in: value to write */
mtr_t* mtr) /*!< in: mini-transaction handle */
{
mach_write_to_8(ptr, val);
if (mtr != 0) {
byte* log_ptr = mlog_open(mtr, 11 + 2 + 9);
/* If no logging is requested, we may return now */
if (log_ptr != 0) {
log_ptr = mlog_write_initial_log_record_fast(
ptr, MLOG_8BYTES, log_ptr, mtr);
mach_write_to_2(log_ptr, page_offset(ptr));
log_ptr += 2;
log_ptr += mach_u64_write_compressed(log_ptr, val);
mlog_close(mtr, log_ptr);
}
}
}
/********************************************************//**
Writes a string to a file page buffered in the buffer pool. Writes the
corresponding log record to the mini-transaction log. */
void
mlog_write_string(
/*==============*/
byte* ptr, /*!< in: pointer where to write */
const byte* str, /*!< in: string to write */
ulint len, /*!< in: string length */
mtr_t* mtr) /*!< in: mini-transaction handle */
{
ut_ad(ptr && mtr);
ut_a(len < srv_page_size);
memcpy(ptr, str, len);
mlog_log_string(ptr, len, mtr);
}
/********************************************************//**
Logs a write of a string to a file page buffered in the buffer pool.
Writes the corresponding log record to the mini-transaction log. */
void
mlog_log_string(
/*============*/
byte* ptr, /*!< in: pointer written to */
ulint len, /*!< in: string length */
mtr_t* mtr) /*!< in: mini-transaction handle */
{
byte* log_ptr;
ut_ad(ptr && mtr);
ut_ad(len <= srv_page_size);
log_ptr = mlog_open(mtr, 30);
/* If no logging is requested, we may return now */
if (log_ptr == NULL) {
return;
}
log_ptr = mlog_write_initial_log_record_fast(ptr, MLOG_WRITE_STRING,
log_ptr, mtr);
mach_write_to_2(log_ptr, page_offset(ptr));
log_ptr += 2;
mach_write_to_2(log_ptr, len);
log_ptr += 2;
mlog_close(mtr, log_ptr);
mlog_catenate_string(mtr, ptr, len);
}
/********************************************************//**
Parses a log record written by mlog_write_string.
@return parsed record end, NULL if not a complete record */
byte*
mlog_parse_string(
/*==============*/
byte* ptr, /*!< in: buffer */
byte* end_ptr,/*!< in: buffer end */
byte* page, /*!< in: page where to apply the log record, or NULL */
void* page_zip)/*!< in/out: compressed page, or NULL */
{
ulint offset;
ulint len;
ut_a(!page || !page_zip
|| (fil_page_get_type(page) != FIL_PAGE_INDEX
&& fil_page_get_type(page) != FIL_PAGE_RTREE));
if (end_ptr < ptr + 4) {
return(NULL);
}
offset = mach_read_from_2(ptr);
ptr += 2;
len = mach_read_from_2(ptr);
ptr += 2;
if (offset >= srv_page_size || len + offset > srv_page_size) {
recv_sys.found_corrupt_log = TRUE;
return(NULL);
}
if (end_ptr < ptr + len) {
return(NULL);
}
if (page) {
if (page_zip) {
memcpy(((page_zip_des_t*) page_zip)->data
+ offset, ptr, len);
}
memcpy(page + offset, ptr, len);
}
return(ptr + len);
}
/** Initialize a string of bytes.
@param[in,out] b buffer page
@param[in] ofs byte offset from block->frame
@param[in] len length of the data to write
@param[in] val the data byte to write
@param[in,out] mtr mini-transaction */
void
mlog_memset(buf_block_t* b, ulint ofs, ulint len, byte val, mtr_t* mtr)
{
ut_ad(len);
ut_ad(ofs <= ulint(srv_page_size));
ut_ad(ofs + len <= ulint(srv_page_size));
memset(ofs + b->frame, val, len);
mtr->set_modified();
switch (mtr->get_log_mode()) {
case MTR_LOG_NONE:
case MTR_LOG_NO_REDO:
return;
case MTR_LOG_SHORT_INSERTS:
ut_ad(0);
/* fall through */
case MTR_LOG_ALL:
break;
}
byte* l = mtr->get_log()->open(11 + 2 + 2 + 1);
l = mlog_write_initial_log_record_low(
MLOG_MEMSET, b->page.id.space(), b->page.id.page_no(), l, mtr);
mach_write_to_2(l, ofs);
mach_write_to_2(l + 2, len);
l[4] = val;
mlog_close(mtr, l + 5);
}
/** Initialize a string of bytes.
@param[in,out] byte byte address
@param[in] len length of the data to write
@param[in] val the data byte to write
@param[in,out] mtr mini-transaction */
void mlog_memset(byte* b, ulint len, byte val, mtr_t* mtr)
{
ut_ad(len);
ut_ad(page_offset(b) + len <= ulint(srv_page_size));
memset(b, val, len);
mtr->set_modified();
switch (mtr->get_log_mode()) {
case MTR_LOG_NONE:
case MTR_LOG_NO_REDO:
return;
case MTR_LOG_SHORT_INSERTS:
ut_ad(0);
/* fall through */
case MTR_LOG_ALL:
break;
}
byte* l = mtr->get_log()->open(11 + 2 + 2 + 1);
l = mlog_write_initial_log_record_fast(b, MLOG_MEMSET, l, mtr);
mach_write_to_2(l, page_offset(b));
mach_write_to_2(l + 2, len);
l[4] = val;
mlog_close(mtr, l + 5);
}
/********************************************************//**
Opens a buffer for mlog, writes the initial log record and,
if needed, the field lengths of an index.
@return buffer, NULL if log mode MTR_LOG_NONE */
byte*
mlog_open_and_write_index(
/*======================*/
mtr_t* mtr, /*!< in: mtr */
const byte* rec, /*!< in: index record or page */
const dict_index_t* index, /*!< in: record descriptor */
mlog_id_t type, /*!< in: log item type */
ulint size) /*!< in: requested buffer size in bytes
(if 0, calls mlog_close() and
returns NULL) */
{
byte* log_ptr;
const byte* log_start;
const byte* log_end;
ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
mtr->set_modified();
switch (mtr->get_log_mode()) {
case MTR_LOG_NONE:
case MTR_LOG_NO_REDO:
return NULL;
case MTR_LOG_SHORT_INSERTS:
ut_ad(0);
/* fall through */
case MTR_LOG_ALL:
break;
}
if (!page_rec_is_comp(rec)) {
log_start = log_ptr = mtr->get_log()->open(11 + size);
log_ptr = mlog_write_initial_log_record_fast(rec, type,
log_ptr, mtr);
log_end = log_ptr + 11 + size;
} else {
ulint i;
bool is_instant = index->is_instant();
ulint n = dict_index_get_n_fields(index);
ulint total = 11 + (is_instant ? 2 : 0) + size + (n + 2) * 2;
ulint alloc = std::min(total,
ulint(mtr_buf_t::MAX_DATA_SIZE));
const bool is_leaf = page_is_leaf(page_align(rec));
/* For spatial index, on non-leaf page, we just keep
2 fields, MBR and page no. */
if (!is_leaf && dict_index_is_spatial(index)) {
n = DICT_INDEX_SPATIAL_NODEPTR_SIZE;
}
log_start = log_ptr = mtr->get_log()->open(alloc);
log_end = log_ptr + alloc;
log_ptr = mlog_write_initial_log_record_fast(
rec, type, log_ptr, mtr);
if (is_instant) {
// marked as instant index
mach_write_to_2(log_ptr, n | 0x8000);
log_ptr += 2;
// record the n_core_fields
mach_write_to_2(log_ptr, index->n_core_fields);
} else {
mach_write_to_2(log_ptr, n);
}
log_ptr += 2;
mach_write_to_2(
log_ptr, is_leaf
? dict_index_get_n_unique_in_tree(index)
: dict_index_get_n_unique_in_tree_nonleaf(index));
log_ptr += 2;
for (i = 0; i < n; i++) {
dict_field_t* field;
const dict_col_t* col;
ulint len;
field = dict_index_get_nth_field(index, i);
col = dict_field_get_col(field);
len = field->fixed_len;
ut_ad(len < 0x7fff);
if (len == 0
&& (DATA_BIG_COL(col))) {
/* variable-length field
with maximum length > 255 */
len = 0x7fff;
}
if (col->prtype & DATA_NOT_NULL) {
len |= 0x8000;
}
if (log_ptr + 2 > log_end) {
mlog_close(mtr, log_ptr);
ut_a(total > ulint(log_ptr - log_start));
total -= ulint(log_ptr - log_start);
alloc = std::min(
total,
ulint(mtr_buf_t::MAX_DATA_SIZE));
log_start = log_ptr = mtr->get_log()->open(
alloc);
log_end = log_ptr + alloc;
}
mach_write_to_2(log_ptr, len);
log_ptr += 2;
}
}
if (size == 0) {
mlog_close(mtr, log_ptr);
log_ptr = NULL;
} else if (log_ptr + size > log_end) {
mlog_close(mtr, log_ptr);
log_ptr = mlog_open(mtr, size);
}
return(log_ptr);
}
/********************************************************//**
Parses a log record written by mlog_open_and_write_index.
@return parsed record end, NULL if not a complete record */
byte*
mlog_parse_index(
/*=============*/
byte* ptr, /*!< in: buffer */
const byte* end_ptr,/*!< in: buffer end */
ibool comp, /*!< in: TRUE=compact row format */
dict_index_t** index) /*!< out, own: dummy index */
{
ulint i, n, n_uniq;
dict_table_t* table;
dict_index_t* ind;
ulint n_core_fields = 0;
ut_ad(comp == FALSE || comp == TRUE);
if (comp) {
if (end_ptr < ptr + 4) {
return(NULL);
}
n = mach_read_from_2(ptr);
ptr += 2;
if (n & 0x8000) { /* record after instant ADD COLUMN */
n &= 0x7FFF;
n_core_fields = mach_read_from_2(ptr);
if (!n_core_fields || n_core_fields > n) {
recv_sys.found_corrupt_log = TRUE;
return(NULL);
}
ptr += 2;
if (end_ptr < ptr + 2) {
return(NULL);
}
}
n_uniq = mach_read_from_2(ptr);
ptr += 2;
ut_ad(n_uniq <= n);
if (end_ptr < ptr + n * 2) {
return(NULL);
}
} else {
n = n_uniq = 1;
}
table = dict_mem_table_create("LOG_DUMMY", NULL, n, 0,
comp ? DICT_TF_COMPACT : 0, 0);
ind = dict_mem_index_create(table, "LOG_DUMMY", 0, n);
ind->n_uniq = (unsigned int) n_uniq;
if (n_uniq != n) {
ut_a(n_uniq + DATA_ROLL_PTR <= n);
ind->type = DICT_CLUSTERED;
}
if (comp) {
for (i = 0; i < n; i++) {
ulint len = mach_read_from_2(ptr);
ptr += 2;
/* The high-order bit of len is the NOT NULL flag;
the rest is 0 or 0x7fff for variable-length fields,
and 1..0x7ffe for fixed-length fields. */
dict_mem_table_add_col(
table, NULL, NULL,
((len + 1) & 0x7fff) <= 1
? DATA_BINARY : DATA_FIXBINARY,
len & 0x8000 ? DATA_NOT_NULL : 0,
len & 0x7fff);
dict_index_add_col(ind, table,
dict_table_get_nth_col(table, i),
0);
}
dict_table_add_system_columns(table, table->heap);
if (n_uniq != n) {
/* Identify DB_TRX_ID and DB_ROLL_PTR in the index. */
ut_a(DATA_TRX_ID_LEN
== dict_index_get_nth_col(ind, DATA_TRX_ID - 1
+ n_uniq)->len);
ut_a(DATA_ROLL_PTR_LEN
== dict_index_get_nth_col(ind, DATA_ROLL_PTR - 1
+ n_uniq)->len);
ind->fields[DATA_TRX_ID - 1 + n_uniq].col
= &table->cols[n + DATA_TRX_ID];
ind->fields[DATA_ROLL_PTR - 1 + n_uniq].col
= &table->cols[n + DATA_ROLL_PTR];
}
ut_ad(table->n_cols == table->n_def);
if (n_core_fields) {
for (i = n_core_fields; i < n; i++) {
ind->fields[i].col->def_val.len
= UNIV_SQL_NULL;
}
ind->n_core_fields = n_core_fields;
ind->n_core_null_bytes = UT_BITS_IN_BYTES(
ind->get_n_nullable(n_core_fields));
} else {
ind->n_core_null_bytes = UT_BITS_IN_BYTES(
unsigned(ind->n_nullable));
ind->n_core_fields = ind->n_fields;
}
}
/* avoid ut_ad(index->cached) in dict_index_get_n_unique_in_tree */
ind->cached = TRUE;
ut_d(ind->is_dummy = true);
*index = ind;
return(ptr);
}