mariadb/storage/maria/ma_rt_split.c
unknown c9a825810d WL#3072 - Maria Recovery
Recovery of R-tree and fulltext indices.
Fix for BUG#35551 "Maria: crash in REPAIR TABLE/ENABLE KEYS if using
repair-with-keycache method".
Fix for bug (see ma_rt_index.c) where we could have a wrong
page_link pointer causing wrong memory access during some R-tree
index insert/delete.
Making ma_rt_test work again (it had been neglected over time) and
adding options (record type etc) to prepare it for integration into
ma_test_all-t (but there is BUG#36321 about "ma_rt_test -M" crash)


mysql-test/r/maria.result:
  correct result
mysql-test/t/maria.test:
  now we get no error
storage/maria/ma_blockrec.c:
  delete_dir_entry() and delete_head_or_tail() don't use info->keyread_buff.
  ma_get_length() does not change **packet, marking it with 'const' to
  remove some casts in callers of this function. The
  (const uchar**)&header casts will be removed when Monty changes 'header'
  to const uchar*.
  _ma_apply_redo_purge_row_head_or_tail() sets 'buff' from pagecache_read()
  so its initialization was superfluous.
storage/maria/ma_check.c:
  Fix for BUG#35551 "Maria: crash in REPAIR TABLE/ENABLE KEYS if using repair-with-keycache method"
  (see comment in code)
storage/maria/ma_create.c:
  FULLTEXT and SPATIAL indices have logging now, they are recoverable.
storage/maria/ma_delete.c:
  Logging done by _ma_ck_delete() is moved to a function
  (_ma_write_undo_key_delete()), for reusal by R-tree logging.
  _ma_log_delete() is made non-static for same
  reason, and some of its parameters are made pointers to const.
  Removed wrong comment ("Note that for delete key" etc, contradicted by
  code and comment "Log also position to row" a few lines above)
storage/maria/ma_ft_update.c:
  unneeded cast, comment for future
storage/maria/ma_key_recover.c:
  Comment about possible deadlock.
  Write bad page to DBUG trace if KEY_OP_CHECK founds bad CRC.
  Support operation KEY_OP_MULTI_COPY.
  When we execute, in UNDO phase, UNDO_KEY_DELETE|INSERT, we must call
  the proper key insertion|deletion depending on if this is R-tree
  or B-tree.
  Explanation of of _ma_[un]lock_key_del() work, maybe useful for
  mortals like me.
storage/maria/ma_key_recover.h:
  change of prototypes
storage/maria/ma_loghandler.h:
  New operation which can be stored in REDO_INDEX log records: KEY_OP_MULTI_COPY
storage/maria/ma_page.c:
  Comments
storage/maria/ma_pagecache.c:
  typo
storage/maria/ma_rt_index.c:
  Fix for bug: the page_link pointer in maria_rtree_insert_req()
  could be wrong when we set its 'changed' member; for the solution
  see ma_key_recover.h. It is needed only in cases when we manipulate
  several pages.
  Logging of changes done to pages by key insert/delete.
  maria_rtree_delete()'s main work is moved to a new function
  maria_rtree_real_delete(), which is used by maria_rtree_delete()
  and by applying of UNDO_KEY_INSERT.
storage/maria/ma_rt_index.h:
  new prototypes and macros for ma_rt_index.c
storage/maria/ma_rt_key.c:
  Logging of maria_rtree_add_key() and maria_rtree_delete_key().
  When inserting, split is necessary if there is not enough room for key:
  take checksum's occupied space in this calculation.
storage/maria/ma_rt_key.h:
  new prototypes (those functions need to know the page's id
  because they do logging)
storage/maria/ma_rt_mbr.c:
  Comments about what the functions change.
storage/maria/ma_rt_split.c:
  maria_rtree_split_page() needs to know the page's id, because
  it does logging.
  Logging of what a split operation does to the split page (see
  comment of _ma_log_rt_split(): moves of keys inside the page,
  sometimes insertion of the new key, and shrinking of the page)
  and to the new page (receives some keys from split page, and
  sometimes the new key).
storage/maria/ma_rt_test.c:
  ma_rt_test had been forgotten when maria_rkey() was changed some months ago
  (0->HA_WHOLE_KEY change), and when calls to maria_rnd(,,HA_OFFSET_ERROR)
  were rewritten to maria_scan() calls (which implies maria_scan_init()).
  The 'max_i' change is to adapt to the fact that maria_scan() does
  not return deleted records for BLOCK_RECORD but does so for other formats;
  the initial code assumed a certain number of deleted records would be
  returned, we change it to rather count only non-deleted ones.
  We also add more features to this test, like ma_test1 (the plan
  is to run ma_rt_test in ma_test_all-t):
  options to choose records' format, table checksum, transactions,
  checkpoints, end at specific stages, abort without committing,
  and debug trace.
storage/maria/ma_test1.c:
  MY_INIT() does my_init().
storage/maria/ma_write.c:
  Logging done by _ma_ck_write_btree_with_log() is moved to a function
  (_ma_write_undo_key_insert()), for reusal by R-tree logging.
  _ma_log_new() and _ma_log_change() are made non-static for same
  reason. Some parameters of logging functions are made pointers to const.
  If EXTRA_DEBUG_KEY_CHANGES, we now log CRC in _ma_log_change() too
  (better checks, bigger record).
storage/maria/maria_read_log.c:
  Program takes no arguments, bail out if any, instead of silently discarding them
storage/myisam/rt_test.c:
  rt_test had been forgotten when mi_rkey() was changed some months ago
  (0->HA_WHOLE_KEY change).
  The 'max_i' change is to make it symmetric with ma_rt_test.c
mysql-test/r/maria-gis-rtree-dynamic.result:
  correct result
mysql-test/r/maria-gis-rtree-trans.result:
  correct result
mysql-test/r/maria-recovery-rtree-ft.result:
  almost correct result (hitting BUG# in the end)
mysql-test/t/maria-gis-rtree-dynamic.test:
  test R-tree & dynamic row format
mysql-test/t/maria-gis-rtree-trans.test:
  Test R-tree and page row format and transactional
mysql-test/t/maria-recovery-rtree-ft-master.opt:
  usual options for recovery testing
mysql-test/t/maria-recovery-rtree-ft.test:
  test of recovery of R-tree and fulltext indices.
2008-04-24 17:22:51 +02:00

558 lines
17 KiB
C

/* Copyright (C) 2006 MySQL AB & Alexey Botchkov & MySQL Finland AB
& TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "maria_def.h"
#include "trnman.h"
#include "ma_key_recover.h"
#ifdef HAVE_RTREE_KEYS
#include "ma_rt_index.h"
#include "ma_rt_key.h"
#include "ma_rt_mbr.h"
typedef struct
{
double square;
int n_node;
const uchar *key;
double *coords;
} SplitStruct;
inline static double *reserve_coords(double **d_buffer, int n_dim)
{
double *coords= *d_buffer;
(*d_buffer)+= n_dim * 2;
return coords;
}
static void mbr_join(double *a, const double *b, int n_dim)
{
double *end= a + n_dim * 2;
do
{
if (a[0] > b[0])
a[0]= b[0];
if (a[1] < b[1])
a[1]= b[1];
a+= 2;
b+= 2;
} while (a != end);
}
/*
Counts the square of mbr which is a join of a and b
*/
static double mbr_join_square(const double *a, const double *b, int n_dim)
{
const double *end= a + n_dim * 2;
double square= 1.0;
do
{
square *=
((a[1] < b[1]) ? b[1] : a[1]) - ((a[0] > b[0]) ? b[0] : a[0]);
a+= 2;
b+= 2;
} while (a != end);
return square;
}
static double count_square(const double *a, int n_dim)
{
const double *end= a + n_dim * 2;
double square= 1.0;
do
{
square *= a[1] - a[0];
a+= 2;
} while (a != end);
return square;
}
inline static void copy_coords(double *dst, const double *src, int n_dim)
{
memcpy(dst, src, sizeof(double) * (n_dim * 2));
}
/**
Select two nodes to collect group upon.
Note that such function uses 'double' arithmetic so may behave differently
on different platforms/builds. There are others in this file.
*/
static void pick_seeds(SplitStruct *node, int n_entries,
SplitStruct **seed_a, SplitStruct **seed_b, int n_dim)
{
SplitStruct *cur1;
SplitStruct *lim1= node + (n_entries - 1);
SplitStruct *cur2;
SplitStruct *lim2= node + n_entries;
double max_d= -DBL_MAX;
double d;
for (cur1= node; cur1 < lim1; cur1++)
{
for (cur2=cur1 + 1; cur2 < lim2; cur2++)
{
d= mbr_join_square(cur1->coords, cur2->coords, n_dim) - cur1->square -
cur2->square;
if (d > max_d)
{
max_d= d;
*seed_a= cur1;
*seed_b= cur2;
}
}
}
}
/*
Select next node and group where to add
*/
static void pick_next(SplitStruct *node, int n_entries, double *g1, double *g2,
SplitStruct **choice, int *n_group, int n_dim)
{
SplitStruct *cur= node;
SplitStruct *end= node + n_entries;
double max_diff= -DBL_MAX;
for (; cur < end; cur++)
{
double diff;
double abs_diff;
if (cur->n_node)
{
continue;
}
diff= mbr_join_square(g1, cur->coords, n_dim) -
mbr_join_square(g2, cur->coords, n_dim);
abs_diff= fabs(diff);
if (abs_diff > max_diff)
{
max_diff= abs_diff;
*n_group= 1 + (diff > 0);
*choice= cur;
}
}
}
/*
Mark not-in-group entries as n_group
*/
static void mark_all_entries(SplitStruct *node, int n_entries, int n_group)
{
SplitStruct *cur= node;
SplitStruct *end= node + n_entries;
for (; cur < end; cur++)
{
if (cur->n_node)
{
continue;
}
cur->n_node= n_group;
}
}
static int split_maria_rtree_node(SplitStruct *node, int n_entries,
int all_size, /* Total key's size */
int key_size,
int min_size, /* Minimal group size */
int size1, int size2 /* initial group sizes */,
double **d_buffer, int n_dim)
{
SplitStruct *cur;
SplitStruct *a;
SplitStruct *b;
double *g1= reserve_coords(d_buffer, n_dim);
double *g2= reserve_coords(d_buffer, n_dim);
SplitStruct *next;
int next_node;
int i;
SplitStruct *end= node + n_entries;
LINT_INIT(a);
LINT_INIT(b);
LINT_INIT(next);
LINT_INIT(next_node);
if (all_size < min_size * 2)
{
return 1;
}
cur= node;
for (; cur < end; cur++)
{
cur->square= count_square(cur->coords, n_dim);
cur->n_node= 0;
}
pick_seeds(node, n_entries, &a, &b, n_dim);
a->n_node= 1;
b->n_node= 2;
copy_coords(g1, a->coords, n_dim);
size1+= key_size;
copy_coords(g2, b->coords, n_dim);
size2+= key_size;
for (i=n_entries - 2; i>0; --i)
{
if (all_size - (size2 + key_size) < min_size) /* Can't write into group 2 */
{
mark_all_entries(node, n_entries, 1);
break;
}
if (all_size - (size1 + key_size) < min_size) /* Can't write into group 1 */
{
mark_all_entries(node, n_entries, 2);
break;
}
pick_next(node, n_entries, g1, g2, &next, &next_node, n_dim);
if (next_node == 1)
{
size1+= key_size;
mbr_join(g1, next->coords, n_dim);
}
else
{
size2+= key_size;
mbr_join(g2, next->coords, n_dim);
}
next->n_node= next_node;
}
return 0;
}
/**
Logs key reorganization done in a split page (new page is logged elsewhere).
The effect of a split on the split page is three changes:
- some piece of the page move to different places inside this page (we are
not interested here in the pieces which move to the new page)
- the key is inserted into the page or not (could be in the new page)
- page is shrunk
All this is uniquely determined by a few parameters:
- the key (starting at 'key-nod_flag', for 'full_length' bytes
(maria_rtree_split_page() seems to depend on its parameters key&key_length
but in fact it reads more (to the left: nod_flag, and to the right:
full_length)
- the binary content of the page
- some variables in the share
- double arithmetic, which is unpredictable from machine to machine and
from build to build (see pick_seeds() above: it has a comparison between
double-s 'if (d > max_d)' so the comparison can go differently from machine
to machine or build to build, it has happened in real life).
If one day we use precision-math instead of double-math, in GIS, then the
last parameter would become constant accross machines and builds and we
could some cheap logging: just log the few parameters above.
Until then, we log the list of memcpy() operations (fortunately, we often do
not have to log the source bytes, as they can be found in the page before
applying the REDO; the only source bytes to log are the key), the key if it
was inserted into this page, and the shrinking.
@param info table
@param page page's offset in the file
@param buff content of the page (post-split)
@param key_with_nod_flag pointer to key-nod_flag
@param full_length length of (key + (nod_flag (if node) or rowid (if
leaf)))
@param log_internal_copy encoded list of mempcy() operations done on
split page, having their source in the page
@param log_internal_copy_length length of above list, in bytes
@param log_key_copy operation describing the key's copy, or NULL if the
inserted key was not put into the page (was put in
new page, so does not have to be logged here)
@param length_diff by how much the page has shrunk during split
*/
static my_bool _ma_log_rt_split(MARIA_HA *info,
my_off_t page, const uchar *buff,
const uchar *key_with_nod_flag,
uint full_length,
const uchar *log_internal_copy,
uint log_internal_copy_length,
const uchar *log_key_copy,
uint length_diff)
{
MARIA_SHARE *share= info->s;
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + 2 + 1 + 2 + 2 + 7],
*log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 5];
uint translog_parts, extra_length= 0;
DBUG_ENTER("_ma_log_rt_split");
DBUG_PRINT("enter", ("page: %lu", (ulong) page));
DBUG_ASSERT(share->now_transactional);
page/= share->block_size;
page_store(log_data + FILEID_STORE_SIZE, page);
log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
log_pos[0]= KEY_OP_DEL_SUFFIX;
log_pos++;
DBUG_ASSERT((int)length_diff > 0);
int2store(log_pos, length_diff);
log_pos+= 2;
log_pos[0]= KEY_OP_MULTI_COPY;
log_pos++;
int2store(log_pos, full_length);
log_pos+= 2;
int2store(log_pos, log_internal_copy_length);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data) - 7;
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= log_internal_copy;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= log_internal_copy_length;
translog_parts= 2;
if (log_key_copy != NULL) /* need to store key into record */
{
log_array[TRANSLOG_INTERNAL_PARTS + 2].str= log_key_copy;
log_array[TRANSLOG_INTERNAL_PARTS + 2].length= 1 + 2 + 1 + 2;
log_array[TRANSLOG_INTERNAL_PARTS + 3].str= key_with_nod_flag;
log_array[TRANSLOG_INTERNAL_PARTS + 3].length= full_length;
extra_length= 1 + 2 + 1 + 2 + full_length;
translog_parts+= 2;
}
#ifdef EXTRA_DEBUG_KEY_CHANGES
{
int page_length= _ma_get_page_used(share, buff);
ha_checksum crc;
crc= my_checksum(0, buff + LSN_STORE_SIZE, page_length - LSN_STORE_SIZE);
log_pos+= 2;
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos + 1, page_length);
int4store(log_pos + 3, crc);
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
extra_length+= 7;
translog_parts++;
}
#endif
if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
(translog_size_t) ((log_pos - log_data) +
log_internal_copy_length +
extra_length),
TRANSLOG_INTERNAL_PARTS + translog_parts,
log_array, log_data, NULL))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
/**
0 ok; the created page is put into page cache; the shortened one is not (up
to the caller to do it)
1 or -1: error.
If new_page_offs==NULL, won't create new page (for redo phase).
*/
int maria_rtree_split_page(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
my_off_t page_offs, uchar *page, const uchar *key,
uint key_length, my_off_t *new_page_offs)
{
MARIA_SHARE *share= info->s;
const my_bool transactional= share->now_transactional;
int n1, n2; /* Number of items in groups */
SplitStruct *task;
SplitStruct *cur;
SplitStruct *stop;
double *coord_buf;
double *next_coord;
double *old_coord;
int n_dim;
uchar *source_cur, *cur1, *cur2;
uchar *new_page, *log_internal_copy, *log_internal_copy_ptr,
*log_key_copy= NULL;
int err_code= 0;
uint nod_flag= _ma_test_if_nod(share, page);
uint org_length= _ma_get_page_used(share, page), new_length;
uint full_length= key_length + (nod_flag ? nod_flag :
share->base.rec_reflength);
int max_keys= ((org_length - share->keypage_header) /
(full_length));
MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
DBUG_ENTER("maria_rtree_split_page");
DBUG_PRINT("rtree", ("splitting block"));
n_dim= keyinfo->keysegs / 2;
if (!(coord_buf= (double*) my_alloca(n_dim * 2 * sizeof(double) *
(max_keys + 1 + 4) +
sizeof(SplitStruct) * (max_keys + 1))))
DBUG_RETURN(-1); /* purecov: inspected */
task= (SplitStruct *)(coord_buf + n_dim * 2 * (max_keys + 1 + 4));
next_coord= coord_buf;
stop= task + max_keys;
source_cur= rt_PAGE_FIRST_KEY(share, page, nod_flag);
for (cur= task;
cur < stop;
cur++, source_cur= rt_PAGE_NEXT_KEY(share, source_cur, key_length,
nod_flag))
{
cur->coords= reserve_coords(&next_coord, n_dim);
cur->key= source_cur;
maria_rtree_d_mbr(keyinfo->seg, source_cur, key_length, cur->coords);
}
cur->coords= reserve_coords(&next_coord, n_dim);
maria_rtree_d_mbr(keyinfo->seg, key, key_length, cur->coords);
cur->key= key;
old_coord= next_coord;
if (split_maria_rtree_node(task, max_keys + 1,
_ma_get_page_used(share, page) + full_length + 2,
full_length,
rt_PAGE_MIN_SIZE(keyinfo->block_length),
2, 2, &next_coord, n_dim))
{
err_code= 1;
goto split_err;
}
/* Allocate buffer for new page and piece of log record */
if (!(new_page= (uchar*) my_alloca((uint)keyinfo->block_length +
(transactional ?
(max_keys * (2 + 2) +
1 + 2 + 1 + 2) : 0))))
{
err_code= -1;
goto split_err;
}
log_internal_copy= log_internal_copy_ptr= new_page + keyinfo->block_length;
bzero(new_page, share->block_size);
stop= task + (max_keys + 1);
cur1= rt_PAGE_FIRST_KEY(share, page, nod_flag);
cur2= rt_PAGE_FIRST_KEY(share, new_page, nod_flag);
n1= n2= 0;
for (cur= task; cur < stop; cur++)
{
uchar *to;
const uchar *cur_key= cur->key;
my_bool log_this_change;
DBUG_ASSERT(log_key_copy == NULL);
if (cur->n_node == 1)
{
to= cur1;
cur1= rt_PAGE_NEXT_KEY(share, cur1, key_length, nod_flag);
n1++;
log_this_change= transactional;
}
else
{
to= cur2;
cur2= rt_PAGE_NEXT_KEY(share, cur2, key_length, nod_flag);
n2++;
log_this_change= FALSE;
}
if (to != cur_key)
{
uchar *to_with_nod_flag= to - nod_flag;
const uchar *cur_key_with_nod_flag= cur_key - nod_flag;
memcpy(to_with_nod_flag, cur_key_with_nod_flag, full_length);
if (log_this_change)
{
uint to_with_nod_flag_offs= to_with_nod_flag - page;
if (likely(cur_key != key))
{
/* this memcpy() is internal to the page (source in the page) */
uint cur_key_with_nod_flag_offs= cur_key_with_nod_flag - page;
int2store(log_internal_copy_ptr, to_with_nod_flag_offs);
log_internal_copy_ptr+= 2;
int2store(log_internal_copy_ptr, cur_key_with_nod_flag_offs);
log_internal_copy_ptr+= 2;
}
else
{
/* last iteration, and this involves *key: source is external */
log_key_copy= log_internal_copy_ptr;
log_key_copy[0]= KEY_OP_OFFSET;
int2store(log_key_copy + 1, to_with_nod_flag_offs);
log_key_copy[3]= KEY_OP_CHANGE;
int2store(log_key_copy + 4, full_length);
/* _ma_log_rt_split() will store *key, right after */
}
}
}
}
{ /* verify that above loop didn't touch header bytes */
uint i;
for (i= 0; i < share->keypage_header; i++)
DBUG_ASSERT(new_page[i]==0);
}
if (nod_flag)
_ma_store_keypage_flag(share, new_page, KEYPAGE_FLAG_ISNOD);
_ma_store_keynr(share, new_page, keyinfo->key_nr);
_ma_store_page_used(share, new_page, share->keypage_header +
n2 * full_length);
new_length= share->keypage_header + n1 * full_length;
_ma_store_page_used(share, page, new_length);
if ((*new_page_offs= _ma_new(info, DFLT_INIT_HITS, &page_link)) ==
HA_OFFSET_ERROR)
err_code= -1;
else
{
if (transactional &&
( /* log change to split page */
_ma_log_rt_split(info, page_offs, page, key - nod_flag,
full_length, log_internal_copy,
log_internal_copy_ptr - log_internal_copy,
log_key_copy, org_length - new_length) ||
/* and to new page */
_ma_log_new(info, *new_page_offs, new_page,
share->keypage_header + n2 * full_length,
keyinfo->key_nr, 0)))
err_code= -1;
if ( _ma_write_keypage(info, keyinfo, *new_page_offs,
page_link->write_lock,
DFLT_INIT_HITS, new_page))
err_code= -1;
}
DBUG_PRINT("rtree", ("split new block: %lu", (ulong) *new_page_offs));
my_afree(new_page);
split_err:
my_afree(coord_buf);
DBUG_RETURN(err_code);
}
#endif /*HAVE_RTREE_KEYS*/