mariadb/storage/maria/ma_page.c
2022-06-09 14:11:43 +03:00

635 lines
20 KiB
C

/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
Copyright (c) 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
/*
Read and write key blocks
The basic structure of a key block is as follows:
LSN 7 (LSN_STORE_SIZE); Log number for last change;
Only for transactional pages
PACK_TRANSID 6 (TRANSID_SIZE); Relative transid to pack page transid's
Only for transactional pages
KEYNR 1 (KEYPAGE_KEYID_SIZE) Which index this page belongs to
FLAG 1 (KEYPAGE_FLAG_SIZE) Flags for page
PAGE_SIZE 2 (KEYPAGE_USED_SIZE) How much of the page is used.
high-byte-first
The flag is a combination of the following values:
KEYPAGE_FLAG_ISNOD Page is a node
KEYPAGE_FLAG_HAS_TRANSID There may be a transid on the page.
After this we store key data, either packed or not packed, directly
after each other. If the page is a node flag, there is a pointer to
the next key page at page start and after each key.
At end of page the last KEYPAGE_CHECKSUM_SIZE bytes are reserved for a
page checksum.
*/
#include "maria_def.h"
#include "trnman.h"
#include "ma_key_recover.h"
/**
Fill MARIA_PAGE structure for usage with _ma_write_keypage
*/
void _ma_page_setup(MARIA_PAGE *page, MARIA_HA *info,
const MARIA_KEYDEF *keyinfo, my_off_t pos,
uchar *buff)
{
MARIA_SHARE *share= info->s;
page->info= info;
page->keyinfo= keyinfo;
page->buff= buff;
page->pos= pos;
page->size= _ma_get_page_used(share, buff);
page->org_size= page->size;
page->flag= _ma_get_keypage_flag(share, buff);
page->node= ((page->flag & KEYPAGE_FLAG_ISNOD) ?
share->base.key_reflength : 0);
}
#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
void page_cleanup(MARIA_SHARE *share, MARIA_PAGE *page)
{
uint length= page->size;
DBUG_ASSERT(length <= share->max_index_block_size);
bzero(page->buff + length, share->block_size - length);
}
#endif
/**
Fetch a key-page in memory
@fn _ma_fetch_keypage()
@param page Fill this struct with information about read page
@param info Maria handler
@param keyinfo Key definition for used key
@param pos Position for page (in bytes)
@param lock Lock type for page
@param level Importance of page; Priority for page cache
@param buff Buffer to use for page
@param return_buffer Set to 1 if we want to force useage of buff
@return
@retval 0 ok
@retval 1 error
*/
my_bool _ma_fetch_keypage(MARIA_PAGE *page, MARIA_HA *info,
const MARIA_KEYDEF *keyinfo,
my_off_t pos, enum pagecache_page_lock lock,
int level, uchar *buff,
my_bool return_buffer __attribute__ ((unused)))
{
uchar *tmp;
MARIA_PINNED_PAGE page_link;
MARIA_SHARE *share= info->s;
uint block_size= share->block_size;
DBUG_ENTER("_ma_fetch_keypage");
DBUG_PRINT("enter",("page: %lu", (ulong) (pos / block_size)));
tmp= pagecache_read(share->pagecache, &share->kfile,
(pgcache_page_no_t) (pos / block_size), level, buff,
share->page_type, lock, &page_link.link);
if (lock != PAGECACHE_LOCK_LEFT_UNLOCKED)
{
DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE || lock == PAGECACHE_LOCK_READ);
page_link.unlock= (lock == PAGECACHE_LOCK_WRITE ?
PAGECACHE_LOCK_WRITE_UNLOCK :
PAGECACHE_LOCK_READ_UNLOCK);
page_link.changed= 0;
push_dynamic(&info->pinned_pages, (void*) &page_link);
page->link_offset= (uint)info->pinned_pages.elements-1;
}
if (tmp == info->buff)
info->keyread_buff_used=1;
else if (!tmp)
{
DBUG_PRINT("error",("Got errno: %d from pagecache_read",my_errno));
info->last_keypage=HA_OFFSET_ERROR;
_ma_set_fatal_error(info, my_errno);
DBUG_RETURN(1);
}
info->last_keypage= pos;
/*
Setup page structure to make pages easy to use
This is same as page_fill_info, but here inlined as this si used
so often.
*/
page->info= info;
page->keyinfo= keyinfo;
page->buff= tmp;
page->pos= pos;
page->size= _ma_get_page_used(share, tmp);
page->org_size= page->size; /* For debugging */
page->flag= _ma_get_keypage_flag(share, tmp);
page->node= ((page->flag & KEYPAGE_FLAG_ISNOD) ?
share->base.key_reflength : 0);
#ifdef EXTRA_DEBUG
{
uint page_size= page->size;
if (page_size < 4 || page_size > share->max_index_block_size ||
_ma_get_keynr(share, tmp) != keyinfo->key_nr)
{
DBUG_PRINT("error",("page %lu had wrong page length: %u page_header: %u keynr: %u",
(ulong) (pos / block_size), page_size,
share->keypage_header,
_ma_get_keynr(share, tmp)));
DBUG_DUMP("page", tmp, page_size);
info->last_keypage = HA_OFFSET_ERROR;
_ma_set_fatal_error(info, HA_ERR_CRASHED);
DBUG_RETURN(1);
}
}
#endif
DBUG_RETURN(0);
} /* _ma_fetch_keypage */
/* Write a key-page on disk */
my_bool _ma_write_keypage(MARIA_PAGE *page, enum pagecache_page_lock lock,
int level)
{
MARIA_SHARE *share= page->info->s;
uint block_size= share->block_size;
uchar *buff= page->buff;
my_bool res;
MARIA_PINNED_PAGE page_link;
DBUG_ENTER("_ma_write_keypage");
/*
The following ensures that for transactional tables we have logged
all changes that changes the page size (as the logging code sets
page->org_size)
*/
DBUG_ASSERT(!share->now_transactional || page->size == page->org_size);
#ifdef EXTRA_DEBUG /* Safety check */
{
uint page_length, nod_flag;
page_length= _ma_get_page_used(share, buff);
nod_flag= _ma_test_if_nod(share, buff);
DBUG_ASSERT(page->size == page_length);
DBUG_ASSERT(page->size <= share->max_index_block_size);
DBUG_ASSERT(page->flag == _ma_get_keypage_flag(share, buff));
if (page->pos < share->base.keystart ||
page->pos+block_size > share->state.state.key_file_length ||
(page->pos & (maria_block_size-1)))
{
DBUG_PRINT("error",("Trying to write inside key status region: "
"key_start: %lu length: %lu page_pos: %lu",
(long) share->base.keystart,
(long) share->state.state.key_file_length,
(long) page->pos));
my_errno=EINVAL;
DBUG_ASSERT(0);
DBUG_RETURN(1);
}
DBUG_PRINT("page",("write page at: %lu",(ulong) (page->pos / block_size)));
DBUG_DUMP("buff", buff, page_length);
DBUG_ASSERT(page_length >= share->keypage_header + nod_flag +
page->keyinfo->minlength || maria_in_recovery);
}
#endif
/* Verify that keynr is correct */
DBUG_ASSERT(_ma_get_keynr(share, buff) == page->keyinfo->key_nr);
#if defined(EXTRA_DEBUG) && defined(HAVE_valgrind) && defined(WHEN_DEBUGGING)
MEM_CHECK_DEFINED(buff, block_size);
#endif
page_cleanup(share, page);
{
PAGECACHE_BLOCK_LINK **link;
enum pagecache_page_pin pin;
if (lock == PAGECACHE_LOCK_LEFT_WRITELOCKED)
{
pin= PAGECACHE_PIN_LEFT_PINNED;
link= &page_link.link;
}
else if (lock == PAGECACHE_LOCK_WRITE_UNLOCK)
{
pin= PAGECACHE_UNPIN;
/*
We unlock this page so link should be 0 to prevent it usage
even accidentally
*/
link= NULL;
}
else
{
pin= PAGECACHE_PIN;
link= &page_link.link;
}
res= pagecache_write(share->pagecache,
&share->kfile,
(pgcache_page_no_t) (page->pos / block_size),
level, buff, share->page_type,
lock, pin, PAGECACHE_WRITE_DELAY, link,
LSN_IMPOSSIBLE);
}
if (lock == PAGECACHE_LOCK_WRITE)
{
/* It was not locked before, we have to unlock it when we unpin pages */
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
page_link.changed= 1;
push_dynamic(&page->info->pinned_pages, (void*) &page_link);
}
DBUG_RETURN(res);
}
/**
@brief Put page in free list
@fn _ma_dispose()
@param info Maria handle
@param pos Address to page
@param page_not_read 1 if page has not yet been read
@note
The page at 'pos' must have been read with a write lock.
This function does logging (unlike _ma_new()).
@return
@retval 0 ok
@retval 1 error
*/
int _ma_dispose(register MARIA_HA *info, my_off_t pos, my_bool page_not_read)
{
my_off_t old_link;
uchar buff[MAX_KEYPAGE_HEADER_SIZE+ 8 + 2];
ulonglong page_no;
MARIA_SHARE *share= info->s;
MARIA_PINNED_PAGE page_link;
uint block_size= share->block_size;
int result= 0;
enum pagecache_page_lock lock_method;
enum pagecache_page_pin pin_method;
DBUG_ENTER("_ma_dispose");
DBUG_PRINT("enter",("page: %lu", (ulong) (pos / block_size)));
DBUG_ASSERT(pos % block_size == 0);
(void) _ma_lock_key_del(info, 0);
old_link= share->key_del_current;
share->key_del_current= pos;
page_no= pos / block_size;
bzero(buff, share->keypage_header);
_ma_store_keynr(share, buff, (uchar) MARIA_DELETE_KEY_NR);
_ma_store_page_used(share, buff, share->keypage_header + 8);
mi_sizestore(buff + share->keypage_header, old_link);
share->state.changed|= STATE_NOT_SORTED_PAGES;
if (share->now_transactional)
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2];
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
my_off_t page;
/* Store address of deleted page */
page_store(log_data + FILEID_STORE_SIZE, page_no);
/* Store link to next unused page (the link that is written to page) */
page= (old_link == HA_OFFSET_ERROR ? IMPOSSIBLE_PAGE_NO :
old_link / block_size);
page_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE, page);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_REDO_INDEX_FREE_PAGE,
info->trn, info,
(translog_size_t) sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data, NULL))
result= 1;
}
if (page_not_read)
{
lock_method= PAGECACHE_LOCK_WRITE;
pin_method= PAGECACHE_PIN;
}
else
{
lock_method= PAGECACHE_LOCK_LEFT_WRITELOCKED;
pin_method= PAGECACHE_PIN_LEFT_PINNED;
}
if (pagecache_write_part(share->pagecache,
&share->kfile, (pgcache_page_no_t) page_no,
PAGECACHE_PRIORITY_LOW, buff,
share->page_type,
lock_method, pin_method,
PAGECACHE_WRITE_DELAY, &page_link.link,
LSN_IMPOSSIBLE,
0, share->keypage_header + 8))
result= 1;
#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
{
uchar *page_buff= pagecache_block_link_to_buffer(page_link.link);
bzero(page_buff + share->keypage_header + 8,
block_size - share->keypage_header - 8 - KEYPAGE_CHECKSUM_SIZE);
}
#endif
if (page_not_read)
{
/* It was not locked before, we have to unlock it when we unpin pages */
page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
page_link.changed= 1;
push_dynamic(&info->pinned_pages, (void*) &page_link);
}
DBUG_RETURN(result);
} /* _ma_dispose */
/**
@brief Get address for free page to use
@fn _ma_new()
@param info Maria handle
@param level Type of key block (caching priority for pagecache)
@param page_link Pointer to page in page cache if read. One can
check if this is used by checking if
page_link->changed != 0
@note Logging of this is left to the caller (so that the "new"ing and the
first changes done to this new page can be logged as one single entry - one
single _ma_log_new()) call).
@return
HA_OFFSET_ERROR File is full or page read error
# Page address to use
*/
my_off_t _ma_new(register MARIA_HA *info, int level,
MARIA_PINNED_PAGE **page_link)
{
my_off_t pos;
MARIA_SHARE *share= info->s;
uint block_size= share->block_size;
DBUG_ENTER("_ma_new");
if (_ma_lock_key_del(info, 1))
{
mysql_mutex_lock(&share->intern_lock);
pos= share->state.state.key_file_length;
if (pos >= share->base.max_key_file_length - block_size)
{
my_errno=HA_ERR_INDEX_FILE_FULL;
mysql_mutex_unlock(&share->intern_lock);
DBUG_RETURN(HA_OFFSET_ERROR);
}
share->state.state.key_file_length+= block_size;
/* Following is for not transactional tables */
info->state->key_file_length= share->state.state.key_file_length;
mysql_mutex_unlock(&share->intern_lock);
(*page_link)->changed= 0;
(*page_link)->write_lock= PAGECACHE_LOCK_WRITE;
}
else
{
uchar *buff;
pos= share->key_del_current; /* Protected */
DBUG_ASSERT(share->pagecache->block_size == block_size);
if (!(buff= pagecache_read(share->pagecache,
&share->kfile,
(pgcache_page_no_t) (pos / block_size), level,
0, share->page_type,
PAGECACHE_LOCK_WRITE, &(*page_link)->link)))
{
pos= HA_OFFSET_ERROR;
_ma_set_fatal_error(info, my_errno);
}
else
{
/*
Next deleted page's number is in the header of the present page
(single linked list):
*/
#ifdef DBUG_ASSERT_EXISTS
my_off_t key_del_current;
#endif
share->key_del_current= mi_sizekorr(buff+share->keypage_header);
#ifdef DBUG_ASSERT_EXISTS
key_del_current= share->key_del_current;
DBUG_ASSERT((key_del_current != 0) &&
((key_del_current == HA_OFFSET_ERROR) ||
(key_del_current <=
(share->state.state.key_file_length - block_size))));
#endif
}
(*page_link)->unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
(*page_link)->write_lock= PAGECACHE_LOCK_WRITE;
/*
We have to mark it changed as _ma_flush_pending_blocks() uses
'changed' to know if we used the page cache or not
*/
(*page_link)->changed= 1;
push_dynamic(&info->pinned_pages, (void*) *page_link);
*page_link= dynamic_element(&info->pinned_pages,
info->pinned_pages.elements-1,
MARIA_PINNED_PAGE *);
}
share->state.changed|= STATE_NOT_SORTED_PAGES;
DBUG_PRINT("exit",("Pos: %ld",(long) pos));
DBUG_RETURN(pos);
} /* _ma_new */
/**
Log compactation of a index page
*/
static my_bool _ma_log_compact_keypage(MARIA_PAGE *ma_page,
TrID min_read_from)
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + 7 + TRANSID_SIZE];
uchar *log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
MARIA_HA *info= ma_page->info;
MARIA_SHARE *share= info->s;
uint translog_parts, extra_length;
my_off_t page= ma_page->pos;
DBUG_ENTER("_ma_log_compact_keypage");
DBUG_PRINT("enter", ("page: %lu", (ulong) (page / share->block_size)));
/* Store address of new root page */
page/= share->block_size;
page_store(log_data + FILEID_STORE_SIZE, page);
log_pos= log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE;
log_pos[0]= KEY_OP_COMPACT_PAGE;
transid_store(log_pos + 1, min_read_from);
log_pos+= 1 + TRANSID_SIZE;
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
log_data);
translog_parts= 1;
extra_length= 0;
_ma_log_key_changes(ma_page,
log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
log_pos, &extra_length, &translog_parts);
/* Remember new page length for future log entires for same page */
ma_page->org_size= ma_page->size;
if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
(translog_size_t)(log_array[TRANSLOG_INTERNAL_PARTS +
0].length + extra_length),
TRANSLOG_INTERNAL_PARTS + translog_parts,
log_array, log_data, NULL))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
/**
Remove all transaction id's less than given one from a key page
@fn _ma_compact_keypage()
@param keyinfo Key handler
@param page_pos Page position on disk
@param page Buffer for page
@param min_read_from Remove all trids from page less than this
@retval 0 Ok
®retval 1 Error; my_errno contains the error
*/
my_bool _ma_compact_keypage(MARIA_PAGE *ma_page, TrID min_read_from)
{
MARIA_HA *info= ma_page->info;
MARIA_SHARE *share= info->s;
MARIA_KEY key;
uchar *page, *endpos, *start_of_empty_space;
uint page_flag, nod_flag, saved_space;
my_bool page_has_transid;
DBUG_ENTER("_ma_compact_keypage");
page_flag= ma_page->flag;
if (!(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
DBUG_RETURN(0); /* No transaction id on page */
nod_flag= ma_page->node;
page= ma_page->buff;
endpos= page + ma_page->size;
key.data= info->lastkey_buff;
key.keyinfo= (MARIA_KEYDEF*) ma_page->keyinfo;
page_has_transid= 0;
page+= share->keypage_header + nod_flag;
key.data[0]= 0; /* safety */
start_of_empty_space= 0;
saved_space= 0;
do
{
if (!(page= (*ma_page->keyinfo->skip_key)(&key, 0, 0, page)))
{
DBUG_PRINT("error",("Couldn't find last key: page_pos: %p",
page));
_ma_set_fatal_error(info, HA_ERR_CRASHED);
DBUG_RETURN(1);
}
if (key_has_transid(page-1))
{
uint transid_length;
transid_length= transid_packed_length(page);
if (min_read_from == ~(TrID) 0 ||
min_read_from < transid_get_packed(share, page))
{
page[-1]&= 254; /* Remove transid marker */
transid_length= transid_packed_length(page);
if (start_of_empty_space)
{
/* Move block before the transid up in page */
uint copy_length= (uint) (page - start_of_empty_space) - saved_space;
memmove(start_of_empty_space, start_of_empty_space + saved_space,
copy_length);
start_of_empty_space+= copy_length;
}
else
start_of_empty_space= page;
saved_space+= transid_length;
}
else
page_has_transid= 1; /* At least one id left */
page+= transid_length;
}
page+= nod_flag;
} while (page < endpos);
DBUG_ASSERT(page == endpos);
if (start_of_empty_space)
{
/*
Move last block down
This is always true if any transid was removed
*/
uint copy_length= (uint) (endpos - start_of_empty_space) - saved_space;
if (copy_length)
memmove(start_of_empty_space, start_of_empty_space + saved_space,
copy_length);
ma_page->size= (uint) (start_of_empty_space + copy_length - ma_page->buff);
page_store_size(share, ma_page);
}
if (!page_has_transid)
{
ma_page->flag&= ~KEYPAGE_FLAG_HAS_TRANSID;
_ma_store_keypage_flag(share, ma_page->buff, ma_page->flag);
/* Clear packed transid (in case of zerofill) */
bzero(ma_page->buff + LSN_STORE_SIZE, TRANSID_SIZE);
}
if (share->now_transactional)
{
if (_ma_log_compact_keypage(ma_page, min_read_from))
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}