Bug#17402313 DUMP THREAD SENDS SOME EVENTS MORE THAN ONCE

Dump thread may encounter an error when reading events from the active binlog
file. However the errors may be temporary, so dump thread will try to read
the event again. But dump thread seeked to an wrong position, it caused some
events was sent twice.

To fix the bug, prev_pos is defined out the while loop and is set the correct
position after reading every event correctly.

This patch also make binlog_can_be_corrupted more accurate, only the binlogs
not closed normally are marked binlog_can_be_corrupted.

Finally, two warnings are added when dump threads encounter the temporary
errors.
This commit is contained in:
Libing Song 2013-09-10 09:35:49 +08:00
parent cdec34bcd0
commit d5fdf9ef88
2 changed files with 54 additions and 8 deletions

View file

@ -1027,6 +1027,17 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
if (log_file_name_arg)
*is_binlog_active= mysql_bin_log.is_active(log_file_name_arg);
DBUG_EXECUTE_IF("dump_fake_io_error",
{
if (log_lock)
{
pthread_mutex_unlock(log_lock);
DBUG_SET("-d,dump_fake_io_error");
DBUG_RETURN(LOG_READ_IO);
}
});
if (my_b_read(file, (uchar*) buf, sizeof(buf)))
{
/*

View file

@ -360,6 +360,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
int left_events = max_binlog_dump_events;
#endif
int old_max_allowed_packet= thd->variables.max_allowed_packet;
bool is_active_binlog= false;
my_off_t prev_pos= pos;
DBUG_ENTER("mysql_binlog_send");
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
@ -483,7 +486,8 @@ impossible position";
Try to find a Format_description_log_event at the beginning of
the binlog
*/
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
if (!(error = Log_event::read_log_event(&log, packet, log_lock,
log_file_name, &is_active_binlog)))
{
/*
The packet has offsets equal to the normal offsets in a binlog
@ -494,8 +498,14 @@ impossible position";
(*packet)[EVENT_TYPE_OFFSET+1]));
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
LOG_EVENT_BINLOG_IN_USE_F);
/*
If a binlog is not active, but LOG_EVENT_BINLOG_IN_USE_F
flag is 1. That means it is not closed in normal way
(E.g server crash) and may include corrupted events.
*/
binlog_can_be_corrupted= (!is_active_binlog) &&
test((*packet)[FLAGS_OFFSET+1] & LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
@ -547,12 +557,11 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed)
{
my_off_t prev_pos= pos;
bool is_active_binlog= false;
while (!(error= Log_event::read_log_event(&log, packet, log_lock,
log_file_name,
&is_active_binlog)))
{
DBUG_ASSERT(prev_pos < my_b_tell(&log));
prev_pos= my_b_tell(&log);
#ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--)
@ -580,8 +589,9 @@ impossible position";
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
LOG_EVENT_BINLOG_IN_USE_F);
binlog_can_be_corrupted= (!is_active_binlog) &&
test((*packet)[FLAGS_OFFSET+1] & LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
@ -620,9 +630,31 @@ impossible position";
here we were reading binlog that was not closed properly (as a result
of a crash ?). treat any corruption as EOF
*/
if (binlog_can_be_corrupted &&
if ((binlog_can_be_corrupted || is_active_binlog) &&
error != LOG_READ_MEM && error != LOG_READ_EOF)
{
test_for_non_eof_log_read_errors(error, &errmsg);
if (is_active_binlog)
{
sql_print_warning("Failed to read an event from active binlog(%s,%lu). "
"error: %s. Dump thread will try to read it again.",
log_file_name, (ulong)prev_pos, errmsg);
}
else
{
sql_print_warning("Failed to read an event from inactive binlog"
"(%s, %lu). error: %s. Dump thread found the binlog "
"was not rotated correctly. It will jump to next "
"binlog directly.",
log_file_name, (ulong) prev_pos, errmsg);
}
errmsg= NULL;
/*
If binlog is active, it will try to read the event again. Otherwise,
skip the corrupted events and switch to next binlog.
*/
my_b_seek(&log, prev_pos);
error=LOG_READ_EOF;
}
@ -695,6 +727,8 @@ impossible position";
/* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock);
read_packet = 1;
DBUG_ASSERT(prev_pos < my_b_tell(&log));
prev_pos= my_b_tell(&log);
break;
case LOG_READ_EOF:
@ -757,6 +791,7 @@ impossible position";
thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
switch (mysql_bin_log.find_next_log(&linfo, 1)) {
case 0:
prev_pos= BIN_LOG_HEADER_SIZE;
break;
case LOG_INFO_EOF:
if (mysql_bin_log.is_active(log_file_name))