mariadb/storage/maria/ma_delete.c
2019-05-13 17:54:04 +03:00

1662 lines
56 KiB
C

/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
Copyright (C) 2009-2010 Monty Program Ab
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
#include "ma_fulltext.h"
#include "ma_rt_index.h"
#include "trnman.h"
#include "ma_key_recover.h"
static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
MARIA_PAGE *page);
static int del(MARIA_HA *info, MARIA_KEY *key,
MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
uchar *keypos, my_off_t next_block, uchar *ret_key_buff);
static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
uchar *keypos);
static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
uchar *keypos, uchar *lastkey, uchar *page_end,
my_off_t *next_block, MARIA_KEY_PARAM *s_temp);
/* @breif Remove a row from a MARIA table */
int maria_delete(MARIA_HA *info,const uchar *record)
{
uint i;
uchar *old_key;
int save_errno;
char lastpos[8];
MARIA_SHARE *share= info->s;
MARIA_KEYDEF *keyinfo;
DBUG_ENTER("maria_delete");
/* Test if record is in datafile */
DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
maria_print_error(share, HA_ERR_CRASHED);
DBUG_RETURN(my_errno= HA_ERR_CRASHED););
DBUG_EXECUTE_IF("my_error_test_undefined_error",
maria_print_error(share, INT_MAX);
DBUG_RETURN(my_errno= INT_MAX););
if (!(info->update & HA_STATE_AKTIV))
{
DBUG_RETURN(my_errno=HA_ERR_KEY_NOT_FOUND); /* No database read */
}
if (share->options & HA_OPTION_READ_ONLY_DATA)
{
DBUG_RETURN(my_errno=EACCES);
}
if (_ma_readinfo(info,F_WRLCK,1))
DBUG_RETURN(my_errno);
if ((*share->compare_record)(info,record))
goto err; /* Error on read-check */
if (_ma_mark_file_changed(share))
goto err;
/* Ensure we don't change the autoincrement value */
info->last_auto_increment= ~(ulonglong) 0;
/* Remove all keys from the index file */
old_key= info->lastkey_buff2;
for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
{
if (maria_is_key_active(share->state.key_map, i))
{
keyinfo->version++;
if (keyinfo->flag & HA_FULLTEXT)
{
if (_ma_ft_del(info, i, old_key, record, info->cur_row.lastpos))
goto err;
}
else
{
MARIA_KEY key;
if (keyinfo->ck_delete(info,
(*keyinfo->make_key)(info, &key, i, old_key,
record,
info->cur_row.lastpos,
info->cur_row.trid)))
goto err;
}
/* The above changed info->lastkey2. Inform maria_rnext_same(). */
info->update&= ~HA_STATE_RNEXT_SAME;
}
}
if (share->calc_checksum)
{
/*
We can't use the row based checksum as this doesn't have enough
precision.
*/
info->cur_row.checksum= (*share->calc_checksum)(info, record);
}
if ((*share->delete_record)(info, record))
goto err; /* Remove record from database */
info->state->checksum-= info->cur_row.checksum;
info->state->records--;
info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
info->row_changes++;
share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE |
STATE_NOT_ZEROFILLED);
info->state->changed=1;
mi_sizestore(lastpos, info->cur_row.lastpos);
_ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
if (info->invalidator != 0)
{
DBUG_PRINT("info", ("invalidator... '%s' (delete)",
share->open_file_name.str));
(*info->invalidator)(share->open_file_name.str);
info->invalidator=0;
}
DBUG_RETURN(0);
err:
save_errno= my_errno;
DBUG_ASSERT(save_errno);
if (!save_errno)
save_errno= HA_ERR_INTERNAL_ERROR; /* Should never happen */
mi_sizestore(lastpos, info->cur_row.lastpos);
(void) _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
info->update|=HA_STATE_WRITTEN; /* Buffer changed */
if (save_errno != HA_ERR_RECORD_CHANGED)
{
_ma_set_fatal_error(share, HA_ERR_CRASHED);
save_errno= HA_ERR_CRASHED;
}
DBUG_RETURN(my_errno= save_errno);
} /* maria_delete */
/*
Remove a key from the btree index
TODO:
Change ma_ck_real_delete() to use another buffer for changed keys instead
of key->data. This would allows us to remove the copying of the key here.
*/
my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key)
{
MARIA_SHARE *share= info->s;
int res;
LSN lsn= LSN_IMPOSSIBLE;
my_off_t new_root= share->state.key_root[key->keyinfo->key_nr];
uchar key_buff[MARIA_MAX_KEY_BUFF], *save_key_data;
MARIA_KEY org_key;
DBUG_ENTER("_ma_ck_delete");
LINT_INIT_STRUCT(org_key);
save_key_data= key->data;
if (share->now_transactional)
{
/* Save original value as the key may change */
memcpy(key_buff, key->data, key->data_length + key->ref_length);
org_key= *key;
key->data= key_buff;
}
if ((res= _ma_ck_real_delete(info, key, &new_root)))
{
/* We have to mark the table crashed before unpin_all_pages() */
maria_mark_crashed(info);
}
key->data= save_key_data;
if (!res && share->now_transactional)
res= _ma_write_undo_key_delete(info, &org_key, new_root, &lsn);
else
{
share->state.key_root[key->keyinfo->key_nr]= new_root;
_ma_fast_unlock_key_del(info);
}
_ma_unpin_all_pages_and_finalize_row(info, lsn);
DBUG_RETURN(res != 0);
} /* _ma_ck_delete */
my_bool _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEY *key,
my_off_t *root)
{
int error;
my_bool result= 0;
my_off_t old_root;
uchar *root_buff;
MARIA_KEYDEF *keyinfo= key->keyinfo;
MARIA_PAGE page;
DBUG_ENTER("_ma_ck_real_delete");
if ((old_root=*root) == HA_OFFSET_ERROR)
{
_ma_set_fatal_error(info->s, HA_ERR_CRASHED);
DBUG_RETURN(1);
}
if (!(root_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
MARIA_MAX_KEY_BUFF*2)))
{
DBUG_PRINT("error",("Couldn't allocate memory"));
my_errno=ENOMEM;
DBUG_RETURN(1);
}
DBUG_PRINT("info",("root_page: %lu",
(ulong) (old_root / keyinfo->block_length)));
if (_ma_fetch_keypage(&page, info, keyinfo, old_root,
PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0))
{
result= 1;
goto err;
}
if ((error= d_search(info, key, (keyinfo->flag & HA_FULLTEXT ?
SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT:
SEARCH_SAME),
&page)))
{
if (error < 0)
result= 1;
else if (error == 2)
{
DBUG_PRINT("test",("Enlarging of root when deleting"));
if (_ma_enlarge_root(info, key, root))
result= 1;
}
else /* error == 1 */
{
MARIA_SHARE *share= info->s;
page_mark_changed(info, &page);
if (page.size <= page.node + share->keypage_header + 1)
{
DBUG_ASSERT(page.size == page.node + share->keypage_header);
if (page.node)
*root= _ma_kpos(page.node, root_buff +share->keypage_header +
page.node);
else
*root=HA_OFFSET_ERROR;
if (_ma_dispose(info, old_root, 0))
result= 1;
}
else if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS))
result= 1;
}
}
err:
my_afree(root_buff);
DBUG_PRINT("exit",("Return: %d",result));
DBUG_RETURN(result);
} /* _ma_ck_real_delete */
/**
@brief Remove key below key root
@param key Key to delete. Will contain new key if block was enlarged
@return
@retval 0 ok (anc_page is not changed)
@retval 1 If data on page is too small; In this case anc_buff is not saved
@retval 2 If data on page is too big
@retval -1 On errors
*/
static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
MARIA_PAGE *anc_page)
{
int flag,ret_value,save_flag;
uint nod_flag, page_flag;
my_bool last_key;
uchar *leaf_buff,*keypos;
uchar lastkey[MARIA_MAX_KEY_BUFF];
MARIA_KEY_PARAM s_temp;
MARIA_SHARE *share= info->s;
MARIA_KEYDEF *keyinfo= key->keyinfo;
MARIA_PAGE leaf_page;
DBUG_ENTER("d_search");
DBUG_DUMP("page", anc_page->buff, anc_page->size);
flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos, lastkey,
&last_key);
if (flag == MARIA_FOUND_WRONG_KEY)
{
DBUG_PRINT("error",("Found wrong key"));
DBUG_RETURN(-1);
}
page_flag= anc_page->flag;
nod_flag= anc_page->node;
if (!flag && (keyinfo->flag & HA_FULLTEXT))
{
uint off;
int subkeys;
get_key_full_length_rdonly(off, lastkey);
subkeys=ft_sintXkorr(lastkey+off);
DBUG_ASSERT(info->ft1_to_ft2==0 || subkeys >=0);
comp_flag=SEARCH_SAME;
if (subkeys >= 0)
{
/* normal word, one-level tree structure */
if (info->ft1_to_ft2)
{
/* we're in ft1->ft2 conversion mode. Saving key data */
insert_dynamic(info->ft1_to_ft2, (lastkey+off));
}
else
{
/* we need exact match only if not in ft1->ft2 conversion mode */
flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos,
lastkey, &last_key);
}
/* fall through to normal delete */
}
else
{
/* popular word. two-level tree. going down */
uint tmp_key_length;
my_off_t root;
uchar *kpos=keypos;
MARIA_KEY tmp_key;
tmp_key.data= lastkey;
tmp_key.keyinfo= keyinfo;
if (!(tmp_key_length=(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag,
&kpos)))
{
_ma_set_fatal_error(share, HA_ERR_CRASHED);
DBUG_RETURN(-1);
}
root= _ma_row_pos_from_key(&tmp_key);
if (subkeys == -1)
{
/* the last entry in sub-tree */
if (_ma_dispose(info, root, 1))
DBUG_RETURN(-1);
/* fall through to normal delete */
}
else
{
MARIA_KEY word_key;
keyinfo=&share->ft2_keyinfo;
/* we'll modify key entry 'in vivo' */
kpos-=keyinfo->keylength+nod_flag;
get_key_full_length_rdonly(off, key->data);
word_key.data= key->data + off;
word_key.keyinfo= &share->ft2_keyinfo;
word_key.data_length= HA_FT_WLEN;
word_key.ref_length= 0;
word_key.flag= 0;
ret_value= _ma_ck_real_delete(info, &word_key, &root);
_ma_dpointer(share, kpos+HA_FT_WLEN, root);
subkeys++;
ft_intXstore(kpos, subkeys);
if (!ret_value)
{
page_mark_changed(info, anc_page);
ret_value= _ma_write_keypage(anc_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS);
}
DBUG_PRINT("exit",("Return: %d",ret_value));
DBUG_RETURN(ret_value);
}
}
}
leaf_buff=0;
if (nod_flag)
{
/* Read left child page */
leaf_page.pos= _ma_kpos(nod_flag,keypos);
if (!(leaf_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
MARIA_MAX_KEY_BUFF*2)))
{
DBUG_PRINT("error", ("Couldn't allocate memory"));
my_errno=ENOMEM;
DBUG_RETURN(-1);
}
if (_ma_fetch_keypage(&leaf_page, info,keyinfo, leaf_page.pos,
PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, leaf_buff,
0))
goto err;
}
if (flag != 0)
{
if (!nod_flag)
{
/* This should newer happend */
DBUG_PRINT("error",("Didn't find key"));
_ma_set_fatal_error(share, HA_ERR_CRASHED);
goto err;
}
save_flag=0;
ret_value= d_search(info, key, comp_flag, &leaf_page);
}
else
{ /* Found key */
uint tmp;
uint anc_buff_length= anc_page->size;
uint anc_page_flag= anc_page->flag;
my_off_t next_block;
if (!(tmp= remove_key(keyinfo, anc_page_flag, nod_flag, keypos, lastkey,
anc_page->buff + anc_buff_length,
&next_block, &s_temp)))
goto err;
page_mark_changed(info, anc_page);
anc_buff_length-= tmp;
anc_page->size= anc_buff_length;
page_store_size(share, anc_page);
/*
Log initial changes on pages
If there is an underflow, there will be more changes logged to the
page
*/
if (share->now_transactional &&
_ma_log_delete(anc_page, s_temp.key_pos,
s_temp.changed_length, s_temp.move_length,
0, KEY_OP_DEBUG_LOG_DEL_CHANGE_1))
DBUG_RETURN(-1);
if (!nod_flag)
{ /* On leaf page */
if (anc_buff_length <= (info->quick_mode ?
MARIA_MIN_KEYBLOCK_LENGTH :
(uint) keyinfo->underflow_block_length))
{
/* Page will be written by caller if we return 1 */
DBUG_RETURN(1);
}
if (_ma_write_keypage(anc_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
save_flag=1; /* Mark that anc_buff is changed */
ret_value= del(info, key, anc_page, &leaf_page,
keypos, next_block, lastkey);
}
if (ret_value >0)
{
save_flag= 2;
if (ret_value == 1)
ret_value= underflow(info, keyinfo, anc_page, &leaf_page, keypos);
else
{
/* This can only happen with variable length keys */
MARIA_KEY last_key;
DBUG_PRINT("test",("Enlarging of key when deleting"));
last_key.data= lastkey;
last_key.keyinfo= keyinfo;
if (!_ma_get_last_key(&last_key, anc_page, keypos))
goto err;
ret_value= _ma_insert(info, key, anc_page, keypos,
last_key.data,
(MARIA_PAGE*) 0, (uchar*) 0, (my_bool) 0);
if (_ma_write_keypage(&leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS))
ret_value= -1;
}
}
if (ret_value == 0 && anc_page->size > share->max_index_block_size)
{
/*
parent buffer got too big ; We have to split the page.
The | 2 is there to force write of anc page below
*/
save_flag= 3;
ret_value= _ma_split_page(info, key, anc_page,
share->max_index_block_size,
(uchar*) 0, 0, 0, lastkey, 0) | 2;
DBUG_ASSERT(anc_page->org_size == anc_page->size);
}
if (save_flag && ret_value != 1)
{
page_mark_changed(info, anc_page);
if (_ma_write_keypage(anc_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS))
ret_value= -1;
}
else
{
DBUG_DUMP("page", anc_page->buff, anc_page->size);
}
my_afree(leaf_buff);
DBUG_PRINT("exit",("Return: %d",ret_value));
DBUG_RETURN(ret_value);
err:
my_afree(leaf_buff);
DBUG_PRINT("exit",("Error: %d",my_errno));
DBUG_RETURN (-1);
} /* d_search */
/**
@brief Remove a key that has a page-reference
@param info Maria handler
@param key Buffer for key to be inserted at upper level
@param anc_page Page address for page where deleted key was
@param anc_buff Page buffer (nod) where deleted key was
@param leaf_page Page address for nod before the deleted key
@param leaf_buff Buffer for leaf_page
@param leaf_buff_link Pinned page link for leaf_buff
@param keypos Pos to where deleted key was on anc_buff
@param next_block Page adress for nod after deleted key
@param ret_key_buff Key before keypos in anc_buff
@notes
leaf_page must be written to disk if retval > 0
anc_page is not updated on disk. Caller should do this
@return
@retval < 0 Error
@retval 0 OK. leaf_buff is written to disk
@retval 1 key contains key to upper level (from balance page)
leaf_buff has underflow
@retval 2 key contains key to upper level (from split space)
*/
static int del(MARIA_HA *info, MARIA_KEY *key,
MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
uchar *keypos, my_off_t next_block, uchar *ret_key_buff)
{
int ret_value,length;
uint a_length, page_flag, nod_flag, leaf_length, new_leaf_length;
uchar keybuff[MARIA_MAX_KEY_BUFF],*endpos,*next_buff,*key_start, *prev_key;
uchar *anc_buff;
MARIA_KEY_PARAM s_temp;
MARIA_KEY tmp_key;
MARIA_SHARE *share= info->s;
MARIA_KEYDEF *keyinfo= key->keyinfo;
MARIA_KEY ret_key;
MARIA_PAGE next_page;
DBUG_ENTER("del");
DBUG_PRINT("enter",("leaf_page: %lu keypos: %p",
(ulong) (leaf_page->pos / share->block_size),
keypos));
DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
page_flag= leaf_page->flag;
leaf_length= leaf_page->size;
nod_flag= leaf_page->node;
endpos= leaf_page->buff + leaf_length;
tmp_key.keyinfo= keyinfo;
tmp_key.data= keybuff;
next_buff= 0;
if (!(key_start= _ma_get_last_key(&tmp_key, leaf_page, endpos)))
DBUG_RETURN(-1);
if (nod_flag)
{
next_page.pos= _ma_kpos(nod_flag,endpos);
if (!(next_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
MARIA_MAX_KEY_BUFF*2)))
DBUG_RETURN(-1);
if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, next_buff, 0))
ret_value= -1;
else
{
DBUG_DUMP("next_page", next_page.buff, next_page.size);
if ((ret_value= del(info, key, anc_page, &next_page,
keypos, next_block, ret_key_buff)) >0)
{
/* Get new length after key was deleted */
endpos= leaf_page->buff+ leaf_page->size;
if (ret_value == 1)
{
/* underflow writes "next_page" to disk */
ret_value= underflow(info, keyinfo, leaf_page, &next_page,
endpos);
if (ret_value < 0)
goto err;
if (leaf_page->size > share->max_index_block_size)
{
DBUG_ASSERT(ret_value == 0);
ret_value= (_ma_split_page(info, key, leaf_page,
share->max_index_block_size,
(uchar*) 0, 0, 0,
ret_key_buff, 0) | 2);
}
}
else
{
if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS))
goto err;
DBUG_PRINT("test",("Inserting of key when deleting"));
if (!_ma_get_last_key(&tmp_key, leaf_page, endpos))
goto err;
ret_value= _ma_insert(info, key, leaf_page, endpos,
tmp_key.data, (MARIA_PAGE *) 0, (uchar*) 0,
0);
}
}
page_mark_changed(info, leaf_page);
/*
If ret_value <> 0, then leaf_page underflowed and caller will have
to handle underflow and write leaf_page to disk.
We can't write it here, as if leaf_page is empty we get an assert
in _ma_write_keypage.
*/
if (ret_value == 0 && _ma_write_keypage(leaf_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS))
goto err;
}
my_afree(next_buff);
DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
DBUG_RETURN(ret_value);
}
/*
Remove last key from leaf page
Note that leaf_page page may only have had one key (can normally only
happen in quick mode), in which ase it will now temporary have 0 keys
on it. This will be corrected by the caller as we will return 0.
*/
new_leaf_length= (uint) (key_start - leaf_page->buff);
leaf_page->size= new_leaf_length;
page_store_size(share, leaf_page);
if (share->now_transactional &&
_ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
goto err;
page_mark_changed(info, leaf_page); /* Safety */
if (new_leaf_length <= (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
(uint) keyinfo->underflow_block_length))
{
/* Underflow, leaf_page will be written by caller */
ret_value= 1;
}
else
{
ret_value= 0;
if (_ma_write_keypage(leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS))
goto err;
}
/* Place last key in ancestor page on deleted key position */
a_length= anc_page->size;
anc_buff= anc_page->buff;
endpos= anc_buff + a_length;
ret_key.keyinfo= keyinfo;
ret_key.data= ret_key_buff;
prev_key= 0;
if (keypos != anc_buff+share->keypage_header + share->base.key_reflength)
{
if (!_ma_get_last_key(&ret_key, anc_page, keypos))
goto err;
prev_key= ret_key.data;
}
length= (*keyinfo->pack_key)(&tmp_key, share->base.key_reflength,
keypos == endpos ? (uchar*) 0 : keypos,
prev_key, prev_key,
&s_temp);
if (length > 0)
bmove_upp(endpos+length,endpos,(uint) (endpos-keypos));
else
bmove(keypos,keypos-length, (int) (endpos-keypos)+length);
(*keyinfo->store_key)(keyinfo,keypos,&s_temp);
key_start= keypos;
if (tmp_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
SEARCH_PAGE_KEY_HAS_TRANSID))
_ma_mark_page_with_transid(share, anc_page);
/* Save pointer to next leaf on parent page */
if (!(*keyinfo->get_key)(&ret_key, page_flag, share->base.key_reflength,
&keypos))
goto err;
_ma_kpointer(info,keypos - share->base.key_reflength,next_block);
anc_page->size= a_length + length;
page_store_size(share, anc_page);
if (share->now_transactional &&
_ma_log_add(anc_page, a_length,
key_start, s_temp.changed_length, s_temp.move_length, 1,
KEY_OP_DEBUG_LOG_ADD_2))
goto err;
DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
DBUG_RETURN(new_leaf_length <=
(info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
(uint) keyinfo->underflow_block_length));
err:
if (next_buff)
my_afree(next_buff);
DBUG_RETURN(-1);
} /* del */
/**
@brief Balances adjacent pages if underflow occours
@fn underflow()
@param anc_buff Anchestor page data
@param leaf_page Leaf page (page that underflowed)
@param leaf_page_link Pointer to pin information about leaf page
@param keypos Position after current key in anc_buff
@note
This function writes redo entries for all changes
leaf_page is saved to disk
Caller must save anc_buff
For the algoritm to work, we have to ensure for packed keys that
key_length + (underflow_length + max_block_length + key_length) / 2
<= block_length.
From which follows that underflow_length <= block_length - key_length *3
For not packed keys we have:
(underflow_length + max_block_length + key_length) / 2 <= block_length
From which follows that underflow_length < block_length - key_length
This is ensured by setting of underflow_block_length.
@return
@retval 0 ok
@retval 1 ok, but anc_page did underflow
@retval -1 error
*/
static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
uchar *keypos)
{
int t_length;
uint anc_length,buff_length,leaf_length,p_length,s_length,nod_flag;
uint next_buff_length, new_buff_length, key_reflength;
uint unchanged_leaf_length, new_leaf_length, new_anc_length;
uint anc_page_flag, page_flag;
uchar anc_key_buff[MARIA_MAX_KEY_BUFF], leaf_key_buff[MARIA_MAX_KEY_BUFF];
uchar *endpos, *next_keypos, *anc_pos, *half_pos, *prev_key;
uchar *anc_buff, *leaf_buff;
uchar *after_key, *anc_end_pos;
MARIA_KEY_PARAM key_deleted, key_inserted;
MARIA_SHARE *share= info->s;
my_bool first_key;
MARIA_KEY tmp_key, anc_key, leaf_key;
MARIA_PAGE next_page;
DBUG_ENTER("underflow");
DBUG_PRINT("enter",("leaf_page: %lu keypos: %p",
(ulong) (leaf_page->pos / share->block_size),
keypos));
DBUG_DUMP("anc_buff", anc_page->buff, anc_page->size);
DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
anc_page_flag= anc_page->flag;
anc_buff= anc_page->buff;
leaf_buff= leaf_page->buff;
info->keyread_buff_used=1;
next_keypos=keypos;
nod_flag= leaf_page->node;
p_length= nod_flag+share->keypage_header;
anc_length= anc_page->size;
leaf_length= leaf_page->size;
key_reflength= share->base.key_reflength;
if (share->keyinfo+info->lastinx == keyinfo)
info->page_changed=1;
first_key= keypos == anc_buff + share->keypage_header + key_reflength;
tmp_key.data= info->buff;
anc_key.data= anc_key_buff;
leaf_key.data= leaf_key_buff;
tmp_key.keyinfo= leaf_key.keyinfo= anc_key.keyinfo= keyinfo;
if ((keypos < anc_buff + anc_length && (info->state->records & 1)) ||
first_key)
{
size_t tmp_length;
uint next_page_flag;
/* Use page right of anc-page */
DBUG_PRINT("test",("use right page"));
/*
Calculate position after the current key. Note that keydata itself is
not used
*/
if (keyinfo->flag & HA_BINARY_PACK_KEY)
{
if (!(next_keypos= _ma_get_key(&tmp_key, anc_page, keypos)))
goto err;
}
else
{
/* Avoid length error check if packed key */
tmp_key.data[0]= tmp_key.data[1]= 0;
/* Got to end of found key */
if (!(*keyinfo->get_key)(&tmp_key, anc_page_flag, key_reflength,
&next_keypos))
goto err;
}
next_page.pos= _ma_kpos(key_reflength, next_keypos);
if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
goto err;
next_buff_length= next_page.size;
next_page_flag= next_page.flag;
DBUG_DUMP("next", next_page.buff, next_page.size);
/* find keys to make a big key-page */
bmove(next_keypos-key_reflength, next_page.buff + share->keypage_header,
key_reflength);
if (!_ma_get_last_key(&anc_key, anc_page, next_keypos) ||
!_ma_get_last_key(&leaf_key, leaf_page, leaf_buff+leaf_length))
goto err;
/* merge pages and put parting key from anc_page between */
prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
t_length= (*keyinfo->pack_key)(&anc_key, nod_flag, next_page.buff+p_length,
prev_key, prev_key, &key_inserted);
tmp_length= next_buff_length - p_length;
endpos= next_page.buff + tmp_length + leaf_length + t_length;
/* next_page.buff will always be larger than before !*/
bmove_upp(endpos, next_page.buff + next_buff_length, tmp_length);
memcpy(next_page.buff, leaf_buff,(size_t) leaf_length);
(*keyinfo->store_key)(keyinfo, next_page.buff+leaf_length, &key_inserted);
buff_length= (uint) (endpos - next_page.buff);
/* Set page flag from combination of both key pages and parting key */
page_flag= next_page_flag | leaf_page->flag;
if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
SEARCH_PAGE_KEY_HAS_TRANSID))
page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
next_page.size= buff_length;
next_page.flag= page_flag;
page_store_info(share, &next_page);
/* remove key from anc_page */
if (!(s_length=remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
anc_key_buff, anc_buff+anc_length,
(my_off_t *) 0, &key_deleted)))
goto err;
new_anc_length= anc_length - s_length;
anc_page->size= new_anc_length;
page_store_size(share, anc_page);
if (buff_length <= share->max_index_block_size)
{
/* All keys fitted into one page */
page_mark_changed(info, &next_page);
if (_ma_dispose(info, next_page.pos, 0))
goto err;
memcpy(leaf_buff, next_page.buff, (size_t) buff_length);
leaf_page->size= next_page.size;
leaf_page->flag= next_page.flag;
if (share->now_transactional)
{
/*
Log changes to parent page. Note that this page may have been
temporarily bigger than block_size.
*/
if (_ma_log_delete(anc_page, key_deleted.key_pos,
key_deleted.changed_length,
key_deleted.move_length,
anc_length - anc_page->org_size,
KEY_OP_DEBUG_LOG_DEL_CHANGE_2))
goto err;
/*
Log changes to leaf page. Data for leaf page is in leaf_buff
which contains original leaf_buff, parting key and next_buff
*/
if (_ma_log_suffix(leaf_page, leaf_length, buff_length))
goto err;
}
}
else
{
/*
Balancing didn't free a page, so we have to split 'buff' into two
pages:
- Find key in middle of buffer
- Store everything before key in 'leaf_page'
- Pack key into anc_page at position of deleted key
Note that anc_page may overflow! (is handled by caller)
- Store remaining keys in next_page (buff)
*/
MARIA_KEY_PARAM anc_key_inserted;
anc_end_pos= anc_buff + new_anc_length;
DBUG_PRINT("test",("anc_buff:%p anc_end_pos:%p",
anc_buff, anc_end_pos));
if (!first_key && !_ma_get_last_key(&anc_key, anc_page, keypos))
goto err;
if (!(half_pos= _ma_find_half_pos(&leaf_key, &next_page, &after_key)))
goto err;
new_leaf_length= (uint) (half_pos - next_page.buff);
memcpy(leaf_buff, next_page.buff, (size_t) new_leaf_length);
leaf_page->size= new_leaf_length;
leaf_page->flag= page_flag;
page_store_info(share, leaf_page);
/* Correct new keypointer to leaf_page */
half_pos=after_key;
_ma_kpointer(info,
leaf_key.data + leaf_key.data_length + leaf_key.ref_length,
next_page.pos);
/* Save key in anc_page */
prev_key= (first_key ? (uchar*) 0 : anc_key.data);
t_length= (*keyinfo->pack_key)(&leaf_key, key_reflength,
(keypos == anc_end_pos ? (uchar*) 0 :
keypos),
prev_key, prev_key, &anc_key_inserted);
if (t_length >= 0)
bmove_upp(anc_end_pos+t_length, anc_end_pos,
(uint) (anc_end_pos - keypos));
else
bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
(*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
new_anc_length+= t_length;
anc_page->size= new_anc_length;
page_store_size(share, anc_page);
if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
SEARCH_PAGE_KEY_HAS_TRANSID))
_ma_mark_page_with_transid(share, anc_page);
/* Store key first in new page */
if (nod_flag)
bmove(next_page.buff + share->keypage_header, half_pos-nod_flag,
(size_t) nod_flag);
if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
goto err;
t_length=(int) (*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
(uchar*) 0, (uchar*) 0,
&key_inserted);
/* t_length will always be > 0 for a new page !*/
tmp_length= (size_t) ((next_page.buff + buff_length) - half_pos);
bmove(next_page.buff + p_length + t_length, half_pos, tmp_length);
(*keyinfo->store_key)(keyinfo, next_page.buff + p_length, &key_inserted);
new_buff_length= tmp_length + t_length + p_length;
next_page.size= new_buff_length;
page_store_size(share, &next_page);
/* keypage flag is already up to date */
if (share->now_transactional)
{
/*
Log changes to parent page
This has one key deleted from it and one key inserted to it at
keypos
ma_log_add ensures that we don't log changes that is outside of
key block size, as the REDO code can't handle that
*/
if (_ma_log_add(anc_page, anc_length, keypos,
anc_key_inserted.move_length +
MY_MAX(anc_key_inserted.changed_length -
anc_key_inserted.move_length,
key_deleted.changed_length),
anc_key_inserted.move_length -
key_deleted.move_length, 1,
KEY_OP_DEBUG_LOG_ADD_3))
goto err;
/*
Log changes to leaf page.
This contains original data with new data added at end
*/
DBUG_ASSERT(leaf_length <= new_leaf_length);
if (_ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
goto err;
/*
Log changes to next page
This contains original data with some prefix data deleted and
some compressed data at start possible extended
Data in buff was originally:
org_leaf_buff [leaf_length]
separator_key [buff_key_inserted.move_length]
next_key_changes [buff_key_inserted.changed_length -move_length]
next_page_data [next_buff_length - p_length -
(buff_key_inserted.changed_length -move_length)]
After changes it's now:
unpacked_key [key_inserted.changed_length]
next_suffix [next_buff_length - key_inserted.changed_length]
*/
DBUG_ASSERT(new_buff_length <= next_buff_length);
if (_ma_log_prefix(&next_page, key_inserted.changed_length,
(int) (new_buff_length - next_buff_length),
KEY_OP_DEBUG_LOG_PREFIX_1))
goto err;
}
page_mark_changed(info, &next_page);
if (_ma_write_keypage(&next_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
goto err;
}
page_mark_changed(info, leaf_page);
if (_ma_write_keypage(leaf_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
goto err;
DBUG_RETURN(new_anc_length <=
((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
(uint) keyinfo->underflow_block_length)));
}
DBUG_PRINT("test",("use left page"));
keypos= _ma_get_last_key(&anc_key, anc_page, keypos);
if (!keypos)
goto err;
next_page.pos= _ma_kpos(key_reflength,keypos);
if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
goto err;
buff_length= next_page.size;
endpos= next_page.buff + buff_length;
DBUG_DUMP("prev", next_page.buff, next_page.size);
/* find keys to make a big key-page */
bmove(next_keypos - key_reflength, leaf_buff + share->keypage_header,
key_reflength);
next_keypos=keypos;
if (!(*keyinfo->get_key)(&anc_key, anc_page_flag, key_reflength,
&next_keypos))
goto err;
if (!_ma_get_last_key(&leaf_key, &next_page, endpos))
goto err;
/* merge pages and put parting key from anc_page between */
prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
t_length=(*keyinfo->pack_key)(&anc_key, nod_flag,
(leaf_length == p_length ?
(uchar*) 0 : leaf_buff+p_length),
prev_key, prev_key,
&key_inserted);
if (t_length >= 0)
bmove(endpos+t_length, leaf_buff+p_length,
(size_t) (leaf_length-p_length));
else /* We gained space */
bmove(endpos,leaf_buff+((int) p_length-t_length),
(size_t) (leaf_length-p_length+t_length));
(*keyinfo->store_key)(keyinfo,endpos, &key_inserted);
/* Remember for logging how many bytes of leaf_buff that are not changed */
DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
unchanged_leaf_length= (leaf_length - p_length -
(key_inserted.changed_length -
key_inserted.move_length));
new_buff_length= buff_length + leaf_length - p_length + t_length;
#ifdef EXTRA_DEBUG
/* Ensure that unchanged_leaf_length is correct */
DBUG_ASSERT(bcmp(next_page.buff + new_buff_length - unchanged_leaf_length,
leaf_buff + leaf_length - unchanged_leaf_length,
unchanged_leaf_length) == 0);
#endif
page_flag= next_page.flag | leaf_page->flag;
if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
SEARCH_PAGE_KEY_HAS_TRANSID))
page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
next_page.size= new_buff_length;
next_page.flag= page_flag;
page_store_info(share, &next_page);
/* remove key from anc_page */
if (!(s_length= remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
anc_key_buff,
anc_buff+anc_length, (my_off_t *) 0,
&key_deleted)))
goto err;
new_anc_length= anc_length - s_length;
anc_page->size= new_anc_length;
page_store_size(share, anc_page);
if (new_buff_length <= share->max_index_block_size)
{
/* All keys fitted into one page */
page_mark_changed(info, leaf_page);
if (_ma_dispose(info, leaf_page->pos, 0))
goto err;
if (share->now_transactional)
{
/*
Log changes to parent page. Note that this page may have been
temporarily bigger than block_size.
*/
if (_ma_log_delete(anc_page, key_deleted.key_pos,
key_deleted.changed_length, key_deleted.move_length,
anc_length - anc_page->org_size,
KEY_OP_DEBUG_LOG_DEL_CHANGE_3))
goto err;
/*
Log changes to next page. Data for leaf page is in buff
that contains original leaf_buff, parting key and next_buff
*/
if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
goto err;
}
}
else
{
/*
Balancing didn't free a page, so we have to split 'next_page' into two
pages
- Find key in middle of buffer (buff)
- Pack key at half_buff into anc_page at position of deleted key
Note that anc_page may overflow! (is handled by caller)
- Move everything after middlekey to 'leaf_buff'
- Shorten buff at 'endpos'
*/
MARIA_KEY_PARAM anc_key_inserted;
size_t tmp_length;
if (keypos == anc_buff + share->keypage_header + key_reflength)
anc_pos= 0; /* First key */
else
{
if (!_ma_get_last_key(&anc_key, anc_page, keypos))
goto err;
anc_pos= anc_key.data;
}
if (!(endpos= _ma_find_half_pos(&leaf_key, &next_page, &half_pos)))
goto err;
/* Correct new keypointer to leaf_page */
_ma_kpointer(info,leaf_key.data + leaf_key.data_length +
leaf_key.ref_length, leaf_page->pos);
/* Save parting key found by _ma_find_half_pos() in anc_page */
DBUG_DUMP("anc_buff", anc_buff, new_anc_length);
DBUG_DUMP_KEY("key_to_anc", &leaf_key);
anc_end_pos= anc_buff + new_anc_length;
t_length=(*keyinfo->pack_key)(&leaf_key, key_reflength,
keypos == anc_end_pos ? (uchar*) 0
: keypos,
anc_pos, anc_pos,
&anc_key_inserted);
if (t_length >= 0)
bmove_upp(anc_end_pos+t_length, anc_end_pos,
(uint) (anc_end_pos-keypos));
else
bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
(*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
new_anc_length+= t_length;
anc_page->size= new_anc_length;
page_store_size(share, anc_page);
if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
SEARCH_PAGE_KEY_HAS_TRANSID))
_ma_mark_page_with_transid(share, anc_page);
/* Store first key on new page */
if (nod_flag)
bmove(leaf_buff + share->keypage_header, half_pos-nod_flag,
(size_t) nod_flag);
if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
goto err;
DBUG_DUMP_KEY("key_to_leaf", &leaf_key);
t_length=(*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
(uchar*) 0, (uchar*) 0, &key_inserted);
/* t_length will always be > 0 for a new page !*/
tmp_length= (size_t) ((next_page.buff + new_buff_length) - half_pos);
DBUG_PRINT("info",("t_length: %d length: %d",t_length, (int) tmp_length));
bmove(leaf_buff+p_length+t_length, half_pos, tmp_length);
(*keyinfo->store_key)(keyinfo,leaf_buff+p_length, &key_inserted);
new_leaf_length= tmp_length + t_length + p_length;
DBUG_ASSERT(new_leaf_length <= share->max_index_block_size);
leaf_page->size= new_leaf_length;
leaf_page->flag= page_flag;
page_store_info(share, leaf_page);
new_buff_length= (uint) (endpos - next_page.buff);
next_page.size= new_buff_length;
page_store_size(share, &next_page);
if (share->now_transactional)
{
/*
Log changes to parent page
This has one key deleted from it and one key inserted to it at
keypos
ma_log_add() ensures that we don't log changes that is outside of
key block size, as the REDO code can't handle that
*/
if (_ma_log_add(anc_page, anc_length, keypos,
anc_key_inserted.move_length +
MY_MAX(anc_key_inserted.changed_length -
anc_key_inserted.move_length,
key_deleted.changed_length),
anc_key_inserted.move_length -
key_deleted.move_length, 1,KEY_OP_DEBUG_LOG_ADD_4))
goto err;
/*
Log changes to leaf page.
This contains original data with new data added first
*/
DBUG_ASSERT(leaf_length <= new_leaf_length);
DBUG_ASSERT(new_leaf_length >= unchanged_leaf_length);
if (_ma_log_prefix(leaf_page, new_leaf_length - unchanged_leaf_length,
(int) (new_leaf_length - leaf_length),
KEY_OP_DEBUG_LOG_PREFIX_2))
goto err;
/*
Log changes to next page
This contains original data with some suffix data deleted
*/
DBUG_ASSERT(new_buff_length <= buff_length);
if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
goto err;
}
page_mark_changed(info, leaf_page);
if (_ma_write_keypage(leaf_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
goto err;
}
page_mark_changed(info, &next_page);
if (_ma_write_keypage(&next_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
goto err;
DBUG_RETURN(new_anc_length <=
((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
(uint) keyinfo->underflow_block_length)));
err:
DBUG_RETURN(-1);
} /* underflow */
/**
@brief Remove a key from page
@fn remove_key()
keyinfo Key handle
nod_flag Length of node ptr
keypos Where on page key starts
lastkey Buffer for storing keys to be removed
page_end Pointer to end of page
next_block If <> 0 and node-page, this is set to address of
next page
s_temp Information about what changes was done one the page:
s_temp.key_pos Start of key
s_temp.move_length Number of bytes removed at keypos
s_temp.changed_length Number of bytes changed at keypos
@todo
The current code doesn't handle the case that the next key may be
packed better against the previous key if there is a case difference
@return
@retval 0 error
@retval # How many chars was removed
*/
static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
uchar *keypos, uchar *lastkey,
uchar *page_end, my_off_t *next_block,
MARIA_KEY_PARAM *s_temp)
{
int s_length;
uchar *start;
DBUG_ENTER("remove_key");
DBUG_PRINT("enter", ("keypos:%p page_end: %p",
keypos, page_end));
start= s_temp->key_pos= keypos;
s_temp->changed_length= 0;
if (!(keyinfo->flag &
(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
HA_BINARY_PACK_KEY)) &&
!(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
{
/* Static length key */
s_length=(int) (keyinfo->keylength+nod_flag);
if (next_block && nod_flag)
*next_block= _ma_kpos(nod_flag,keypos+s_length);
}
else
{
/* Let keypos point at next key */
MARIA_KEY key;
/* Calculate length of key */
key.keyinfo= keyinfo;
key.data= lastkey;
if (!(*keyinfo->get_key)(&key, page_flag, nod_flag, &keypos))
DBUG_RETURN(0); /* Error */
if (next_block && nod_flag)
*next_block= _ma_kpos(nod_flag,keypos);
s_length=(int) (keypos-start);
if (keypos != page_end)
{
if (keyinfo->flag & HA_BINARY_PACK_KEY)
{
uchar *old_key= start;
uint next_length,prev_length,prev_pack_length;
/* keypos points here on start of next key */
get_key_length(next_length,keypos);
get_key_pack_length(prev_length,prev_pack_length,old_key);
if (next_length > prev_length)
{
uint diff= (next_length-prev_length);
/* We have to copy data from the current key to the next key */
keypos-= diff + prev_pack_length;
store_key_length(keypos, prev_length);
bmove(keypos + prev_pack_length, lastkey + prev_length, diff);
s_length=(int) (keypos-start);
s_temp->changed_length= diff + prev_pack_length;
}
}
else
{
/* Check if a variable length first key part */
if ((keyinfo->seg->flag & HA_PACK_KEY) && *keypos & 128)
{
/* Next key is packed against the current one */
uint next_length,prev_length,prev_pack_length,lastkey_length,
rest_length;
if (keyinfo->seg[0].length >= 127)
{
if (!(prev_length=mi_uint2korr(start) & 32767))
goto end;
next_length=mi_uint2korr(keypos) & 32767;
keypos+=2;
prev_pack_length=2;
}
else
{
if (!(prev_length= *start & 127))
goto end; /* Same key as previous*/
next_length= *keypos & 127;
keypos++;
prev_pack_length=1;
}
if (!(*start & 128))
prev_length=0; /* prev key not packed */
if (keyinfo->seg[0].flag & HA_NULL_PART)
lastkey++; /* Skip null marker */
get_key_length(lastkey_length,lastkey);
if (!next_length) /* Same key after */
{
next_length=lastkey_length;
rest_length=0;
}
else
get_key_length(rest_length,keypos);
if (next_length >= prev_length)
{
/* Next key is based on deleted key */
uint pack_length;
uint diff= (next_length-prev_length);
/* keypos points to data of next key (after key length) */
bmove(keypos - diff, lastkey + prev_length, diff);
rest_length+= diff;
pack_length= prev_length ? get_pack_length(rest_length): 0;
keypos-= diff + pack_length + prev_pack_length;
s_length=(int) (keypos-start);
if (prev_length) /* Pack against prev key */
{
*keypos++= start[0];
if (prev_pack_length == 2)
*keypos++= start[1];
store_key_length(keypos,rest_length);
}
else
{
/* Next key is not packed anymore */
if (keyinfo->seg[0].flag & HA_NULL_PART)
{
rest_length++; /* Mark not null */
}
if (prev_pack_length == 2)
{
mi_int2store(keypos,rest_length);
}
else
*keypos= rest_length;
}
s_temp->changed_length= diff + pack_length + prev_pack_length;
}
}
}
}
}
end:
bmove(start, start+s_length, (uint) (page_end-start-s_length));
s_temp->move_length= s_length;
DBUG_RETURN((uint) s_length);
} /* remove_key */
/****************************************************************************
Logging of redos
****************************************************************************/
/**
@brief
log entry where some parts are deleted and some things are changed
and some data could be added last.
@fn _ma_log_delete()
@param info Maria handler
@param page Pageaddress for changed page
@param buff Page buffer
@param key_pos Start of change area
@param changed_length How many bytes where changed at key_pos
@param move_length How many bytes where deleted at key_pos
@param append_length Length of data added last
This is taken from end of ma_page->buff
This is mainly used when a key is deleted. The append happens
when we delete a key from a page with data > block_size kept in
memory and we have to add back the data that was stored > block_size
*/
my_bool _ma_log_delete(MARIA_PAGE *ma_page, const uchar *key_pos,
uint changed_length, uint move_length,
uint append_length __attribute__((unused)),
enum en_key_debug debug_marker __attribute__((unused)))
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 5+ 2 + 3 + 3 + 6 + 3 + 7];
uchar *log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 7];
uint translog_parts, current_size, extra_length;
uint offset= (uint) (key_pos - ma_page->buff);
MARIA_HA *info= ma_page->info;
MARIA_SHARE *share= info->s;
my_off_t page= ma_page->pos / share->block_size;
DBUG_ENTER("_ma_log_delete");
DBUG_PRINT("enter", ("page: %lu offset: %u changed_length: %u move_length: %u append_length: %u page_size: %u",
(ulong) page, offset, changed_length, move_length,
append_length, ma_page->size));
DBUG_ASSERT(share->now_transactional && move_length);
DBUG_ASSERT(offset + changed_length <= ma_page->size);
DBUG_ASSERT(ma_page->org_size - move_length + append_length == ma_page->size);
DBUG_ASSERT(move_length <= ma_page->org_size - share->keypage_header);
/* Store address of new root page */
page_store(log_data + FILEID_STORE_SIZE, page);
log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
current_size= ma_page->org_size;
#ifdef EXTRA_DEBUG_KEY_CHANGES
*log_pos++= KEY_OP_DEBUG;
*log_pos++= debug_marker;
*log_pos++= KEY_OP_DEBUG_2;
int2store(log_pos, ma_page->org_size);
int2store(log_pos+2, ma_page->size);
log_pos+=4;
#endif
/* Store keypage_flag */
*log_pos++= KEY_OP_SET_PAGEFLAG;
*log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
log_pos[0]= KEY_OP_OFFSET;
int2store(log_pos+1, offset);
log_pos+= 3;
translog_parts= TRANSLOG_INTERNAL_PARTS + 1;
extra_length= 0;
if (changed_length)
{
if (offset + changed_length >= share->max_index_block_size)
{
changed_length= share->max_index_block_size - offset;
move_length= 0; /* Nothing to move */
current_size= share->max_index_block_size;
}
log_pos[0]= KEY_OP_CHANGE;
int2store(log_pos+1, changed_length);
log_pos+= 3;
log_array[translog_parts].str= ma_page->buff + offset;
log_array[translog_parts].length= changed_length;
translog_parts++;
/* We only have to move things after offset+changed_length */
offset+= changed_length;
}
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
if (move_length)
{
uint log_length;
if (offset + move_length < share->max_index_block_size)
{
/*
Move down things that is on page.
page_offset in apply_redo_inxed() will be at original offset
+ changed_length.
*/
log_pos[0]= KEY_OP_SHIFT;
int2store(log_pos+1, - (int) move_length);
log_length= 3;
current_size-= move_length;
}
else
{
/* Delete to end of page */
uint tmp= current_size - offset;
current_size= offset;
log_pos[0]= KEY_OP_DEL_SUFFIX;
int2store(log_pos+1, tmp);
log_length= 3;
}
log_array[translog_parts].str= log_pos;
log_array[translog_parts].length= log_length;
translog_parts++;
log_pos+= log_length;
extra_length+= log_length;
}
if (current_size != ma_page->size &&
current_size != share->max_index_block_size)
{
/* Append data that didn't fit on the page before */
uint length= (MY_MIN(ma_page->size, share->max_index_block_size) -
current_size);
uchar *data= ma_page->buff + current_size;
DBUG_ASSERT(length <= append_length);
log_pos[0]= KEY_OP_ADD_SUFFIX;
int2store(log_pos+1, length);
log_array[translog_parts].str= log_pos;
log_array[translog_parts].length= 3;
log_array[translog_parts + 1].str= data;
log_array[translog_parts + 1].length= length;
log_pos+= 3;
translog_parts+= 2;
current_size+= length;
extra_length+= 3 + length;
}
_ma_log_key_changes(ma_page,
log_array + translog_parts,
log_pos, &extra_length, &translog_parts);
/* Remember new page length for future log entires for same page */
ma_page->org_size= current_size;
if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
(translog_size_t)
log_array[TRANSLOG_INTERNAL_PARTS].length +
changed_length + extra_length, translog_parts,
log_array, log_data, NULL))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
/****************************************************************************
Logging of undos
****************************************************************************/
my_bool _ma_write_undo_key_delete(MARIA_HA *info, const MARIA_KEY *key,
my_off_t new_root, LSN *res_lsn)
{
MARIA_SHARE *share= info->s;
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
struct st_msg_to_write_hook_for_undo_key msg;
enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
uint keynr= key->keyinfo->key_nr;
lsn_store(log_data, info->trn->undo_lsn);
key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;
/**
@todo BUG if we had concurrent insert/deletes, reading state's key_root
like this would be unsafe.
*/
if (new_root != share->state.key_root[keynr])
{
my_off_t page;
page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
new_root / share->block_size);
page_store(log_pos, page);
log_pos+= PAGE_STORE_SIZE;
log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
}
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key->data;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (key->data_length +
key->ref_length);
msg.root= &share->state.key_root[keynr];
msg.value= new_root;
/*
set autoincrement to 1 if this is an auto_increment key
This is only used if we are now in a rollback of a duplicate key
*/
msg.auto_increment= share->base.auto_key == keynr + 1;
return translog_write_record(res_lsn, log_type,
info->trn, info,
(translog_size_t)
(log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
log_array[TRANSLOG_INTERNAL_PARTS + 1].length),
TRANSLOG_INTERNAL_PARTS + 2, log_array,
log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
}