MDEV-35312 page_cur_search_with_match() could avoid rec_get_offsets()

page_cur_search_with_match(): Remove rec_get_offsets(), and instead
determine the start and end of each field while comparing.

page_dir_slot_get_rec(), page_dir_slot_get_rec_validate():
Add a parameter to avoid invoking page_align().

page_cur_dtuple_cmp(): Replaces cmp_dtuple_rec_leaf() for both
leaf and non-leaf pages. In SPATIAL INDEX, non-leaf records are
special, because the child page number may be part of the comparison.

Reviewed by: Vladislav Lesin
This commit is contained in:
Marko Mäkelä 2025-01-10 16:40:55 +02:00
parent 793a2fc8ba
commit 3761a7fec8
3 changed files with 261 additions and 277 deletions

View file

@ -212,23 +212,18 @@ bool page_apply_delete_dynamic(const buf_block_t &block, ulint prev,
size_t hdr_size, size_t data_size); size_t hdr_size, size_t data_size);
MY_ATTRIBUTE((warn_unused_result)) MY_ATTRIBUTE((warn_unused_result))
/****************************************************************//** /** Search the right position for a page cursor.
Searches the right position for a page cursor. */ @param tuple search key
bool @param mode search mode
page_cur_search_with_match( @param iup_fields matched fields in the upper limit record
/*=======================*/ @param ilow_fields matched fields in the low limit record
const dtuple_t* tuple, /*!< in: data tuple */ @param cursor page cursor
page_cur_mode_t mode, /*!< in: PAGE_CUR_L, @param rtr_info R-tree search stack, or nullptr
PAGE_CUR_LE, PAGE_CUR_G, or @return whether the page is corrupted */
PAGE_CUR_GE */ bool page_cur_search_with_match(const dtuple_t *tuple, page_cur_mode_t mode,
uint16_t* iup_matched_fields, uint16_t *iup_fields, uint16_t *ilow_fields,
/*!< in/out: already matched page_cur_t *cursor, rtr_info_t *rtr_info)
fields in upper limit record */ noexcept;
uint16_t* ilow_matched_fields,
/*!< in/out: already matched
fields in lower limit record */
page_cur_t* cursor, /*!< in/out: page cursor */
rtr_info_t* rtr_info);/*!< in/out: rtree search stack */
/** Search the right position for a page cursor. /** Search the right position for a page cursor.
@param tuple search key @param tuple search key
@ -238,7 +233,7 @@ page_cur_search_with_match(
@param cursor page cursor @param cursor page cursor
@param iup_bytes matched bytes after iup_fields @param iup_bytes matched bytes after iup_fields
@param ilow_bytes matched bytes after ilow_fields @param ilow_bytes matched bytes after ilow_fields
@return whether the first partially matched field in the lower limit record, @return whether the first partially matched field is in the lower limit record,
or the page is corrupted */ or the page is corrupted */
bool page_cur_search_with_match_bytes(const dtuple_t &tuple, bool page_cur_search_with_match_bytes(const dtuple_t &tuple,
page_cur_mode_t mode, page_cur_mode_t mode,

View file

@ -636,31 +636,55 @@ page_rec_check(
/** Get the record pointed to by a directory slot. /** Get the record pointed to by a directory slot.
@param[in] slot directory slot @param[in] slot directory slot
@return pointer to record */ @return pointer to record */
inline rec_t *page_dir_slot_get_rec(page_dir_slot_t *slot) inline rec_t *page_dir_slot_get_rec(page_t *page, page_dir_slot_t *slot)
noexcept
{ {
return page_align(slot) + mach_read_from_2(my_assume_aligned<2>(slot)); return page + mach_read_from_2(my_assume_aligned<2>(slot));
} }
inline const rec_t *page_dir_slot_get_rec(const page_dir_slot_t *slot) inline const rec_t *page_dir_slot_get_rec(const page_t *page,
const page_dir_slot_t *slot) noexcept
{
return page_dir_slot_get_rec(const_cast<page_t*>(page),
const_cast<rec_t*>(slot));
}
inline rec_t *page_dir_slot_get_rec(page_dir_slot_t *slot) noexcept
{
return page_dir_slot_get_rec(page_align(slot), slot);
}
inline const rec_t *page_dir_slot_get_rec(const page_dir_slot_t *slot) noexcept
{ {
return page_dir_slot_get_rec(const_cast<rec_t*>(slot)); return page_dir_slot_get_rec(const_cast<rec_t*>(slot));
} }
inline rec_t *page_dir_slot_get_rec_validate(page_dir_slot_t *slot) inline rec_t *page_dir_slot_get_rec_validate(page_t *page,
page_dir_slot_t *slot) noexcept
{ {
const size_t s= mach_read_from_2(my_assume_aligned<2>(slot)); const size_t s= mach_read_from_2(my_assume_aligned<2>(slot));
page_t *page= page_align(slot);
return UNIV_LIKELY(s >= PAGE_NEW_INFIMUM && return UNIV_LIKELY(s >= PAGE_NEW_INFIMUM &&
s <= page_header_get_field(page, PAGE_HEAP_TOP)) s <= page_header_get_field(page, PAGE_HEAP_TOP))
? page + s ? page + s
: nullptr; : nullptr;
} }
inline const rec_t *page_dir_slot_get_rec_validate(const page_t *page,
const page_dir_slot_t *slot)
noexcept
{
return page_dir_slot_get_rec_validate(const_cast<page_t*>(page),
const_cast<page_dir_slot_t*>(slot));
}
inline rec_t *page_dir_slot_get_rec_validate(page_dir_slot_t *slot) noexcept
{
return page_dir_slot_get_rec_validate(page_align(slot), slot);
}
inline const rec_t *page_dir_slot_get_rec_validate(const page_dir_slot_t *slot) inline const rec_t *page_dir_slot_get_rec_validate(const page_dir_slot_t *slot)
noexcept
{ {
return page_dir_slot_get_rec_validate(const_cast<rec_t*>(slot)); return page_dir_slot_get_rec_validate(const_cast<rec_t*>(slot));
} }
/***************************************************************//** /***************************************************************//**
Gets the number of records owned by a directory slot. Gets the number of records owned by a directory slot.
@return number of records */ @return number of records */

View file

@ -69,8 +69,7 @@ static ulint cmp_get_pad_char(const dtype_t &type) noexcept
} }
/** Compare a data tuple to a physical record. /** Compare a data tuple to a physical record.
@param page B-tree index leaf page @param rec B-tree index record
@param rec PAGE_LAST_INSERT record
@param index index B-tree @param index index B-tree
@param tuple search key @param tuple search key
@param match matched fields << 16 | bytes @param match matched fields << 16 | bytes
@ -79,22 +78,23 @@ static ulint cmp_get_pad_char(const dtype_t &type) noexcept
@retval 0 if dtuple is equal to rec @retval 0 if dtuple is equal to rec
@retval negative if dtuple is less than rec @retval negative if dtuple is less than rec
@retval positive if dtuple is greater than rec */ @retval positive if dtuple is greater than rec */
static int cmp_dtuple_rec_bytes(const page_t *page, static int cmp_dtuple_rec_bytes(const rec_t *rec,
const rec_t *rec,
const dict_index_t &index, const dict_index_t &index,
const dtuple_t &tuple, int *match, ulint comp) const dtuple_t &tuple, int *match, ulint comp)
noexcept noexcept
{ {
ut_ad(dtuple_check_typed(&tuple)); ut_ad(dtuple_check_typed(&tuple));
ut_ad(page_is_leaf(page)); ut_ad(page_rec_is_leaf(rec));
ut_ad(!(REC_INFO_MIN_REC_FLAG & dtuple_get_info_bits(&tuple))); ut_ad(!(REC_INFO_MIN_REC_FLAG & dtuple_get_info_bits(&tuple)));
ut_ad(!!comp == index.table->not_redundant()); ut_ad(!!comp == index.table->not_redundant());
if (UNIV_UNLIKELY(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(rec, comp))) if (UNIV_UNLIKELY(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(rec, comp)))
{ {
ut_d(const page_t *page= page_align(rec));
ut_ad(page_rec_is_first(rec, page)); ut_ad(page_rec_is_first(rec, page));
ut_ad(!page_has_prev(page)); ut_ad(!page_has_prev(page));
ut_ad(rec_is_metadata(rec, index)); ut_ad(rec_is_metadata(rec, index));
*match= 0;
return 1; return 1;
} }
@ -110,9 +110,18 @@ static int cmp_dtuple_rec_bytes(const page_t *page,
if (UNIV_LIKELY(comp != 0)) if (UNIV_LIKELY(comp != 0))
{ {
const unsigned n_core_null_bytes= index.n_core_null_bytes;
const byte *nulls= rec - REC_N_NEW_EXTRA_BYTES; const byte *nulls= rec - REC_N_NEW_EXTRA_BYTES;
const byte *lens= --nulls - n_core_null_bytes; const byte *lens;
if (rec_get_status(rec) == REC_STATUS_INSTANT)
{
ulint n_fields= index.n_core_fields + rec_get_n_add_field(nulls) + 1;
ut_ad(n_fields <= index.n_fields);
const ulint n_nullable= index.get_n_nullable(n_fields);
ut_ad(n_nullable <= index.n_nullable);
lens= --nulls - UT_BITS_IN_BYTES(n_nullable);
}
else
lens= --nulls - index.n_core_null_bytes;
byte null_mask= 1; byte null_mask= 1;
size_t i= 0; size_t i= 0;
@ -354,7 +363,7 @@ static bool page_cur_try_search_shortcut_bytes(const page_t *page,
int up= int(*iup_fields << 16 | *iup_bytes); int up= int(*iup_fields << 16 | *iup_bytes);
up= low= std::min(low, up); up= low= std::min(low, up);
const auto comp= page_is_comp(page); const auto comp= page_is_comp(page);
if (cmp_dtuple_rec_bytes(page, rec, index, tuple, &low, comp) < 0) if (cmp_dtuple_rec_bytes(rec, index, tuple, &low, comp) < 0)
return false; return false;
const rec_t *next; const rec_t *next;
if (UNIV_LIKELY(comp != 0)) if (UNIV_LIKELY(comp != 0))
@ -364,7 +373,7 @@ static bool page_cur_try_search_shortcut_bytes(const page_t *page,
if (next != page + PAGE_NEW_SUPREMUM) if (next != page + PAGE_NEW_SUPREMUM)
{ {
cmp_up: cmp_up:
if (cmp_dtuple_rec_bytes(page, rec, index, tuple, &up, comp) >= 0) if (cmp_dtuple_rec_bytes(rec, index, tuple, &up, comp) >= 0)
return false; return false;
*iup_fields= uint16_t(up >> 16); *iup_fields= uint16_t(up >> 16);
*iup_bytes= uint16_t(up); *iup_bytes= uint16_t(up);
@ -383,24 +392,13 @@ static bool page_cur_try_search_shortcut_bytes(const page_t *page,
return true; return true;
} }
/** Search the right position for a page cursor.
@param tuple search key
@param mode search mode
@param iup_fields matched fields in the upper limit record
@param ilow_fields matched fields in the low limit record
@param cursor page cursor
@param iup_bytes matched bytes after iup_fields
@param ilow_bytes matched bytes after ilow_fields
@return whether the first partially matched field in the lower limit record,
or the page is corrupted */
bool page_cur_search_with_match_bytes(const dtuple_t &tuple, bool page_cur_search_with_match_bytes(const dtuple_t &tuple,
page_cur_mode_t mode, page_cur_mode_t mode,
uint16_t *iup_fields, uint16_t *iup_fields,
uint16_t *ilow_fields, uint16_t *ilow_fields,
page_cur_t *cursor, page_cur_t *cursor,
uint16_t *iup_bytes, uint16_t *iup_bytes,
uint16_t *ilow_bytes) uint16_t *ilow_bytes) noexcept
noexcept
{ {
ut_ad(dtuple_validate(&tuple)); ut_ad(dtuple_validate(&tuple));
ut_ad(!(tuple.info_bits & REC_INFO_MIN_REC_FLAG)); ut_ad(!(tuple.info_bits & REC_INFO_MIN_REC_FLAG));
@ -450,11 +448,12 @@ bool page_cur_search_with_match_bytes(const dtuple_t &tuple,
while (up - low > 1) while (up - low > 1)
{ {
const size_t mid= (low + up) / 2; const size_t mid= (low + up) / 2;
mid_rec= page_dir_slot_get_rec_validate(page_dir_get_nth_slot(page, mid)); mid_rec= page_dir_slot_get_rec_validate(page,
page_dir_get_nth_slot(page, mid));
if (UNIV_UNLIKELY(!mid_rec)) if (UNIV_UNLIKELY(!mid_rec))
return true; return true;
int cur= std::min(low_cmp, up_cmp); int cur= std::min(low_cmp, up_cmp);
int cmp= cmp_dtuple_rec_bytes(page, mid_rec, index, tuple, &cur, comp); int cmp= cmp_dtuple_rec_bytes(mid_rec, index, tuple, &cur, comp);
if (cmp > 0) if (cmp > 0)
low_slot_match: low_slot_match:
low= mid, low_cmp= cur; low= mid, low_cmp= cur;
@ -468,9 +467,9 @@ bool page_cur_search_with_match_bytes(const dtuple_t &tuple,
} }
const rec_t *up_rec= const rec_t *up_rec=
page_dir_slot_get_rec_validate(page_dir_get_nth_slot(page, up));; page_dir_slot_get_rec_validate(page, page_dir_get_nth_slot(page, up));
const rec_t *low_rec= const rec_t *low_rec=
page_dir_slot_get_rec_validate(page_dir_get_nth_slot(page, low)); page_dir_slot_get_rec_validate(page, page_dir_get_nth_slot(page, low));
if (UNIV_UNLIKELY(!low_rec || !up_rec)) if (UNIV_UNLIKELY(!low_rec || !up_rec))
return true; return true;
@ -493,12 +492,12 @@ bool page_cur_search_with_match_bytes(const dtuple_t &tuple,
if (UNIV_UNLIKELY(rec_get_info_bits(mid_rec, comp) & if (UNIV_UNLIKELY(rec_get_info_bits(mid_rec, comp) &
REC_INFO_MIN_REC_FLAG)) REC_INFO_MIN_REC_FLAG))
{ {
ut_ad(!page_has_prev(page_align(mid_rec))); ut_ad(!page_has_prev(page));
ut_ad(rec_is_metadata(mid_rec, index)); ut_ad(rec_is_metadata(mid_rec, index));
goto low_rec_match; goto low_rec_match;
} }
cmp= cmp_dtuple_rec_bytes(page, mid_rec, index, tuple, &cur, comp); cmp= cmp_dtuple_rec_bytes(mid_rec, index, tuple, &cur, comp);
if (cmp > 0) if (cmp > 0)
low_rec_match: low_rec_match:
@ -519,6 +518,7 @@ bool page_cur_search_with_match_bytes(const dtuple_t &tuple,
} }
/** Compare a data tuple to a physical record. /** Compare a data tuple to a physical record.
@tparam leaf whether this must be a leaf page
@param page B-tree index page @param page B-tree index page
@param rec B-tree index record @param rec B-tree index record
@param index index B-tree @param index index B-tree
@ -529,7 +529,8 @@ bool page_cur_search_with_match_bytes(const dtuple_t &tuple,
@retval 0 if dtuple is equal to rec @retval 0 if dtuple is equal to rec
@retval negative if dtuple is less than rec @retval negative if dtuple is less than rec
@retval positive if dtuple is greater than rec */ @retval positive if dtuple is greater than rec */
static int cmp_dtuple_rec_leaf(const dtuple_t &dtuple, const rec_t *rec, template<bool leaf= true>
static int page_cur_dtuple_cmp(const dtuple_t &dtuple, const rec_t *rec,
const dict_index_t &index, const dict_index_t &index,
uint16_t *matched_fields, ulint comp) noexcept uint16_t *matched_fields, ulint comp) noexcept
{ {
@ -539,15 +540,17 @@ static int cmp_dtuple_rec_leaf(const dtuple_t &dtuple, const rec_t *rec,
ut_ad(dtuple.n_fields_cmp > 0); ut_ad(dtuple.n_fields_cmp > 0);
ut_ad(dtuple.n_fields_cmp <= index.n_core_fields); ut_ad(dtuple.n_fields_cmp <= index.n_core_fields);
ut_ad(cur_field <= dtuple.n_fields_cmp); ut_ad(cur_field <= dtuple.n_fields_cmp);
ut_ad(page_rec_is_leaf(rec)); ut_ad(leaf == page_rec_is_leaf(rec));
ut_ad(!(rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG) || ut_ad(!leaf || !(rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG) ||
index.is_instant() || index.is_instant() ||
(index.is_primary() && trx_roll_crash_recv_trx && (index.is_primary() && trx_roll_crash_recv_trx &&
!trx_rollback_is_active)); !trx_rollback_is_active));
ut_ad(!(dtuple.info_bits & REC_INFO_MIN_REC_FLAG) || ut_ad(!leaf || !(dtuple.info_bits & REC_INFO_MIN_REC_FLAG) ||
index.is_instant() || index.is_instant() ||
(index.is_primary() && trx_roll_crash_recv_trx && (index.is_primary() && trx_roll_crash_recv_trx &&
!trx_rollback_is_active)); !trx_rollback_is_active));
ut_ad(leaf || !index.is_spatial() ||
dtuple.n_fields_cmp == DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1);
int ret= 0; int ret= 0;
if (dtuple.info_bits & REC_INFO_MIN_REC_FLAG) if (dtuple.info_bits & REC_INFO_MIN_REC_FLAG)
@ -626,6 +629,17 @@ static int cmp_dtuple_rec_leaf(const dtuple_t &dtuple, const rec_t *rec,
{ {
const dfield_t *df= dtuple_get_nth_field(&dtuple, i); const dfield_t *df= dtuple_get_nth_field(&dtuple, i);
ut_ad(!dfield_is_ext(df)); ut_ad(!dfield_is_ext(df));
if (!leaf && i == DICT_INDEX_SPATIAL_NODEPTR_SIZE &&
index.is_spatial())
{
/* SPATIAL INDEX non-leaf records comprise
MBR (minimum bounding rectangle) and the child page number.
The function rtr_cur_restore_position() includes the
child page number in the search key, because the MBR alone
would not be unique. */
ut_ad(dtuple.fields[DICT_INDEX_SPATIAL_NODEPTR_SIZE].len == 4);
len= 4;
}
ret= cmp_data(df->type.mtype, df->type.prtype, field->descending, ret= cmp_data(df->type.mtype, df->type.prtype, field->descending,
static_cast<const byte*>(df->data), df->len, f, len); static_cast<const byte*>(df->data), df->len, f, len);
if (ret) if (ret)
@ -657,6 +671,16 @@ static int cmp_dtuple_rec_leaf(const dtuple_t &dtuple, const rec_t *rec,
return ret; return ret;
} }
static int page_cur_dtuple_cmp(const dtuple_t &dtuple, const rec_t *rec,
const dict_index_t &index,
uint16_t *matched_fields, ulint comp, bool leaf)
noexcept
{
return leaf
? page_cur_dtuple_cmp<true>(dtuple, rec, index, matched_fields, comp)
: page_cur_dtuple_cmp<false>(dtuple, rec, index, matched_fields, comp);
}
#ifdef BTR_CUR_HASH_ADAPT #ifdef BTR_CUR_HASH_ADAPT
bool btr_cur_t::check_mismatch(const dtuple_t &tuple, bool ge, ulint comp) bool btr_cur_t::check_mismatch(const dtuple_t &tuple, bool ge, ulint comp)
noexcept noexcept
@ -666,7 +690,7 @@ bool btr_cur_t::check_mismatch(const dtuple_t &tuple, bool ge, ulint comp)
const rec_t *rec= page_cur.rec; const rec_t *rec= page_cur.rec;
uint16_t match= 0; uint16_t match= 0;
int cmp= cmp_dtuple_rec_leaf(tuple, rec, *index(), &match, comp); int cmp= page_cur_dtuple_cmp(tuple, rec, *index(), &match, comp);
const auto uniq= dict_index_get_n_unique_in_tree(index()); const auto uniq= dict_index_get_n_unique_in_tree(index());
ut_ad(match <= uniq); ut_ad(match <= uniq);
ut_ad(match <= tuple.n_fields_cmp); ut_ad(match <= tuple.n_fields_cmp);
@ -706,7 +730,7 @@ bool btr_cur_t::check_mismatch(const dtuple_t &tuple, bool ge, ulint comp)
if (uintptr_t(rec - page) == PAGE_OLD_SUPREMUM) if (uintptr_t(rec - page) == PAGE_OLD_SUPREMUM)
goto le_supremum; goto le_supremum;
} }
return cmp_dtuple_rec_leaf(tuple, rec, *index(), &up_match, comp) >= 0; return page_cur_dtuple_cmp(tuple, rec, *index(), &up_match, comp) >= 0;
} }
else else
{ {
@ -728,34 +752,33 @@ bool btr_cur_t::check_mismatch(const dtuple_t &tuple, bool ge, ulint comp)
default: default:
return true; return true;
} }
return cmp_dtuple_rec_leaf(tuple, rec, *index(), &match, comp) <= 0; return page_cur_dtuple_cmp(tuple, rec, *index(), &match, comp) <= 0;
} }
} }
#endif /* BTR_CUR_HASH_ADAPT */ #endif /* BTR_CUR_HASH_ADAPT */
/** Try a search shortcut based on the last insert. /** Try a search shortcut based on the last insert.
@param page index page @param page index page
@param rec PAGE_LAST_INSERT record @param rec PAGE_LAST_INSERT record
@param index index tree @param index index tree
@param tuple search key @param tuple search key
@param iup matched fields in the upper limit record @param iup matched fields in the upper limit record
@param ilow matched fields in the lower limit record @param ilow matched fields in the lower limit record
@param comp nonzero if ROW_FORMAT=REDUNDANT is not being used
@return record @return record
@return nullptr if the tuple was not found */ @return nullptr if the tuple was not found */
static bool page_cur_try_search_shortcut(const page_t *page, const rec_t *rec, static bool page_cur_try_search_shortcut(const page_t *page, const rec_t *rec,
const dict_index_t &index, const dict_index_t &index,
const dtuple_t &tuple, const dtuple_t &tuple,
uint16_t *iup, uint16_t *ilow) uint16_t *iup, uint16_t *ilow,
noexcept ulint comp) noexcept
{ {
ut_ad(dtuple_check_typed(&tuple)); ut_ad(dtuple_check_typed(&tuple));
ut_ad(page_is_leaf(page));
auto comp= page_is_comp(page);
ut_ad(page_rec_is_user_rec(rec)); ut_ad(page_rec_is_user_rec(rec));
uint16_t low= std::min(*ilow, *iup), up= low; uint16_t low= std::min(*ilow, *iup), up= low;
if (cmp_dtuple_rec_leaf(tuple, rec, index, &low, comp) < 0) if (page_cur_dtuple_cmp(tuple, rec, index, &low, comp) < 0)
return false; return false;
if (comp) if (comp)
@ -766,7 +789,7 @@ static bool page_cur_try_search_shortcut(const page_t *page, const rec_t *rec,
if (rec != page + PAGE_NEW_SUPREMUM) if (rec != page + PAGE_NEW_SUPREMUM)
{ {
compare_next: compare_next:
if (cmp_dtuple_rec_leaf(tuple, rec, index, &up, comp) >= 0) if (page_cur_dtuple_cmp(tuple, rec, index, &up, comp) >= 0)
return false; return false;
*iup= up; *iup= up;
} }
@ -784,211 +807,140 @@ static bool page_cur_try_search_shortcut(const page_t *page, const rec_t *rec,
return true; return true;
} }
/****************************************************************//** bool page_cur_search_with_match(const dtuple_t *tuple, page_cur_mode_t mode,
Searches the right position for a page cursor. */ uint16_t *iup_fields, uint16_t *ilow_fields,
bool page_cur_t *cursor, rtr_info_t *rtr_info)
page_cur_search_with_match( noexcept
/*=======================*/
const dtuple_t* tuple, /*!< in: data tuple */
page_cur_mode_t mode, /*!< in: PAGE_CUR_L,
PAGE_CUR_LE, PAGE_CUR_G, or
PAGE_CUR_GE */
uint16_t* iup_matched_fields,
/*!< in/out: already matched
fields in upper limit record */
uint16_t* ilow_matched_fields,
/*!< in/out: already matched
fields in lower limit record */
page_cur_t* cursor, /*!< out: page cursor */
rtr_info_t* rtr_info)/*!< in/out: rtree search stack */
{ {
ulint up; ut_ad(dtuple_validate(tuple));
ulint low; ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE ||
ulint mid; mode == PAGE_CUR_G || mode == PAGE_CUR_GE ||
const page_t* page; cursor->index->is_spatial());
const rec_t* up_rec; const dict_index_t &index= *cursor->index;
const rec_t* low_rec; const buf_block_t *const block= cursor->block;
const rec_t* mid_rec; const page_t *const page= block->page.frame;
uint16_t up_matched_fields; ut_d(page_check_dir(page));
uint16_t low_matched_fields;
uint16_t cur_matched_fields;
int cmp;
const dict_index_t* const index = cursor->index;
const buf_block_t* const block = cursor->block;
#ifdef UNIV_ZIP_DEBUG #ifdef UNIV_ZIP_DEBUG
const page_zip_des_t* page_zip = buf_block_get_page_zip(block); if (const page_zip_des_t *page_zip= buf_block_get_page_zip(block))
ut_a(page_zip_validate(page_zip, page, &index));
#endif /* UNIV_ZIP_DEBUG */ #endif /* UNIV_ZIP_DEBUG */
mem_heap_t* heap = NULL; const auto comp= page_is_comp(page);
rec_offs offsets_[REC_OFFS_NORMAL_SIZE]; const bool leaf{page_is_leaf(page)};
rec_offs* offsets = offsets_;
rec_offs_init(offsets_);
ut_ad(dtuple_validate(tuple)); /* If the mode is for R-tree indexes, use the special MBR
ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE related compare functions */
|| mode == PAGE_CUR_G || mode == PAGE_CUR_GE if (mode == PAGE_CUR_RTREE_INSERT && leaf)
|| index->is_spatial()); {
page = buf_block_get_frame(block); /* Leaf level insert uses the traditional compare function */
#ifdef UNIV_ZIP_DEBUG mode= PAGE_CUR_LE;
ut_a(!page_zip || page_zip_validate(page_zip, page, index)); goto check_last_insert;
#endif /* UNIV_ZIP_DEBUG */ }
else if (mode > PAGE_CUR_LE)
return rtr_cur_search_with_match(block,
const_cast<dict_index_t*>(&index),
tuple, mode, cursor, rtr_info);
else if (mode == PAGE_CUR_LE && leaf)
{
check_last_insert:
if (page_get_direction(page) != PAGE_RIGHT ||
(tuple->info_bits & REC_INFO_MIN_REC_FLAG));
else if (uint16_t last= page_header_get_offs(page, PAGE_LAST_INSERT))
{
const rec_t *rec= page + last;
if (page_header_get_field(page, PAGE_N_DIRECTION) > 2 &&
page_cur_try_search_shortcut(page, rec, index, *tuple,
iup_fields, ilow_fields, comp))
{
page_cur_position(rec, block, cursor);
return false;
}
}
}
ut_d(page_check_dir(page)); /* If mode PAGE_CUR_G is specified, we are trying to position the
const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0; cursor to answer a query of the form "tuple < X", where tuple is the
input parameter, and X denotes an arbitrary physical record on the
page. We want to position the cursor on the first X which satisfies
the condition. */
uint16_t up_fields= *iup_fields, low_fields= *ilow_fields;
/* If the mode is for R-tree indexes, use the special MBR /* Perform binary search. First the search is done through the page
related compare functions */ directory, after that as a linear search in the list of records
if (index->is_spatial()) { owned by the upper limit directory slot. */
/* For leaf level insert, we still use the traditional size_t low= 0, up= ulint{page_dir_get_n_slots(page)} - 1;
compare function for now */ const rec_t *mid_rec;
if (n_core && mode == PAGE_CUR_RTREE_INSERT) {
mode = PAGE_CUR_LE;
} else if (mode > PAGE_CUR_LE) {
return rtr_cur_search_with_match(
block, (dict_index_t*)index, tuple, mode,
cursor, rtr_info);
}
} else if (!n_core || mode != PAGE_CUR_LE || !page_is_leaf(page)
|| page_get_direction(page) != PAGE_RIGHT
|| (tuple->info_bits & REC_INFO_MIN_REC_FLAG)) {
} else if (uint16_t last =
page_header_get_offs(page, PAGE_LAST_INSERT)) {
if (page_header_get_field(page, PAGE_N_DIRECTION) > 2
&& page_cur_try_search_shortcut(page, page + last, *index,
*tuple,
iup_matched_fields,
ilow_matched_fields)) {
page_cur_position(page + last, block, cursor);
return false;
}
}
/* If mode PAGE_CUR_G is specified, we are trying to position the /* Perform binary search until the lower and upper limit directory
cursor to answer a query of the form "tuple < X", where tuple is slots come to the distance 1 of each other */
the input parameter, and X denotes an arbitrary physical record on while (up - low > 1)
the page. We want to position the cursor on the first X which {
satisfies the condition. */ const size_t mid= (low + up) / 2;
mid_rec=
page_dir_slot_get_rec_validate(page, page_dir_get_nth_slot(page, mid));
if (UNIV_UNLIKELY(!mid_rec))
return true;
uint16_t cur= std::min(low_fields, up_fields);
int cmp= page_cur_dtuple_cmp(*tuple, mid_rec, index, &cur, comp, leaf);
if (cmp > 0)
low_slot_match:
low= mid, low_fields= cur;
else if (cmp)
up_slot_match:
up= mid, up_fields= cur;
else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE)
goto low_slot_match;
else
goto up_slot_match;
}
up_matched_fields = *iup_matched_fields; const rec_t *up_rec=
low_matched_fields = *ilow_matched_fields; page_dir_slot_get_rec_validate(page, page_dir_get_nth_slot(page, up));;
const rec_t *low_rec=
page_dir_slot_get_rec_validate(page, page_dir_get_nth_slot(page, low));
if (UNIV_UNLIKELY(!low_rec || !up_rec))
return true;
/* Perform binary search. First the search is done through the page /* Perform linear search until the upper and lower records come to
directory, after that as a linear search in the list of records distance 1 of each other. */
owned by the upper limit directory slot. */
low = 0; for (;;)
up = ulint(page_dir_get_n_slots(page)) - 1; {
mid_rec= comp
? page_rec_next_get<true>(page, low_rec)
: page_rec_next_get<false>(page, low_rec);
if (!mid_rec)
return true;
if (mid_rec == up_rec)
break;
/* Perform binary search until the lower and upper limit directory uint16_t cur= std::min(low_fields, up_fields);
slots come to the distance 1 of each other */ int cmp= page_cur_dtuple_cmp(*tuple, mid_rec, index, &cur, comp, leaf);
if (cmp > 0)
low_rec_match:
low_rec= mid_rec, low_fields= cur;
else if (cmp)
up_rec_match:
up_rec= mid_rec, up_fields= cur;
else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE)
{
if (cur == 0)
{
/* A match on 0 fields must be due to REC_INFO_MIN_REC_FLAG */
ut_ad(rec_get_info_bits(mid_rec, comp) & REC_INFO_MIN_REC_FLAG);
ut_ad(!page_has_prev(page));
ut_ad(!leaf || rec_is_metadata(mid_rec, index));
cur= tuple->n_fields_cmp;
}
goto low_rec_match;
}
else
goto up_rec_match;
}
while (up - low > 1) { page_cur_position(mode <= PAGE_CUR_GE ? up_rec : low_rec, block, cursor);
mid = (low + up) / 2; *iup_fields= up_fields;
const page_dir_slot_t* slot = page_dir_get_nth_slot(page, mid); *ilow_fields= low_fields;
if (UNIV_UNLIKELY(!(mid_rec return false;
= page_dir_slot_get_rec_validate(slot)))) {
goto corrupted;
}
cur_matched_fields = std::min(low_matched_fields,
up_matched_fields);
offsets = offsets_;
offsets = rec_get_offsets(
mid_rec, index, offsets, n_core,
dtuple_get_n_fields_cmp(tuple), &heap);
cmp = cmp_dtuple_rec_with_match(
tuple, mid_rec, index, offsets, &cur_matched_fields);
if (cmp > 0) {
low_slot_match:
low = mid;
low_matched_fields = cur_matched_fields;
} else if (cmp) {
up_slot_match:
up = mid;
up_matched_fields = cur_matched_fields;
} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE) {
goto low_slot_match;
} else {
goto up_slot_match;
}
}
low_rec = page_dir_slot_get_rec_validate(
page_dir_get_nth_slot(page, low));
up_rec = page_dir_slot_get_rec_validate(
page_dir_get_nth_slot(page, up));
if (UNIV_UNLIKELY(!low_rec || !up_rec)) {
corrupted:
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return true;
}
/* Perform linear search until the upper and lower records come to
distance 1 of each other. */
for (;;) {
if (const rec_t* next = page_rec_get_next_const(low_rec)) {
if (next == up_rec) {
break;
}
mid_rec = next;
} else {
goto corrupted;
}
cur_matched_fields = std::min(low_matched_fields,
up_matched_fields);
offsets = offsets_;
offsets = rec_get_offsets(
mid_rec, index, offsets, n_core,
dtuple_get_n_fields_cmp(tuple), &heap);
cmp = cmp_dtuple_rec_with_match(
tuple, mid_rec, index, offsets, &cur_matched_fields);
if (cmp > 0) {
low_rec_match:
low_rec = mid_rec;
low_matched_fields = cur_matched_fields;
} else if (cmp) {
up_rec_match:
up_rec = mid_rec;
up_matched_fields = cur_matched_fields;
} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE) {
if (!cmp && !cur_matched_fields) {
/* We got a match, but cur_matched_fields is
0, it must have REC_INFO_MIN_REC_FLAG */
ut_ad(rec_get_info_bits(mid_rec,
rec_offs_comp(offsets))
& REC_INFO_MIN_REC_FLAG);
ut_ad(!page_has_prev(page));
cur_matched_fields = tuple->n_fields_cmp;
}
goto low_rec_match;
} else {
goto up_rec_match;
}
}
page_cur_position(mode <= PAGE_CUR_GE ? up_rec : low_rec, block,
cursor);
*iup_matched_fields = up_matched_fields;
*ilow_matched_fields = low_matched_fields;
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return false;
} }
/***********************************************************//** /***********************************************************//**
@ -1032,7 +984,8 @@ static bool page_dir_split_slot(const buf_block_t &block,
PAGE_DIR_SLOT_MIN_N_OWNED, "compatibility"); PAGE_DIR_SLOT_MIN_N_OWNED, "compatibility");
/* Find a record approximately in the middle. */ /* Find a record approximately in the middle. */
const rec_t *rec= page_dir_slot_get_rec_validate(slot + PAGE_DIR_SLOT_SIZE); const rec_t *rec= page_dir_slot_get_rec_validate(block.page.frame,
slot + PAGE_DIR_SLOT_SIZE);
for (ulint i= n_owned / 2; i--; ) for (ulint i= n_owned / 2; i--; )
{ {
@ -1065,8 +1018,10 @@ static bool page_dir_split_slot(const buf_block_t &block,
mach_write_to_2(slot, rec - block.page.frame); mach_write_to_2(slot, rec - block.page.frame);
const bool comp= page_is_comp(block.page.frame) != 0; const bool comp= page_is_comp(block.page.frame) != 0;
page_rec_set_n_owned(page_dir_slot_get_rec(slot), half_owned, comp); page_rec_set_n_owned(page_dir_slot_get_rec(block.page.frame, slot),
page_rec_set_n_owned(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE), half_owned, comp);
page_rec_set_n_owned(page_dir_slot_get_rec(block.page.frame,
slot - PAGE_DIR_SLOT_SIZE),
n_owned - half_owned, comp); n_owned - half_owned, comp);
return false; return false;
} }
@ -1092,7 +1047,8 @@ static void page_zip_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
/* 1. We loop to find a record approximately in the middle of the /* 1. We loop to find a record approximately in the middle of the
records owned by the slot. */ records owned by the slot. */
const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE); const rec_t *rec= page_dir_slot_get_rec(block->page.frame,
slot + PAGE_DIR_SLOT_SIZE);
/* We do not try to prevent crash on corruption here. /* We do not try to prevent crash on corruption here.
For ROW_FORMAT=COMPRESSED pages, the next-record links should For ROW_FORMAT=COMPRESSED pages, the next-record links should
@ -1119,10 +1075,13 @@ static void page_zip_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
/* Log changes to the compressed page header and the dense page directory. */ /* Log changes to the compressed page header and the dense page directory. */
memcpy_aligned<2>(&block->page.zip.data[n_slots_f], n_slots_p, 2); memcpy_aligned<2>(&block->page.zip.data[n_slots_f], n_slots_p, 2);
mach_write_to_2(slot, rec - block->page.frame); mach_write_to_2(slot, rec - block->page.frame);
page_rec_set_n_owned<true>(block, page_dir_slot_get_rec(slot), half_owned, page_rec_set_n_owned<true>(block,
page_dir_slot_get_rec(block->page.frame, slot),
half_owned,
true, mtr); true, mtr);
page_rec_set_n_owned<true>(block, page_rec_set_n_owned<true>(block,
page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE), page_dir_slot_get_rec(block->page.frame,
slot - PAGE_DIR_SLOT_SIZE),
n_owned - half_owned, true, mtr); n_owned - half_owned, true, mtr);
} }
@ -1150,12 +1109,15 @@ static void page_zip_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
page_dir_slot_t* slot = page_dir_get_nth_slot(block->page.frame, s); page_dir_slot_t* slot = page_dir_get_nth_slot(block->page.frame, s);
rec_t* const up_rec = const_cast<rec_t*> rec_t* const up_rec = const_cast<rec_t*>
(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE)); (page_dir_slot_get_rec(block->page.frame,
slot - PAGE_DIR_SLOT_SIZE));
rec_t* const slot_rec = const_cast<rec_t*> rec_t* const slot_rec = const_cast<rec_t*>
(page_dir_slot_get_rec(slot)); (page_dir_slot_get_rec(block->page.frame,
slot));
const ulint up_n_owned = rec_get_n_owned_new(up_rec); const ulint up_n_owned = rec_get_n_owned_new(up_rec);
ut_ad(rec_get_n_owned_new(page_dir_slot_get_rec(slot)) ut_ad(rec_get_n_owned_new(page_dir_slot_get_rec(block->page.frame,
slot))
== PAGE_DIR_SLOT_MIN_N_OWNED - 1); == PAGE_DIR_SLOT_MIN_N_OWNED - 1);
if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) { if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
@ -1219,9 +1181,10 @@ static void page_dir_balance_slot(const buf_block_t &block, ulint s)
page_dir_slot_t* slot = page_dir_get_nth_slot(block.page.frame, s); page_dir_slot_t* slot = page_dir_get_nth_slot(block.page.frame, s);
rec_t* const up_rec = const_cast<rec_t*> rec_t* const up_rec = const_cast<rec_t*>
(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE)); (page_dir_slot_get_rec(block.page.frame,
slot - PAGE_DIR_SLOT_SIZE));
rec_t* const slot_rec = const_cast<rec_t*> rec_t* const slot_rec = const_cast<rec_t*>
(page_dir_slot_get_rec(slot)); (page_dir_slot_get_rec(block.page.frame, slot));
const ulint up_n_owned = comp const ulint up_n_owned = comp
? rec_get_n_owned_new(up_rec) ? rec_get_n_owned_new(up_rec)
: rec_get_n_owned_old(up_rec); : rec_get_n_owned_old(up_rec);
@ -2500,7 +2463,8 @@ page_cur_delete_rec(
left at the next record. */ left at the next record. */
rec = const_cast<rec_t*> rec = const_cast<rec_t*>
(page_dir_slot_get_rec(cur_dir_slot + PAGE_DIR_SLOT_SIZE)); (page_dir_slot_get_rec(block->page.frame,
cur_dir_slot + PAGE_DIR_SLOT_SIZE));
/* rec now points to the record of the previous directory slot. Look /* rec now points to the record of the previous directory slot. Look
for the immediate predecessor of current_rec in a loop. */ for the immediate predecessor of current_rec in a loop. */
@ -2529,7 +2493,8 @@ page_cur_delete_rec(
ut_ad(cur_n_owned > 1); ut_ad(cur_n_owned > 1);
rec_t* slot_rec = const_cast<rec_t*> rec_t* slot_rec = const_cast<rec_t*>
(page_dir_slot_get_rec(cur_dir_slot)); (page_dir_slot_get_rec(block->page.frame,
cur_dir_slot));
if (UNIV_LIKELY_NULL(block->page.zip.data)) { if (UNIV_LIKELY_NULL(block->page.zip.data)) {
ut_ad(page_is_comp(block->page.frame)); ut_ad(page_is_comp(block->page.frame));