Fixed bugs related to huge records where group list do not

fit in on one page (many chunks type 0 created) (BUG#34695).
Maria log dump now can read unittest logs.


storage/maria/CMakeLists.txt:
  Maria log dump now can read unittest logs.
storage/maria/Makefile.am:
  Maria log dump now can read unittest logs.
storage/maria/unittest/sequence_storage.c:
  New BitKeeper file ``storage/maria/unittest/sequence_storage.c''
storage/maria/unittest/sequence_storage.h:
  New BitKeeper file ``storage/maria/unittest/sequence_storage.h''
This commit is contained in:
unknown 2008-04-22 13:04:18 +03:00
parent 23536e8709
commit bc31f00577
5 changed files with 214 additions and 59 deletions

View file

@ -62,7 +62,7 @@ TARGET_LINK_LIBRARIES(maria_read_log maria myisam mysys dbug strings zlib wsock3
ADD_EXECUTABLE(maria_pack maria_pack.c)
TARGET_LINK_LIBRARIES(maria_pack maria myisam mysys dbug strings zlib wsock32)
ADD_EXECUTABLE(maria_dump_log ma_loghandler.c)
ADD_EXECUTABLE(maria_dump_log ma_loghandler.c unittest/ma_loghandler_examples.c)
TARGET_LINK_LIBRARIES(maria_dump_log maria myisam mysys dbug strings zlib wsock32)
SET_TARGET_PROPERTIES(maria_dump_log PROPERTIES COMPILE_FLAGS "-DMARIA_DUMP_LOG")

View file

@ -63,7 +63,7 @@ maria_dump_log_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@
maria_dump_log_SOURCES= ma_loghandler.c
maria_dump_log_SOURCES= ma_loghandler.c unittest/ma_loghandler_examples.c
maria_dump_log_CPPFLAGS= -DMARIA_DUMP_LOG
noinst_PROGRAMS = ma_test1 ma_test2 ma_test3 ma_rt_test ma_sp_test
noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \

View file

@ -322,7 +322,8 @@ enum enum_translog_status translog_status= TRANSLOG_UNINITED;
#define TRANSLOG_CHUNK_NOHDR (2 << 6) /* 2 no head chunk (till page end) */
#define TRANSLOG_CHUNK_LNGTH (3 << 6) /* 3 chunk with chunk length */
#define TRANSLOG_CHUNK_TYPE (3 << 6) /* Mask to get chunk type */
#define TRANSLOG_REC_TYPE 0x3F /* Mask to get record type */
#define TRANSLOG_REC_TYPE 0x3F /* Mask to get record type */
#define TRANSLOG_CHUNK_0_CONT 0x3F /* the type to mark chunk 0 continue */
/* compressed (relative) LSN constants */
#define TRANSLOG_CLSN_LEN_BITS 0xC0 /* Mask to get compressed LSN length */
@ -343,6 +344,7 @@ static my_bool translog_page_validator(uchar *page,
static my_bool translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner);
static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected);
LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon);
/*
@ -3267,6 +3269,26 @@ static void translog_fill_overhead_table()
}
/**
@brief Checks that chunk is LSN one
@param type type of the chunk
@retval 1 the chunk is LNS
@retval 0 the chunk is not LSN
*/
static my_bool translog_is_LSN_chunk(uchar type)
{
DBUG_ENTER("translog_is_LSN_chunk");
DBUG_PRINT("info", ("byte: %x chunk type: %u record type: %u",
type, type >> 6, type & TRANSLOG_REC_TYPE));
DBUG_RETURN(((type & TRANSLOG_CHUNK_TYPE) == TRANSLOG_CHUNK_FIXED) ||
(((type & TRANSLOG_CHUNK_TYPE) == TRANSLOG_CHUNK_LSN) &&
((type & TRANSLOG_REC_TYPE)) != TRANSLOG_CHUNK_0_CONT));
}
/**
@brief Initialize transaction log
@ -3765,12 +3787,9 @@ my_bool translog_init_with_table(const char *directory,
scanner.page_offset= page_overhead[scanner.page[TRANSLOG_PAGE_FLAGS]];
for (;;)
{
uint chunk_type;
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
while (chunk_type != TRANSLOG_CHUNK_LSN &&
chunk_type != TRANSLOG_CHUNK_FIXED &&
uint chunk_1byte;
chunk_1byte= scanner.page[scanner.page_offset];
while (!translog_is_LSN_chunk(chunk_1byte) &&
scanner.page != END_OF_LOG &&
scanner.page[scanner.page_offset] != TRANSLOG_FILLER &&
scanner.page_addr == page_addr)
@ -3781,14 +3800,9 @@ my_bool translog_init_with_table(const char *directory,
DBUG_RETURN(1);
}
if (scanner.page != END_OF_LOG)
{
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
}
chunk_1byte= scanner.page[scanner.page_offset];
}
if (chunk_type == TRANSLOG_CHUNK_LSN ||
chunk_type == TRANSLOG_CHUNK_FIXED)
if (translog_is_LSN_chunk(chunk_1byte))
{
last_lsn= scanner.page_addr + scanner.page_offset;
if (translog_get_next_chunk(&scanner))
@ -3798,9 +3812,7 @@ my_bool translog_init_with_table(const char *directory,
}
if (scanner.page == END_OF_LOG)
break; /* it was the last record */
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
chunk_1byte= scanner.page[scanner.page_offset];
continue; /* try to find other record on this page */
}
@ -5504,7 +5516,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
LSN_IN_PARTS(log_descriptor.horizon),
LSN_IN_PARTS(horizon)));
*chunk0_header= (uchar) (type |TRANSLOG_CHUNK_LSN);
*chunk0_header= (uchar) (type | TRANSLOG_CHUNK_LSN);
int2store(chunk0_header + 1, short_trid);
translog_write_variable_record_1group_code_len(chunk0_header + 3,
parts->record_length,
@ -5585,7 +5597,8 @@ translog_write_variable_record_mgroup(LSN *lsn,
chunk0_pages--;
curr_group+= limit;
/* put special type to indicate that it is not LSN chunk */
*chunk0_header= (uchar) (TRANSLOG_CHUNK_LSN | TRANSLOG_CHUNK_0_CONT);
} while (chunk0_pages != 0);
translog_buffer_lock(cursor.buffer);
if (cmp_translog_addr(cursor.buffer->last_lsn, *lsn) < 0)
@ -5904,8 +5917,9 @@ my_bool translog_write_record(LSN *lsn,
int rc;
uint short_trid= trn->short_id;
DBUG_ENTER("translog_write_record");
DBUG_PRINT("enter", ("type: %u ShortTrID: %u rec_len: %lu",
(uint) type, (uint) short_trid, (ulong) rec_len));
DBUG_PRINT("enter", ("type: %u (%s) ShortTrID: %u rec_len: %lu",
(uint) type, log_record_type_descriptor[type].name,
(uint) short_trid, (ulong) rec_len));
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
if (unlikely(translog_status != TRANSLOG_OK))
@ -6672,12 +6686,11 @@ int translog_read_record_header_from_buffer(uchar *page,
{
translog_size_t res;
DBUG_ENTER("translog_read_record_header_from_buffer");
DBUG_ASSERT((page[page_offset] & TRANSLOG_CHUNK_TYPE) ==
TRANSLOG_CHUNK_LSN ||
(page[page_offset] & TRANSLOG_CHUNK_TYPE) ==
TRANSLOG_CHUNK_FIXED);
DBUG_ASSERT(translog_is_LSN_chunk(page[page_offset]));
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
DBUG_PRINT("info", ("page byte: 0x%x offset: %u",
(uint) page[page_offset], (uint) page_offset));
buff->type= (page[page_offset] & TRANSLOG_REC_TYPE);
buff->short_trid= uint2korr(page + page_offset + 1);
DBUG_PRINT("info", ("Type %u, Short TrID %u, LSN (%lu,0x%lx)",
@ -6816,7 +6829,6 @@ int translog_read_record_header_scan(TRANSLOG_SCANNER_DATA *scanner,
int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner,
TRANSLOG_HEADER_BUFFER *buff)
{
uint8 chunk_type;
translog_size_t res;
DBUG_ENTER("translog_read_next_record_header");
@ -6843,14 +6855,11 @@ int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner,
buff->lsn= LSN_IMPOSSIBLE;
DBUG_RETURN(RECHEADER_READ_EOF);
}
chunk_type= scanner->page[scanner->page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("Page: (%lu,0x%lx) offset: %lu type: %x byte: %x",
DBUG_PRINT("info", ("Page: (%lu,0x%lx) offset: %lu byte: %x",
LSN_IN_PARTS(scanner->page_addr),
(ulong) scanner->page_offset,
(uint) chunk_type,
(uint) scanner->page[scanner->page_offset]));
} while (chunk_type != TRANSLOG_CHUNK_LSN &&
chunk_type != TRANSLOG_CHUNK_FIXED &&
} while (!translog_is_LSN_chunk(scanner->page[scanner->page_offset]) &&
scanner->page[scanner->page_offset] != TRANSLOG_FILLER);
if (scanner->page[scanner->page_offset] == TRANSLOG_FILLER)
@ -7693,7 +7702,6 @@ static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
{
uint chunk_type;
TRANSLOG_SCANNER_DATA scanner;
LSN result;
DBUG_ENTER("translog_next_LSN");
@ -7740,11 +7748,7 @@ LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
}
}
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
while (chunk_type != TRANSLOG_CHUNK_LSN &&
chunk_type != TRANSLOG_CHUNK_FIXED &&
while (!translog_is_LSN_chunk(scanner.page[scanner.page_offset]) &&
scanner.page[scanner.page_offset] != TRANSLOG_FILLER)
{
if (translog_get_next_chunk(&scanner))
@ -7757,9 +7761,6 @@ LSN translog_next_LSN(TRANSLOG_ADDRESS addr, TRANSLOG_ADDRESS horizon)
result= LSN_IMPOSSIBLE;
goto out;
}
chunk_type= scanner.page[scanner.page_offset] & TRANSLOG_CHUNK_TYPE;
DBUG_PRINT("info", ("type: %x byte: %x", (uint) chunk_type,
(uint) scanner.page[scanner.page_offset]));
}
if (scanner.page[scanner.page_offset] == TRANSLOG_FILLER)
@ -8067,6 +8068,7 @@ void translog_set_file_size(uint32 size)
#ifdef MARIA_DUMP_LOG
#include <my_getopt.h>
extern void translog_example_table_init();
static const char *load_default_groups[]= { "maria_dump_log",0 };
static void get_options(int *argc,char * * *argv);
#ifndef DBUG_OFF
@ -8080,6 +8082,7 @@ static ulonglong opt_offset;
static ulong opt_pages;
static const char *opt_file= NULL;
static File handler= -1;
static my_bool opt_unit= 0;
static struct my_option my_long_options[] =
{
#ifdef IMPLTMENTED
@ -8104,6 +8107,10 @@ static struct my_option my_long_options[] =
GET_ULONG, REQUIRED_ARG, (long) ~(ulong) 0,
(long) 1, (long) ~(ulong) 0, (long) 0,
(long) 1, 0},
{"unit-test", 'U',
"Use unit test record table (for logs created by unittests",
(uchar **) &opt_unit, (uchar **) &opt_unit, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"version", 'V', "Print version and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
@ -8245,22 +8252,28 @@ static uchar *dump_chunk(uchar *buffer, uchar *ptr)
switch (ptr[0] & TRANSLOG_CHUNK_TYPE) {
case TRANSLOG_CHUNK_LSN:
printf(" LSN chunk type 0 (variable length)\n");
printf(" Record type %u: %s record class %s compressed LSNs: %u\n",
ptr[0] & TRANSLOG_REC_TYPE,
(log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].name ?
log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].name :
"NULL"),
record_class_string[log_record_type_descriptor[ptr[0] &
TRANSLOG_REC_TYPE].
rclass],
log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].
compressed_LSN);
if (log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].rclass !=
LOGRECTYPE_VARIABLE_LENGTH)
if (likely((ptr[0] & TRANSLOG_REC_TYPE) != TRANSLOG_CHUNK_0_CONT))
{
printf(" WARNING: this record class here can't be used "
"(stop interpretation)!!!\n");
printf(" Record type %u: %s record class %s compressed LSNs: %u\n",
ptr[0] & TRANSLOG_REC_TYPE,
(log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].name ?
log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].name :
"NULL"),
record_class_string[log_record_type_descriptor[ptr[0] &
TRANSLOG_REC_TYPE].
rclass],
log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].
compressed_LSN);
if (log_record_type_descriptor[ptr[0] & TRANSLOG_REC_TYPE].rclass !=
LOGRECTYPE_VARIABLE_LENGTH)
{
printf(" WARNING: this record class here can't be used "
"(stop interpretation)!!!\n");
break;
}
}
else
printf(" Continuation of previous chunk 0 header \n");
printf(" Short transaction id: %u\n", (uint) uint2korr(ptr + 1));
{
uchar *hdr_ptr= ptr + 1 + 2; /* chunk type and short trid */
@ -8431,7 +8444,7 @@ static void dump_datapage(uchar *buffer)
static void dump_page(uchar *buffer)
{
printf("Page by offset %lld\n", opt_offset);
printf("Page by offset %llu (0x%llx)\n", opt_offset, opt_offset);
if (strncmp((char*)maria_trans_file_magic, (char*)buffer,
sizeof(maria_trans_file_magic)) == 0)
{
@ -8450,13 +8463,17 @@ int main(int argc, char **argv)
char **default_argv;
uchar buffer[TRANSLOG_PAGE_SIZE];
MY_INIT(argv[0]);
translog_table_init();
translog_fill_overhead_table();
load_defaults("my", load_default_groups, &argc, &argv);
default_argv= argv;
get_options(&argc, &argv);
if (opt_unit)
translog_example_table_init();
else
translog_table_init();
translog_fill_overhead_table();
maria_data_root= (char *)".";
if ((handler= my_open(opt_file, O_RDONLY, MYF(MY_WME))) < 0)

View file

@ -0,0 +1,110 @@
/* Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "../maria_def.h"
#include "sequence_storage.h"
/**
@brief Initializes the sequence from the sequence file.
@param seq Reference on the sequence storage.
@param file Path to the file where to write the sequence
@retval 0 OK
@retval 1 Error
*/
my_bool seq_storage_reader_init(SEQ_STORAGE *seq, const char *file)
{
FILE *fd;
seq->pos= 0;
if ((fd= my_fopen(file, O_RDONLY, MYF(MY_WME))) == NULL)
return 1;
if (my_init_dynamic_array(&seq->seq, sizeof(ulong), 10, 10))
return 1;
for(;;)
{
ulong num;
char line[22];
if (fgets(line, sizeof(line), fd) == NULL)
break;
num= atol(line);
if (insert_dynamic(&seq->seq, (uchar*) &num))
return 1;
}
fclose(fd);
return 0;
}
/**
@brief Gets next number from the sequence storage
@param seq Reference on the sequence storage.
@return Next number from the sequence.
*/
ulong seq_storage_next(SEQ_STORAGE *seq)
{
DBUG_ASSERT(seq->seq.elements > 0);
DBUG_ASSERT(seq->pos < seq->seq.elements);
return (*(dynamic_element(&seq->seq, seq->pos++, ulong *)));
}
/**
@brief Frees resources allocated for the storage
@param seq Reference on the sequence storage.
*/
void seq_storage_destroy(SEQ_STORAGE *seq)
{
delete_dynamic(&seq->seq);
}
/**
@brief Starts the sequence from begining
@param seq Reference on the sequence storage.
*/
void seq_storage_rewind(SEQ_STORAGE *seq)
{
seq->pos= 0;
}
/**
@brief Writes a number to the sequence file.
@param file Path to the file where to write the sequence
@pagem num Number to be written
@retval 0 OK
@retval 1 Error
*/
my_bool seq_storage_write(const char *file, ulong num)
{
FILE *fd;
return ((fd= my_fopen(file, O_CREAT | O_APPEND | O_WRONLY, MYF(MY_WME))) ==
NULL ||
fprintf(fd, "%lu\n", num) < 0 ||
fclose(fd) != 0);
}

View file

@ -0,0 +1,28 @@
/* Copyright (C) 2008 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
typedef struct st_seq_storage
{
uint pos;
DYNAMIC_ARRAY seq;
} SEQ_STORAGE;
extern my_bool seq_storage_reader_init(SEQ_STORAGE *seq, const char *file);
extern ulong seq_storage_next(SEQ_STORAGE *seq);
extern void seq_storage_destroy(SEQ_STORAGE *seq);
extern void seq_storage_rewind(SEQ_STORAGE *seq);
extern my_bool seq_storage_write(const char *file, ulong num);