mariadb/storage/maria/ma_delete.c
unknown ebf7ab7bce Added error HA_ERR_FILE_TOO_SHORT to be used when files are shorter than expected (by my_read/my_pread)
Added debugger hook _my_dbug_put_break_here() that is called if we get a CRC that matches --debug-crc-break (my_crc_dbug_break)
Fixed REDO_REPAIR to use all repair modes (repair, repair_by_sort, repair_paralell
REDO_REPAIR now also logs used key map
Fixed some bugs in REDO logging of key pages
Better error messages from maria_read_log
Added my_readwrite_flags to init_pagecache() to be able to get better error messages and simplify code.
Don't allow pagecaches with less than 8 blocks (Causes strange crashes)
Added EXTRA_DEBUG_KEY_CHANGES. When this is defined some REDO_INDEX entries contains page checksums (these are calculated and checked in DBUG mode, ignored otherwise)
Fixed bug in ma_pagecache unit tests that caused program to sometimes fail
Added some missing calls to MY_INIT() that caused some unit tests to fail
Fixed that TRUNCATE works properly on temporary MyISAM files
Updates some result files to new table checksums results (checksum when NULL fields are ignored)
perl test-insert can be replayed with maria_read_log!


sql/share/Makefile.am:
  Change mode to -rw-rw-r--
BitKeeper/etc/ignore:
  added storage/maria/unittest/page_cache_test_file_1 storage/maria/unittest/pagecache_debug.log
include/maria.h:
  Added maria_tmpdir
include/my_base.h:
  Added error HA_ERR_FILE_TOO_SHORT
include/my_sys.h:
  Added variable my_crc_dbug_check
  Added function my_dbug_put_break_here()
include/myisamchk.h:
  Added org_key_map (Needed for writing REDO record for REPAIR)
mysql-test/r/innodb.result:
  Updated to new checksum algorithm (NULL ignored)
mysql-test/r/mix2_myisam.result:
  Updated to new checksum algorithm (NULL ignored)
mysql-test/r/myisam.result:
  Updated to new checksum algorithm (NULL ignored)
mysql-test/t/myisam.test:
  Added used table
mysys/checksum.c:
  Added DBUG for checksum results
  Added debugger hook so that _my_dbug_put_break_here() is called if we get matching CRC
mysys/lf_alloc-pin.c:
  Fixed compiler warning
mysys/my_handler.c:
  Added new error message
mysys/my_init.c:
  If my_progname is not given, use 'unknown' form my_progname_short
  Added debugger function my_debug_put_break_here()
mysys/my_pread.c:
  In case of too short file when MY_NABP or MY_FNABP is specified, give error HA_ERR_FILE_TO_SHORT
mysys/my_read.c:
  In case of too short file when MY_NABP or MY_FNABP is specified, give error HA_ERR_FILE_TO_SHORT
sql/mysqld.cc:
  Added debug option --debug-crc-break
sql/sql_parse.cc:
  Trivial optimization
storage/maria/ha_maria.cc:
  Renamed variable to be more logical
  Ensure that param.testflag is correct when calling repair
  Added extra argument to init_pagecache
  Set default value for maria_tempdir
storage/maria/ma_blockrec.c:
  Test for HA_ERR_FILE_TOO_SHORT instead for -1
storage/maria/ma_cache.c:
  Test for HA_ERR_FILE_TOO_SHORT instead for -1
storage/maria/ma_check.c:
  Set param->testflag to match how repair is run (needed for REDO logging)
  Simple optimization
  Moved flag if page is node from pagelength to keypage-flag byte
  Log used key map in REDO log.
storage/maria/ma_delete.c:
  Remember previous UNDO entry when writing undo (for future CLR records)
  Moved flag if page is node from pagelength to keypage-flag byte
  Fixed some bugs in redo logging
  Added CRC for some translog REDO_INDEX entries
storage/maria/ma_dynrec.c:
  Test for HA_ERR_FILE_TOO_SHORT instead for -1
storage/maria/ma_ft_update.c:
  Fixed call to _ma_store_page_used()
storage/maria/ma_key_recover.c:
  Added CRC for some translog REDO_INDEX entries
  Removed not needed pagecache_write() in _ma_apply_redo_index()
storage/maria/ma_locking.c:
  Test for HA_ERR_FILE_TOO_SHORT instead for -1
storage/maria/ma_loghandler.c:
  Added used key map to REDO_REPAIR_TABLE
storage/maria/ma_loghandler.h:
  Added operation for checksum of key pages
storage/maria/ma_open.c:
  Allocate storage for undo lsn pointers
storage/maria/ma_pagecache.c:
  Remove not needed include file
  Change logging to use fd: for file descritors as other code
  Added my_readwrite_flags to init_pagecache() to be able to get better error messages for maria_chk/maria_read_log
  Don't allow pagecaches with less than 8 blocks
  Remove wrong DBUG_ASSERT()
storage/maria/ma_pagecache.h:
  Added readwrite_flags
storage/maria/ma_recovery.c:
  Better error messages for maria_read_log:
  - Added eprint() for printing error messages
  - Print extra \n before error message if we are printing %0 %10 ...
  
  Added used key_map to REDO_REPAIR log entry
  More DBUG
  Call same repair method that was used by mysqld
storage/maria/ma_rt_index.c:
  Moved flag if page is node from pagelength to keypage-flag byte
storage/maria/ma_rt_key.c:
  Fixed call to _ma_store_page_used()
storage/maria/ma_rt_split.c:
  Moved flag if page is node from pagelength to keypage-flag byte
storage/maria/ma_static.c:
  Added maria_tmpdir
storage/maria/ma_test1.c:
  Updated call to init_pagecache()
storage/maria/ma_test2.c:
  Updated call to init_pagecache()
storage/maria/ma_test3.c:
  Updated call to init_pagecache()
storage/maria/ma_write.c:
  Removed #ifdef NOT_YET
  Moved flag if page is node from pagelength to keypage-flag byte
  Fixed bug in  _ma_log_del_prefix()
storage/maria/maria_chk.c:
  Fixed wrong min limit for page_buffer_size
  Updated call to init_pagecache()
storage/maria/maria_def.h:
  Added EXTRA_DEBUG_KEY_CHANGES. When this is defined some REDO_INDEX entries contains page checksums
  Moved flag if page is node from pagelength to keypage-flag byte
storage/maria/maria_ftdump.c:
  Updated call to init_pagecache()
storage/maria/maria_pack.c:
  Updated call to init_pagecache()
  Reset share->state.create_rename_lsn & share->state.is_of_horizon
storage/maria/maria_read_log.c:
  Better error messages
  Added --tmpdir option (needed to set temporary directory for REDO_REPAIR)
  Added --start-from-lsn
  Changed option for --display-only to 'd' (wanted to use -o for 'offset')
storage/maria/unittest/lockman2-t.c:
  Added missing call to MY_INIT()
storage/maria/unittest/ma_pagecache_consist.c:
  Updated call to init_pagecache()
storage/maria/unittest/ma_pagecache_single.c:
  Fixed bug that caused program to sometimes fail
  Added some DBUG_ASSERTS()
  Changed some calls to malloc()/free() to my_malloc()/my_free()
  Create extra file to expose original hard-to-find bug
storage/maria/unittest/ma_test_loghandler-t.c:
  Updated call to init_pagecache()
storage/maria/unittest/ma_test_loghandler_first_lsn-t.c:
  Updated call to init_pagecache()
storage/maria/unittest/ma_test_loghandler_max_lsn-t.c:
  Updated call to init_pagecache()
storage/maria/unittest/ma_test_loghandler_multigroup-t.c:
  Updated call to init_pagecache()
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
  Updated call to init_pagecache()
storage/maria/unittest/ma_test_loghandler_noflush-t.c:
  Updated call to init_pagecache()
storage/maria/unittest/ma_test_loghandler_pagecache-t.c:
  Updated call to init_pagecache()
storage/maria/unittest/ma_test_loghandler_purge-t.c:
  Updated call to init_pagecache()
storage/maria/unittest/test_file.c:
  Changed malloc()/free() to my_malloc()/my_free()
  Fixed memory leak
  Changd logic a bit while trying to find bug in reset_file()
storage/maria/unittest/trnman-t.c:
  Added missing call to MY_INIT()
storage/myisam/mi_cache.c:
  Test for HA_ERR_FILE_TOO_SHORT instead for -1
storage/myisam/mi_create.c:
  Removed O_EXCL to get TRUNCATE to work for temporary files
storage/myisam/mi_dynrec.c:
  Test for HA_ERR_FILE_TOO_SHORT instead for -1
storage/myisam/mi_locking.c:
  Test for HA_ERR_FILE_TOO_SHORT instead for -1
mysql-test/r/old-mode.result:
  New BitKeeper file ``mysql-test/r/old-mode.result''
mysql-test/t/old-mode-master.opt:
  New BitKeeper file ``mysql-test/t/old-mode-master.opt''
mysql-test/t/old-mode.test:
  New BitKeeper file ``mysql-test/t/old-mode.test''
2007-12-04 23:23:42 +02:00

1384 lines
48 KiB
C

/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "ma_fulltext.h"
#include "ma_rt_index.h"
#include "trnman.h"
#include "ma_key_recover.h"
static int d_search(MARIA_HA *info,MARIA_KEYDEF *keyinfo,uint comp_flag,
uchar *key, uint key_length,
my_off_t page, uchar *anc_buff,
MARIA_PINNED_PAGE *anc_page_link);
static int del(MARIA_HA *info,MARIA_KEYDEF *keyinfo, uchar *key,
my_off_t anc_page, uchar *anc_buff, my_off_t leaf_page,
uchar *leaf_buff, MARIA_PINNED_PAGE *leaf_page_link,
uchar *keypos, my_off_t next_block, uchar *ret_key);
static int underflow(MARIA_HA *info,MARIA_KEYDEF *keyinfo,
my_off_t anc_page, uchar *anc_buff,
my_off_t leaf_page, uchar *leaf_buff,
MARIA_PINNED_PAGE *leaf_page_link, uchar *keypos);
static uint remove_key(MARIA_KEYDEF *keyinfo,uint nod_flag,uchar *keypos,
uchar *lastkey,uchar *page_end,
my_off_t *next_block, MARIA_KEY_PARAM *s_temp);
static my_bool _ma_log_delete(MARIA_HA *info, my_off_t page, uchar *buff,
uchar *key_pos, uint changed_length,
uint move_length);
/* @breif Remove a row from a MARIA table */
int maria_delete(MARIA_HA *info,const uchar *record)
{
uint i;
uchar *old_key;
int save_errno;
char lastpos[8];
MARIA_SHARE *share=info->s;
DBUG_ENTER("maria_delete");
/* Test if record is in datafile */
DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
maria_print_error(share, HA_ERR_CRASHED);
DBUG_RETURN(my_errno= HA_ERR_CRASHED););
DBUG_EXECUTE_IF("my_error_test_undefined_error",
maria_print_error(share, INT_MAX);
DBUG_RETURN(my_errno= INT_MAX););
if (!(info->update & HA_STATE_AKTIV))
{
DBUG_RETURN(my_errno=HA_ERR_KEY_NOT_FOUND); /* No database read */
}
if (share->options & HA_OPTION_READ_ONLY_DATA)
{
DBUG_RETURN(my_errno=EACCES);
}
if (_ma_readinfo(info,F_WRLCK,1))
DBUG_RETURN(my_errno);
if ((*share->compare_record)(info,record))
goto err; /* Error on read-check */
if (_ma_mark_file_changed(info))
goto err;
/* Remove all keys from the index file */
old_key= info->lastkey2;
for (i=0 ; i < share->base.keys ; i++ )
{
if (maria_is_key_active(share->state.key_map, i))
{
share->keyinfo[i].version++;
if (share->keyinfo[i].flag & HA_FULLTEXT )
{
if (_ma_ft_del(info, i, old_key, record, info->cur_row.lastpos))
goto err;
}
else
{
if (share->keyinfo[i].ck_delete(info,i,old_key,
_ma_make_key(info, i, old_key,
record,
info->cur_row.lastpos)))
goto err;
}
/* The above changed info->lastkey2. Inform maria_rnext_same(). */
info->update&= ~HA_STATE_RNEXT_SAME;
}
}
if (share->calc_checksum)
{
/*
We can't use the row based checksum as this doesn't have enough
precision.
*/
info->cur_row.checksum= (*share->calc_checksum)(info, record);
}
if ((*share->delete_record)(info, record))
goto err; /* Remove record from database */
info->state->checksum+= - !share->now_transactional *
info->cur_row.checksum;
info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
info->state->records-= !share->now_transactional;
share->state.changed|= STATE_NOT_OPTIMIZED_ROWS;
mi_sizestore(lastpos, info->cur_row.lastpos);
VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
allow_break(); /* Allow SIGHUP & SIGINT */
if (info->invalidator != 0)
{
DBUG_PRINT("info", ("invalidator... '%s' (delete)", share->open_file_name));
(*info->invalidator)(share->open_file_name);
info->invalidator=0;
}
DBUG_RETURN(0);
err:
save_errno=my_errno;
mi_sizestore(lastpos, info->cur_row.lastpos);
if (save_errno != HA_ERR_RECORD_CHANGED)
{
maria_print_error(share, HA_ERR_CRASHED);
maria_mark_crashed(info); /* mark table crashed */
}
VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
info->update|=HA_STATE_WRITTEN; /* Buffer changed */
allow_break(); /* Allow SIGHUP & SIGINT */
my_errno=save_errno;
if (save_errno == HA_ERR_KEY_NOT_FOUND)
{
maria_print_error(share, HA_ERR_CRASHED);
my_errno=HA_ERR_CRASHED;
}
DBUG_RETURN(my_errno);
} /* maria_delete */
/* Remove a key from the btree index */
int _ma_ck_delete(register MARIA_HA *info, uint keynr, uchar *key,
uint key_length)
{
int res;
LSN lsn= LSN_IMPOSSIBLE;
my_off_t new_root= info->s->state.key_root[keynr];
uchar key_buff[HA_MAX_KEY_BUFF];
DBUG_ENTER("_ma_ck_delete");
if (info->s->now_transactional)
{
/* Save original value as the key may change */
memcpy(key_buff, key, key_length + info->s->rec_reflength);
}
res= _ma_ck_real_delete(info, info->s->keyinfo+keynr, key, key_length,
&new_root);
if (!res && info->s->now_transactional)
{
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
struct st_msg_to_write_hook_for_undo_key msg;
enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
info->key_delete_undo_lsn[keynr]= info->trn->undo_lsn;
lsn_store(log_data, info->trn->undo_lsn);
key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;
if (new_root != info->s->state.key_root[keynr])
{
my_off_t page;
page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
new_root / info->s->block_size);
page_store(log_pos, page);
log_pos+= PAGE_STORE_SIZE;
log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
}
key_length+= info->s->rec_reflength;
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (char*) key_buff;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
msg.root= &info->s->state.key_root[keynr];
msg.value= new_root;
if (translog_write_record(&lsn, log_type,
info->trn, info,
log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
key_length,
TRANSLOG_INTERNAL_PARTS + 2, log_array,
log_data + LSN_STORE_SIZE, &msg))
res= -1;
}
else
{
info->s->state.key_root[keynr]= new_root;
_ma_fast_unlock_key_del(info);
}
_ma_unpin_all_pages_and_finalize_row(info, lsn);
DBUG_RETURN(res);
} /* _ma_ck_delete */
int _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEYDEF *keyinfo,
uchar *key, uint key_length, my_off_t *root)
{
int error;
uint nod_flag;
my_off_t old_root;
uchar *root_buff;
MARIA_PINNED_PAGE *page_link;
DBUG_ENTER("_ma_ck_real_delete");
if ((old_root=*root) == HA_OFFSET_ERROR)
{
maria_print_error(info->s, HA_ERR_CRASHED);
DBUG_RETURN(my_errno=HA_ERR_CRASHED);
}
if (!(root_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
HA_MAX_KEY_BUFF*2)))
{
DBUG_PRINT("error",("Couldn't allocate memory"));
DBUG_RETURN(my_errno=ENOMEM);
}
DBUG_PRINT("info",("root_page: %ld", (long) old_root));
if (!_ma_fetch_keypage(info, keyinfo, old_root,
PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0,
&page_link))
{
error= -1;
goto err;
}
if ((error=d_search(info,keyinfo,
(keyinfo->flag & HA_FULLTEXT ?
SEARCH_FIND | SEARCH_UPDATE : SEARCH_SAME),
key, key_length, old_root, root_buff, page_link)) >0)
{
if (error == 2)
{
DBUG_PRINT("test",("Enlarging of root when deleting"));
error= _ma_enlarge_root(info,keyinfo,key,root);
}
else /* error == 1 */
{
uint used_length;
_ma_get_used_and_nod(info, root_buff, used_length, nod_flag);
page_link->changed= 1;
if (used_length <= nod_flag + info->s->keypage_header + 1)
{
error=0;
if (nod_flag)
*root= _ma_kpos(nod_flag, root_buff +info->s->keypage_header +
nod_flag);
else
*root=HA_OFFSET_ERROR;
if (_ma_dispose(info, old_root, 0))
error= -1;
}
else
error= _ma_write_keypage(info,keyinfo, old_root,
PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS, root_buff);
}
}
err:
my_afree((uchar*) root_buff);
DBUG_PRINT("exit",("Return: %d",error));
DBUG_RETURN(error);
} /* _ma_ck_real_delete */
/**
@brief Remove key below key root
@param key Key to delete. Will contain new key if block was enlarged
@return
@retval 0 ok (anc_page is not changed)
@retval 1 If data on page is too small; In this case anc_buff is not saved
@retval 2 If data on page is too big
@retval -1 On errors
*/
static int d_search(register MARIA_HA *info, register MARIA_KEYDEF *keyinfo,
uint comp_flag, uchar *key, uint key_length,
my_off_t anc_page, uchar *anc_buff,
MARIA_PINNED_PAGE *anc_page_link)
{
int flag,ret_value,save_flag;
uint length,nod_flag,search_key_length;
my_bool last_key;
uchar *leaf_buff,*keypos;
my_off_t leaf_page,next_block;
uchar lastkey[HA_MAX_KEY_BUFF];
MARIA_PINNED_PAGE *leaf_page_link;
MARIA_KEY_PARAM s_temp;
DBUG_ENTER("d_search");
DBUG_DUMP("page",anc_buff,_ma_get_page_used(info, anc_buff));
search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
flag=(*keyinfo->bin_search)(info,keyinfo,anc_buff,key, search_key_length,
comp_flag, &keypos, lastkey, &last_key);
if (flag == MARIA_FOUND_WRONG_KEY)
{
DBUG_PRINT("error",("Found wrong key"));
DBUG_RETURN(-1);
}
nod_flag=_ma_test_if_nod(info, anc_buff);
if (!flag && keyinfo->flag & HA_FULLTEXT)
{
uint off;
int subkeys;
get_key_full_length_rdonly(off, lastkey);
subkeys=ft_sintXkorr(lastkey+off);
DBUG_ASSERT(info->ft1_to_ft2==0 || subkeys >=0);
comp_flag=SEARCH_SAME;
if (subkeys >= 0)
{
/* normal word, one-level tree structure */
if (info->ft1_to_ft2)
{
/* we're in ft1->ft2 conversion mode. Saving key data */
insert_dynamic(info->ft1_to_ft2, (lastkey+off));
}
else
{
/* we need exact match only if not in ft1->ft2 conversion mode */
flag=(*keyinfo->bin_search)(info,keyinfo,anc_buff,key,USE_WHOLE_KEY,
comp_flag, &keypos, lastkey, &last_key);
}
/* fall through to normal delete */
}
else
{
/* popular word. two-level tree. going down */
uint tmp_key_length;
my_off_t root;
uchar *kpos=keypos;
if (!(tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&kpos,
lastkey)))
{
maria_print_error(info->s, HA_ERR_CRASHED);
my_errno= HA_ERR_CRASHED;
DBUG_RETURN(-1);
}
root= _ma_dpos(info,nod_flag,kpos);
if (subkeys == -1)
{
/* the last entry in sub-tree */
if (_ma_dispose(info, root, 1))
DBUG_RETURN(-1);
/* fall through to normal delete */
}
else
{
keyinfo=&info->s->ft2_keyinfo;
/* we'll modify key entry 'in vivo' */
kpos-=keyinfo->keylength+nod_flag;
get_key_full_length_rdonly(off, key);
key+=off;
ret_value= _ma_ck_real_delete(info, &info->s->ft2_keyinfo,
key, HA_FT_WLEN, &root);
_ma_dpointer(info, kpos+HA_FT_WLEN, root);
subkeys++;
ft_intXstore(kpos, subkeys);
if (!ret_value)
{
anc_page_link->changed= 1;
ret_value= _ma_write_keypage(info, keyinfo, anc_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS, anc_buff);
}
DBUG_PRINT("exit",("Return: %d",ret_value));
DBUG_RETURN(ret_value);
}
}
}
leaf_buff=0;
LINT_INIT(leaf_page);
if (nod_flag)
{
leaf_page= _ma_kpos(nod_flag,keypos);
if (!(leaf_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
HA_MAX_KEY_BUFF*2)))
{
DBUG_PRINT("error", ("Couldn't allocate memory"));
my_errno=ENOMEM;
DBUG_RETURN(-1);
}
if (!_ma_fetch_keypage(info,keyinfo,leaf_page,
PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, leaf_buff,
0, &leaf_page_link))
goto err;
}
if (flag != 0)
{
if (!nod_flag)
{
DBUG_PRINT("error",("Didn't find key"));
maria_print_error(info->s, HA_ERR_CRASHED);
my_errno=HA_ERR_CRASHED; /* This should newer happend */
goto err;
}
save_flag=0;
ret_value=d_search(info, keyinfo, comp_flag, key, key_length,
leaf_page, leaf_buff, leaf_page_link);
}
else
{ /* Found key */
uint tmp;
uint anc_buff_length= _ma_get_page_used(info, anc_buff);
if (!(tmp= remove_key(keyinfo,nod_flag,keypos,lastkey,
anc_buff + anc_buff_length,
&next_block, &s_temp)))
goto err;
anc_page_link->changed= 1;
anc_buff_length-= tmp;
_ma_store_page_used(info, anc_buff, anc_buff_length);
/*
Log initial changes on pages
If there is an underflow, there will be more changes logged to the
page
*/
if (info->s->now_transactional &&
_ma_log_delete(info, anc_page, anc_buff, s_temp.key_pos,
s_temp.changed_length, s_temp.move_length))
DBUG_RETURN(-1);
if (!nod_flag)
{ /* On leaf page */
if (test(anc_buff_length <= (info->quick_mode ?
MARIA_MIN_KEYBLOCK_LENGTH :
(uint) keyinfo->underflow_block_length)))
{
/* Page will be written by caller if we return 1 */
DBUG_RETURN(1);
}
if (_ma_write_keypage(info, keyinfo, anc_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
anc_buff))
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
save_flag=1; /* Mark that anc_buff is changed */
ret_value= del(info, keyinfo, key, anc_page, anc_buff,
leaf_page, leaf_buff, leaf_page_link,
keypos, next_block, lastkey);
}
if (ret_value >0)
{
save_flag=1;
if (ret_value == 1)
ret_value= underflow(info, keyinfo, anc_page, anc_buff,
leaf_page, leaf_buff, leaf_page_link, keypos);
else
{ /* This happens only with packed keys */
DBUG_PRINT("test",("Enlarging of key when deleting"));
if (!_ma_get_last_key(info,keyinfo,anc_buff,lastkey,keypos,&length))
goto err;
ret_value= _ma_insert(info, keyinfo, key, anc_buff, keypos, anc_page,
lastkey, (my_off_t) 0, (uchar*) 0,
(MARIA_PINNED_PAGE*) 0, (uchar*) 0, (my_bool) 0);
}
}
if (ret_value == 0 && _ma_get_page_used(info, anc_buff) >
(uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
{
/* parent buffer got too big ; We have to split the page */
save_flag=1;
ret_value= _ma_split_page(info, keyinfo, key, anc_page, anc_buff,
(uint) (keyinfo->block_length -
KEYPAGE_CHECKSUM_SIZE),
(uchar*) 0, 0, 0, lastkey, 0) | 2;
}
if (save_flag && ret_value != 1)
{
anc_page_link->changed= 1;
ret_value|= _ma_write_keypage(info, keyinfo, anc_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS, anc_buff);
}
else
{
DBUG_DUMP("page", anc_buff, _ma_get_page_used(info, anc_buff));
}
my_afree(leaf_buff);
DBUG_PRINT("exit",("Return: %d",ret_value));
DBUG_RETURN(ret_value);
err:
my_afree(leaf_buff);
DBUG_PRINT("exit",("Error: %d",my_errno));
DBUG_RETURN (-1);
} /* d_search */
/**
@brief Remove a key that has a page-reference
@param info Maria handler
@param key Buffer for key to be inserted at upper level
@param anc_page Page address for page where deleted key was
@param anc_buff Page buffer (nod) where deleted key was
@param leaf_page Page address for nod before the deleted key
@param leaf_buff Buffer for leaf_page
@param leaf_buff_link Pinned page link for leaf_buff
@param keypos Pos to where deleted key was on anc_buff
@param next_block Page adress for nod after deleted key
@param ret_key Key before keypos in anc_buff
@notes
leaf_buff is written to disk
anc_buff is not updated on disk. Caller should do this
@return
@retval < 0 Error
@retval 0 OK
@retval 1 key contains key to upper level (from balance page)
@retval 2 key contains key to upper level (from split space)
*/
static int del(register MARIA_HA *info, MARIA_KEYDEF *keyinfo,
uchar *key, my_off_t anc_page, uchar *anc_buff,
my_off_t leaf_page, uchar *leaf_buff,
MARIA_PINNED_PAGE *leaf_page_link,
uchar *keypos, my_off_t next_block, uchar *ret_key)
{
int ret_value,length;
uint a_length, nod_flag, leaf_length, tmp;
my_off_t next_page;
uchar keybuff[HA_MAX_KEY_BUFF],*endpos,*next_buff,*key_start, *prev_key;
MARIA_SHARE *share=info->s;
MARIA_KEY_PARAM s_temp;
MARIA_PINNED_PAGE *next_page_link;
DBUG_ENTER("del");
DBUG_PRINT("enter",("leaf_page: %ld keypos: 0x%lx", (long) leaf_page,
(ulong) keypos));
_ma_get_used_and_nod(info, leaf_buff, leaf_length, nod_flag);
DBUG_DUMP("leaf_buff", leaf_buff, leaf_length);
endpos= leaf_buff + leaf_length;
if (!(key_start= _ma_get_last_key(info,keyinfo,leaf_buff,keybuff,endpos,
&tmp)))
DBUG_RETURN(-1);
if (nod_flag)
{
next_page= _ma_kpos(nod_flag,endpos);
if (!(next_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
HA_MAX_KEY_BUFF*2)))
DBUG_RETURN(-1);
if (!_ma_fetch_keypage(info, keyinfo, next_page, PAGECACHE_LOCK_WRITE,
DFLT_INIT_HITS, next_buff, 0, &next_page_link))
ret_value= -1;
else
{
DBUG_DUMP("next_page", next_buff, _ma_get_page_used(info, next_buff));
if ((ret_value= del(info, keyinfo, key, anc_page, anc_buff, next_page,
next_buff, next_page_link, keypos, next_block,
ret_key)) >0)
{
/* Get new length after key was deleted */
endpos=leaf_buff+_ma_get_page_used(info, leaf_buff);
if (ret_value == 1)
{
ret_value= underflow(info, keyinfo, leaf_page, leaf_buff, next_page,
next_buff, next_page_link, endpos);
if (ret_value == 0 &&
_ma_get_page_used(info, leaf_buff) >
(uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
{
ret_value= (_ma_split_page(info, keyinfo, key,
leaf_page, leaf_buff,
(uint) (keyinfo->block_length -
KEYPAGE_CHECKSUM_SIZE),
(uchar*) 0, 0, 0,
ret_key, 0) | 2);
}
}
else
{
DBUG_PRINT("test",("Inserting of key when deleting"));
if (!_ma_get_last_key(info,keyinfo,leaf_buff,keybuff,endpos,
&tmp))
goto err;
ret_value= _ma_insert(info, keyinfo, key, leaf_buff, endpos,
leaf_page, keybuff, (my_off_t) 0, (uchar*) 0,
(MARIA_PINNED_PAGE *) 0, (uchar*) 0, 0);
}
}
leaf_page_link->changed= 1;
if (_ma_write_keypage(info, keyinfo, leaf_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS, leaf_buff))
goto err;
}
my_afree(next_buff);
DBUG_RETURN(ret_value);
}
/* Remove last key from leaf page */
_ma_store_page_used(info, leaf_buff, key_start-leaf_buff);
if (info->s->now_transactional &&
_ma_log_suffix(info, leaf_page, leaf_buff, leaf_length,
(uint) (key_start - leaf_buff)))
goto err;
leaf_page_link->changed= 1;
if (_ma_write_keypage(info, keyinfo, leaf_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
leaf_buff))
goto err;
/* Place last key in ancestor page on deleted key position */
a_length= _ma_get_page_used(info, anc_buff);
endpos=anc_buff+a_length;
if (keypos != anc_buff+info->s->keypage_header + share->base.key_reflength &&
!_ma_get_last_key(info,keyinfo,anc_buff,ret_key,keypos,&tmp))
goto err;
prev_key= (keypos == anc_buff + info->s->keypage_header +
share->base.key_reflength ? 0 : ret_key);
length=(*keyinfo->pack_key)(keyinfo,share->base.key_reflength,
keypos == endpos ? (uchar*) 0 : keypos,
prev_key, prev_key,
keybuff,&s_temp);
if (length > 0)
bmove_upp(endpos+length,endpos,(uint) (endpos-keypos));
else
bmove(keypos,keypos-length, (int) (endpos-keypos)+length);
(*keyinfo->store_key)(keyinfo,keypos,&s_temp);
key_start= keypos;
/* Save pointer to next leaf */
if (!(*keyinfo->get_key)(keyinfo,share->base.key_reflength,&keypos,ret_key))
goto err;
_ma_kpointer(info,keypos - share->base.key_reflength,next_block);
_ma_store_page_used(info, anc_buff, a_length + length);
if (info->s->now_transactional &&
_ma_log_add(info, anc_page, anc_buff, a_length,
key_start, s_temp.changed_length, s_temp.move_length, 1))
goto err;
DBUG_RETURN(_ma_get_page_used(info, leaf_buff) <=
(info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
(uint) keyinfo->underflow_block_length));
err:
DBUG_RETURN(-1);
} /* del */
/**
@brief Balances adjacent pages if underflow occours
@fn underflow()
@param anc_buff Anchestor page data
@param leaf_page Page number of leaf page
@param leaf_buff Leaf page (page that underflowed)
@param leaf_page_link Pointer to pin information about leaf page
@param keypos Position after current key in anc_buff
@note
This function writes redo entries for all changes
Caller must save anc_buff
@return
@retval 0 ok
@retval 1 ok, but anc_buff did underflow
@retval -1 error
*/
static int underflow(register MARIA_HA *info, register MARIA_KEYDEF *keyinfo,
my_off_t anc_page, uchar *anc_buff,
my_off_t leaf_page, uchar *leaf_buff,
MARIA_PINNED_PAGE *leaf_page_link,
uchar *keypos)
{
int t_length;
uint length,anc_length,buff_length,leaf_length,p_length,s_length,nod_flag;
uint next_buff_length, new_buff_length, key_reflength, key_length;
uint unchanged_leaf_length, new_leaf_length, new_anc_length;
my_off_t next_page;
uchar anc_key[HA_MAX_KEY_BUFF],leaf_key[HA_MAX_KEY_BUFF];
uchar *buff,*endpos,*next_keypos,*anc_pos,*half_pos,*prev_key;
uchar *after_key, *anc_end_pos;
MARIA_KEY_PARAM key_deleted, key_inserted;
MARIA_SHARE *share=info->s;
MARIA_PINNED_PAGE *next_page_link;
my_bool first_key;
DBUG_ENTER("underflow");
DBUG_PRINT("enter",("leaf_page: %ld keypos: 0x%lx",(long) leaf_page,
(ulong) keypos));
DBUG_DUMP("anc_buff", anc_buff, _ma_get_page_used(info, anc_buff));
DBUG_DUMP("leaf_buff", leaf_buff, _ma_get_page_used(info, leaf_buff));
buff=info->buff;
info->keyread_buff_used=1;
next_keypos=keypos;
nod_flag= _ma_test_if_nod(info, leaf_buff);
p_length= nod_flag+info->s->keypage_header;
anc_length= _ma_get_page_used(info, anc_buff);
leaf_length= _ma_get_page_used(info, leaf_buff);
key_reflength=share->base.key_reflength;
if (info->s->keyinfo+info->lastinx == keyinfo)
info->page_changed=1;
first_key= keypos == anc_buff + info->s->keypage_header + key_reflength;
if ((keypos < anc_buff + anc_length && (info->state->records & 1)) ||
first_key)
{
/* Use page right of anc-page */
DBUG_PRINT("test",("use right page"));
if (keyinfo->flag & HA_BINARY_PACK_KEY)
{
if (!(next_keypos= _ma_get_key(info, keyinfo,
anc_buff, buff, keypos, &length)))
goto err;
}
else
{
/* Got to end of found key */
buff[0]=buff[1]=0; /* Avoid length error check if packed key */
if (!(*keyinfo->get_key)(keyinfo,key_reflength,&next_keypos,
buff))
goto err;
}
next_page= _ma_kpos(key_reflength,next_keypos);
if (!_ma_fetch_keypage(info,keyinfo, next_page, PAGECACHE_LOCK_WRITE,
DFLT_INIT_HITS, buff, 0, &next_page_link))
goto err;
next_buff_length= _ma_get_page_used(info, buff);
DBUG_DUMP("next", buff, next_buff_length);
/* find keys to make a big key-page */
bmove(next_keypos-key_reflength, buff + info->s->keypage_header,
key_reflength);
if (!_ma_get_last_key(info,keyinfo,anc_buff,anc_key,next_keypos,&length) ||
!_ma_get_last_key(info,keyinfo,leaf_buff,leaf_key,
leaf_buff+leaf_length,&length))
goto err;
/* merge pages and put parting key from anc_buff between */
prev_key=(leaf_length == p_length ? (uchar*) 0 : leaf_key);
t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,buff+p_length,
prev_key, prev_key,
anc_key, &key_inserted);
length= next_buff_length - p_length;
endpos=buff+length+leaf_length+t_length;
/* buff will always be larger than before !*/
bmove_upp(endpos, buff + next_buff_length, length);
memcpy(buff, leaf_buff,(size_t) leaf_length);
(*keyinfo->store_key)(keyinfo, buff+leaf_length, &key_inserted);
buff_length= (uint) (endpos-buff);
_ma_store_page_used(info, buff, buff_length);
/* remove key from anc_buff */
if (!(s_length=remove_key(keyinfo,key_reflength,keypos,anc_key,
anc_buff+anc_length,(my_off_t *) 0,
&key_deleted)))
goto err;
new_anc_length= anc_length - s_length;
_ma_store_page_used(info, anc_buff, new_anc_length);
if (buff_length <= (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
{
/* All keys fitted into one page */
next_page_link->changed= 1;
if (_ma_dispose(info, next_page, 0))
goto err;
memcpy(leaf_buff, buff, (size_t) buff_length);
if (info->s->now_transactional)
{
/* Log changes to parent page */
if (_ma_log_delete(info, anc_page, anc_buff, key_deleted.key_pos,
key_deleted.changed_length,
key_deleted.move_length))
goto err;
/*
Log changes to leaf page. Data for leaf page is in buff
which contains original leaf_buff, parting key and next_buff
*/
if (_ma_log_suffix(info, leaf_page, leaf_buff,
leaf_length, buff_length))
goto err;
}
}
else
{
/*
Balancing didn't free a page, so we have to split 'buff' into two
pages:
- Find key in middle of buffer
- Store everything before key in 'leaf_buff'
- Pack key into anc_buff at position of deleted key
Note that anc_buff may overflow! (is handled by caller)
- Store remaining keys in next_page (buff)
*/
MARIA_KEY_PARAM anc_key_inserted;
anc_end_pos= anc_buff + new_anc_length;
DBUG_PRINT("test",("anc_buff: 0x%lx anc_end_pos: 0x%lx",
(long) anc_buff, (long) anc_end_pos));
if (!first_key &&
!_ma_get_last_key(info,keyinfo,anc_buff,anc_key,keypos,&length))
goto err;
if (!(half_pos= _ma_find_half_pos(info, nod_flag, keyinfo, buff,
leaf_key, &key_length, &after_key)))
goto err;
new_leaf_length= (uint) (half_pos-buff);
memcpy(leaf_buff, buff, (size_t) new_leaf_length);
_ma_store_page_used(info, leaf_buff, new_leaf_length);
/* Correct new keypointer to leaf_page */
half_pos=after_key;
_ma_kpointer(info,leaf_key+key_length,next_page);
/* Save key in anc_buff */
prev_key= (first_key ? (uchar*) 0 : anc_key);
t_length=(*keyinfo->pack_key)(keyinfo,key_reflength,
(keypos == anc_end_pos ? (uchar*) 0 :
keypos),
prev_key, prev_key,
leaf_key, &anc_key_inserted);
if (t_length >= 0)
bmove_upp(anc_end_pos+t_length, anc_end_pos,
(uint) (anc_end_pos - keypos));
else
bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
(*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
new_anc_length+= t_length;
_ma_store_page_used(info, anc_buff, new_anc_length);
/* Store key first in new page */
if (nod_flag)
bmove(buff+info->s->keypage_header, half_pos-nod_flag,
(size_t) nod_flag);
if (!(*keyinfo->get_key)(keyinfo,nod_flag,&half_pos,leaf_key))
goto err;
t_length=(int) (*keyinfo->pack_key)(keyinfo, nod_flag, (uchar*) 0,
(uchar*) 0, (uchar*) 0,
leaf_key, &key_inserted);
/* t_length will always be > 0 for a new page !*/
length= (uint) ((buff + buff_length) - half_pos);
bmove(buff+p_length+t_length, half_pos, (size_t) length);
(*keyinfo->store_key)(keyinfo,buff+p_length, &key_inserted);
new_buff_length= length + t_length + p_length;
_ma_store_page_used(info, buff, new_buff_length);
if (info->s->now_transactional)
{
/*
Log changes to parent page
This has one key deleted from it and one key inserted to it at
keypos
ma_log_add ensures that we don't log changes that is outside of
key block size, as the REDO code can't handle that
*/
if (_ma_log_add(info, anc_page, anc_buff, anc_length,
keypos,
anc_key_inserted.move_length +
max(anc_key_inserted.changed_length -
anc_key_inserted.move_length,
key_deleted.changed_length),
anc_key_inserted.move_length -
key_deleted.move_length, 1))
goto err;
/*
Log changes to leaf page.
This contains original data with new data added at end
*/
DBUG_ASSERT(leaf_length <= new_leaf_length);
if (_ma_log_suffix(info, leaf_page, leaf_buff, leaf_length,
new_leaf_length))
goto err;
/*
Log changes to next page
This contains original data with some prefix data deleted and
some compressed data at start possible extended
Data in buff was originally:
org_leaf_buff [leaf_length]
separator_key [buff_key_inserted.move_length]
next_key_changes [buff_key_inserted.changed_length -move_length]
next_page_data [next_buff_length - p_length -
(buff_key_inserted.changed_length -move_length)]
After changes it's now:
unpacked_key [key_inserted.changed_length]
next_suffix [next_buff_length - key_inserted.changed_length]
*/
DBUG_ASSERT(new_buff_length <= next_buff_length);
if (_ma_log_prefix(info, next_page, buff,
key_inserted.changed_length,
(int) (new_buff_length - next_buff_length)))
goto err;
}
next_page_link->changed= 1;
if (_ma_write_keypage(info, keyinfo, next_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
buff))
goto err;
}
leaf_page_link->changed= 1;
if (_ma_write_keypage(info, keyinfo, leaf_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
leaf_buff))
goto err;
DBUG_RETURN(new_anc_length <= ((info->quick_mode ? MARIA_MIN_BLOCK_LENGTH :
(uint) keyinfo->underflow_block_length)));
}
DBUG_PRINT("test",("use left page"));
keypos= _ma_get_last_key(info,keyinfo,anc_buff,anc_key,keypos,&length);
if (!keypos)
goto err;
next_page= _ma_kpos(key_reflength,keypos);
if (!_ma_fetch_keypage(info, keyinfo, next_page, PAGECACHE_LOCK_WRITE,
DFLT_INIT_HITS, buff, 0, &next_page_link))
goto err;
buff_length= _ma_get_page_used(info, buff);
endpos= buff + buff_length;
DBUG_DUMP("prev",buff,buff_length);
/* find keys to make a big key-page */
bmove(next_keypos - key_reflength, leaf_buff + info->s->keypage_header,
key_reflength);
next_keypos=keypos;
if (!(*keyinfo->get_key)(keyinfo,key_reflength,&next_keypos,
anc_key))
goto err;
if (!_ma_get_last_key(info,keyinfo,buff,leaf_key,endpos,&length))
goto err;
/* merge pages and put parting key from anc_buff between */
prev_key=(leaf_length == p_length ? (uchar*) 0 : leaf_key);
t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
(leaf_length == p_length ?
(uchar*) 0 : leaf_buff+p_length),
prev_key, prev_key,
anc_key, &key_inserted);
if (t_length >= 0)
bmove(endpos+t_length, leaf_buff+p_length,
(size_t) (leaf_length-p_length));
else /* We gained space */
bmove(endpos,leaf_buff+((int) p_length-t_length),
(size_t) (leaf_length-p_length+t_length));
(*keyinfo->store_key)(keyinfo,endpos, &key_inserted);
/* Remember for logging how many bytes of leaf_buff that are not changed */
DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
unchanged_leaf_length= leaf_length - (key_inserted.changed_length -
key_inserted.move_length);
new_buff_length= buff_length + leaf_length - p_length + t_length;
_ma_store_page_used(info, buff, new_buff_length);
/* remove key from anc_buff */
if (!(s_length= remove_key(keyinfo,key_reflength,keypos,anc_key,
anc_buff+anc_length,(my_off_t *) 0,
&key_deleted)))
goto err;
new_anc_length= anc_length - s_length;
_ma_store_page_used(info, anc_buff, new_anc_length);
if (new_buff_length <= (uint) (keyinfo->block_length -
KEYPAGE_CHECKSUM_SIZE))
{
/* All keys fitted into one page */
leaf_page_link->changed= 1;
if (_ma_dispose(info, leaf_page, 0))
goto err;
if (info->s->now_transactional)
{
/* Log changes to parent page */
if (_ma_log_delete(info, anc_page, anc_buff, key_deleted.key_pos,
key_deleted.changed_length, key_deleted.move_length))
goto err;
/*
Log changes to next page. Data for leaf page is in buff
that contains original leaf_buff, parting key and next_buff
*/
if (_ma_log_suffix(info, next_page, buff,
buff_length, new_buff_length))
goto err;
}
}
else
{
/*
Balancing didn't free a page, so we have to split 'buff' into two
pages
- Find key in middle of buffer (buff)
- Pack key at half_buff into anc_buff at position of deleted key
Note that anc_buff may overflow! (is handled by caller)
- Move everything after middlekey to 'leaf_buff'
- Shorten buff at 'endpos'
*/
MARIA_KEY_PARAM anc_key_inserted;
if (first_key)
anc_pos= 0; /* First key */
else if (!_ma_get_last_key(info,keyinfo,anc_buff,anc_pos=anc_key,keypos,
&length))
goto err;
if (!(endpos= _ma_find_half_pos(info, nod_flag, keyinfo, buff, leaf_key,
&key_length, &half_pos)))
goto err;
/* Correct new keypointer to leaf_page */
_ma_kpointer(info,leaf_key+key_length,leaf_page);
/* Save key in anc_buff */
DBUG_DUMP("anc_buff", anc_buff, new_anc_length);
DBUG_DUMP("key_to_anc",leaf_key,key_length);
anc_end_pos= anc_buff + new_anc_length;
t_length=(*keyinfo->pack_key)(keyinfo,key_reflength,
keypos == anc_end_pos ? (uchar*) 0
: keypos,
anc_pos, anc_pos,
leaf_key, &anc_key_inserted);
if (t_length >= 0)
bmove_upp(anc_end_pos+t_length, anc_end_pos,
(uint) (anc_end_pos-keypos));
else
bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
(*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
new_anc_length+= t_length;
_ma_store_page_used(info, anc_buff, new_anc_length);
/* Store first key on new page */
if (nod_flag)
bmove(leaf_buff + info->s->keypage_header, half_pos-nod_flag,
(size_t) nod_flag);
if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&half_pos,leaf_key)))
goto err;
DBUG_DUMP("key_to_leaf",leaf_key,length);
t_length=(*keyinfo->pack_key)(keyinfo,nod_flag, (uchar*) 0,
(uchar*) 0, (uchar*) 0, leaf_key,
&key_inserted);
/* t_length will always be > 0 for a new page !*/
length= (uint) ((buff + new_buff_length) - half_pos);
DBUG_PRINT("info",("t_length: %d length: %d",t_length,(int) length));
bmove(leaf_buff+p_length+t_length, half_pos, (size_t) length);
(*keyinfo->store_key)(keyinfo,leaf_buff+p_length, &key_inserted);
new_leaf_length= length + t_length + p_length;
_ma_store_page_used(info, leaf_buff, new_leaf_length);
new_buff_length= (uint) (endpos - buff);
_ma_store_page_used(info, buff, new_buff_length);
if (info->s->now_transactional)
{
/*
Log changes to parent page
This has one key deleted from it and one key inserted to it at
keypos
ma_log_add() ensures that we don't log changes that is outside of
key block size, as the REDO code can't handle that
*/
if (_ma_log_add(info, anc_page, anc_buff, anc_length,
keypos,
anc_key_inserted.move_length +
max(anc_key_inserted.changed_length -
anc_key_inserted.move_length,
key_deleted.changed_length),
anc_key_inserted.move_length -
key_deleted.move_length, 1))
goto err;
/*
Log changes to leaf page.
This contains original data with new data added first
*/
DBUG_ASSERT(leaf_length <= new_leaf_length);
if (_ma_log_prefix(info, leaf_page, leaf_buff,
new_leaf_length - unchanged_leaf_length,
(int) (new_leaf_length - leaf_length)))
goto err;
/*
Log changes to next page
This contains original data with some suffix data deleted
*/
DBUG_ASSERT(new_buff_length <= buff_length);
if (_ma_log_suffix(info, next_page, buff,
buff_length, new_buff_length))
goto err;
}
leaf_page_link->changed= 1;
if (_ma_write_keypage(info, keyinfo, leaf_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS,
leaf_buff))
goto err;
}
next_page_link->changed= 1;
if (_ma_write_keypage(info, keyinfo, next_page,
PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS, buff))
goto err;
DBUG_RETURN(new_anc_length <= (uint)
(keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE)/2);
err:
DBUG_RETURN(-1);
} /* underflow */
/**
@brief Remove a key from page
@fn remove_key()
keyinfo Key handle
keypos Where on page key starts
lastkey Unpacked version of key to be removed
page_end Pointer to end of page
next_block If <> 0 and node-page, this is set to address of
next page
s_temp Information about what changes was done one the page:
s_temp.key_pos Start of key
s_temp.move_length Number of bytes removed at keypos
s_temp.changed_length Number of bytes changed at keypos
@todo
The current code doesn't handle the case that the next key may be
packed better against the previous key if there is a case difference
@return
@retval 0 error
@retval # How many chars was removed
*/
static uint remove_key(MARIA_KEYDEF *keyinfo, uint nod_flag,
uchar *keypos, uchar *lastkey,
uchar *page_end, my_off_t *next_block,
MARIA_KEY_PARAM *s_temp)
{
int s_length;
uchar *start;
DBUG_ENTER("remove_key");
DBUG_PRINT("enter", ("keypos: 0x%lx page_end: 0x%lx",
(long) keypos, (long) page_end));
start= s_temp->key_pos= keypos;
s_temp->changed_length= 0;
if (!(keyinfo->flag &
(HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
HA_BINARY_PACK_KEY)))
{
s_length=(int) (keyinfo->keylength+nod_flag);
if (next_block && nod_flag)
*next_block= _ma_kpos(nod_flag,keypos+s_length);
}
else
{ /* Let keypos point at next key */
/* Calculate length of key */
if (!(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,lastkey))
DBUG_RETURN(0); /* Error */
if (next_block && nod_flag)
*next_block= _ma_kpos(nod_flag,keypos);
s_length=(int) (keypos-start);
if (keypos != page_end)
{
if (keyinfo->flag & HA_BINARY_PACK_KEY)
{
uchar *old_key= start;
uint next_length,prev_length,prev_pack_length;
/* keypos points here on start of next key */
get_key_length(next_length,keypos);
get_key_pack_length(prev_length,prev_pack_length,old_key);
if (next_length > prev_length)
{
uint diff= (next_length-prev_length);
/* We have to copy data from the current key to the next key */
keypos-= diff + prev_pack_length;
store_key_length(keypos, prev_length);
bmove(keypos + prev_pack_length, lastkey + prev_length, diff);
s_length=(int) (keypos-start);
s_temp->changed_length= diff + prev_pack_length;
}
}
else
{
/* Check if a variable length first key part */
if ((keyinfo->seg->flag & HA_PACK_KEY) && *keypos & 128)
{
/* Next key is packed against the current one */
uint next_length,prev_length,prev_pack_length,lastkey_length,
rest_length;
if (keyinfo->seg[0].length >= 127)
{
if (!(prev_length=mi_uint2korr(start) & 32767))
goto end;
next_length=mi_uint2korr(keypos) & 32767;
keypos+=2;
prev_pack_length=2;
}
else
{
if (!(prev_length= *start & 127))
goto end; /* Same key as previous*/
next_length= *keypos & 127;
keypos++;
prev_pack_length=1;
}
if (!(*start & 128))
prev_length=0; /* prev key not packed */
if (keyinfo->seg[0].flag & HA_NULL_PART)
lastkey++; /* Skip null marker */
get_key_length(lastkey_length,lastkey);
if (!next_length) /* Same key after */
{
next_length=lastkey_length;
rest_length=0;
}
else
get_key_length(rest_length,keypos);
if (next_length >= prev_length)
{
/* Next key is based on deleted key */
uint pack_length;
uint diff= (next_length-prev_length);
/* keypos points to data of next key (after key length) */
bmove(keypos - diff, lastkey + prev_length, diff);
rest_length+= diff;
pack_length= prev_length ? get_pack_length(rest_length): 0;
keypos-= diff + pack_length + prev_pack_length;
s_length=(int) (keypos-start);
if (prev_length) /* Pack against prev key */
{
*keypos++= start[0];
if (prev_pack_length == 2)
*keypos++= start[1];
store_key_length(keypos,rest_length);
}
else
{
/* Next key is not packed anymore */
if (keyinfo->seg[0].flag & HA_NULL_PART)
{
rest_length++; /* Mark not null */
}
if (prev_pack_length == 2)
{
mi_int2store(keypos,rest_length);
}
else
*keypos= rest_length;
}
s_temp->changed_length= diff + pack_length + prev_pack_length;
}
}
}
}
}
end:
bmove(start, start+s_length, (uint) (page_end-start-s_length));
s_temp->move_length= s_length;
DBUG_RETURN((uint) s_length);
} /* remove_key */
/****************************************************************************
Logging of redos
****************************************************************************/
/**
@brief log entry where some parts are deleted and some things are changed
@fn _ma_log_delete()
@param info Maria handler
@param page Pageaddress for changed page
@param buff Page buffer
@param key_pos Start of change area
@param changed_length How many bytes where changed at key_pos
@param move_length How many bytes where deleted at key_pos
*/
static my_bool _ma_log_delete(MARIA_HA *info, my_off_t page, uchar *buff,
uchar *key_pos, uint changed_length,
uint move_length)
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 9 + 7], *log_pos;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
uint translog_parts;
uint offset= (uint) (key_pos - buff);
DBUG_ENTER("_ma_log_delete");
DBUG_PRINT("enter", ("page: %lu changed_length: %u move_length: %d",
(ulong) page, changed_length, move_length));
DBUG_ASSERT(info->s->now_transactional && move_length);
DBUG_ASSERT(offset + changed_length <= _ma_get_page_used(info, buff));
/* Store address of new root page */
page/= info->s->block_size;
page_store(log_data + FILEID_STORE_SIZE, page);
log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
log_pos[0]= KEY_OP_OFFSET;
int2store(log_pos+1, offset);
log_pos[3]= KEY_OP_SHIFT;
int2store(log_pos+4, -(int) move_length);
log_pos+= 6;
translog_parts= 1;
if (changed_length)
{
log_pos[0]= KEY_OP_CHANGE;
int2store(log_pos+1, changed_length);
log_pos+= 3;
translog_parts= 2;
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= buff + offset;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
}
#ifdef EXTRA_DEBUG_KEY_CHANGES
{
int page_length= _ma_get_page_used(info, buff);
ha_checksum crc;
crc= my_checksum(0, buff + LSN_STORE_SIZE, page_length - LSN_STORE_SIZE);
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos+1, page_length);
int4store(log_pos+3, crc);
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
changed_length+= 7;
translog_parts++;
}
#endif
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
changed_length,
TRANSLOG_INTERNAL_PARTS + translog_parts,
log_array, log_data, NULL))
DBUG_RETURN(1);
DBUG_RETURN(0);
}