mariadb/storage/maria/unittest/ma_test_loghandler_multithread-t.c
unknown d0b9387b88 WL#3072 - Maria recovery.
* Recovery of the table's live checksum (CREATE TABLE ... CHECKSUM=1)
is achieved in this patch. The table's live checksum
(info->s->state.state.checksum) is updated in inwrite_rec_hook's
under the log mutex when writing UNDO_ROW_INSERT|UPDATE|DELETE
and REDO_DELETE_ALL. The checksum variation caused by the operation
is stored in these UNDOs, so that the REDO phase, when it sees such
UNDOs, can update the live checksum if it is older (state.is_of_lsn is
lower) than the record. It is also used, as a nice add-on with no
cost, to do less row checksum computation during the UNDO phase
(as we have it in the record already).
Doing this work, it became pressing to move in-write hooks
(write_hook_for_redo() et al) to ma_blockrec.c.
The 'parts' argument of inwrite_rec_hook is unpredictable (it comes
mangled at this stage, for example by LSN compression) so it is
replaced by a 'void* hook_arg', which is used to pass down information,
currently only to write_hook_for_clr_end() (previous undo_lsn and
type of undone record).
* If from ha_maria, we print to stderr how many seconds (with one
fractional digit) the REDO phase took, same for UNDO phase and for
final table close. Just to give an indication for debugging and maybe
also for Support.


storage/maria/ha_maria.cc:
  question for Monty
