mariadb/storage/xtradb/page/page0cur.c
2012-06-15 14:54:23 +02:00

2004 lines
54 KiB
C

/*****************************************************************************
Copyright (c) 1994, 2012, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
*****************************************************************************/
/********************************************************************//**
@file page/page0cur.c
The page cursor
Created 10/4/1994 Heikki Tuuri
*************************************************************************/
#include "page0cur.h"
#ifdef UNIV_NONINL
#include "page0cur.ic"
#endif
#include "page0zip.h"
#include "mtr0log.h"
#include "log0recv.h"
#include "ut0ut.h"
#ifndef UNIV_HOTBACKUP
#include "rem0cmp.h"
#ifdef PAGE_CUR_ADAPT
# ifdef UNIV_SEARCH_PERF_STAT
static ulint page_cur_short_succ = 0;
# endif /* UNIV_SEARCH_PERF_STAT */
/*******************************************************************//**
This is a linear congruential generator PRNG. Returns a pseudo random
number between 0 and 2^64-1 inclusive. The formula and the constants
being used are:
X[n+1] = (a * X[n] + c) mod m
where:
X[0] = ut_time_us(NULL)
a = 1103515245 (3^5 * 5 * 7 * 129749)
c = 12345 (3 * 5 * 823)
m = 18446744073709551616 (2^64)
@return number between 0 and 2^64-1 */
static
ib_uint64_t
page_cur_lcg_prng(void)
/*===================*/
{
#define LCG_a 1103515245
#define LCG_c 12345
static ib_uint64_t lcg_current = 0;
static ibool initialized = FALSE;
if (!initialized) {
lcg_current = (ib_uint64_t) ut_time_us(NULL);
initialized = TRUE;
}
/* no need to "% 2^64" explicitly because lcg_current is
64 bit and this will be done anyway */
lcg_current = LCG_a * lcg_current + LCG_c;
return(lcg_current);
}
/****************************************************************//**
Tries a search shortcut based on the last insert.
@return TRUE on success */
UNIV_INLINE
ibool
page_cur_try_search_shortcut(
/*=========================*/
const buf_block_t* block, /*!< in: index page */
const dict_index_t* index, /*!< in: record descriptor */
const dtuple_t* tuple, /*!< in: data tuple */
ulint* iup_matched_fields,
/*!< in/out: already matched
fields in upper limit record */
ulint* iup_matched_bytes,
/*!< in/out: already matched
bytes in a field not yet
completely matched */
ulint* ilow_matched_fields,
/*!< in/out: already matched
fields in lower limit record */
ulint* ilow_matched_bytes,
/*!< in/out: already matched
bytes in a field not yet
completely matched */
page_cur_t* cursor) /*!< out: page cursor */
{
const rec_t* rec;
const rec_t* next_rec;
ulint low_match;
ulint low_bytes;
ulint up_match;
ulint up_bytes;
#ifdef UNIV_SEARCH_DEBUG
page_cur_t cursor2;
#endif
ibool success = FALSE;
const page_t* page = buf_block_get_frame(block);
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
rec_offs_init(offsets_);
ut_ad(dtuple_check_typed(tuple));
rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
offsets = rec_get_offsets(rec, index, offsets,
dtuple_get_n_fields(tuple), &heap);
ut_ad(rec);
ut_ad(page_rec_is_user_rec(rec));
ut_pair_min(&low_match, &low_bytes,
*ilow_matched_fields, *ilow_matched_bytes,
*iup_matched_fields, *iup_matched_bytes);
up_match = low_match;
up_bytes = low_bytes;
if (page_cmp_dtuple_rec_with_match(tuple, rec, offsets,
&low_match, &low_bytes) < 0) {
goto exit_func;
}
next_rec = page_rec_get_next_const(rec);
offsets = rec_get_offsets(next_rec, index, offsets,
dtuple_get_n_fields(tuple), &heap);
if (page_cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
&up_match, &up_bytes) >= 0) {
goto exit_func;
}
page_cur_position(rec, block, cursor);
#ifdef UNIV_SEARCH_DEBUG
page_cur_search_with_match(block, index, tuple, PAGE_CUR_DBG,
iup_matched_fields,
iup_matched_bytes,
ilow_matched_fields,
ilow_matched_bytes,
&cursor2);
ut_a(cursor2.rec == cursor->rec);
if (!page_rec_is_supremum(next_rec)) {
ut_a(*iup_matched_fields == up_match);
ut_a(*iup_matched_bytes == up_bytes);
}
ut_a(*ilow_matched_fields == low_match);
ut_a(*ilow_matched_bytes == low_bytes);
#endif
if (!page_rec_is_supremum(next_rec)) {
*iup_matched_fields = up_match;
*iup_matched_bytes = up_bytes;
}
*ilow_matched_fields = low_match;
*ilow_matched_bytes = low_bytes;
#ifdef UNIV_SEARCH_PERF_STAT
page_cur_short_succ++;
#endif
success = TRUE;
exit_func:
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return(success);
}
#endif
#ifdef PAGE_CUR_LE_OR_EXTENDS
/****************************************************************//**
Checks if the nth field in a record is a character type field which extends
the nth field in tuple, i.e., the field is longer or equal in length and has
common first characters.
@return TRUE if rec field extends tuple field */
static
ibool
page_cur_rec_field_extends(
/*=======================*/
const dtuple_t* tuple, /*!< in: data tuple */
const rec_t* rec, /*!< in: record */
const ulint* offsets,/*!< in: array returned by rec_get_offsets() */
ulint n) /*!< in: compare nth field */
{
const dtype_t* type;
const dfield_t* dfield;
const byte* rec_f;
ulint rec_f_len;
ut_ad(rec_offs_validate(rec, NULL, offsets));
dfield = dtuple_get_nth_field(tuple, n);
type = dfield_get_type(dfield);
rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
if (type->mtype == DATA_VARCHAR
|| type->mtype == DATA_CHAR
|| type->mtype == DATA_FIXBINARY
|| type->mtype == DATA_BINARY
|| type->mtype == DATA_BLOB
|| type->mtype == DATA_VARMYSQL
|| type->mtype == DATA_MYSQL) {
if (dfield_get_len(dfield) != UNIV_SQL_NULL
&& rec_f_len != UNIV_SQL_NULL
&& rec_f_len >= dfield_get_len(dfield)
&& !cmp_data_data_slow(type->mtype, type->prtype,
dfield_get_data(dfield),
dfield_get_len(dfield),
rec_f, dfield_get_len(dfield))) {
return(TRUE);
}
}
return(FALSE);
}
#endif /* PAGE_CUR_LE_OR_EXTENDS */
/****************************************************************//**
Searches the right position for a page cursor. */
UNIV_INTERN
void
page_cur_search_with_match(
/*=======================*/
const buf_block_t* block, /*!< in: buffer block */
const dict_index_t* index, /*!< in: record descriptor */
const dtuple_t* tuple, /*!< in: data tuple */
ulint mode, /*!< in: PAGE_CUR_L,
PAGE_CUR_LE, PAGE_CUR_G, or
PAGE_CUR_GE */
ulint* iup_matched_fields,
/*!< in/out: already matched
fields in upper limit record */
ulint* iup_matched_bytes,
/*!< in/out: already matched
bytes in a field not yet
completely matched */
ulint* ilow_matched_fields,
/*!< in/out: already matched
fields in lower limit record */
ulint* ilow_matched_bytes,
/*!< in/out: already matched
bytes in a field not yet
completely matched */
page_cur_t* cursor) /*!< out: page cursor */
{
ulint up;
ulint low;
ulint mid;
const page_t* page;
const page_dir_slot_t* slot;
const rec_t* up_rec;
const rec_t* low_rec;
const rec_t* mid_rec;
ulint up_matched_fields;
ulint up_matched_bytes;
ulint low_matched_fields;
ulint low_matched_bytes;
ulint cur_matched_fields;
ulint cur_matched_bytes;
int cmp;
#ifdef UNIV_SEARCH_DEBUG
int dbg_cmp;
ulint dbg_matched_fields;
ulint dbg_matched_bytes;
#endif
#ifdef UNIV_ZIP_DEBUG
const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
#endif /* UNIV_ZIP_DEBUG */
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
rec_offs_init(offsets_);
ut_ad(block && tuple && iup_matched_fields && iup_matched_bytes
&& ilow_matched_fields && ilow_matched_bytes && cursor);
ut_ad(dtuple_validate(tuple));
#ifdef UNIV_DEBUG
# ifdef PAGE_CUR_DBG
if (mode != PAGE_CUR_DBG)
# endif /* PAGE_CUR_DBG */
# ifdef PAGE_CUR_LE_OR_EXTENDS
if (mode != PAGE_CUR_LE_OR_EXTENDS)
# endif /* PAGE_CUR_LE_OR_EXTENDS */
ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
|| mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
#endif /* UNIV_DEBUG */
page = buf_block_get_frame(block);
#ifdef UNIV_ZIP_DEBUG
ut_a(!page_zip || page_zip_validate(page_zip, page));
#endif /* UNIV_ZIP_DEBUG */
page_check_dir(page);
#ifdef PAGE_CUR_ADAPT
if (page_is_leaf(page)
&& (mode == PAGE_CUR_LE)
&& (page_header_get_field(page, PAGE_N_DIRECTION) > 3)
&& (page_header_get_ptr(page, PAGE_LAST_INSERT))
&& (page_header_get_field(page, PAGE_DIRECTION) == PAGE_RIGHT)) {
if (page_cur_try_search_shortcut(
block, index, tuple,
iup_matched_fields, iup_matched_bytes,
ilow_matched_fields, ilow_matched_bytes,
cursor)) {
return;
}
}
# ifdef PAGE_CUR_DBG
if (mode == PAGE_CUR_DBG) {
mode = PAGE_CUR_LE;
}
# endif
#endif
/* The following flag does not work for non-latin1 char sets because
cmp_full_field does not tell how many bytes matched */
#ifdef PAGE_CUR_LE_OR_EXTENDS
ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
#endif /* PAGE_CUR_LE_OR_EXTENDS */
/* If mode PAGE_CUR_G is specified, we are trying to position the
cursor to answer a query of the form "tuple < X", where tuple is
the input parameter, and X denotes an arbitrary physical record on
the page. We want to position the cursor on the first X which
satisfies the condition. */
up_matched_fields = *iup_matched_fields;
up_matched_bytes = *iup_matched_bytes;
low_matched_fields = *ilow_matched_fields;
low_matched_bytes = *ilow_matched_bytes;
/* Perform binary search. First the search is done through the page
directory, after that as a linear search in the list of records
owned by the upper limit directory slot. */
low = 0;
up = page_dir_get_n_slots(page) - 1;
/* Perform binary search until the lower and upper limit directory
slots come to the distance 1 of each other */
while (up - low > 1) {
mid = (low + up) / 2;
slot = page_dir_get_nth_slot(page, mid);
mid_rec = page_dir_slot_get_rec(slot);
ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
low_matched_fields, low_matched_bytes,
up_matched_fields, up_matched_bytes);
offsets = rec_get_offsets(mid_rec, index, offsets,
dtuple_get_n_fields_cmp(tuple),
&heap);
cmp = cmp_dtuple_rec_with_match(tuple, mid_rec, offsets,
&cur_matched_fields,
&cur_matched_bytes);
if (UNIV_LIKELY(cmp > 0)) {
low_slot_match:
low = mid;
low_matched_fields = cur_matched_fields;
low_matched_bytes = cur_matched_bytes;
} else if (UNIV_EXPECT(cmp, -1)) {
#ifdef PAGE_CUR_LE_OR_EXTENDS
if (mode == PAGE_CUR_LE_OR_EXTENDS
&& page_cur_rec_field_extends(
tuple, mid_rec, offsets,
cur_matched_fields)) {
goto low_slot_match;
}
#endif /* PAGE_CUR_LE_OR_EXTENDS */
up_slot_match:
up = mid;
up_matched_fields = cur_matched_fields;
up_matched_bytes = cur_matched_bytes;
} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
#ifdef PAGE_CUR_LE_OR_EXTENDS
|| mode == PAGE_CUR_LE_OR_EXTENDS
#endif /* PAGE_CUR_LE_OR_EXTENDS */
) {
goto low_slot_match;
} else {
goto up_slot_match;
}
}
slot = page_dir_get_nth_slot(page, low);
low_rec = page_dir_slot_get_rec(slot);
slot = page_dir_get_nth_slot(page, up);
up_rec = page_dir_slot_get_rec(slot);
/* Perform linear search until the upper and lower records come to
distance 1 of each other. */
while (page_rec_get_next_const(low_rec) != up_rec) {
mid_rec = page_rec_get_next_const(low_rec);
ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
low_matched_fields, low_matched_bytes,
up_matched_fields, up_matched_bytes);
offsets = rec_get_offsets(mid_rec, index, offsets,
dtuple_get_n_fields_cmp(tuple),
&heap);
cmp = cmp_dtuple_rec_with_match(tuple, mid_rec, offsets,
&cur_matched_fields,
&cur_matched_bytes);
if (UNIV_LIKELY(cmp > 0)) {
low_rec_match:
low_rec = mid_rec;
low_matched_fields = cur_matched_fields;
low_matched_bytes = cur_matched_bytes;
} else if (UNIV_EXPECT(cmp, -1)) {
#ifdef PAGE_CUR_LE_OR_EXTENDS
if (mode == PAGE_CUR_LE_OR_EXTENDS
&& page_cur_rec_field_extends(
tuple, mid_rec, offsets,
cur_matched_fields)) {
goto low_rec_match;
}
#endif /* PAGE_CUR_LE_OR_EXTENDS */
up_rec_match:
up_rec = mid_rec;
up_matched_fields = cur_matched_fields;
up_matched_bytes = cur_matched_bytes;
} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
#ifdef PAGE_CUR_LE_OR_EXTENDS
|| mode == PAGE_CUR_LE_OR_EXTENDS
#endif /* PAGE_CUR_LE_OR_EXTENDS */
) {
goto low_rec_match;
} else {
goto up_rec_match;
}
}
#ifdef UNIV_SEARCH_DEBUG
/* Check that the lower and upper limit records have the
right alphabetical order compared to tuple. */
dbg_matched_fields = 0;
dbg_matched_bytes = 0;
offsets = rec_get_offsets(low_rec, index, offsets,
ULINT_UNDEFINED, &heap);
dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, low_rec, offsets,
&dbg_matched_fields,
&dbg_matched_bytes);
if (mode == PAGE_CUR_G) {
ut_a(dbg_cmp >= 0);
} else if (mode == PAGE_CUR_GE) {
ut_a(dbg_cmp == 1);
} else if (mode == PAGE_CUR_L) {
ut_a(dbg_cmp == 1);
} else if (mode == PAGE_CUR_LE) {
ut_a(dbg_cmp >= 0);
}
if (!page_rec_is_infimum(low_rec)) {
ut_a(low_matched_fields == dbg_matched_fields);
ut_a(low_matched_bytes == dbg_matched_bytes);
}
dbg_matched_fields = 0;
dbg_matched_bytes = 0;
offsets = rec_get_offsets(up_rec, index, offsets,
ULINT_UNDEFINED, &heap);
dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, up_rec, offsets,
&dbg_matched_fields,
&dbg_matched_bytes);
if (mode == PAGE_CUR_G) {
ut_a(dbg_cmp == -1);
} else if (mode == PAGE_CUR_GE) {
ut_a(dbg_cmp <= 0);
} else if (mode == PAGE_CUR_L) {
ut_a(dbg_cmp <= 0);
} else if (mode == PAGE_CUR_LE) {
ut_a(dbg_cmp == -1);
}
if (!page_rec_is_supremum(up_rec)) {
ut_a(up_matched_fields == dbg_matched_fields);
ut_a(up_matched_bytes == dbg_matched_bytes);
}
#endif
if (mode <= PAGE_CUR_GE) {
page_cur_position(up_rec, block, cursor);
} else {
page_cur_position(low_rec, block, cursor);
}
*iup_matched_fields = up_matched_fields;
*iup_matched_bytes = up_matched_bytes;
*ilow_matched_fields = low_matched_fields;
*ilow_matched_bytes = low_matched_bytes;
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
}
/***********************************************************//**
Positions a page cursor on a randomly chosen user record on a page. If there
are no user records, sets the cursor on the infimum record. */
UNIV_INTERN
void
page_cur_open_on_rnd_user_rec(
/*==========================*/
buf_block_t* block, /*!< in: page */
page_cur_t* cursor) /*!< out: page cursor */
{
ulint rnd;
ulint n_recs = page_get_n_recs(buf_block_get_frame(block));
page_cur_set_before_first(block, cursor);
if (UNIV_UNLIKELY(n_recs == 0)) {
return;
}
rnd = (ulint) (page_cur_lcg_prng() % n_recs);
do {
page_cur_move_to_next(cursor);
} while (rnd--);
}
/***********************************************************//**
Writes the log record of a record insert on a page. */
static
void
page_cur_insert_rec_write_log(
/*==========================*/
rec_t* insert_rec, /*!< in: inserted physical record */
ulint rec_size, /*!< in: insert_rec size */
rec_t* cursor_rec, /*!< in: record the
cursor is pointing to */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mini-transaction handle */
{
ulint cur_rec_size;
ulint extra_size;
ulint cur_extra_size;
const byte* ins_ptr;
byte* log_ptr;
const byte* log_end;
ulint i;
ut_a(rec_size < UNIV_PAGE_SIZE);
ut_ad(page_align(insert_rec) == page_align(cursor_rec));
ut_ad(!page_rec_is_comp(insert_rec)
== !dict_table_is_comp(index->table));
{
mem_heap_t* heap = NULL;
ulint cur_offs_[REC_OFFS_NORMAL_SIZE];
ulint ins_offs_[REC_OFFS_NORMAL_SIZE];
ulint* cur_offs;
ulint* ins_offs;
rec_offs_init(cur_offs_);
rec_offs_init(ins_offs_);
cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
ULINT_UNDEFINED, &heap);
ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
ULINT_UNDEFINED, &heap);
extra_size = rec_offs_extra_size(ins_offs);
cur_extra_size = rec_offs_extra_size(cur_offs);
ut_ad(rec_size == rec_offs_size(ins_offs));
cur_rec_size = rec_offs_size(cur_offs);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
}
ins_ptr = insert_rec - extra_size;
i = 0;
if (cur_extra_size == extra_size) {
ulint min_rec_size = ut_min(cur_rec_size, rec_size);
const byte* cur_ptr = cursor_rec - cur_extra_size;
/* Find out the first byte in insert_rec which differs from
cursor_rec; skip the bytes in the record info */
do {
if (*ins_ptr == *cur_ptr) {
i++;
ins_ptr++;
cur_ptr++;
} else if ((i < extra_size)
&& (i >= extra_size
- page_rec_get_base_extra_size
(insert_rec))) {
i = extra_size;
ins_ptr = insert_rec;
cur_ptr = cursor_rec;
} else {
break;
}
} while (i < min_rec_size);
}
if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
if (page_rec_is_comp(insert_rec)) {
log_ptr = mlog_open_and_write_index(
mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
if (UNIV_UNLIKELY(!log_ptr)) {
/* Logging in mtr is switched off
during crash recovery: in that case
mlog_open returns NULL */
return;
}
} else {
log_ptr = mlog_open(mtr, 11
+ 2 + 5 + 1 + 5 + 5
+ MLOG_BUF_MARGIN);
if (UNIV_UNLIKELY(!log_ptr)) {
/* Logging in mtr is switched off
during crash recovery: in that case
mlog_open returns NULL */
return;
}
log_ptr = mlog_write_initial_log_record_fast(
insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
}
log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
/* Write the cursor rec offset as a 2-byte ulint */
mach_write_to_2(log_ptr, page_offset(cursor_rec));
log_ptr += 2;
} else {
log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
if (!log_ptr) {
/* Logging in mtr is switched off during crash
recovery: in that case mlog_open returns NULL */
return;
}
log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
}
if (page_rec_is_comp(insert_rec)) {
if (UNIV_UNLIKELY
(rec_get_info_and_status_bits(insert_rec, TRUE)
!= rec_get_info_and_status_bits(cursor_rec, TRUE))) {
goto need_extra_info;
}
} else {
if (UNIV_UNLIKELY
(rec_get_info_and_status_bits(insert_rec, FALSE)
!= rec_get_info_and_status_bits(cursor_rec, FALSE))) {
goto need_extra_info;
}
}
if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
need_extra_info:
/* Write the record end segment length
and the extra info storage flag */
log_ptr += mach_write_compressed(log_ptr,
2 * (rec_size - i) + 1);
/* Write the info bits */
mach_write_to_1(log_ptr,
rec_get_info_and_status_bits(
insert_rec,
page_rec_is_comp(insert_rec)));
log_ptr++;
/* Write the record origin offset */
log_ptr += mach_write_compressed(log_ptr, extra_size);
/* Write the mismatch index */
log_ptr += mach_write_compressed(log_ptr, i);
ut_a(i < UNIV_PAGE_SIZE);
ut_a(extra_size < UNIV_PAGE_SIZE);
} else {
/* Write the record end segment length
and the extra info storage flag */
log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
}
/* Write to the log the inserted index record end segment which
differs from the cursor record */
rec_size -= i;
if (log_ptr + rec_size <= log_end) {
memcpy(log_ptr, ins_ptr, rec_size);
mlog_close(mtr, log_ptr + rec_size);
} else {
mlog_close(mtr, log_ptr);
ut_a(rec_size < UNIV_PAGE_SIZE);
mlog_catenate_string(mtr, ins_ptr, rec_size);
}
}
#else /* !UNIV_HOTBACKUP */
# define page_cur_insert_rec_write_log(ins_rec,size,cur,index,mtr) ((void) 0)
#endif /* !UNIV_HOTBACKUP */
/***********************************************************//**
Parses a log record of a record insert on a page.
@return end of log record or NULL */
UNIV_INTERN
byte*
page_cur_parse_insert_rec(
/*======================*/
ibool is_short,/*!< in: TRUE if short inserts */
byte* ptr, /*!< in: buffer */
byte* end_ptr,/*!< in: buffer end */
buf_block_t* block, /*!< in: page or NULL */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mtr or NULL */
{
ulint origin_offset;
ulint end_seg_len;
ulint mismatch_index;
page_t* page;
rec_t* cursor_rec;
byte buf1[1024];
byte* buf;
byte* ptr2 = ptr;
ulint info_and_status_bits = 0; /* remove warning */
page_cur_t cursor;
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
rec_offs_init(offsets_);
page = block ? buf_block_get_frame(block) : NULL;
if (is_short) {
cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
} else {
ulint offset;
/* Read the cursor rec offset as a 2-byte ulint */
if (UNIV_UNLIKELY(end_ptr < ptr + 2)) {
return(NULL);
}
offset = mach_read_from_2(ptr);
ptr += 2;
cursor_rec = page + offset;
if (UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)) {
recv_sys->found_corrupt_log = TRUE;
return(NULL);
}
}
ptr = mach_parse_compressed(ptr, end_ptr, &end_seg_len);
if (ptr == NULL) {
return(NULL);
}
if (UNIV_UNLIKELY(end_seg_len >= UNIV_PAGE_SIZE << 1)) {
recv_sys->found_corrupt_log = TRUE;
return(NULL);
}
if (end_seg_len & 0x1UL) {
/* Read the info bits */
if (end_ptr < ptr + 1) {
return(NULL);
}
info_and_status_bits = mach_read_from_1(ptr);
ptr++;
ptr = mach_parse_compressed(ptr, end_ptr, &origin_offset);
if (ptr == NULL) {
return(NULL);
}
ut_a(origin_offset < UNIV_PAGE_SIZE);
ptr = mach_parse_compressed(ptr, end_ptr, &mismatch_index);
if (ptr == NULL) {
return(NULL);
}
ut_a(mismatch_index < UNIV_PAGE_SIZE);
}
if (UNIV_UNLIKELY(end_ptr < ptr + (end_seg_len >> 1))) {
return(NULL);
}
if (!block) {
return(ptr + (end_seg_len >> 1));
}
ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
/* Read from the log the inserted index record end segment which
differs from the cursor record */
offsets = rec_get_offsets(cursor_rec, index, offsets,
ULINT_UNDEFINED, &heap);
if (!(end_seg_len & 0x1UL)) {
info_and_status_bits = rec_get_info_and_status_bits(
cursor_rec, page_is_comp(page));
origin_offset = rec_offs_extra_size(offsets);
mismatch_index = rec_offs_size(offsets) - (end_seg_len >> 1);
}
end_seg_len >>= 1;
if (mismatch_index + end_seg_len < sizeof buf1) {
buf = buf1;
} else {
buf = mem_alloc(mismatch_index + end_seg_len);
}
/* Build the inserted record to buf */
if (UNIV_UNLIKELY(mismatch_index >= UNIV_PAGE_SIZE)) {
fprintf(stderr,
"Is short %lu, info_and_status_bits %lu, offset %lu, "
"o_offset %lu\n"
"mismatch index %lu, end_seg_len %lu\n"
"parsed len %lu\n",
(ulong) is_short, (ulong) info_and_status_bits,
(ulong) page_offset(cursor_rec),
(ulong) origin_offset,
(ulong) mismatch_index, (ulong) end_seg_len,
(ulong) (ptr - ptr2));
fputs("Dump of 300 bytes of log:\n", stderr);
ut_print_buf(stderr, ptr2, 300);
putc('\n', stderr);
buf_page_print(page, 0, 0);
ut_error;
}
ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
if (page_is_comp(page)) {
rec_set_info_and_status_bits(buf + origin_offset,
info_and_status_bits);
} else {
rec_set_info_bits_old(buf + origin_offset,
info_and_status_bits);
}
page_cur_position(cursor_rec, block, &cursor);
offsets = rec_get_offsets(buf + origin_offset, index, offsets,
ULINT_UNDEFINED, &heap);
if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor,
buf + origin_offset,
index, offsets, mtr))) {
/* The redo log record should only have been written
after the write was successful. */
ut_error;
}
if (buf != buf1) {
mem_free(buf);
}
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return(ptr + end_seg_len);
}
/***********************************************************//**
Inserts a record next to page cursor on an uncompressed page.
Returns pointer to inserted record if succeed, i.e., enough
space available, NULL otherwise. The cursor stays at the same position.
@return pointer to record if succeed, NULL otherwise */
UNIV_INTERN
rec_t*
page_cur_insert_rec_low(
/*====================*/
rec_t* current_rec,/*!< in: pointer to current record after
which the new record is inserted */
dict_index_t* index, /*!< in: record descriptor */
const rec_t* rec, /*!< in: pointer to a physical record */
ulint* offsets,/*!< in/out: rec_get_offsets(rec, index) */
mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */
{
byte* insert_buf;
ulint rec_size;
page_t* page; /*!< the relevant page */
rec_t* last_insert; /*!< cursor position at previous
insert */
rec_t* free_rec; /*!< a free record that was reused,
or NULL */
rec_t* insert_rec; /*!< inserted record */
ulint heap_no; /*!< heap number of the inserted
record */
ut_ad(rec_offs_validate(rec, index, offsets));
page = page_align(current_rec);
ut_ad(dict_table_is_comp(index->table)
== (ibool) !!page_is_comp(page));
ut_ad(!page_rec_is_supremum(current_rec));
/* 1. Get the size of the physical record in the page */
rec_size = rec_offs_size(offsets);
#ifdef UNIV_DEBUG_VALGRIND
{
const void* rec_start
= rec - rec_offs_extra_size(offsets);
ulint extra_size
= rec_offs_extra_size(offsets)
- (rec_offs_comp(offsets)
? REC_N_NEW_EXTRA_BYTES
: REC_N_OLD_EXTRA_BYTES);
/* All data bytes of the record must be valid. */
UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
/* The variable-length header must be valid. */
UNIV_MEM_ASSERT_RW(rec_start, extra_size);
}
#endif /* UNIV_DEBUG_VALGRIND */
/* 2. Try to find suitable space from page memory management */
free_rec = page_header_get_ptr(page, PAGE_FREE);
if (UNIV_LIKELY_NULL(free_rec)) {
/* Try to allocate from the head of the free list. */
ulint foffsets_[REC_OFFS_NORMAL_SIZE];
ulint* foffsets = foffsets_;
mem_heap_t* heap = NULL;
rec_offs_init(foffsets_);
foffsets = rec_get_offsets(free_rec, index, foffsets,
ULINT_UNDEFINED, &heap);
if (rec_offs_size(foffsets) < rec_size) {
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
goto use_heap;
}
insert_buf = free_rec - rec_offs_extra_size(foffsets);
if (page_is_comp(page)) {
heap_no = rec_get_heap_no_new(free_rec);
page_mem_alloc_free(page, NULL,
rec_get_next_ptr(free_rec, TRUE),
rec_size);
} else {
heap_no = rec_get_heap_no_old(free_rec);
page_mem_alloc_free(page, NULL,
rec_get_next_ptr(free_rec, FALSE),
rec_size);
}
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
} else {
use_heap:
free_rec = NULL;
insert_buf = page_mem_alloc_heap(page, NULL,
rec_size, &heap_no);
if (UNIV_UNLIKELY(insert_buf == NULL)) {
return(NULL);
}
}
/* 3. Create the record */
insert_rec = rec_copy(insert_buf, rec, offsets);
rec_offs_make_valid(insert_rec, index, offsets);
/* 4. Insert the record in the linked list of records */
ut_ad(current_rec != insert_rec);
{
/* next record after current before the insertion */
rec_t* next_rec = page_rec_get_next(current_rec);
#ifdef UNIV_DEBUG
if (page_is_comp(page)) {
ut_ad(rec_get_status(current_rec)
<= REC_STATUS_INFIMUM);
ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
}
#endif
page_rec_set_next(insert_rec, next_rec);
page_rec_set_next(current_rec, insert_rec);
}
page_header_set_field(page, NULL, PAGE_N_RECS,
1 + page_get_n_recs(page));
/* 5. Set the n_owned field in the inserted record to zero,
and set the heap_no field */
if (page_is_comp(page)) {
rec_set_n_owned_new(insert_rec, NULL, 0);
rec_set_heap_no_new(insert_rec, heap_no);
} else {
rec_set_n_owned_old(insert_rec, 0);
rec_set_heap_no_old(insert_rec, heap_no);
}
UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
rec_offs_size(offsets));
/* 6. Update the last insertion info in page header */
last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
ut_ad(!last_insert || !page_is_comp(page)
|| rec_get_node_ptr_flag(last_insert)
== rec_get_node_ptr_flag(insert_rec));
if (UNIV_UNLIKELY(last_insert == NULL)) {
page_header_set_field(page, NULL, PAGE_DIRECTION,
PAGE_NO_DIRECTION);
page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
} else if ((last_insert == current_rec)
&& (page_header_get_field(page, PAGE_DIRECTION)
!= PAGE_LEFT)) {
page_header_set_field(page, NULL, PAGE_DIRECTION,
PAGE_RIGHT);
page_header_set_field(page, NULL, PAGE_N_DIRECTION,
page_header_get_field(
page, PAGE_N_DIRECTION) + 1);
} else if ((page_rec_get_next(insert_rec) == last_insert)
&& (page_header_get_field(page, PAGE_DIRECTION)
!= PAGE_RIGHT)) {
page_header_set_field(page, NULL, PAGE_DIRECTION,
PAGE_LEFT);
page_header_set_field(page, NULL, PAGE_N_DIRECTION,
page_header_get_field(
page, PAGE_N_DIRECTION) + 1);
} else {
page_header_set_field(page, NULL, PAGE_DIRECTION,
PAGE_NO_DIRECTION);
page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
}
page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
/* 7. It remains to update the owner record. */
{
rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
ulint n_owned;
if (page_is_comp(page)) {
n_owned = rec_get_n_owned_new(owner_rec);
rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
} else {
n_owned = rec_get_n_owned_old(owner_rec);
rec_set_n_owned_old(owner_rec, n_owned + 1);
}
/* 8. Now we have incremented the n_owned field of the owner
record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
we have to split the corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
page_dir_split_slot(
page, NULL,
page_dir_find_owner_slot(owner_rec));
}
}
/* 9. Write log record of the insert */
if (UNIV_LIKELY(mtr != NULL)) {
page_cur_insert_rec_write_log(insert_rec, rec_size,
current_rec, index, mtr);
}
btr_blob_dbg_add_rec(insert_rec, index, offsets, "insert");
return(insert_rec);
}
/***********************************************************//**
Compresses or reorganizes a page after an optimistic insert.
@return rec if succeed, NULL otherwise */
static
rec_t*
page_cur_insert_rec_zip_reorg(
/*==========================*/
rec_t** current_rec,/*!< in/out: pointer to current record after
which the new record is inserted */
buf_block_t* block, /*!< in: buffer block */
dict_index_t* index, /*!< in: record descriptor */
rec_t* rec, /*!< in: inserted record */
page_t* page, /*!< in: uncompressed page */
page_zip_des_t* page_zip,/*!< in: compressed page */
mtr_t* mtr) /*!< in: mini-transaction, or NULL */
{
ulint pos;
/* Recompress or reorganize and recompress the page. */
if (UNIV_LIKELY(page_zip_compress(page_zip, page, index, mtr))) {
return(rec);
}
/* Before trying to reorganize the page,
store the number of preceding records on the page. */
pos = page_rec_get_n_recs_before(rec);
ut_ad(pos > 0);
if (page_zip_reorganize(block, index, mtr)) {
/* The page was reorganized: Find rec by seeking to pos,
and update *current_rec. */
if (pos > 1) {
rec = page_rec_get_nth(page, pos - 1);
} else {
rec = page + PAGE_NEW_INFIMUM;
}
*current_rec = rec;
rec = page + rec_get_next_offs(rec, TRUE);
return(rec);
}
/* Out of space: restore the page */
btr_blob_dbg_remove(page, index, "insert_zip_fail");
if (!page_zip_decompress(page_zip, page, FALSE)) {
ut_error; /* Memory corrupted? */
}
ut_ad(page_validate(page, index));
btr_blob_dbg_add(page, index, "insert_zip_fail");
return(NULL);
}
/***********************************************************//**
Inserts a record next to page cursor on a compressed and uncompressed
page. Returns pointer to inserted record if succeed, i.e.,
enough space available, NULL otherwise.
The cursor stays at the same position.
@return pointer to record if succeed, NULL otherwise */
UNIV_INTERN
rec_t*
page_cur_insert_rec_zip(
/*====================*/
rec_t** current_rec,/*!< in/out: pointer to current record after
which the new record is inserted */
buf_block_t* block, /*!< in: buffer block of *current_rec */
dict_index_t* index, /*!< in: record descriptor */
const rec_t* rec, /*!< in: pointer to a physical record */
ulint* offsets,/*!< in/out: rec_get_offsets(rec, index) */
mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */
{
byte* insert_buf;
ulint rec_size;
page_t* page; /*!< the relevant page */
rec_t* last_insert; /*!< cursor position at previous
insert */
rec_t* free_rec; /*!< a free record that was reused,
or NULL */
rec_t* insert_rec; /*!< inserted record */
ulint heap_no; /*!< heap number of the inserted
record */
page_zip_des_t* page_zip;
page_zip = buf_block_get_page_zip(block);
ut_ad(page_zip);
ut_ad(rec_offs_validate(rec, index, offsets));
page = page_align(*current_rec);
ut_ad(dict_table_is_comp(index->table));
ut_ad(page_is_comp(page));
ut_ad(!page_rec_is_supremum(*current_rec));
#ifdef UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_ZIP_DEBUG */
/* 1. Get the size of the physical record in the page */
rec_size = rec_offs_size(offsets);
#ifdef UNIV_DEBUG_VALGRIND
{
const void* rec_start
= rec - rec_offs_extra_size(offsets);
ulint extra_size
= rec_offs_extra_size(offsets)
- (rec_offs_comp(offsets)
? REC_N_NEW_EXTRA_BYTES
: REC_N_OLD_EXTRA_BYTES);
/* All data bytes of the record must be valid. */
UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
/* The variable-length header must be valid. */
UNIV_MEM_ASSERT_RW(rec_start, extra_size);
}
#endif /* UNIV_DEBUG_VALGRIND */
/* 2. Try to find suitable space from page memory management */
if (!page_zip_available(page_zip, dict_index_is_clust(index),
rec_size, 1)) {
/* Try compressing the whole page afterwards. */
insert_rec = page_cur_insert_rec_low(*current_rec,
index, rec, offsets,
NULL);
if (UNIV_LIKELY(insert_rec != NULL)) {
insert_rec = page_cur_insert_rec_zip_reorg(
current_rec, block, index, insert_rec,
page, page_zip, mtr);
#ifdef UNIV_DEBUG
if (insert_rec) {
rec_offs_make_valid(
insert_rec, index, offsets);
}
#endif /* UNIV_DEBUG */
}
return(insert_rec);
}
free_rec = page_header_get_ptr(page, PAGE_FREE);
if (UNIV_LIKELY_NULL(free_rec)) {
/* Try to allocate from the head of the free list. */
lint extra_size_diff;
ulint foffsets_[REC_OFFS_NORMAL_SIZE];
ulint* foffsets = foffsets_;
mem_heap_t* heap = NULL;
rec_offs_init(foffsets_);
foffsets = rec_get_offsets(free_rec, index, foffsets,
ULINT_UNDEFINED, &heap);
if (rec_offs_size(foffsets) < rec_size) {
too_small:
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
goto use_heap;
}
insert_buf = free_rec - rec_offs_extra_size(foffsets);
/* On compressed pages, do not relocate records from
the free list. If extra_size would grow, use the heap. */
extra_size_diff
= rec_offs_extra_size(offsets)
- rec_offs_extra_size(foffsets);
if (UNIV_UNLIKELY(extra_size_diff < 0)) {
/* Add an offset to the extra_size. */
if (rec_offs_size(foffsets)
< rec_size - extra_size_diff) {
goto too_small;
}
insert_buf -= extra_size_diff;
} else if (UNIV_UNLIKELY(extra_size_diff)) {
/* Do not allow extra_size to grow */
goto too_small;
}
heap_no = rec_get_heap_no_new(free_rec);
page_mem_alloc_free(page, page_zip,
rec_get_next_ptr(free_rec, TRUE),
rec_size);
if (!page_is_leaf(page)) {
/* Zero out the node pointer of free_rec,
in case it will not be overwritten by
insert_rec. */
ut_ad(rec_size > REC_NODE_PTR_SIZE);
if (rec_offs_extra_size(foffsets)
+ rec_offs_data_size(foffsets) > rec_size) {
memset(rec_get_end(free_rec, foffsets)
- REC_NODE_PTR_SIZE, 0,
REC_NODE_PTR_SIZE);
}
} else if (dict_index_is_clust(index)) {
/* Zero out the DB_TRX_ID and DB_ROLL_PTR
columns of free_rec, in case it will not be
overwritten by insert_rec. */
ulint trx_id_col;
ulint trx_id_offs;
ulint len;
trx_id_col = dict_index_get_sys_col_pos(index,
DATA_TRX_ID);
ut_ad(trx_id_col > 0);
ut_ad(trx_id_col != ULINT_UNDEFINED);
trx_id_offs = rec_get_nth_field_offs(foffsets,
trx_id_col, &len);
ut_ad(len == DATA_TRX_ID_LEN);
if (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN + trx_id_offs
+ rec_offs_extra_size(foffsets) > rec_size) {
/* We will have to zero out the
DB_TRX_ID and DB_ROLL_PTR, because
they will not be fully overwritten by
insert_rec. */
memset(free_rec + trx_id_offs, 0,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
}
ut_ad(free_rec + trx_id_offs + DATA_TRX_ID_LEN
== rec_get_nth_field(free_rec, foffsets,
trx_id_col + 1, &len));
ut_ad(len == DATA_ROLL_PTR_LEN);
}
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
} else {
use_heap:
free_rec = NULL;
insert_buf = page_mem_alloc_heap(page, page_zip,
rec_size, &heap_no);
if (UNIV_UNLIKELY(insert_buf == NULL)) {
return(NULL);
}
page_zip_dir_add_slot(page_zip, dict_index_is_clust(index));
}
/* 3. Create the record */
insert_rec = rec_copy(insert_buf, rec, offsets);
rec_offs_make_valid(insert_rec, index, offsets);
/* 4. Insert the record in the linked list of records */
ut_ad(*current_rec != insert_rec);
{
/* next record after current before the insertion */
rec_t* next_rec = page_rec_get_next(*current_rec);
ut_ad(rec_get_status(*current_rec)
<= REC_STATUS_INFIMUM);
ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
page_rec_set_next(insert_rec, next_rec);
page_rec_set_next(*current_rec, insert_rec);
}
page_header_set_field(page, page_zip, PAGE_N_RECS,
1 + page_get_n_recs(page));
/* 5. Set the n_owned field in the inserted record to zero,
and set the heap_no field */
rec_set_n_owned_new(insert_rec, NULL, 0);
rec_set_heap_no_new(insert_rec, heap_no);
UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
rec_offs_size(offsets));
page_zip_dir_insert(page_zip, *current_rec, free_rec, insert_rec);
/* 6. Update the last insertion info in page header */
last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
ut_ad(!last_insert
|| rec_get_node_ptr_flag(last_insert)
== rec_get_node_ptr_flag(insert_rec));
if (UNIV_UNLIKELY(last_insert == NULL)) {
page_header_set_field(page, page_zip, PAGE_DIRECTION,
PAGE_NO_DIRECTION);
page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
} else if ((last_insert == *current_rec)
&& (page_header_get_field(page, PAGE_DIRECTION)
!= PAGE_LEFT)) {
page_header_set_field(page, page_zip, PAGE_DIRECTION,
PAGE_RIGHT);
page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
page_header_get_field(
page, PAGE_N_DIRECTION) + 1);
} else if ((page_rec_get_next(insert_rec) == last_insert)
&& (page_header_get_field(page, PAGE_DIRECTION)
!= PAGE_RIGHT)) {
page_header_set_field(page, page_zip, PAGE_DIRECTION,
PAGE_LEFT);
page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
page_header_get_field(
page, PAGE_N_DIRECTION) + 1);
} else {
page_header_set_field(page, page_zip, PAGE_DIRECTION,
PAGE_NO_DIRECTION);
page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
}
page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec);
/* 7. It remains to update the owner record. */
{
rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
ulint n_owned;
n_owned = rec_get_n_owned_new(owner_rec);
rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
/* 8. Now we have incremented the n_owned field of the owner
record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
we have to split the corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
page_dir_split_slot(
page, page_zip,
page_dir_find_owner_slot(owner_rec));
}
}
page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
btr_blob_dbg_add_rec(insert_rec, index, offsets, "insert_zip_ok");
/* 9. Write log record of the insert */
if (UNIV_LIKELY(mtr != NULL)) {
page_cur_insert_rec_write_log(insert_rec, rec_size,
*current_rec, index, mtr);
}
return(insert_rec);
}
#ifndef UNIV_HOTBACKUP
/**********************************************************//**
Writes a log record of copying a record list end to a new created page.
@return 4-byte field where to write the log data length, or NULL if
logging is disabled */
UNIV_INLINE
byte*
page_copy_rec_list_to_created_page_write_log(
/*=========================================*/
page_t* page, /*!< in: index page */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mtr */
{
byte* log_ptr;
ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
log_ptr = mlog_open_and_write_index(mtr, page, index,
page_is_comp(page)
? MLOG_COMP_LIST_END_COPY_CREATED
: MLOG_LIST_END_COPY_CREATED, 4);
if (UNIV_LIKELY(log_ptr != NULL)) {
mlog_close(mtr, log_ptr + 4);
}
return(log_ptr);
}
#endif /* !UNIV_HOTBACKUP */
/**********************************************************//**
Parses a log record of copying a record list end to a new created page.
@return end of log record or NULL */
UNIV_INTERN
byte*
page_parse_copy_rec_list_to_created_page(
/*=====================================*/
byte* ptr, /*!< in: buffer */
byte* end_ptr,/*!< in: buffer end */
buf_block_t* block, /*!< in: page or NULL */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mtr or NULL */
{
byte* rec_end;
ulint log_data_len;
page_t* page;
page_zip_des_t* page_zip;
if (ptr + 4 > end_ptr) {
return(NULL);
}
log_data_len = mach_read_from_4(ptr);
ptr += 4;
rec_end = ptr + log_data_len;
if (rec_end > end_ptr) {
return(NULL);
}
if (!block) {
return(rec_end);
}
while (ptr < rec_end) {
ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr,
block, index, mtr);
}
ut_a(ptr == rec_end);
page = buf_block_get_frame(block);
page_zip = buf_block_get_page_zip(block);
page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
page_header_set_field(page, page_zip, PAGE_DIRECTION,
PAGE_NO_DIRECTION);
page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
return(rec_end);
}
#ifndef UNIV_HOTBACKUP
/*************************************************************//**
Copies records from page to a newly created page, from a given record onward,
including that record. Infimum and supremum records are not copied. */
UNIV_INTERN
void
page_copy_rec_list_end_to_created_page(
/*===================================*/
page_t* new_page, /*!< in/out: index page to copy to */
rec_t* rec, /*!< in: first record to copy */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mtr */
{
page_dir_slot_t* slot = 0; /* remove warning */
byte* heap_top;
rec_t* insert_rec = 0; /* remove warning */
rec_t* prev_rec;
ulint count;
ulint n_recs;
ulint slot_index;
ulint rec_size;
ulint log_mode;
byte* log_ptr;
ulint log_data_len;
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
rec_offs_init(offsets_);
ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
ut_ad(page_align(rec) != new_page);
ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
if (page_rec_is_infimum(rec)) {
rec = page_rec_get_next(rec);
}
if (page_rec_is_supremum(rec)) {
return;
}
#ifdef UNIV_DEBUG
/* To pass the debug tests we have to set these dummy values
in the debug version */
page_dir_set_n_slots(new_page, NULL, UNIV_PAGE_SIZE / 2);
page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
new_page + UNIV_PAGE_SIZE - 1);
#endif
log_ptr = page_copy_rec_list_to_created_page_write_log(new_page,
index, mtr);
log_data_len = dyn_array_get_data_size(&(mtr->log));
/* Individual inserts are logged in a shorter form */
log_mode = mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
prev_rec = page_get_infimum_rec(new_page);
if (page_is_comp(new_page)) {
heap_top = new_page + PAGE_NEW_SUPREMUM_END;
} else {
heap_top = new_page + PAGE_OLD_SUPREMUM_END;
}
count = 0;
slot_index = 0;
n_recs = 0;
do {
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
insert_rec = rec_copy(heap_top, rec, offsets);
if (page_is_comp(new_page)) {
rec_set_next_offs_new(prev_rec,
page_offset(insert_rec));
rec_set_n_owned_new(insert_rec, NULL, 0);
rec_set_heap_no_new(insert_rec,
PAGE_HEAP_NO_USER_LOW + n_recs);
} else {
rec_set_next_offs_old(prev_rec,
page_offset(insert_rec));
rec_set_n_owned_old(insert_rec, 0);
rec_set_heap_no_old(insert_rec,
PAGE_HEAP_NO_USER_LOW + n_recs);
}
count++;
n_recs++;
if (UNIV_UNLIKELY
(count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)) {
slot_index++;
slot = page_dir_get_nth_slot(new_page, slot_index);
page_dir_slot_set_rec(slot, insert_rec);
page_dir_slot_set_n_owned(slot, NULL, count);
count = 0;
}
rec_size = rec_offs_size(offsets);
ut_ad(heap_top < new_page + UNIV_PAGE_SIZE);
heap_top += rec_size;
rec_offs_make_valid(insert_rec, index, offsets);
btr_blob_dbg_add_rec(insert_rec, index, offsets, "copy_end");
page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
index, mtr);
prev_rec = insert_rec;
rec = page_rec_get_next(rec);
} while (!page_rec_is_supremum(rec));
if ((slot_index > 0) && (count + 1
+ (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
<= PAGE_DIR_SLOT_MAX_N_OWNED)) {
/* We can merge the two last dir slots. This operation is
here to make this function imitate exactly the equivalent
task made using page_cur_insert_rec, which we use in database
recovery to reproduce the task performed by this function.
To be able to check the correctness of recovery, it is good
that it imitates exactly. */
count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
page_dir_slot_set_n_owned(slot, NULL, 0);
slot_index--;
}
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
log_data_len = dyn_array_get_data_size(&(mtr->log)) - log_data_len;
ut_a(log_data_len < 100 * UNIV_PAGE_SIZE);
if (UNIV_LIKELY(log_ptr != NULL)) {
mach_write_to_4(log_ptr, log_data_len);
}
if (page_is_comp(new_page)) {
rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
} else {
rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
}
slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
page_dir_slot_set_n_owned(slot, NULL, count + 1);
page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
page_header_set_ptr(new_page, NULL, PAGE_LAST_INSERT, NULL);
page_header_set_field(new_page, NULL, PAGE_DIRECTION,
PAGE_NO_DIRECTION);
page_header_set_field(new_page, NULL, PAGE_N_DIRECTION, 0);
/* Restore the log mode */
mtr_set_log_mode(mtr, log_mode);
}
/***********************************************************//**
Writes log record of a record delete on a page. */
UNIV_INLINE
void
page_cur_delete_rec_write_log(
/*==========================*/
rec_t* rec, /*!< in: record to be deleted */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mini-transaction handle */
{
byte* log_ptr;
ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
log_ptr = mlog_open_and_write_index(mtr, rec, index,
page_rec_is_comp(rec)
? MLOG_COMP_REC_DELETE
: MLOG_REC_DELETE, 2);
if (!log_ptr) {
/* Logging in mtr is switched off during crash recovery:
in that case mlog_open returns NULL */
return;
}
/* Write the cursor rec offset as a 2-byte ulint */
mach_write_to_2(log_ptr, page_offset(rec));
mlog_close(mtr, log_ptr + 2);
}
#else /* !UNIV_HOTBACKUP */
# define page_cur_delete_rec_write_log(rec,index,mtr) ((void) 0)
#endif /* !UNIV_HOTBACKUP */
/***********************************************************//**
Parses log record of a record delete on a page.
@return pointer to record end or NULL */
UNIV_INTERN
byte*
page_cur_parse_delete_rec(
/*======================*/
byte* ptr, /*!< in: buffer */
byte* end_ptr,/*!< in: buffer end */
buf_block_t* block, /*!< in: page or NULL */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mtr or NULL */
{
ulint offset;
page_cur_t cursor;
if (end_ptr < ptr + 2) {
return(NULL);
}
/* Read the cursor rec offset as a 2-byte ulint */
offset = mach_read_from_2(ptr);
ptr += 2;
ut_a(offset <= UNIV_PAGE_SIZE);
if (block) {
page_t* page = buf_block_get_frame(block);
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
rec_t* rec = page + offset;
rec_offs_init(offsets_);
page_cur_position(rec, block, &cursor);
ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
page_cur_delete_rec(&cursor, index,
rec_get_offsets(rec, index, offsets_,
ULINT_UNDEFINED, &heap),
mtr);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
}
return(ptr);
}
/***********************************************************//**
Deletes a record at the page cursor. The cursor is moved to the next
record after the deleted one. */
UNIV_INTERN
void
page_cur_delete_rec(
/*================*/
page_cur_t* cursor, /*!< in/out: a page cursor */
dict_index_t* index, /*!< in: record descriptor */
const ulint* offsets,/*!< in: rec_get_offsets(cursor->rec, index) */
mtr_t* mtr) /*!< in: mini-transaction handle */
{
page_dir_slot_t* cur_dir_slot;
page_dir_slot_t* prev_slot;
page_t* page;
page_zip_des_t* page_zip;
rec_t* current_rec;
rec_t* prev_rec = NULL;
rec_t* next_rec;
ulint cur_slot_no;
ulint cur_n_owned;
rec_t* rec;
ut_ad(cursor && mtr);
page = page_cur_get_page(cursor);
page_zip = page_cur_get_page_zip(cursor);
/* page_zip_validate() will fail here when
btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark().
Then, both "page_zip" and "page" would have the min-rec-mark
set on the smallest user record, but "page" would additionally
have it set on the smallest-but-one record. Because sloppy
page_zip_validate_low() only ignores min-rec-flag differences
in the smallest user record, it cannot be used here either. */
current_rec = cursor->rec;
ut_ad(rec_offs_validate(current_rec, index, offsets));
ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
/* The record must not be the supremum or infimum record. */
ut_ad(page_rec_is_user_rec(current_rec));
/* Save to local variables some data associated with current_rec */
cur_slot_no = page_dir_find_owner_slot(current_rec);
cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
/* 0. Write the log record */
page_cur_delete_rec_write_log(current_rec, index, mtr);
/* 1. Reset the last insert info in the page header and increment
the modify clock for the frame */
page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
/* The page gets invalid for optimistic searches: increment the
frame modify clock */
buf_block_modify_clock_inc(page_cur_get_block(cursor));
/* 2. Find the next and the previous record. Note that the cursor is
left at the next record. */
ut_ad(cur_slot_no > 0);
prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
/* rec now points to the record of the previous directory slot. Look
for the immediate predecessor of current_rec in a loop. */
while(current_rec != rec) {
prev_rec = rec;
rec = page_rec_get_next(rec);
}
page_cur_move_to_next(cursor);
next_rec = cursor->rec;
/* 3. Remove the record from the linked list of records */
page_rec_set_next(prev_rec, next_rec);
/* 4. If the deleted record is pointed to by a dir slot, update the
record pointer in slot. In the following if-clause we assume that
prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
>= 2. */
#if PAGE_DIR_SLOT_MIN_N_OWNED < 2
# error "PAGE_DIR_SLOT_MIN_N_OWNED < 2"
#endif
ut_ad(cur_n_owned > 1);
if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
page_dir_slot_set_rec(cur_dir_slot, prev_rec);
}
/* 5. Update the number of owned records of the slot */
page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
/* 6. Free the memory occupied by the record */
btr_blob_dbg_remove_rec(current_rec, index, offsets, "delete");
page_mem_free(page, page_zip, current_rec, index, offsets);
/* 7. Now we have decremented the number of owned records of the slot.
If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
slots. */
if (UNIV_UNLIKELY(cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED)) {
page_dir_balance_slot(page, page_zip, cur_slot_no);
}
#ifdef UNIV_ZIP_DEBUG
ut_a(!page_zip || page_zip_validate(page_zip, page));
#endif /* UNIV_ZIP_DEBUG */
}
#ifdef UNIV_COMPILE_TEST_FUNCS
/*******************************************************************//**
Print the first n numbers, generated by page_cur_lcg_prng() to make sure
(visually) that it works properly. */
void
test_page_cur_lcg_prng(
/*===================*/
int n) /*!< in: print first n numbers */
{
int i;
unsigned long long rnd;
for (i = 0; i < n; i++) {
rnd = page_cur_lcg_prng();
printf("%llu\t%%2=%llu %%3=%llu %%5=%llu %%7=%llu %%11=%llu\n",
rnd,
rnd % 2,
rnd % 3,
rnd % 5,
rnd % 7,
rnd % 11);
}
}
#endif /* UNIV_COMPILE_TEST_FUNCS */