diff --git a/storage/innobase/include/page0cur.h b/storage/innobase/include/page0cur.h index fdc76bdb17e..14a3ce9ba03 100644 --- a/storage/innobase/include/page0cur.h +++ b/storage/innobase/include/page0cur.h @@ -212,23 +212,18 @@ bool page_apply_delete_dynamic(const buf_block_t &block, ulint prev, size_t hdr_size, size_t data_size); MY_ATTRIBUTE((warn_unused_result)) -/****************************************************************//** -Searches the right position for a page cursor. */ -bool -page_cur_search_with_match( -/*=======================*/ - const dtuple_t* tuple, /*!< in: data tuple */ - page_cur_mode_t mode, /*!< in: PAGE_CUR_L, - PAGE_CUR_LE, PAGE_CUR_G, or - PAGE_CUR_GE */ - uint16_t* iup_matched_fields, - /*!< in/out: already matched - fields in upper limit record */ - uint16_t* ilow_matched_fields, - /*!< in/out: already matched - fields in lower limit record */ - page_cur_t* cursor, /*!< in/out: page cursor */ - rtr_info_t* rtr_info);/*!< in/out: rtree search stack */ +/** Search the right position for a page cursor. +@param tuple search key +@param mode search mode +@param iup_fields matched fields in the upper limit record +@param ilow_fields matched fields in the low limit record +@param cursor page cursor +@param rtr_info R-tree search stack, or nullptr +@return whether the page is corrupted */ +bool page_cur_search_with_match(const dtuple_t *tuple, page_cur_mode_t mode, + uint16_t *iup_fields, uint16_t *ilow_fields, + page_cur_t *cursor, rtr_info_t *rtr_info) + noexcept; /** Search the right position for a page cursor. @param tuple search key @@ -238,7 +233,7 @@ page_cur_search_with_match( @param cursor page cursor @param iup_bytes matched bytes after iup_fields @param ilow_bytes matched bytes after ilow_fields -@return whether the first partially matched field in the lower limit record, +@return whether the first partially matched field is in the lower limit record, or the page is corrupted */ bool page_cur_search_with_match_bytes(const dtuple_t &tuple, page_cur_mode_t mode, diff --git a/storage/innobase/include/page0page.h b/storage/innobase/include/page0page.h index 945414dac01..c8e819134d0 100644 --- a/storage/innobase/include/page0page.h +++ b/storage/innobase/include/page0page.h @@ -636,31 +636,55 @@ page_rec_check( /** Get the record pointed to by a directory slot. @param[in] slot directory slot @return pointer to record */ -inline rec_t *page_dir_slot_get_rec(page_dir_slot_t *slot) +inline rec_t *page_dir_slot_get_rec(page_t *page, page_dir_slot_t *slot) + noexcept { - return page_align(slot) + mach_read_from_2(my_assume_aligned<2>(slot)); + return page + mach_read_from_2(my_assume_aligned<2>(slot)); } -inline const rec_t *page_dir_slot_get_rec(const page_dir_slot_t *slot) +inline const rec_t *page_dir_slot_get_rec(const page_t *page, + const page_dir_slot_t *slot) noexcept +{ + return page_dir_slot_get_rec(const_cast(page), + const_cast(slot)); +} + +inline rec_t *page_dir_slot_get_rec(page_dir_slot_t *slot) noexcept +{ + return page_dir_slot_get_rec(page_align(slot), slot); +} +inline const rec_t *page_dir_slot_get_rec(const page_dir_slot_t *slot) noexcept { return page_dir_slot_get_rec(const_cast(slot)); } -inline rec_t *page_dir_slot_get_rec_validate(page_dir_slot_t *slot) +inline rec_t *page_dir_slot_get_rec_validate(page_t *page, + page_dir_slot_t *slot) noexcept { const size_t s= mach_read_from_2(my_assume_aligned<2>(slot)); - page_t *page= page_align(slot); - return UNIV_LIKELY(s >= PAGE_NEW_INFIMUM && s <= page_header_get_field(page, PAGE_HEAP_TOP)) ? page + s : nullptr; } + +inline const rec_t *page_dir_slot_get_rec_validate(const page_t *page, + const page_dir_slot_t *slot) + noexcept +{ + return page_dir_slot_get_rec_validate(const_cast(page), + const_cast(slot)); +} + +inline rec_t *page_dir_slot_get_rec_validate(page_dir_slot_t *slot) noexcept +{ + return page_dir_slot_get_rec_validate(page_align(slot), slot); +} inline const rec_t *page_dir_slot_get_rec_validate(const page_dir_slot_t *slot) + noexcept { return page_dir_slot_get_rec_validate(const_cast(slot)); } - /***************************************************************//** Gets the number of records owned by a directory slot. @return number of records */ diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc index 88f8c3c9204..00e29caeeb7 100644 --- a/storage/innobase/page/page0cur.cc +++ b/storage/innobase/page/page0cur.cc @@ -69,8 +69,7 @@ static ulint cmp_get_pad_char(const dtype_t &type) noexcept } /** Compare a data tuple to a physical record. -@param page B-tree index leaf page -@param rec PAGE_LAST_INSERT record +@param rec B-tree index record @param index index B-tree @param tuple search key @param match matched fields << 16 | bytes @@ -79,22 +78,23 @@ static ulint cmp_get_pad_char(const dtype_t &type) noexcept @retval 0 if dtuple is equal to rec @retval negative if dtuple is less than rec @retval positive if dtuple is greater than rec */ -static int cmp_dtuple_rec_bytes(const page_t *page, - const rec_t *rec, +static int cmp_dtuple_rec_bytes(const rec_t *rec, const dict_index_t &index, const dtuple_t &tuple, int *match, ulint comp) noexcept { ut_ad(dtuple_check_typed(&tuple)); - ut_ad(page_is_leaf(page)); + ut_ad(page_rec_is_leaf(rec)); ut_ad(!(REC_INFO_MIN_REC_FLAG & dtuple_get_info_bits(&tuple))); ut_ad(!!comp == index.table->not_redundant()); if (UNIV_UNLIKELY(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(rec, comp))) { + ut_d(const page_t *page= page_align(rec)); ut_ad(page_rec_is_first(rec, page)); ut_ad(!page_has_prev(page)); ut_ad(rec_is_metadata(rec, index)); + *match= 0; return 1; } @@ -110,9 +110,18 @@ static int cmp_dtuple_rec_bytes(const page_t *page, if (UNIV_LIKELY(comp != 0)) { - const unsigned n_core_null_bytes= index.n_core_null_bytes; const byte *nulls= rec - REC_N_NEW_EXTRA_BYTES; - const byte *lens= --nulls - n_core_null_bytes; + const byte *lens; + if (rec_get_status(rec) == REC_STATUS_INSTANT) + { + ulint n_fields= index.n_core_fields + rec_get_n_add_field(nulls) + 1; + ut_ad(n_fields <= index.n_fields); + const ulint n_nullable= index.get_n_nullable(n_fields); + ut_ad(n_nullable <= index.n_nullable); + lens= --nulls - UT_BITS_IN_BYTES(n_nullable); + } + else + lens= --nulls - index.n_core_null_bytes; byte null_mask= 1; size_t i= 0; @@ -354,7 +363,7 @@ static bool page_cur_try_search_shortcut_bytes(const page_t *page, int up= int(*iup_fields << 16 | *iup_bytes); up= low= std::min(low, up); const auto comp= page_is_comp(page); - if (cmp_dtuple_rec_bytes(page, rec, index, tuple, &low, comp) < 0) + if (cmp_dtuple_rec_bytes(rec, index, tuple, &low, comp) < 0) return false; const rec_t *next; if (UNIV_LIKELY(comp != 0)) @@ -364,7 +373,7 @@ static bool page_cur_try_search_shortcut_bytes(const page_t *page, if (next != page + PAGE_NEW_SUPREMUM) { cmp_up: - if (cmp_dtuple_rec_bytes(page, rec, index, tuple, &up, comp) >= 0) + if (cmp_dtuple_rec_bytes(rec, index, tuple, &up, comp) >= 0) return false; *iup_fields= uint16_t(up >> 16); *iup_bytes= uint16_t(up); @@ -383,24 +392,13 @@ static bool page_cur_try_search_shortcut_bytes(const page_t *page, return true; } -/** Search the right position for a page cursor. -@param tuple search key -@param mode search mode -@param iup_fields matched fields in the upper limit record -@param ilow_fields matched fields in the low limit record -@param cursor page cursor -@param iup_bytes matched bytes after iup_fields -@param ilow_bytes matched bytes after ilow_fields -@return whether the first partially matched field in the lower limit record, -or the page is corrupted */ bool page_cur_search_with_match_bytes(const dtuple_t &tuple, page_cur_mode_t mode, uint16_t *iup_fields, uint16_t *ilow_fields, page_cur_t *cursor, uint16_t *iup_bytes, - uint16_t *ilow_bytes) - noexcept + uint16_t *ilow_bytes) noexcept { ut_ad(dtuple_validate(&tuple)); ut_ad(!(tuple.info_bits & REC_INFO_MIN_REC_FLAG)); @@ -450,11 +448,12 @@ bool page_cur_search_with_match_bytes(const dtuple_t &tuple, while (up - low > 1) { const size_t mid= (low + up) / 2; - mid_rec= page_dir_slot_get_rec_validate(page_dir_get_nth_slot(page, mid)); + mid_rec= page_dir_slot_get_rec_validate(page, + page_dir_get_nth_slot(page, mid)); if (UNIV_UNLIKELY(!mid_rec)) return true; int cur= std::min(low_cmp, up_cmp); - int cmp= cmp_dtuple_rec_bytes(page, mid_rec, index, tuple, &cur, comp); + int cmp= cmp_dtuple_rec_bytes(mid_rec, index, tuple, &cur, comp); if (cmp > 0) low_slot_match: low= mid, low_cmp= cur; @@ -468,9 +467,9 @@ bool page_cur_search_with_match_bytes(const dtuple_t &tuple, } const rec_t *up_rec= - page_dir_slot_get_rec_validate(page_dir_get_nth_slot(page, up));; + page_dir_slot_get_rec_validate(page, page_dir_get_nth_slot(page, up)); const rec_t *low_rec= - page_dir_slot_get_rec_validate(page_dir_get_nth_slot(page, low)); + page_dir_slot_get_rec_validate(page, page_dir_get_nth_slot(page, low)); if (UNIV_UNLIKELY(!low_rec || !up_rec)) return true; @@ -493,12 +492,12 @@ bool page_cur_search_with_match_bytes(const dtuple_t &tuple, if (UNIV_UNLIKELY(rec_get_info_bits(mid_rec, comp) & REC_INFO_MIN_REC_FLAG)) { - ut_ad(!page_has_prev(page_align(mid_rec))); + ut_ad(!page_has_prev(page)); ut_ad(rec_is_metadata(mid_rec, index)); goto low_rec_match; } - cmp= cmp_dtuple_rec_bytes(page, mid_rec, index, tuple, &cur, comp); + cmp= cmp_dtuple_rec_bytes(mid_rec, index, tuple, &cur, comp); if (cmp > 0) low_rec_match: @@ -519,6 +518,7 @@ bool page_cur_search_with_match_bytes(const dtuple_t &tuple, } /** Compare a data tuple to a physical record. +@tparam leaf whether this must be a leaf page @param page B-tree index page @param rec B-tree index record @param index index B-tree @@ -529,7 +529,8 @@ bool page_cur_search_with_match_bytes(const dtuple_t &tuple, @retval 0 if dtuple is equal to rec @retval negative if dtuple is less than rec @retval positive if dtuple is greater than rec */ -static int cmp_dtuple_rec_leaf(const dtuple_t &dtuple, const rec_t *rec, +template +static int page_cur_dtuple_cmp(const dtuple_t &dtuple, const rec_t *rec, const dict_index_t &index, uint16_t *matched_fields, ulint comp) noexcept { @@ -539,15 +540,17 @@ static int cmp_dtuple_rec_leaf(const dtuple_t &dtuple, const rec_t *rec, ut_ad(dtuple.n_fields_cmp > 0); ut_ad(dtuple.n_fields_cmp <= index.n_core_fields); ut_ad(cur_field <= dtuple.n_fields_cmp); - ut_ad(page_rec_is_leaf(rec)); - ut_ad(!(rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG) || + ut_ad(leaf == page_rec_is_leaf(rec)); + ut_ad(!leaf || !(rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG) || index.is_instant() || (index.is_primary() && trx_roll_crash_recv_trx && !trx_rollback_is_active)); - ut_ad(!(dtuple.info_bits & REC_INFO_MIN_REC_FLAG) || + ut_ad(!leaf || !(dtuple.info_bits & REC_INFO_MIN_REC_FLAG) || index.is_instant() || (index.is_primary() && trx_roll_crash_recv_trx && !trx_rollback_is_active)); + ut_ad(leaf || !index.is_spatial() || + dtuple.n_fields_cmp == DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1); int ret= 0; if (dtuple.info_bits & REC_INFO_MIN_REC_FLAG) @@ -626,6 +629,17 @@ static int cmp_dtuple_rec_leaf(const dtuple_t &dtuple, const rec_t *rec, { const dfield_t *df= dtuple_get_nth_field(&dtuple, i); ut_ad(!dfield_is_ext(df)); + if (!leaf && i == DICT_INDEX_SPATIAL_NODEPTR_SIZE && + index.is_spatial()) + { + /* SPATIAL INDEX non-leaf records comprise + MBR (minimum bounding rectangle) and the child page number. + The function rtr_cur_restore_position() includes the + child page number in the search key, because the MBR alone + would not be unique. */ + ut_ad(dtuple.fields[DICT_INDEX_SPATIAL_NODEPTR_SIZE].len == 4); + len= 4; + } ret= cmp_data(df->type.mtype, df->type.prtype, field->descending, static_cast(df->data), df->len, f, len); if (ret) @@ -657,6 +671,16 @@ static int cmp_dtuple_rec_leaf(const dtuple_t &dtuple, const rec_t *rec, return ret; } +static int page_cur_dtuple_cmp(const dtuple_t &dtuple, const rec_t *rec, + const dict_index_t &index, + uint16_t *matched_fields, ulint comp, bool leaf) + noexcept +{ + return leaf + ? page_cur_dtuple_cmp(dtuple, rec, index, matched_fields, comp) + : page_cur_dtuple_cmp(dtuple, rec, index, matched_fields, comp); +} + #ifdef BTR_CUR_HASH_ADAPT bool btr_cur_t::check_mismatch(const dtuple_t &tuple, bool ge, ulint comp) noexcept @@ -666,7 +690,7 @@ bool btr_cur_t::check_mismatch(const dtuple_t &tuple, bool ge, ulint comp) const rec_t *rec= page_cur.rec; uint16_t match= 0; - int cmp= cmp_dtuple_rec_leaf(tuple, rec, *index(), &match, comp); + int cmp= page_cur_dtuple_cmp(tuple, rec, *index(), &match, comp); const auto uniq= dict_index_get_n_unique_in_tree(index()); ut_ad(match <= uniq); ut_ad(match <= tuple.n_fields_cmp); @@ -706,7 +730,7 @@ bool btr_cur_t::check_mismatch(const dtuple_t &tuple, bool ge, ulint comp) if (uintptr_t(rec - page) == PAGE_OLD_SUPREMUM) goto le_supremum; } - return cmp_dtuple_rec_leaf(tuple, rec, *index(), &up_match, comp) >= 0; + return page_cur_dtuple_cmp(tuple, rec, *index(), &up_match, comp) >= 0; } else { @@ -728,34 +752,33 @@ bool btr_cur_t::check_mismatch(const dtuple_t &tuple, bool ge, ulint comp) default: return true; } - return cmp_dtuple_rec_leaf(tuple, rec, *index(), &match, comp) <= 0; + return page_cur_dtuple_cmp(tuple, rec, *index(), &match, comp) <= 0; } } #endif /* BTR_CUR_HASH_ADAPT */ /** Try a search shortcut based on the last insert. -@param page index page -@param rec PAGE_LAST_INSERT record -@param index index tree -@param tuple search key -@param iup matched fields in the upper limit record -@param ilow matched fields in the lower limit record +@param page index page +@param rec PAGE_LAST_INSERT record +@param index index tree +@param tuple search key +@param iup matched fields in the upper limit record +@param ilow matched fields in the lower limit record +@param comp nonzero if ROW_FORMAT=REDUNDANT is not being used @return record @return nullptr if the tuple was not found */ static bool page_cur_try_search_shortcut(const page_t *page, const rec_t *rec, const dict_index_t &index, const dtuple_t &tuple, - uint16_t *iup, uint16_t *ilow) - noexcept + uint16_t *iup, uint16_t *ilow, + ulint comp) noexcept { ut_ad(dtuple_check_typed(&tuple)); - ut_ad(page_is_leaf(page)); - auto comp= page_is_comp(page); ut_ad(page_rec_is_user_rec(rec)); uint16_t low= std::min(*ilow, *iup), up= low; - if (cmp_dtuple_rec_leaf(tuple, rec, index, &low, comp) < 0) + if (page_cur_dtuple_cmp(tuple, rec, index, &low, comp) < 0) return false; if (comp) @@ -766,7 +789,7 @@ static bool page_cur_try_search_shortcut(const page_t *page, const rec_t *rec, if (rec != page + PAGE_NEW_SUPREMUM) { compare_next: - if (cmp_dtuple_rec_leaf(tuple, rec, index, &up, comp) >= 0) + if (page_cur_dtuple_cmp(tuple, rec, index, &up, comp) >= 0) return false; *iup= up; } @@ -784,211 +807,140 @@ static bool page_cur_try_search_shortcut(const page_t *page, const rec_t *rec, return true; } -/****************************************************************//** -Searches the right position for a page cursor. */ -bool -page_cur_search_with_match( -/*=======================*/ - const dtuple_t* tuple, /*!< in: data tuple */ - page_cur_mode_t mode, /*!< in: PAGE_CUR_L, - PAGE_CUR_LE, PAGE_CUR_G, or - PAGE_CUR_GE */ - uint16_t* iup_matched_fields, - /*!< in/out: already matched - fields in upper limit record */ - uint16_t* ilow_matched_fields, - /*!< in/out: already matched - fields in lower limit record */ - page_cur_t* cursor, /*!< out: page cursor */ - rtr_info_t* rtr_info)/*!< in/out: rtree search stack */ +bool page_cur_search_with_match(const dtuple_t *tuple, page_cur_mode_t mode, + uint16_t *iup_fields, uint16_t *ilow_fields, + page_cur_t *cursor, rtr_info_t *rtr_info) + noexcept { - ulint up; - ulint low; - ulint mid; - const page_t* page; - const rec_t* up_rec; - const rec_t* low_rec; - const rec_t* mid_rec; - uint16_t up_matched_fields; - uint16_t low_matched_fields; - uint16_t cur_matched_fields; - int cmp; - const dict_index_t* const index = cursor->index; - const buf_block_t* const block = cursor->block; + ut_ad(dtuple_validate(tuple)); + ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE || + mode == PAGE_CUR_G || mode == PAGE_CUR_GE || + cursor->index->is_spatial()); + const dict_index_t &index= *cursor->index; + const buf_block_t *const block= cursor->block; + const page_t *const page= block->page.frame; + ut_d(page_check_dir(page)); #ifdef UNIV_ZIP_DEBUG - const page_zip_des_t* page_zip = buf_block_get_page_zip(block); + if (const page_zip_des_t *page_zip= buf_block_get_page_zip(block)) + ut_a(page_zip_validate(page_zip, page, &index)); #endif /* UNIV_ZIP_DEBUG */ - mem_heap_t* heap = NULL; - rec_offs offsets_[REC_OFFS_NORMAL_SIZE]; - rec_offs* offsets = offsets_; - rec_offs_init(offsets_); + const auto comp= page_is_comp(page); + const bool leaf{page_is_leaf(page)}; - ut_ad(dtuple_validate(tuple)); - ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE - || mode == PAGE_CUR_G || mode == PAGE_CUR_GE - || index->is_spatial()); - page = buf_block_get_frame(block); -#ifdef UNIV_ZIP_DEBUG - ut_a(!page_zip || page_zip_validate(page_zip, page, index)); -#endif /* UNIV_ZIP_DEBUG */ + /* If the mode is for R-tree indexes, use the special MBR + related compare functions */ + if (mode == PAGE_CUR_RTREE_INSERT && leaf) + { + /* Leaf level insert uses the traditional compare function */ + mode= PAGE_CUR_LE; + goto check_last_insert; + } + else if (mode > PAGE_CUR_LE) + return rtr_cur_search_with_match(block, + const_cast(&index), + tuple, mode, cursor, rtr_info); + else if (mode == PAGE_CUR_LE && leaf) + { + check_last_insert: + if (page_get_direction(page) != PAGE_RIGHT || + (tuple->info_bits & REC_INFO_MIN_REC_FLAG)); + else if (uint16_t last= page_header_get_offs(page, PAGE_LAST_INSERT)) + { + const rec_t *rec= page + last; + if (page_header_get_field(page, PAGE_N_DIRECTION) > 2 && + page_cur_try_search_shortcut(page, rec, index, *tuple, + iup_fields, ilow_fields, comp)) + { + page_cur_position(rec, block, cursor); + return false; + } + } + } - ut_d(page_check_dir(page)); - const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0; + /* If mode PAGE_CUR_G is specified, we are trying to position the + cursor to answer a query of the form "tuple < X", where tuple is the + input parameter, and X denotes an arbitrary physical record on the + page. We want to position the cursor on the first X which satisfies + the condition. */ + uint16_t up_fields= *iup_fields, low_fields= *ilow_fields; - /* If the mode is for R-tree indexes, use the special MBR - related compare functions */ - if (index->is_spatial()) { - /* For leaf level insert, we still use the traditional - compare function for now */ - if (n_core && mode == PAGE_CUR_RTREE_INSERT) { - mode = PAGE_CUR_LE; - } else if (mode > PAGE_CUR_LE) { - return rtr_cur_search_with_match( - block, (dict_index_t*)index, tuple, mode, - cursor, rtr_info); - } - } else if (!n_core || mode != PAGE_CUR_LE || !page_is_leaf(page) - || page_get_direction(page) != PAGE_RIGHT - || (tuple->info_bits & REC_INFO_MIN_REC_FLAG)) { - } else if (uint16_t last = - page_header_get_offs(page, PAGE_LAST_INSERT)) { - if (page_header_get_field(page, PAGE_N_DIRECTION) > 2 - && page_cur_try_search_shortcut(page, page + last, *index, - *tuple, - iup_matched_fields, - ilow_matched_fields)) { - page_cur_position(page + last, block, cursor); - return false; - } - } + /* Perform binary search. First the search is done through the page + directory, after that as a linear search in the list of records + owned by the upper limit directory slot. */ + size_t low= 0, up= ulint{page_dir_get_n_slots(page)} - 1; + const rec_t *mid_rec; - /* If mode PAGE_CUR_G is specified, we are trying to position the - cursor to answer a query of the form "tuple < X", where tuple is - the input parameter, and X denotes an arbitrary physical record on - the page. We want to position the cursor on the first X which - satisfies the condition. */ + /* Perform binary search until the lower and upper limit directory + slots come to the distance 1 of each other */ + while (up - low > 1) + { + const size_t mid= (low + up) / 2; + mid_rec= + page_dir_slot_get_rec_validate(page, page_dir_get_nth_slot(page, mid)); + if (UNIV_UNLIKELY(!mid_rec)) + return true; + uint16_t cur= std::min(low_fields, up_fields); + int cmp= page_cur_dtuple_cmp(*tuple, mid_rec, index, &cur, comp, leaf); + if (cmp > 0) + low_slot_match: + low= mid, low_fields= cur; + else if (cmp) + up_slot_match: + up= mid, up_fields= cur; + else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE) + goto low_slot_match; + else + goto up_slot_match; + } - up_matched_fields = *iup_matched_fields; - low_matched_fields = *ilow_matched_fields; + const rec_t *up_rec= + page_dir_slot_get_rec_validate(page, page_dir_get_nth_slot(page, up));; + const rec_t *low_rec= + page_dir_slot_get_rec_validate(page, page_dir_get_nth_slot(page, low)); + if (UNIV_UNLIKELY(!low_rec || !up_rec)) + return true; - /* Perform binary search. First the search is done through the page - directory, after that as a linear search in the list of records - owned by the upper limit directory slot. */ + /* Perform linear search until the upper and lower records come to + distance 1 of each other. */ - low = 0; - up = ulint(page_dir_get_n_slots(page)) - 1; + for (;;) + { + mid_rec= comp + ? page_rec_next_get(page, low_rec) + : page_rec_next_get(page, low_rec); + if (!mid_rec) + return true; + if (mid_rec == up_rec) + break; - /* Perform binary search until the lower and upper limit directory - slots come to the distance 1 of each other */ + uint16_t cur= std::min(low_fields, up_fields); + int cmp= page_cur_dtuple_cmp(*tuple, mid_rec, index, &cur, comp, leaf); + if (cmp > 0) + low_rec_match: + low_rec= mid_rec, low_fields= cur; + else if (cmp) + up_rec_match: + up_rec= mid_rec, up_fields= cur; + else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE) + { + if (cur == 0) + { + /* A match on 0 fields must be due to REC_INFO_MIN_REC_FLAG */ + ut_ad(rec_get_info_bits(mid_rec, comp) & REC_INFO_MIN_REC_FLAG); + ut_ad(!page_has_prev(page)); + ut_ad(!leaf || rec_is_metadata(mid_rec, index)); + cur= tuple->n_fields_cmp; + } + goto low_rec_match; + } + else + goto up_rec_match; + } - while (up - low > 1) { - mid = (low + up) / 2; - const page_dir_slot_t* slot = page_dir_get_nth_slot(page, mid); - if (UNIV_UNLIKELY(!(mid_rec - = page_dir_slot_get_rec_validate(slot)))) { - goto corrupted; - } - cur_matched_fields = std::min(low_matched_fields, - up_matched_fields); - - offsets = offsets_; - offsets = rec_get_offsets( - mid_rec, index, offsets, n_core, - dtuple_get_n_fields_cmp(tuple), &heap); - - cmp = cmp_dtuple_rec_with_match( - tuple, mid_rec, index, offsets, &cur_matched_fields); - - if (cmp > 0) { -low_slot_match: - low = mid; - low_matched_fields = cur_matched_fields; - - } else if (cmp) { -up_slot_match: - up = mid; - up_matched_fields = cur_matched_fields; - - } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE) { - goto low_slot_match; - } else { - - goto up_slot_match; - } - } - - low_rec = page_dir_slot_get_rec_validate( - page_dir_get_nth_slot(page, low)); - up_rec = page_dir_slot_get_rec_validate( - page_dir_get_nth_slot(page, up)); - if (UNIV_UNLIKELY(!low_rec || !up_rec)) { -corrupted: - if (UNIV_LIKELY_NULL(heap)) { - mem_heap_free(heap); - } - return true; - } - - /* Perform linear search until the upper and lower records come to - distance 1 of each other. */ - - for (;;) { - if (const rec_t* next = page_rec_get_next_const(low_rec)) { - if (next == up_rec) { - break; - } - mid_rec = next; - } else { - goto corrupted; - } - cur_matched_fields = std::min(low_matched_fields, - up_matched_fields); - - offsets = offsets_; - offsets = rec_get_offsets( - mid_rec, index, offsets, n_core, - dtuple_get_n_fields_cmp(tuple), &heap); - - cmp = cmp_dtuple_rec_with_match( - tuple, mid_rec, index, offsets, &cur_matched_fields); - - if (cmp > 0) { -low_rec_match: - low_rec = mid_rec; - low_matched_fields = cur_matched_fields; - - } else if (cmp) { -up_rec_match: - up_rec = mid_rec; - up_matched_fields = cur_matched_fields; - } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE) { - if (!cmp && !cur_matched_fields) { - /* We got a match, but cur_matched_fields is - 0, it must have REC_INFO_MIN_REC_FLAG */ - ut_ad(rec_get_info_bits(mid_rec, - rec_offs_comp(offsets)) - & REC_INFO_MIN_REC_FLAG); - ut_ad(!page_has_prev(page)); - cur_matched_fields = tuple->n_fields_cmp; - } - - goto low_rec_match; - } else { - - goto up_rec_match; - } - } - - page_cur_position(mode <= PAGE_CUR_GE ? up_rec : low_rec, block, - cursor); - - *iup_matched_fields = up_matched_fields; - *ilow_matched_fields = low_matched_fields; - if (UNIV_LIKELY_NULL(heap)) { - mem_heap_free(heap); - } - - return false; + page_cur_position(mode <= PAGE_CUR_GE ? up_rec : low_rec, block, cursor); + *iup_fields= up_fields; + *ilow_fields= low_fields; + return false; } /***********************************************************//** @@ -1032,7 +984,8 @@ static bool page_dir_split_slot(const buf_block_t &block, PAGE_DIR_SLOT_MIN_N_OWNED, "compatibility"); /* Find a record approximately in the middle. */ - const rec_t *rec= page_dir_slot_get_rec_validate(slot + PAGE_DIR_SLOT_SIZE); + const rec_t *rec= page_dir_slot_get_rec_validate(block.page.frame, + slot + PAGE_DIR_SLOT_SIZE); for (ulint i= n_owned / 2; i--; ) { @@ -1065,8 +1018,10 @@ static bool page_dir_split_slot(const buf_block_t &block, mach_write_to_2(slot, rec - block.page.frame); const bool comp= page_is_comp(block.page.frame) != 0; - page_rec_set_n_owned(page_dir_slot_get_rec(slot), half_owned, comp); - page_rec_set_n_owned(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE), + page_rec_set_n_owned(page_dir_slot_get_rec(block.page.frame, slot), + half_owned, comp); + page_rec_set_n_owned(page_dir_slot_get_rec(block.page.frame, + slot - PAGE_DIR_SLOT_SIZE), n_owned - half_owned, comp); return false; } @@ -1092,7 +1047,8 @@ static void page_zip_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr) /* 1. We loop to find a record approximately in the middle of the records owned by the slot. */ - const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE); + const rec_t *rec= page_dir_slot_get_rec(block->page.frame, + slot + PAGE_DIR_SLOT_SIZE); /* We do not try to prevent crash on corruption here. For ROW_FORMAT=COMPRESSED pages, the next-record links should @@ -1119,10 +1075,13 @@ static void page_zip_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr) /* Log changes to the compressed page header and the dense page directory. */ memcpy_aligned<2>(&block->page.zip.data[n_slots_f], n_slots_p, 2); mach_write_to_2(slot, rec - block->page.frame); - page_rec_set_n_owned(block, page_dir_slot_get_rec(slot), half_owned, + page_rec_set_n_owned(block, + page_dir_slot_get_rec(block->page.frame, slot), + half_owned, true, mtr); page_rec_set_n_owned(block, - page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE), + page_dir_slot_get_rec(block->page.frame, + slot - PAGE_DIR_SLOT_SIZE), n_owned - half_owned, true, mtr); } @@ -1150,12 +1109,15 @@ static void page_zip_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr) page_dir_slot_t* slot = page_dir_get_nth_slot(block->page.frame, s); rec_t* const up_rec = const_cast - (page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE)); + (page_dir_slot_get_rec(block->page.frame, + slot - PAGE_DIR_SLOT_SIZE)); rec_t* const slot_rec = const_cast - (page_dir_slot_get_rec(slot)); + (page_dir_slot_get_rec(block->page.frame, + slot)); const ulint up_n_owned = rec_get_n_owned_new(up_rec); - ut_ad(rec_get_n_owned_new(page_dir_slot_get_rec(slot)) + ut_ad(rec_get_n_owned_new(page_dir_slot_get_rec(block->page.frame, + slot)) == PAGE_DIR_SLOT_MIN_N_OWNED - 1); if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) { @@ -1219,9 +1181,10 @@ static void page_dir_balance_slot(const buf_block_t &block, ulint s) page_dir_slot_t* slot = page_dir_get_nth_slot(block.page.frame, s); rec_t* const up_rec = const_cast - (page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE)); + (page_dir_slot_get_rec(block.page.frame, + slot - PAGE_DIR_SLOT_SIZE)); rec_t* const slot_rec = const_cast - (page_dir_slot_get_rec(slot)); + (page_dir_slot_get_rec(block.page.frame, slot)); const ulint up_n_owned = comp ? rec_get_n_owned_new(up_rec) : rec_get_n_owned_old(up_rec); @@ -2500,7 +2463,8 @@ page_cur_delete_rec( left at the next record. */ rec = const_cast - (page_dir_slot_get_rec(cur_dir_slot + PAGE_DIR_SLOT_SIZE)); + (page_dir_slot_get_rec(block->page.frame, + cur_dir_slot + PAGE_DIR_SLOT_SIZE)); /* rec now points to the record of the previous directory slot. Look for the immediate predecessor of current_rec in a loop. */ @@ -2529,7 +2493,8 @@ page_cur_delete_rec( ut_ad(cur_n_owned > 1); rec_t* slot_rec = const_cast - (page_dir_slot_get_rec(cur_dir_slot)); + (page_dir_slot_get_rec(block->page.frame, + cur_dir_slot)); if (UNIV_LIKELY_NULL(block->page.zip.data)) { ut_ad(page_is_comp(block->page.frame));