storage/maria/ma_blockrec.c:
  * log in-write hooks (write_hook_for_redo() etc) move from
  ma_loghandler.c to here; this is natural: the hooks are coupled
  to their callers (functions in ma_blockrec.c).
  * translog_write_record() now has a new argument "hook_arg";
  using it to pass down to write_hook_for_clr_end() the transaction's
  previous_undo_lsn and the type of the being undone record, and also
  to pass down to all UNDOs the live checksum variation caused by the
  operation.
  * If table has live checksum, store in UNDO_ROW_INSERT|UPDATE|DELETE
  and in CLR_END the checksum variation ("delta") caused by the
  operation. For example if a DELETE caused the table's live checksum
  to change from 123 to 456, we store in the UNDO_ROW_DELETE, in 4 bytes,
  the value 333 (456-123).
  * Instead of hard-coded "1" as length of the place where we store
  the undone record's type in CLR_END, use a symbol CLR_TYPE_STORE_SIZE;
  use macros clr_type_store and clr_type_korr.
  * write_block_record() has a new parameter 'old_record_checksum'
  which is the pre-computed checksum of old_record; that value is used
  to update the table's live checksum when writing UNDO_ROW_UPDATE|CLR_END.
  * In allocate_write_block_record(), if we are executing UNDO_ROW_DELETE
  the row's checksum is already computed.
  * _ma_update_block_record2() now expect the new row's checksum into
  cur_row.checksum (was already true) and the old row's checksum into
  new_row.checksum (that's new). Its two callers, maria_update() and
  _ma_apply_undo_row_update(), honour this.
  * When executing an UNDO_ROW_INSERT|UPDATE|DELETE in UNDO phase, pick
  up the checksum delta from the log record. It is then used to update
  the table's live checksum when writing CLR_END, and saves us a
  computation of record.
storage/maria/ma_blockrec.h:
  in-write hooks move from ma_loghandler.c
storage/maria/ma_check.c:
  more straightforward size of buffer
storage/maria/ma_checkpoint.c:
  <= is enough
storage/maria/ma_commit.c:
  new prototype of translog_write_record()
storage/maria/ma_create.c:
  new prototype of translog_write_record()
storage/maria/ma_delete.c:
  The row's checksum must be computed before calling(*delete_record)(),
  not after, because it must be known inside _ma_delete_block_record()
  (to update the table's live checksum when writing UNDO_ROW_DELETE).
  If deleting from a transactional table, live checksum was already updated
  when writing UNDO_ROW_DELETE.
storage/maria/ma_delete_all.c:
  @todo is now done (in ma_loghandler.c)
storage/maria/ma_delete_table.c:
  new prototype of translog_write_record()
storage/maria/ma_loghandler.c:
  * in-write hooks move to ma_blockrec.c.
  * translog_write_record() gets a new argument 'hook_arg' which is
  passed down to pre|inwrite_rec_hook. It is more useful that 'parts'
  for those hooks, because when those hooks are called, 'parts' has
  possibly been mangled (like with LSN compression) and is so
  unpredictable.
  * fix for compiler warning (unused buffer_start when compiling without
  debug support)
  * Because checksum delta is stored into UNDO_ROW_INSERT|UPDATE|DELETE
  and CLR_END, but only if the table has live checksum, these records
  are not PSEUDOFIXEDLENGTH anymore, they are now VARIABLE_LENGTH (their
  length is X if no live checksum and X+4 otherwise).
  * add an inwrite_rec_hook for UNDO_ROW_UPDATE, which updates the
  table's live checksum. Update it also in hooks of UNDO_ROW_INSERT|
  DELETE and REDO_DELETE_ALL and CLR_END.
  * Bugfix: when reading a record in translog_read_record(), it happened
  that "length" became negative, because the function assumed that
  the record extended beyond the page's end, whereas it may be shorter.
storage/maria/ma_loghandler.h:
  * Instead of hard-coded "1" and "4", use symbols and macros
  to store/retrieve the type of record which the CLR_END corresponds
  to, and the checksum variation caused by the operation which logs the
  record
  * translog_write_record() gets a new argument 'hook_arg' which is
  passed down to pre|inwrite_rec_hook. It is more useful that 'parts'
  for those hooks, because when those hooks are called, 'parts' has
  possibly been mangled (like with LSN compression) and is so
  unpredictable.
storage/maria/ma_open.c:
  fix for "empty body in if() statement" (when compiling without safemutex)
storage/maria/ma_pagecache.c:
  <= is enough
storage/maria/ma_recovery.c:
  * print the time that each recovery phase (REDO/UNDO/flush) took;
  this is enabled only when recovering from ha_maria. Is it printed
  n seconds with a fractional part of one digit (like 123.4 seconds).
  * In the REDO phase, update the table's live checksum by using
  the checksum delta stored in UNDO_ROW_INSERT|DELETE|UPDATE and CLR_END.
  Update it too when seeing REDO_DELETE_ALL.
  * In the UNDO phase, when executing UNDO_ROW_INSERT, if the table does
  not have live checksum then reading the record's header (as done by
  the master loop of run_undo_phase()) is enough; otherwise we
  do a translog_read_record() to have the checksum delta ready
  for _ma_apply_undo_row_insert().
  * When at the end of the REDO phase we notice that there is an unfinished
  group of REDOs, don't assert in debug binaries, as I verified that it
  can happen in real life (with kill -9)
  * removing ' in #error as it confuses gcc3
storage/maria/ma_rename.c:
  new prototype of translog_write_record()
storage/maria/ma_test_recovery.expected:
  Change in output of ma_test_recovery: now all live checksums of
  original tables equal those of tables recreated by the REDO phase
  and those of tables fixed by the UNDO phase. I.e. recovery of
  the live checksum looks like working (which was after all the only
  goal of this changeset).
  I checked by hand that it's not just all live checksums which are
  now 0 and that's why they match. They are the old values like
  3757530372. maria.test has hard-coded checksum values in its result
  file so checks this too.
storage/maria/ma_update.c:
  * It's useless to put up HA_STATE_CHANGED in 'key_changed',
  as we put up HA_STATE_CHANGED in info->update anyway.
  * We need to compute the old and new rows' checksum before calling
  (*update_record)(), as checksum delta must be known when logging
  UNDO_ROW_UPDATE which is done by _ma_update_block_record(). Note that
  some functions change the 'newrec' record (at least _ma_check_unique()
  does) so we cannot move the checksum computation too early in the
  function.
storage/maria/ma_write.c:
  If inserting into a transactional table, live's checksum was
  already updated when writing UNDO_ROW_INSERT. The multiplication
  is a trick to save an if().
storage/maria/unittest/ma_test_loghandler-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_first_lsn-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_max_lsn-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_multigroup-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_noflush-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_pagecache-t.c:
  new prototype of translog_write_record()
storage/maria/unittest/ma_test_loghandler_purge-t.c:
  new prototype of translog_write_record()
storage/myisam/sort.c:
  fix for compiler warnings in pushbuild (write_merge_key* functions
  didn't have their declaration match MARIA_HA::write_key).
2007-10-02 18:02:09 +02:00

479 lines
13 KiB
C

#include "../maria_def.h"
#include <stdio.h>
#include <errno.h>
#include <tap.h>
#include "../trnman.h"
extern my_bool maria_log_remove();
#ifndef DBUG_OFF
static const char *default_dbug_option;
#endif
#define PCACHE_SIZE (1024*1024*10)
/*#define LOG_FLAGS TRANSLOG_SECTOR_PROTECTION | TRANSLOG_PAGE_CRC */
#define LOG_FLAGS 0
/*#define LONG_BUFFER_SIZE (1024L*1024L*1024L + 1024L*1024L*512)*/
#define LONG_BUFFER_SIZE (1024L*1024L*1024L)
#define MIN_REC_LENGTH 30
#define SHOW_DIVIDER 10
#define LOG_FILE_SIZE (1024L*1024L*1024L + 1024L*1024L*512)
#define ITERATIONS 3
#define WRITERS 3
static uint number_of_writers= WRITERS;
static pthread_cond_t COND_thread_count;
static pthread_mutex_t LOCK_thread_count;
static uint thread_count;
static ulong lens[WRITERS][ITERATIONS];
static LSN lsns1[WRITERS][ITERATIONS];
static LSN lsns2[WRITERS][ITERATIONS];
static uchar *long_buffer;
/*
Get pseudo-random length of the field in
limits [MIN_REC_LENGTH..LONG_BUFFER_SIZE]
SYNOPSIS
get_len()
RETURN
length - length >= 0 length <= LONG_BUFFER_SIZE
*/
static uint32 get_len()
{
uint32 rec_len;
do
{
rec_len= random() /
(RAND_MAX / (LONG_BUFFER_SIZE - MIN_REC_LENGTH - 1)) + MIN_REC_LENGTH;
} while (rec_len >= LONG_BUFFER_SIZE);
return rec_len;
}
/*
Check that the buffer filled correctly
SYNOPSIS
check_content()
ptr Pointer to the buffer
length length of the buffer
RETURN
0 - OK
1 - Error
*/
static my_bool check_content(uchar *ptr, ulong length)
{
ulong i;
for (i= 0; i < length; i++)
{
if (((uchar)ptr[i]) != (i & 0xFF))
{
fprintf(stderr, "Byte # %lu is %x instead of %x",
i, (uint) ptr[i], (uint) (i & 0xFF));
return 1;
}
}
return 0;
}
/*
Read whole record content, and check content (put with offset)
SYNOPSIS
read_and_check_content()
rec The record header buffer
buffer The buffer to read the record in
skip Skip this number of bytes ot the record content
RETURN
0 - OK
1 - Error
*/
static my_bool read_and_check_content(TRANSLOG_HEADER_BUFFER *rec,
uchar *buffer, uint skip)
{
int res= 0;
translog_size_t len;
if ((len= translog_read_record(rec->lsn, 0, rec->record_length,
buffer, NULL)) != rec->record_length)
{
fprintf(stderr, "Requested %lu byte, read %lu\n",
(ulong) rec->record_length, (ulong) len);
res= 1;
}
res|= check_content(buffer + skip, rec->record_length - skip);
return(res);
}
void writer(int num)
{
LSN lsn;
TRN trn;
uchar long_tr_id[6];
uint i;
trn.short_id= num;
trn.first_undo_lsn= TRANSACTION_LOGGED_LONG_ID;
for (i= 0; i < ITERATIONS; i++)
{
uint len= get_len();
lens[num][i]= len;
LEX_STRING parts[TRANSLOG_INTERNAL_PARTS + 1];
int2store(long_tr_id, num);
int4store(long_tr_id + 2, i);
parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id;
parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6;
if (translog_write_record(&lsn,
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&trn, NULL, 6, TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL, NULL))
{
fprintf(stderr, "Can't write LOGREC_FIXED_RECORD_0LSN_EXAMPLE record #%lu "
"thread %i\n", (ulong) i, num);
translog_destroy();
pthread_mutex_lock(&LOCK_thread_count);
ok(0, "write records");
pthread_mutex_unlock(&LOCK_thread_count);
return;
}
lsns1[num][i]= lsn;
parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_buffer;
parts[TRANSLOG_INTERNAL_PARTS + 0].length= len;
if (translog_write_record(&lsn,
LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE,
&trn, NULL,
len, TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL, NULL))
{
fprintf(stderr, "Can't write variable record #%lu\n", (ulong) i);
translog_destroy();
pthread_mutex_lock(&LOCK_thread_count);
ok(0, "write records");
pthread_mutex_unlock(&LOCK_thread_count);
return;
}
lsns2[num][i]= lsn;
pthread_mutex_lock(&LOCK_thread_count);
ok(1, "write records");
pthread_mutex_unlock(&LOCK_thread_count);
}
return;
}
static void *test_thread_writer(void *arg)
{
int param= *((int*) arg);
my_thread_init();
writer(param);
pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
ok(1, "writer finished"); /* just to show progress */
VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are
ready */
pthread_mutex_unlock(&LOCK_thread_count);
free((uchar*) arg);
my_thread_end();
return(0);
}
int main(int argc __attribute__((unused)),
char **argv __attribute__ ((unused)))
{
uint32 i;
uint pagen;
PAGECACHE pagecache;
LSN first_lsn;
TRANSLOG_HEADER_BUFFER rec;
struct st_translog_scanner_data scanner;
pthread_t tid;
pthread_attr_t thr_attr;
int *param, error;
int rc;
plan(WRITERS + ITERATIONS * WRITERS * 3);
bzero(&pagecache, sizeof(pagecache));
maria_data_root= ".";
long_buffer= malloc(LONG_BUFFER_SIZE + 7 * 2 + 2);
if (long_buffer == 0)
{
fprintf(stderr, "End of memory\n");
exit(1);
}
for (i= 0; i < (LONG_BUFFER_SIZE + 7 * 2 + 2); i++)
long_buffer[i]= (i & 0xFF);
MY_INIT(argv[0]);
if (maria_log_remove())
exit(1);
#ifndef DBUG_OFF
#if defined(__WIN__)
default_dbug_option= "d:t:i:O,\\ma_test_loghandler.trace";
#else
default_dbug_option= "d:t:i:o,/tmp/ma_test_loghandler.trace";
#endif
if (argc > 1)
{
DBUG_SET(default_dbug_option);
DBUG_SET_INITIAL(default_dbug_option);
}
#endif
if ((error= pthread_cond_init(&COND_thread_count, NULL)))
{
fprintf(stderr, "COND_thread_count: %d from pthread_cond_init "
"(errno: %d)\n", error, errno);
exit(1);
}
if ((error= pthread_mutex_init(&LOCK_thread_count, MY_MUTEX_INIT_FAST)))
{
fprintf(stderr, "LOCK_thread_count: %d from pthread_cond_init "
"(errno: %d)\n", error, errno);
exit(1);
}
if ((error= pthread_attr_init(&thr_attr)))
{
fprintf(stderr, "Got error: %d from pthread_attr_init "
"(errno: %d)\n", error, errno);
exit(1);
}
if ((error= pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED)))
{
fprintf(stderr,
"Got error: %d from pthread_attr_setdetachstate (errno: %d)\n",
error, errno);
exit(1);
}
#ifdef HAVE_THR_SETCONCURRENCY
VOID(thr_setconcurrency(2));
#endif
my_thread_global_init();
if (ma_control_file_create_or_open(TRUE))
{
fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1);
}
if ((pagen= init_pagecache(&pagecache, PCACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE)) == 0)
{
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1);
}
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
/* Suppressing of automatic record writing */
dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID;
srandom(122334817L);
{
LEX_STRING parts[TRANSLOG_INTERNAL_PARTS + 1];
uchar long_tr_id[6]=
{
0x11, 0x22, 0x33, 0x44, 0x55, 0x66
};
parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id;
parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6;
dummy_transaction_object.first_undo_lsn= TRANSACTION_LOGGED_LONG_ID;
if (translog_write_record(&first_lsn,
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL, NULL))
{
fprintf(stderr, "Can't write the first record\n");
translog_destroy();
exit(1);
}
}
if ((error= pthread_mutex_lock(&LOCK_thread_count)))
{
fprintf(stderr, "LOCK_thread_count: %d from pthread_mutex_lock "
"(errno: %d)\n", error, errno);
exit(1);
}
while (number_of_writers != 0)
{
param= (int*) malloc(sizeof(int));
*param= number_of_writers - 1;
if ((error= pthread_create(&tid, &thr_attr, test_thread_writer,
(void*) param)))
{
fprintf(stderr, "Got error: %d from pthread_create (errno: %d)\n",
error, errno);
exit(1);
}
thread_count++;
number_of_writers--;
}
pthread_mutex_unlock(&LOCK_thread_count);
pthread_attr_destroy(&thr_attr);
/* wait finishing */
if ((error= pthread_mutex_lock(&LOCK_thread_count)))
fprintf(stderr, "LOCK_thread_count: %d from pthread_mutex_lock\n", error);
while (thread_count)
{
if ((error= pthread_cond_wait(&COND_thread_count, &LOCK_thread_count)))
fprintf(stderr, "COND_thread_count: %d from pthread_cond_wait\n", error);
}
if ((error= pthread_mutex_unlock(&LOCK_thread_count)))
fprintf(stderr, "LOCK_thread_count: %d from pthread_mutex_unlock\n", error);
/* Find last LSN and flush up to it (all our log) */
{
LSN max= 0;
for (i= 0; i < WRITERS; i++)
{
if (cmp_translog_addr(lsns2[i][ITERATIONS - 1], max) > 0)
max= lsns2[i][ITERATIONS - 1];
}
translog_flush(max);
}
rc= 1;
{
uint indeces[WRITERS];
uint index, stage;
int len;
bzero(indeces, sizeof(uint) * WRITERS);
bzero(indeces, sizeof(indeces));
if (translog_init_scanner(first_lsn, 1, &scanner, 0))
{
fprintf(stderr, "scanner init failed\n");
goto err;
}
for (i= 0;; i++)
{
len= translog_read_next_record_header(&scanner, &rec);
if (len == RECHEADER_READ_ERROR)
{
fprintf(stderr, "1-%d translog_read_next_record_header failed (%d)\n",
i, errno);
translog_free_record_header(&rec);
goto err;
}
if (len == RECHEADER_READ_EOF)
{
if (i != WRITERS * ITERATIONS * 2)
{
fprintf(stderr, "EOL met at iteration %u instead of %u\n",
i, ITERATIONS * WRITERS * 2);
translog_free_record_header(&rec);
goto err;
}
break;
}
index= indeces[rec.short_trid] / 2;
stage= indeces[rec.short_trid] % 2;
if (stage == 0)
{
if (rec.type !=LOGREC_FIXED_RECORD_0LSN_EXAMPLE ||
rec.record_length != 6 ||
uint2korr(rec.header) != rec.short_trid ||
index != uint4korr(rec.header + 2) ||
cmp_translog_addr(lsns1[rec.short_trid][index], rec.lsn) != 0)
{
fprintf(stderr, "Incorrect LOGREC_FIXED_RECORD_0LSN_EXAMPLE "
"data read(%d)\n"
"type %u, strid %u %u, len %u, i: %u %u, "
"lsn(%lu,0x%lx) (%lu,0x%lx)\n",
i, (uint) rec.type,
(uint) rec.short_trid, (uint) uint2korr(rec.header),
(uint) rec.record_length,
(uint) index, (uint) uint4korr(rec.header + 2),
LSN_IN_PARTS(rec.lsn),
LSN_IN_PARTS(lsns1[rec.short_trid][index]));
translog_free_record_header(&rec);
goto err;
}
}
else
{
if (rec.type != LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE ||
len != 9 ||
rec.record_length != lens[rec.short_trid][index] ||
cmp_translog_addr(lsns2[rec.short_trid][index], rec.lsn) != 0 ||
check_content(rec.header, (uint)len))
{
fprintf(stderr,
"Incorrect LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE "
"data read(%d) "
"thread: %d, iteration %d, stage %d\n"
"type %u (%d), len %d, length %lu %lu (%d) "
"lsn(%lu,0x%lx) (%lu,0x%lx)\n",
i, (uint) rec.short_trid, index, stage,
(uint) rec.type, (rec.type !=
LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE),
len,
(ulong) rec.record_length, lens[rec.short_trid][index],
(rec.record_length != lens[rec.short_trid][index]),
LSN_IN_PARTS(rec.lsn),
LSN_IN_PARTS(lsns2[rec.short_trid][index]));
translog_free_record_header(&rec);
goto err;
}
if (read_and_check_content(&rec, long_buffer, 0))
{
fprintf(stderr,
"Incorrect LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE "
"in whole rec read lsn(%lu,0x%lx)\n",
LSN_IN_PARTS(rec.lsn));
translog_free_record_header(&rec);
goto err;
}
}
ok(1, "record read");
translog_free_record_header(&rec);
indeces[rec.short_trid]++;
}
}
rc= 0;
err:
if (rc)
ok(0, "record read");
translog_destroy();
end_pagecache(&pagecache, 1);
ma_control_file_end();
if (maria_log_remove())
exit(1);
return(exit_status());
}