From a736a2cbc469da837ddfb3a95368b5e556e12017 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20M=C3=A4kel=C3=A4?= Date: Tue, 3 Mar 2020 14:41:32 +0200 Subject: [PATCH] MDEV-21724: Correctly invoke page_dir_split_slot() In commit 138cbec5f2300bd5b401e83802642c1806264992, we computed an incorrect parameter to page_dir_split_slot(), leading us to splitting the wrong directory slot, or an out-of-bounds access when splitting the supremum slot. This was once caught in the test innodb_gis.kill_server for inserting records to a clustered index root page. page_dir_split_slot(): Take the slot as a pointer, instead of a numeric index. page_apply_insert_redundant(), page_apply_insert_dynamic(): Rename slot to last_slot, and make owner_slot a pointer. --- storage/innobase/page/page0cur.cc | 60 ++++++++++++++++++------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc index 9d30a18253a..dab2c09a4cc 100644 --- a/storage/innobase/page/page0cur.cc +++ b/storage/innobase/page/page0cur.cc @@ -799,12 +799,13 @@ static void page_rec_set_n_owned(rec_t *rec, ulint n_owned, bool comp) /** Split a directory slot which owns too many records. @param[in,out] block index page -@param[in] s the slot that needs to be split */ -static void page_dir_split_slot(const buf_block_t &block, ulint s) +@param[in,out] slot the slot that needs to be split */ +static void page_dir_split_slot(const buf_block_t &block, + page_dir_slot_t *slot) { - ut_ad(s); + ut_ad(slot <= &block.frame[srv_page_size - PAGE_EMPTY_DIR_START]); + slot= my_assume_aligned<2>(slot); - page_dir_slot_t *slot= page_dir_get_nth_slot(block.frame, s); const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1; ut_ad(page_dir_slot_get_n_owned(slot) == n_owned); @@ -825,6 +826,7 @@ static void page_dir_split_slot(const buf_block_t &block, ulint s) page_dir_slot_t *last_slot= static_cast (block.frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) - n_slots * PAGE_DIR_SLOT_SIZE); + ut_ad(slot >= last_slot); memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE, slot - last_slot); @@ -1595,7 +1597,10 @@ copied: corresponding directory slot in two. */ if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) - page_dir_split_slot(*block, page_dir_find_owner_slot(next_rec)); + { + const auto owner= page_dir_find_owner_slot(next_rec); + page_dir_split_slot(*block, page_dir_get_nth_slot(block->frame, owner)); + } rec_offs_make_valid(insert_buf + extra_size, index, page_is_leaf(block->frame), offsets); @@ -2305,14 +2310,14 @@ corrupted: return true; } - const byte *const slot= page_dir_get_nth_slot(block.frame, n_slots - 1); + byte * const last_slot= page_dir_get_nth_slot(block.frame, n_slots - 1); byte * const page_heap_top= my_assume_aligned<2> (PAGE_HEAP_TOP + PAGE_HEADER + block.frame); const byte *const heap_bot= &block.frame[PAGE_OLD_SUPREMUM_END]; byte *heap_top= block.frame + mach_read_from_2(page_heap_top); - if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > slot)) + if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > last_slot)) goto corrupted; - if (UNIV_UNLIKELY(mach_read_from_2(slot) != PAGE_OLD_SUPREMUM)) + if (UNIV_UNLIKELY(mach_read_from_2(last_slot) != PAGE_OLD_SUPREMUM)) goto corrupted; if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) != PAGE_OLD_INFIMUM)) @@ -2370,16 +2375,21 @@ corrupted: goto corrupted; /* Corrupted (cyclic?) next-record list */ } + page_dir_slot_t *owner_slot= last_slot; + if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED) goto corrupted; + else + { + mach_write_to_2(insert_buf, owner_rec - block.frame); + static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility"); + const page_dir_slot_t * const first_slot= + page_dir_get_nth_slot(block.frame, 0); - mach_write_to_2(insert_buf, owner_rec - block.frame); - ulint owner_slot= n_slots; - static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility"); - while (memcmp_aligned<2>(slot + 2 * owner_slot, insert_buf, 2)) - if (!owner_slot--) - goto corrupted; - owner_slot= n_slots - 1 - owner_slot; + while (memcmp_aligned<2>(owner_slot, insert_buf, 2)) + if ((owner_slot+= 2) == first_slot) + goto corrupted; + } memcpy(insert_buf, data, extra_size - hdr_c); byte *insert_rec= &insert_buf[extra_size]; @@ -2437,7 +2447,7 @@ corrupted: } else { - if (UNIV_UNLIKELY(heap_top + extra_size + data_size > slot)) + if (UNIV_UNLIKELY(heap_top + extra_size + data_size > last_slot)) goto corrupted; rec_set_bit_field_2(insert_rec, h, REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); @@ -2545,14 +2555,14 @@ corrupted: return true; } - const byte *const slot= page_dir_get_nth_slot(block.frame, n_slots - 1); + byte * const last_slot= page_dir_get_nth_slot(block.frame, n_slots - 1); byte * const page_heap_top= my_assume_aligned<2> (PAGE_HEAP_TOP + PAGE_HEADER + block.frame); const byte *const heap_bot= &block.frame[PAGE_NEW_SUPREMUM_END]; byte *heap_top= block.frame + mach_read_from_2(page_heap_top); - if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > slot)) + if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > last_slot)) goto corrupted; - if (UNIV_UNLIKELY(mach_read_from_2(slot) != PAGE_NEW_SUPREMUM)) + if (UNIV_UNLIKELY(mach_read_from_2(last_slot) != PAGE_NEW_SUPREMUM)) goto corrupted; if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) != PAGE_NEW_INFIMUM)) @@ -2589,7 +2599,7 @@ corrupted: goto corrupted; /* Corrupted (cyclic?) next-record list */ } - ulint owner_slot= n_slots; + page_dir_slot_t* owner_slot= last_slot; if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED) goto corrupted; @@ -2598,10 +2608,12 @@ corrupted: static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility"); alignas(2) byte slot_buf[2]; mach_write_to_2(slot_buf, owner_rec - block.frame); - while (memcmp_aligned<2>(slot + 2 * owner_slot, slot_buf, 2)) - if (!owner_slot--) + const page_dir_slot_t * const first_slot= + page_dir_get_nth_slot(block.frame, 0); + + while (memcmp_aligned<2>(owner_slot, slot_buf, 2)) + if ((owner_slot+= 2) == first_slot) goto corrupted; - owner_slot= n_slots - 1 - owner_slot; } const ulint extra_size= REC_N_NEW_EXTRA_BYTES + hdr_c + (enc_hdr_l >> 3); @@ -2643,7 +2655,7 @@ corrupted: } else { - if (UNIV_UNLIKELY(heap_top + extra_size + data_size > slot)) + if (UNIV_UNLIKELY(heap_top + extra_size + data_size > last_slot)) goto corrupted; mach_write_to_2(page_n_heap, h + 1); h&= 0x7fff;