Merge work:/home/bk/mysql-4.0

into mysql.sashanet.com:/home/sasha/src/bk/mysql-4.0
This commit is contained in:
unknown 2001-11-10 22:24:46 -07:00
commit f5a488eef4
19 changed files with 919 additions and 722 deletions

View file

@ -21,6 +21,8 @@
#include <time.h> #include <time.h>
#include "log_event.h" #include "log_event.h"
#define PROBE_HEADER_LEN (4+EVENT_LEN_OFFSET)
#define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_LOCAL_FILES) #define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_LOCAL_FILES)
char server_version[SERVER_VERSION_LENGTH]; char server_version[SERVER_VERSION_LENGTH];
@ -288,6 +290,52 @@ static void dump_remote_table(NET* net, const char* db, const char* table)
} }
} }
static int check_master_version(MYSQL* mysql)
{
MYSQL_RES* res = 0;
MYSQL_ROW row;
const char* version;
int old_format = 0;
if (mysql_query(mysql, "SELECT VERSION()")
|| !(res = mysql_store_result(mysql)))
{
mysql_close(mysql);
die("Error checking master version: %s",
mysql_error(mysql));
}
if (!(row = mysql_fetch_row(res)))
{
mysql_free_result(res);
mysql_close(mysql);
die("Master returned no rows for SELECT VERSION()");
return 1;
}
if (!(version = row[0]))
{
mysql_free_result(res);
mysql_close(mysql);
die("Master reported NULL for the version");
}
switch (*version)
{
case '3':
old_format = 1;
break;
case '4':
old_format = 0;
break;
default:
sql_print_error("Master reported unrecognized MySQL version '%s'",
version);
mysql_free_result(res);
mysql_close(mysql);
return 1;
}
mysql_free_result(res);
return old_format;
}
static void dump_remote_log_entries(const char* logname) static void dump_remote_log_entries(const char* logname)
{ {
@ -295,6 +343,9 @@ static void dump_remote_log_entries(const char* logname)
char last_db[FN_REFLEN+1] = ""; char last_db[FN_REFLEN+1] = "";
uint len; uint len;
NET* net = &mysql->net; NET* net = &mysql->net;
int old_format;
old_format = check_master_version(mysql);
if(!position) position = 4; // protect the innocent from spam if(!position) position = 4; // protect the innocent from spam
if (position < 4) if (position < 4)
{ {
@ -307,7 +358,7 @@ static void dump_remote_log_entries(const char* logname)
len = (uint) strlen(logname); len = (uint) strlen(logname);
int4store(buf + 6, 0); int4store(buf + 6, 0);
memcpy(buf + 10, logname,len); memcpy(buf + 10, logname,len);
if(simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
die("Error sending the log dump command"); die("Error sending the log dump command");
for(;;) for(;;)
@ -322,7 +373,7 @@ static void dump_remote_log_entries(const char* logname)
len, net->read_pos[5])); len, net->read_pos[5]));
Log_event * ev = Log_event::read_log_event( Log_event * ev = Log_event::read_log_event(
(const char*) net->read_pos + 1 , (const char*) net->read_pos + 1 ,
len - 1, &error); len - 1, &error, old_format);
if (ev) if (ev)
{ {
ev->print(result_file, short_form, last_db); ev->print(result_file, short_form, last_db);
@ -335,12 +386,34 @@ static void dump_remote_log_entries(const char* logname)
} }
} }
static int check_header (IO_CACHE* file)
{
char buf[PROBE_HEADER_LEN];
int old_format;
my_off_t pos = my_b_tell(file);
my_b_seek(file, (my_off_t)0);
if (my_b_read(file, buf, sizeof(buf)))
die("Failed reading header");
if (buf[EVENT_TYPE_OFFSET+4] == START_EVENT)
{
uint event_len;
event_len = uint4korr(buf + EVENT_LEN_OFFSET + 4);
old_format = (event_len < LOG_EVENT_HEADER_LEN + START_HEADER_LEN);
}
else
old_format = 0;
my_b_seek(file, pos);
return old_format;
}
static void dump_local_log_entries(const char* logname) static void dump_local_log_entries(const char* logname)
{ {
File fd = -1; File fd = -1;
IO_CACHE cache,*file= &cache; IO_CACHE cache,*file= &cache;
ulonglong rec_count = 0; ulonglong rec_count = 0;
char last_db[FN_REFLEN+1] = ""; char last_db[FN_REFLEN+1] = "";
bool old_format = 0;
if (logname && logname[0] != '-') if (logname && logname[0] != '-')
{ {
@ -349,12 +422,14 @@ static void dump_local_log_entries(const char* logname)
if (init_io_cache(file, fd, 0, READ_CACHE, (my_off_t) position, 0, if (init_io_cache(file, fd, 0, READ_CACHE, (my_off_t) position, 0,
MYF(MY_WME | MY_NABP))) MYF(MY_WME | MY_NABP)))
exit(1); exit(1);
old_format = check_header(file);
} }
else else
{ {
if (init_io_cache(file, fileno(result_file), 0, READ_CACHE, (my_off_t) 0, if (init_io_cache(file, fileno(result_file), 0, READ_CACHE, (my_off_t) 0,
0, MYF(MY_WME | MY_NABP | MY_DONT_CHECK_FILESIZE))) 0, MYF(MY_WME | MY_NABP | MY_DONT_CHECK_FILESIZE)))
exit(1); exit(1);
old_format = check_header(file);
if (position) if (position)
{ {
/* skip 'position' characters from stdout */ /* skip 'position' characters from stdout */
@ -385,7 +460,7 @@ static void dump_local_log_entries(const char* logname)
char llbuff[21]; char llbuff[21];
my_off_t old_off = my_b_tell(file); my_off_t old_off = my_b_tell(file);
Log_event* ev = Log_event::read_log_event(file); Log_event* ev = Log_event::read_log_event(file, old_format);
if (!ev) if (!ev)
{ {
if (file->error) if (file->error)

View file

@ -55,7 +55,8 @@ mysysobjects1 = my_init.lo my_static.lo my_malloc.lo my_realloc.lo \
mf_loadpath.lo my_pthread.lo my_thr_init.lo \ mf_loadpath.lo my_pthread.lo my_thr_init.lo \
thr_mutex.lo mulalloc.lo string.lo default.lo \ thr_mutex.lo mulalloc.lo string.lo default.lo \
my_compress.lo array.lo my_once.lo list.lo my_net.lo \ my_compress.lo array.lo my_once.lo list.lo my_net.lo \
charset.lo hash.lo mf_iocache.lo my_seek.lo \ charset.lo hash.lo mf_iocache.lo \
mf_iocache2.lo my_seek.lo \
my_pread.lo mf_cache.lo my_vsnprintf.lo md5.lo my_pread.lo mf_cache.lo my_vsnprintf.lo md5.lo
# Not needed in the minimum library # Not needed in the minimum library

View file

@ -168,6 +168,9 @@ while test $# -gt 0; do
USE_MANAGER=1 USE_MANAGER=1
USE_RUNNING_SERVER= USE_RUNNING_SERVER=
;; ;;
--start-and-exit)
START_AND_EXIT=1
;;
--skip-innobase) --skip-innobase)
EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT --skip-innobase" EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT --skip-innobase"
EXTRA_SLAVE_MYSQLD_OPT="$EXTRA_SLAVE_MYSQLD_OPT --skip-innobase" ;; EXTRA_SLAVE_MYSQLD_OPT="$EXTRA_SLAVE_MYSQLD_OPT --skip-innobase" ;;
@ -1091,6 +1094,10 @@ then
mysql_loadstd mysql_loadstd
fi fi
if [ "x$START_AND_EXIT" = "x1" ] ; then
echo "Servers started, exiting"
exit
fi
$ECHO "Starting Tests" $ECHO "Starting Tests"

View file

@ -16,7 +16,7 @@ n
2002 2002
show slave hosts; show slave hosts;
Server_id Host Port Rpl_recovery_rank Master_id Server_id Host Port Rpl_recovery_rank Master_id
2 127.0.0.1 9307 2 1 2 127.0.0.1 $SLAVE_MYPORT 2 1
drop table t1; drop table t1;
slave stop; slave stop;
drop table if exists t2; drop table if exists t2;

View file

@ -15,7 +15,7 @@ create table t1 (s text);
insert into t1 values('Could not break slave'),('Tried hard'); insert into t1 values('Could not break slave'),('Tried hard');
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root 9999 60 master-bin.001 234 Yes 0 0 3 127.0.0.1 root $MASTER_MYPORT 60 master-bin.001 234 Yes 0 0 3
select * from t1; select * from t1;
s s
Could not break slave Could not break slave
@ -42,7 +42,7 @@ master-bin.003
insert into t2 values (65); insert into t2 values (65);
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root 9999 60 master-bin.003 202 Yes 0 0 3 127.0.0.1 root $MASTER_MYPORT 60 master-bin.003 127 Yes 0 0 2
select * from t2; select * from t2;
m m
34 34
@ -60,12 +60,12 @@ master-bin.005
master-bin.006 master-bin.006
show master status; show master status;
File Position Binlog_do_db Binlog_ignore_db File Position Binlog_do_db Binlog_ignore_db
master-bin.006 710 master-bin.006 382
slave stop; slave stop;
slave start; slave start;
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root 9999 60 master-bin.006 710 Yes 0 0 11 127.0.0.1 root $MASTER_MYPORT 60 master-bin.006 382 Yes 0 0 6
lock tables t3 read; lock tables t3 read;
select count(*) from t3 where n >= 4; select count(*) from t3 where n >= 4;
count(*) count(*)

View file

@ -16,7 +16,7 @@ load data infile '../../std_data/words.dat' into table t1;
drop table t1; drop table t1;
show binlog events; show binlog events;
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 master-bin.001 4 Start 1 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1 master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
@ -41,7 +41,7 @@ insert into t1 values (1);
drop table t1; drop table t1;
show binlog events; show binlog events;
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 master-bin.001 4 Start 1 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1 master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
@ -54,10 +54,9 @@ master-bin.001 627 Rotate 1 10 master-bin.002;pos=4
master-bin.001 668 Stop 1 11 master-bin.001 668 Stop 1 11
show binlog events in 'master-bin.002'; show binlog events in 'master-bin.002';
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 master-bin.002 4 Query 1 1 use test; create table t1 (n int)
master-bin.002 79 Query 1 2 use test; create table t1 (n int) master-bin.002 62 Query 1 2 use test; insert into t1 values (1)
master-bin.002 137 Query 1 3 use test; insert into t1 values (1) master-bin.002 122 Query 1 3 use test; drop table t1
master-bin.002 197 Query 1 4 use test; drop table t1
show master logs; show master logs;
Log_name Log_name
master-bin.001 master-bin.001
@ -69,7 +68,7 @@ slave-bin.001
slave-bin.002 slave-bin.002
show binlog events in 'slave-bin.001' from 4; show binlog events in 'slave-bin.001' from 4;
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 slave-bin.001 4 Start 2 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4 slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4
slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key) slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
slave-bin.001 225 Intvar 1 3 INSERT_ID=1 slave-bin.001 225 Intvar 1 3 INSERT_ID=1
@ -83,14 +82,13 @@ slave-bin.001 689 Rotate 1 4 slave-bin.002;pos=4; forced by master
slave-bin.001 729 Stop 2 5 slave-bin.001 729 Stop 2 5
show binlog events in 'slave-bin.002' from 4; show binlog events in 'slave-bin.002' from 4;
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 slave-bin.002 4 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
slave-bin.002 79 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4 slave-bin.002 57 Query 1 1 use test; create table t1 (n int)
slave-bin.002 132 Query 1 2 use test; create table t1 (n int) slave-bin.002 115 Query 1 2 use test; insert into t1 values (1)
slave-bin.002 190 Query 1 3 use test; insert into t1 values (1) slave-bin.002 175 Query 1 3 use test; drop table t1
slave-bin.002 250 Query 1 4 use test; drop table t1
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 245 Yes 0 0 4 127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 170 Yes 0 0 3
show new master for slave with master_log_file='master-bin.001' and show new master for slave with master_log_file='master-bin.001' and
master_log_pos=4 and master_log_seq=1 and master_server_id=1; master_log_pos=4 and master_log_seq=1 and master_server_id=1;
Log_name Log_pos Log_name Log_pos
@ -106,8 +104,8 @@ slave-bin.001 439
show new master for slave with master_log_file='master-bin.002' and show new master for slave with master_log_file='master-bin.002' and
master_log_pos=4 and master_log_seq=1 and master_server_id=1; master_log_pos=4 and master_log_seq=1 and master_server_id=1;
Log_name Log_pos Log_name Log_pos
slave-bin.002 132 slave-bin.002 57
show new master for slave with master_log_file='master-bin.002' and show new master for slave with master_log_file='master-bin.002' and
master_log_pos=137 and master_log_seq=3 and master_server_id=1; master_log_pos=137 and master_log_seq=3 and master_server_id=1;
Log_name Log_pos Log_name Log_pos
slave-bin.002 250 slave-bin.002 223

View file

@ -23,7 +23,6 @@ insert into t1 values('Could not break slave'),('Tried hard');
save_master_pos; save_master_pos;
connection slave; connection slave;
sync_with_master; sync_with_master;
--replace_result 9306 9999 3334 9999 3335 9999
show slave status; show slave status;
select * from t1; select * from t1;
connection master; connection master;
@ -70,7 +69,6 @@ insert into t2 values (65);
save_master_pos; save_master_pos;
connection slave; connection slave;
sync_with_master; sync_with_master;
--replace_result 9306 9999 3334 9999 3335 9999
show slave status; show slave status;
select * from t2; select * from t2;
connection master; connection master;
@ -92,7 +90,6 @@ connection slave;
slave stop; slave stop;
slave start; slave start;
sync_with_master; sync_with_master;
--replace_result 9306 9999 3334 9999 3335 9999
show slave status; show slave status;
# because of concurrent insert, the table may not be up to date # because of concurrent insert, the table may not be up to date
# if we do not lock # if we do not lock

View file

@ -166,7 +166,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->seek_not_done= test(file >= 0 && type != READ_FIFO && info->seek_not_done= test(file >= 0 && type != READ_FIFO &&
type != READ_NET); type != READ_NET);
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP); info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
info->rc_request_pos=info->rc_pos=info->buffer; info->rc_request_pos=info->rc_pos= info->write_pos = info->buffer;
info->write_pos = info->write_end = 0;
if (type == SEQ_READ_APPEND) if (type == SEQ_READ_APPEND)
{ {
info->append_read_pos = info->write_pos = info->append_buffer; info->append_read_pos = info->write_pos = info->append_buffer;
@ -308,6 +309,11 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
{ {
info->append_read_pos = info->write_pos = info->append_buffer; info->append_read_pos = info->write_pos = info->append_buffer;
} }
if (!info->write_pos)
info->write_pos = info->buffer;
if (!info->write_end)
info->write_end = info->buffer+info->buffer_length-
(seek_offset & (IO_SIZE-1));
info->type=type; info->type=type;
info->error=0; info->error=0;
init_read_function(info,type); init_read_function(info,type);

View file

@ -81,7 +81,8 @@ static int find_uniq_filename(char *name)
MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1), MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1),
name(0), log_type(LOG_CLOSED),write_error(0), name(0), log_type(LOG_CLOSED),write_error(0),
inited(0), log_seq(1), file_id(1),no_rotate(0) inited(0), log_seq(1), file_id(1),no_rotate(0),
need_start_event(1)
{ {
/* /*
We don't want to intialize LOCK_Log here as the thread system may We don't want to intialize LOCK_Log here as the thread system may
@ -136,9 +137,11 @@ bool MYSQL_LOG::open_index( int options)
MYF(MY_WME))) < 0); MYF(MY_WME))) < 0);
} }
void MYSQL_LOG::init(enum_log_type log_type_arg) void MYSQL_LOG::init(enum_log_type log_type_arg,
enum cache_type io_cache_type_arg)
{ {
log_type = log_type_arg; log_type = log_type_arg;
io_cache_type = io_cache_type_arg;
if (!inited) if (!inited)
{ {
inited=1; inited=1;
@ -184,7 +187,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
if ((file=my_open(log_file_name,O_CREAT | O_APPEND | O_WRONLY | O_BINARY, if ((file=my_open(log_file_name,O_CREAT | O_APPEND | O_WRONLY | O_BINARY,
MYF(MY_WME | ME_WAITTANG))) < 0 || MYF(MY_WME | ME_WAITTANG))) < 0 ||
init_io_cache(&log_file, file, IO_SIZE, WRITE_CACHE, init_io_cache(&log_file, file, IO_SIZE, io_cache_type,
my_tell(file,MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP))) my_tell(file,MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP)))
goto err; goto err;
@ -220,6 +223,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
} }
else if (log_type == LOG_BIN) else if (log_type == LOG_BIN)
{ {
bool error;
/* /*
Explanation of the boolean black magic: Explanation of the boolean black magic:
if we are supposed to write magic number try write if we are supposed to write magic number try write
@ -232,10 +236,13 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
goto err; goto err;
log_seq = 1; log_seq = 1;
Start_log_event s; if (need_start_event)
bool error; {
s.set_log_seq(0, this); Start_log_event s;
s.write(&log_file); s.set_log_seq(0, this);
s.write(&log_file);
need_start_event=0;
}
flush_io_cache(&log_file); flush_io_cache(&log_file);
pthread_mutex_lock(&LOCK_index); pthread_mutex_lock(&LOCK_index);
error=(my_write(index_file, (byte*) log_file_name, strlen(log_file_name), error=(my_write(index_file, (byte*) log_file_name, strlen(log_file_name),
@ -715,7 +722,8 @@ bool MYSQL_LOG::write(Log_event* event_info)
file == &log_file && flush_io_cache(file)) file == &log_file && flush_io_cache(file))
goto err; goto err;
error=0; error=0;
should_rotate = (file == &log_file && my_b_tell(file) >= max_binlog_size); should_rotate = (file == &log_file &&
(uint)my_b_tell(file) >= max_binlog_size);
err: err:
if (error) if (error)
{ {

View file

@ -149,12 +149,21 @@ static void cleanup_load_tmpdir()
#endif #endif
Log_event::Log_event(const char* buf):cached_event_len(0),temp_buf(0) Log_event::Log_event(const char* buf, bool old_format):
cached_event_len(0),temp_buf(0)
{ {
when = uint4korr(buf); when = uint4korr(buf);
server_id = uint4korr(buf + SERVER_ID_OFFSET); server_id = uint4korr(buf + SERVER_ID_OFFSET);
log_seq = uint4korr(buf + LOG_SEQ_OFFSET); if (old_format)
flags = uint2korr(buf + FLAGS_OFFSET); {
log_seq=0;
flags=0;
}
else
{
log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
flags = uint2korr(buf + FLAGS_OFFSET);
}
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
thd = 0; thd = 0;
#endif #endif
@ -441,17 +450,24 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
#define UNLOCK_MUTEX #define UNLOCK_MUTEX
#endif #endif
#ifndef MYSQL_CLIENT
#define LOCK_MUTEX if(log_lock) pthread_mutex_lock(log_lock);
#else
#define LOCK_MUTEX
#endif
// allocates memory - the caller is responsible for clean-up // allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock) Log_event* Log_event::read_log_event(IO_CACHE* file,
pthread_mutex_t* log_lock,
bool old_format)
#else #else
Log_event* Log_event::read_log_event(IO_CACHE* file) Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
#endif #endif
{ {
char head[LOG_EVENT_HEADER_LEN]; char head[LOG_EVENT_HEADER_LEN];
#ifndef MYSQL_CLIENT LOCK_MUTEX;
if(log_lock) pthread_mutex_lock(log_lock);
#endif
if (my_b_read(file, (byte *) head, sizeof(head))) if (my_b_read(file, (byte *) head, sizeof(head)))
{ {
UNLOCK_MUTEX; UNLOCK_MUTEX;
@ -489,7 +505,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file)
error = "read error"; error = "read error";
goto err; goto err;
} }
if ((res = read_log_event(buf, data_len, &error))) if ((res = read_log_event(buf, data_len, &error, old_format)))
res->register_temp_buf(buf); res->register_temp_buf(buf);
err: err:
UNLOCK_MUTEX; UNLOCK_MUTEX;
@ -502,7 +518,7 @@ err:
} }
Log_event* Log_event::read_log_event(const char* buf, int event_len, Log_event* Log_event::read_log_event(const char* buf, int event_len,
const char **error) const char **error, bool old_format)
{ {
if (event_len < EVENT_LEN_OFFSET || if (event_len < EVENT_LEN_OFFSET ||
(uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET)) (uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET))
@ -513,14 +529,14 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
switch(buf[EVENT_TYPE_OFFSET]) switch(buf[EVENT_TYPE_OFFSET])
{ {
case QUERY_EVENT: case QUERY_EVENT:
ev = new Query_log_event(buf, event_len); ev = new Query_log_event(buf, event_len, old_format);
break; break;
case LOAD_EVENT: case LOAD_EVENT:
case NEW_LOAD_EVENT: case NEW_LOAD_EVENT:
ev = new Load_log_event(buf, event_len); ev = new Load_log_event(buf, event_len, old_format);
break; break;
case ROTATE_EVENT: case ROTATE_EVENT:
ev = new Rotate_log_event(buf, event_len); ev = new Rotate_log_event(buf, event_len, old_format);
break; break;
case SLAVE_EVENT: case SLAVE_EVENT:
ev = new Slave_log_event(buf, event_len); ev = new Slave_log_event(buf, event_len);
@ -538,13 +554,13 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Execute_load_log_event(buf, event_len); ev = new Execute_load_log_event(buf, event_len);
break; break;
case START_EVENT: case START_EVENT:
ev = new Start_log_event(buf); ev = new Start_log_event(buf, old_format);
break; break;
case STOP_EVENT: case STOP_EVENT:
ev = new Stop_log_event(buf); ev = new Stop_log_event(buf, old_format);
break; break;
case INTVAR_EVENT: case INTVAR_EVENT:
ev = new Intvar_log_event(buf); ev = new Intvar_log_event(buf, old_format);
break; break;
default: default:
break; break;
@ -634,7 +650,8 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
#endif /* #ifdef MYSQL_CLIENT */ #endif /* #ifdef MYSQL_CLIENT */
Start_log_event::Start_log_event(const char* buf) :Log_event(buf) Start_log_event::Start_log_event(const char* buf,
bool old_format) :Log_event(buf, old_format)
{ {
binlog_version = uint2korr(buf + LOG_EVENT_HEADER_LEN + binlog_version = uint2korr(buf + LOG_EVENT_HEADER_LEN +
ST_BINLOG_VER_OFFSET); ST_BINLOG_VER_OFFSET);
@ -652,8 +669,9 @@ int Start_log_event::write_data(IO_CACHE* file)
return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
} }
Rotate_log_event::Rotate_log_event(const char* buf, int event_len): Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
Log_event(buf),new_log_ident(NULL),alloced(0) bool old_format):
Log_event(buf, old_format),new_log_ident(NULL),alloced(0)
{ {
// the caller will ensure that event_len is what we have at // the caller will ensure that event_len is what we have at
// EVENT_LEN_OFFSET // EVENT_LEN_OFFSET
@ -695,8 +713,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
} }
#endif #endif
Query_log_event::Query_log_event(const char* buf, int event_len): Query_log_event::Query_log_event(const char* buf, int event_len,
Log_event(buf),data_buf(0), query(NULL), db(NULL) bool old_format):
Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL)
{ {
if ((uint)event_len < QUERY_EVENT_OVERHEAD) if ((uint)event_len < QUERY_EVENT_OVERHEAD)
return; return;
@ -766,7 +785,8 @@ int Query_log_event::write_data(IO_CACHE* file)
my_b_write(file, (byte*) query, q_len)) ? -1 : 0; my_b_write(file, (byte*) query, q_len)) ? -1 : 0;
} }
Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf) Intvar_log_event::Intvar_log_event(const char* buf, bool old_format):
Log_event(buf, old_format)
{ {
buf += LOG_EVENT_HEADER_LEN; buf += LOG_EVENT_HEADER_LEN;
type = buf[I_TYPE_OFFSET]; type = buf[I_TYPE_OFFSET];
@ -1003,8 +1023,9 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
// the caller must do buf[event_len] = 0 before he starts using the // the caller must do buf[event_len] = 0 before he starts using the
// constructed event // constructed event
Load_log_event::Load_log_event(const char* buf, int event_len): Load_log_event::Load_log_event(const char* buf, int event_len,
Log_event(buf),num_fields(0),fields(0), bool old_format):
Log_event(buf, old_format),num_fields(0),fields(0),
field_lens(0),field_block_len(0), field_lens(0),field_block_len(0),
table_name(0),db(0),fname(0) table_name(0),db(0),fname(0)
{ {
@ -1237,7 +1258,7 @@ void Slave_log_event::init_from_mem_pool(int data_size)
} }
Slave_log_event::Slave_log_event(const char* buf, int event_len): Slave_log_event::Slave_log_event(const char* buf, int event_len):
Log_event(buf),mem_pool(0),master_host(0) Log_event(buf,0),mem_pool(0),master_host(0)
{ {
event_len -= LOG_EVENT_HEADER_LEN; event_len -= LOG_EVENT_HEADER_LEN;
if(event_len < 0) if(event_len < 0)
@ -1291,7 +1312,7 @@ int Create_file_log_event::write_base(IO_CACHE* file)
} }
Create_file_log_event::Create_file_log_event(const char* buf, int len): Create_file_log_event::Create_file_log_event(const char* buf, int len):
Load_log_event(buf,0),fake_base(0),block(0) Load_log_event(buf,0,0),fake_base(0),block(0)
{ {
int block_offset; int block_offset;
if (copy_log_event(buf,len)) if (copy_log_event(buf,len))
@ -1347,7 +1368,7 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg,
#endif #endif
Append_block_log_event::Append_block_log_event(const char* buf, int len): Append_block_log_event::Append_block_log_event(const char* buf, int len):
Log_event(buf),block(0) Log_event(buf, 0),block(0)
{ {
if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD) if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
return; return;
@ -1399,7 +1420,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg):
#endif #endif
Delete_file_log_event::Delete_file_log_event(const char* buf, int len): Delete_file_log_event::Delete_file_log_event(const char* buf, int len):
Log_event(buf),file_id(0) Log_event(buf, 0),file_id(0)
{ {
if((uint)len < DELETE_FILE_EVENT_OVERHEAD) if((uint)len < DELETE_FILE_EVENT_OVERHEAD)
return; return;
@ -1446,7 +1467,7 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg):
#endif #endif
Execute_load_log_event::Execute_load_log_event(const char* buf,int len): Execute_load_log_event::Execute_load_log_event(const char* buf,int len):
Log_event(buf),file_id(0) Log_event(buf, 0),file_id(0)
{ {
if((uint)len < EXEC_LOAD_EVENT_OVERHEAD) if((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
return; return;
@ -1657,15 +1678,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
int Start_log_event::exec_event(struct st_master_info* mi) int Start_log_event::exec_event(struct st_master_info* mi)
{ {
#ifdef TO_BE_DELETED if (!mi->old_format)
/* {
We can't close temporary files or cleanup the tmpdir here, becasue close_temporary_tables(thd);
someone may have just rotated the logs on the master. cleanup_load_tmpdir();
We should only do this cleanup when we know the master restarted. }
*/
close_temporary_tables(thd);
cleanup_load_tmpdir();
#endif
return Log_event::exec_event(mi); return Log_event::exec_event(mi);
} }
@ -1866,7 +1883,9 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
slave_print_error(my_errno, "Could not open file '%s'", fname); slave_print_error(my_errno, "Could not open file '%s'", fname);
goto err; goto err;
} }
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,0)) if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
(pthread_mutex_t*)0,
(bool)0))
|| lev->get_type_code() != NEW_LOAD_EVENT) || lev->get_type_code() != NEW_LOAD_EVENT)
{ {
slave_print_error(0, "File '%s' appears corrupted", fname); slave_print_error(0, "File '%s' appears corrupted", fname);

View file

@ -242,7 +242,7 @@ public:
virtual Log_event_type get_type_code() = 0; virtual Log_event_type get_type_code() = 0;
virtual bool is_valid() = 0; virtual bool is_valid() = 0;
virtual bool get_cache_stmt() { return 0; } virtual bool get_cache_stmt() { return 0; }
Log_event(const char* buf); Log_event(const char* buf, bool old_format);
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Log_event(THD* thd_arg, uint16 flags_arg = 0); Log_event(THD* thd_arg, uint16 flags_arg = 0);
#endif #endif
@ -268,12 +268,14 @@ public:
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
// if mutex is 0, the read will proceed without mutex // if mutex is 0, the read will proceed without mutex
static Log_event* read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock); static Log_event* read_log_event(IO_CACHE* file,
pthread_mutex_t* log_lock,
bool old_format);
#else // avoid having to link mysqlbinlog against libpthread #else // avoid having to link mysqlbinlog against libpthread
static Log_event* read_log_event(IO_CACHE* file); static Log_event* read_log_event(IO_CACHE* file, bool old_format);
#endif #endif
static Log_event* read_log_event(const char* buf, int event_len, static Log_event* read_log_event(const char* buf, int event_len,
const char **error); const char **error, bool old_format);
const char* get_type_str(); const char* get_type_str();
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
@ -317,7 +319,7 @@ public:
bool get_cache_stmt() { return cache_stmt; } bool get_cache_stmt() { return cache_stmt; }
#endif #endif
Query_log_event(const char* buf, int event_len); Query_log_event(const char* buf, int event_len, bool old_format);
~Query_log_event() ~Query_log_event()
{ {
if (data_buf) if (data_buf)
@ -411,7 +413,7 @@ public:
int exec_event(NET* net, struct st_master_info* mi); int exec_event(NET* net, struct st_master_info* mi);
#endif #endif
Load_log_event(const char* buf, int event_len); Load_log_event(const char* buf, int event_len, bool old_format);
~Load_log_event() ~Load_log_event()
{ {
} }
@ -451,7 +453,7 @@ public:
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
} }
#endif #endif
Start_log_event(const char* buf); Start_log_event(const char* buf, bool old_format);
~Start_log_event() {} ~Start_log_event() {}
Log_event_type get_type_code() { return START_EVENT;} Log_event_type get_type_code() { return START_EVENT;}
int write_data(IO_CACHE* file); int write_data(IO_CACHE* file);
@ -479,7 +481,7 @@ public:
:Log_event(thd_arg),val(val_arg),type(type_arg) :Log_event(thd_arg),val(val_arg),type(type_arg)
{} {}
#endif #endif
Intvar_log_event(const char* buf); Intvar_log_event(const char* buf, bool old_format);
~Intvar_log_event() {} ~Intvar_log_event() {}
Log_event_type get_type_code() { return INTVAR_EVENT;} Log_event_type get_type_code() { return INTVAR_EVENT;}
const char* get_var_type_name(); const char* get_var_type_name();
@ -503,7 +505,8 @@ public:
Stop_log_event() :Log_event((THD*)0) Stop_log_event() :Log_event((THD*)0)
{} {}
#endif #endif
Stop_log_event(const char* buf):Log_event(buf) Stop_log_event(const char* buf, bool old_format):Log_event(buf,
old_format)
{ {
} }
~Stop_log_event() {} ~Stop_log_event() {}
@ -534,7 +537,7 @@ public:
alloced(0) alloced(0)
{} {}
#endif #endif
Rotate_log_event(const char* buf, int event_len); Rotate_log_event(const char* buf, int event_len, bool old_format);
~Rotate_log_event() ~Rotate_log_event()
{ {
if (alloced) if (alloced)
@ -686,7 +689,6 @@ public:
#endif #endif
}; };
#endif #endif

View file

@ -20,12 +20,21 @@
#include "repl_failsafe.h" #include "repl_failsafe.h"
#include "sql_repl.h" #include "sql_repl.h"
#include "slave.h" #include "slave.h"
#include "sql_acl.h"
#include "mini_client.h" #include "mini_client.h"
#include "log_event.h"
#include <mysql.h> #include <mysql.h>
#include <thr_alarm.h>
#define SLAVE_LIST_CHUNK 128
#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
RPL_STATUS rpl_status=RPL_NULL; RPL_STATUS rpl_status=RPL_NULL;
pthread_mutex_t LOCK_rpl_status; pthread_mutex_t LOCK_rpl_status;
pthread_cond_t COND_rpl_status; pthread_cond_t COND_rpl_status;
HASH slave_list;
extern const char* any_db;
const char *rpl_role_type[] = {"MASTER","SLAVE",NullS}; const char *rpl_role_type[] = {"MASTER","SLAVE",NullS};
TYPELIB rpl_role_typelib = {array_elements(rpl_role_type)-1,"", TYPELIB rpl_role_typelib = {array_elements(rpl_role_type)-1,"",
@ -37,6 +46,10 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE",
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"", TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
rpl_status_type}; rpl_status_type};
static Slave_log_event* find_slave_event(IO_CACHE* log,
const char* log_file_name,
char* errmsg);
static int init_failsafe_rpl_thread(THD* thd) static int init_failsafe_rpl_thread(THD* thd)
{ {
DBUG_ENTER("init_failsafe_rpl_thread"); DBUG_ENTER("init_failsafe_rpl_thread");
@ -89,6 +102,333 @@ void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
pthread_mutex_unlock(&LOCK_rpl_status); pthread_mutex_unlock(&LOCK_rpl_status);
} }
#define get_object(p, obj) \
{\
uint len = (uint)*p++; \
if (p + len > p_end || len >= sizeof(obj)) \
goto err; \
strmake(obj,(char*) p,len); \
p+= len; \
}\
static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi)
{
return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name,
mi->pos);
}
void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
{
if (need_mutex)
pthread_mutex_lock(&LOCK_slave_list);
if (thd->server_id)
{
SLAVE_INFO* old_si;
if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
(byte*)&thd->server_id, 4)) &&
(!only_mine || old_si->thd == thd))
hash_delete(&slave_list, (byte*)old_si);
}
if (need_mutex)
pthread_mutex_unlock(&LOCK_slave_list);
}
int register_slave(THD* thd, uchar* packet, uint packet_length)
{
SLAVE_INFO *si;
int res = 1;
uchar* p = packet, *p_end = packet + packet_length;
if (check_access(thd, FILE_ACL, any_db))
return 1;
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
goto err;
thd->server_id = si->server_id = uint4korr(p);
p += 4;
get_object(p,si->host);
get_object(p,si->user);
get_object(p,si->password);
si->port = uint2korr(p);
p += 2;
si->rpl_recovery_rank = uint4korr(p);
p += 4;
if (!(si->master_id = uint4korr(p)))
si->master_id = server_id;
si->thd = thd;
pthread_mutex_lock(&LOCK_slave_list);
unregister_slave(thd,0,0);
res = hash_insert(&slave_list, (byte*) si);
pthread_mutex_unlock(&LOCK_slave_list);
return res;
err:
if (si)
my_free((gptr) si, MYF(MY_WME));
return res;
}
static uint32* slave_list_key(SLAVE_INFO* si, uint* len,
my_bool not_used __attribute__((unused)))
{
*len = 4;
return &si->server_id;
}
static void slave_info_free(void *s)
{
my_free((gptr) s, MYF(MY_WME));
}
void init_slave_list()
{
hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0,
(hash_get_key) slave_list_key, slave_info_free, 0);
pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
}
void end_slave_list()
{
pthread_mutex_lock(&LOCK_slave_list);
hash_free(&slave_list);
pthread_mutex_unlock(&LOCK_slave_list);
pthread_mutex_destroy(&LOCK_slave_list);
}
static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg)
{
uint32 log_seq = mi->last_log_seq;
uint32 target_server_id = mi->server_id;
for (;;)
{
Log_event* ev;
if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0,
0)))
{
if (log->error > 0)
strmov(errmsg, "Binary log truncated in the middle of event");
else if (log->error < 0)
strmov(errmsg, "I/O error reading binary log");
else
strmov(errmsg, "Could not find target event in the binary log");
return 1;
}
if (ev->log_seq == log_seq && ev->server_id == target_server_id)
{
delete ev;
mi->pos = my_b_tell(log);
return 0;
}
delete ev;
}
}
int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
{
LOG_INFO linfo;
char search_file_name[FN_REFLEN],last_log_name[FN_REFLEN];
IO_CACHE log;
File file = -1, last_file = -1;
pthread_mutex_t *log_lock;
const char* errmsg_p;
Slave_log_event* sev = 0;
my_off_t last_pos = 0;
int error = 1;
int cmp_res;
LINT_INIT(cmp_res);
if (!mysql_bin_log.is_open())
{
strmov(errmsg,"Binary log is not open");
return 1;
}
if (!server_id_supplied)
{
strmov(errmsg, "Misconfigured master - server id was not set");
return 1;
}
linfo.index_file_offset = 0;
search_file_name[0] = 0;
if (mysql_bin_log.find_first_log(&linfo, search_file_name))
{
strmov(errmsg,"Could not find first log");
return 1;
}
thd->current_linfo = &linfo;
bzero((char*) &log,sizeof(log));
log_lock = mysql_bin_log.get_log_lock();
pthread_mutex_lock(log_lock);
for (;;)
{
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0)
{
strmov(errmsg, errmsg_p);
goto err;
}
if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg)))
goto err;
cmp_res = cmp_master_pos(sev, mi);
delete sev;
if (!cmp_res)
{
/* Copy basename */
fn_format(mi->log_file_name, linfo.log_file_name, "","",1);
mi->pos = my_b_tell(&log);
goto mi_inited;
}
else if (cmp_res > 0)
{
if (!last_pos)
{
strmov(errmsg,
"Slave event in first log points past the target position");
goto err;
}
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0,
MYF(MY_WME)))
{
errmsg[0] = 0;
goto err;
}
break;
}
strmov(last_log_name, linfo.log_file_name);
last_pos = my_b_tell(&log);
switch (mysql_bin_log.find_next_log(&linfo)) {
case LOG_INFO_EOF:
if (last_file >= 0)
(void)my_close(last_file, MYF(MY_WME));
last_file = -1;
goto found_log;
case 0:
break;
default:
strmov(errmsg, "Error reading log index");
goto err;
}
end_io_cache(&log);
if (last_file >= 0)
(void) my_close(last_file, MYF(MY_WME));
last_file = file;
}
found_log:
my_b_seek(&log, last_pos);
if (find_target_pos(mi,&log,errmsg))
goto err;
fn_format(mi->log_file_name, last_log_name, "","",1); /* Copy basename */
mi_inited:
error = 0;
err:
pthread_mutex_unlock(log_lock);
end_io_cache(&log);
pthread_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
pthread_mutex_unlock(&LOCK_thread_count);
if (file >= 0)
(void) my_close(file, MYF(MY_WME));
if (last_file >= 0 && last_file != file)
(void) my_close(last_file, MYF(MY_WME));
return error;
}
// caller must delete result when done
static Slave_log_event* find_slave_event(IO_CACHE* log,
const char* log_file_name,
char* errmsg)
{
Log_event* ev;
int i;
bool slave_event_found = 0;
LINT_INIT(ev);
for (i = 0; i < 2; i++)
{
if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, 0)))
{
my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
"Error reading event in log '%s'",
(char*)log_file_name);
return 0;
}
if (ev->get_type_code() == SLAVE_EVENT)
{
slave_event_found = 1;
break;
}
delete ev;
}
if (!slave_event_found)
{
my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
"Could not find slave event in log '%s'",
(char*)log_file_name);
delete ev;
return 0;
}
return (Slave_log_event*)ev;
}
int show_new_master(THD* thd)
{
DBUG_ENTER("show_new_master");
List<Item> field_list;
char errmsg[SLAVE_ERRMSG_SIZE];
LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
errmsg[0]=0; // Safety
if (translate_master(thd, lex_mi, errmsg))
{
if (errmsg[0])
net_printf(&thd->net, ER_ERROR_WHEN_EXECUTING_COMMAND,
"SHOW NEW MASTER", errmsg);
else
send_error(&thd->net, 0);
DBUG_RETURN(1);
}
else
{
String* packet = &thd->packet;
field_list.push_back(new Item_empty_string("Log_name", 20));
field_list.push_back(new Item_empty_string("Log_pos", 20));
if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
packet->length(0);
net_store_data(packet, lex_mi->log_file_name);
net_store_data(packet, (longlong)lex_mi->pos);
if (my_net_write(&thd->net, packet->ptr(), packet->length()))
DBUG_RETURN(-1);
send_eof(&thd->net);
DBUG_RETURN(0);
}
}
int update_slave_list(MYSQL* mysql) int update_slave_list(MYSQL* mysql)
{ {
MYSQL_RES* res=0; MYSQL_RES* res=0;
@ -216,6 +556,277 @@ err:
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int show_slave_hosts(THD* thd)
{
List<Item> field_list;
NET* net = &thd->net;
String* packet = &thd->packet;
DBUG_ENTER("show_slave_hosts");
field_list.push_back(new Item_empty_string("Server_id", 20));
field_list.push_back(new Item_empty_string("Host", 20));
if (opt_show_slave_auth_info)
{
field_list.push_back(new Item_empty_string("User",20));
field_list.push_back(new Item_empty_string("Password",20));
}
field_list.push_back(new Item_empty_string("Port",20));
field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
field_list.push_back(new Item_empty_string("Master_id", 20));
if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
pthread_mutex_lock(&LOCK_slave_list);
for (uint i = 0; i < slave_list.records; ++i)
{
SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i);
packet->length(0);
net_store_data(packet, si->server_id);
net_store_data(packet, si->host);
if (opt_show_slave_auth_info)
{
net_store_data(packet, si->user);
net_store_data(packet, si->password);
}
net_store_data(packet, (uint32) si->port);
net_store_data(packet, si->rpl_recovery_rank);
net_store_data(packet, si->master_id);
if (my_net_write(net, (char*)packet->ptr(), packet->length()))
{
pthread_mutex_unlock(&LOCK_slave_list);
DBUG_RETURN(-1);
}
}
pthread_mutex_unlock(&LOCK_slave_list);
send_eof(net);
DBUG_RETURN(0);
}
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
{
if (!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))
{
sql_print_error("Connection to master failed: %s",
mc_mysql_error(mysql));
return 1;
}
return 0;
}
static inline void cleanup_mysql_results(MYSQL_RES* db_res,
MYSQL_RES** cur, MYSQL_RES** start)
{
for( ; cur >= start; --cur)
{
if (*cur)
mc_mysql_free_result(*cur);
}
mc_mysql_free_result(db_res);
}
static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
MYSQL_RES* table_res)
{
MYSQL_ROW row;
for( row = mc_mysql_fetch_row(table_res); row;
row = mc_mysql_fetch_row(table_res))
{
TABLE_LIST table;
const char* table_name = row[0];
int error;
if (table_rules_on)
{
table.next = 0;
table.db = (char*)db;
table.real_name = (char*)table_name;
table.updating = 1;
if (!tables_ok(thd, &table))
continue;
}
if ((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql)))
return error;
}
return 0;
}
int load_master_data(THD* thd)
{
MYSQL mysql;
MYSQL_RES* master_status_res = 0;
bool slave_was_running = 0;
int error = 0;
mc_mysql_init(&mysql);
// we do not want anyone messing with the slave at all for the entire
// duration of the data load;
pthread_mutex_lock(&LOCK_slave);
// first, kill the slave
if ((slave_was_running = slave_running))
{
abort_slave = 1;
KICK_SLAVE;
thd->proc_info = "waiting for slave to die";
while (slave_running)
pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
}
if (connect_to_master(thd, &mysql, &glob_mi))
{
net_printf(&thd->net, error = ER_CONNECT_TO_MASTER,
mc_mysql_error(&mysql));
goto err;
}
// now that we are connected, get all database and tables in each
{
MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
uint num_dbs;
if (mc_mysql_query(&mysql, "show databases", 0) ||
!(db_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
if (!(num_dbs = (uint) mc_mysql_num_rows(db_res)))
goto err;
// in theory, the master could have no databases at all
// and run with skip-grant
if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
{
net_printf(&thd->net, error = ER_OUTOFMEMORY);
goto err;
}
// this is a temporary solution until we have online backup
// capabilities - to be replaced once online backup is working
// we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
// can to minimize the lock time
if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) ||
mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
!(master_status_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
// go through every table in every database, and if the replication
// rules allow replicating it, get it
table_res_end = table_res + num_dbs;
for(cur_table_res = table_res; cur_table_res < table_res_end;
cur_table_res++)
{
// since we know how many rows we have, this can never be NULL
MYSQL_ROW row = mc_mysql_fetch_row(db_res);
char* db = row[0];
/*
Do not replicate databases excluded by rules
also skip mysql database - in most cases the user will
mess up and not exclude mysql database with the rules when
he actually means to - in this case, he is up for a surprise if
his priv tables get dropped and downloaded from master
TO DO - add special option, not enabled
by default, to allow inclusion of mysql database into load
data from master
*/
if (!db_ok(db, replicate_do_db, replicate_ignore_db) ||
!strcmp(db,"mysql"))
{
*cur_table_res = 0;
continue;
}
if (mysql_rm_db(thd, db, 1,1) ||
mysql_create_db(thd, db, 0, 1))
{
send_error(&thd->net, 0, 0);
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
goto err;
}
if (mc_mysql_select_db(&mysql, db) ||
mc_mysql_query(&mysql, "show tables", 0) ||
!(*cur_table_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
goto err;
}
if ((error = fetch_db_tables(thd, &mysql, db, *cur_table_res)))
{
// we do not report the error - fetch_db_tables handles it
cleanup_mysql_results(db_res, cur_table_res, table_res);
goto err;
}
}
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
// adjust position in the master
if (master_status_res)
{
MYSQL_ROW row = mc_mysql_fetch_row(master_status_res);
/*
We need this check because the master may not be running with
log-bin, but it will still allow us to do all the steps
of LOAD DATA FROM MASTER - no reason to forbid it, really,
although it does not make much sense for the user to do it
*/
if (row[0] && row[1])
{
strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name));
glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB
if (glob_mi.pos < 4)
glob_mi.pos = 4; // don't hit the magic number
glob_mi.pending = 0;
flush_master_info(&glob_mi);
}
mc_mysql_free_result(master_status_res);
}
if (mc_mysql_query(&mysql, "UNLOCK TABLES", 0))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
}
err:
pthread_mutex_unlock(&LOCK_slave);
if (slave_was_running)
start_slave(0, 0);
mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
if (!error)
send_ok(&thd->net);
return error;
}

View file

@ -2,6 +2,8 @@
#define REPL_FAILSAFE_H #define REPL_FAILSAFE_H
#include "mysql.h" #include "mysql.h"
#include "my_sys.h"
#include "slave.h"
typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE, typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE,
RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER, RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER,
@ -19,4 +21,18 @@ pthread_handler_decl(handle_failsafe_rpl,arg);
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status); void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status);
int find_recovery_captain(THD* thd, MYSQL* mysql); int find_recovery_captain(THD* thd, MYSQL* mysql);
int update_slave_list(MYSQL* mysql); int update_slave_list(MYSQL* mysql);
extern HASH slave_list;
int load_master_data(THD* thd);
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
int show_new_master(THD* thd);
int show_slave_hosts(THD* thd);
int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg);
void init_slave_list();
void end_slave_list();
int register_slave(THD* thd, uchar* packet, uint packet_length);
void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
#endif #endif

View file

@ -61,6 +61,8 @@ static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db, static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name); const char* table_name);
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi);
char* rewrite_db(char* db); char* rewrite_db(char* db);
static void free_table_ent(TABLE_RULE_ENT* e) static void free_table_ent(TABLE_RULE_ENT* e)
@ -333,6 +335,54 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
return 1; return 1;
} }
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
{
MYSQL_RES* res;
MYSQL_ROW row;
const char* version;
const char* errmsg = 0;
if (mc_mysql_query(mysql, "SELECT VERSION()", 0)
|| !(res = mc_mysql_store_result(mysql)))
{
sql_print_error("Error checking master version: %s",
mc_mysql_error(mysql));
return 1;
}
if (!(row = mc_mysql_fetch_row(res)))
{
errmsg = "Master returned no rows for SELECT VERSION()";
goto err;
}
if (!(version = row[0]))
{
errmsg = "Master reported NULL for the version";
goto err;
}
switch (*version)
{
case '3':
mi->old_format = 1;
break;
case '4':
mi->old_format = 0;
break;
default:
errmsg = "Master reported unrecognized MySQL version";
goto err;
}
err:
if (res)
mc_mysql_free_result(res);
if (errmsg)
{
sql_print_error(errmsg);
return 1;
}
return 0;
}
static int create_table_from_dump(THD* thd, NET* net, const char* db, static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name) const char* table_name)
@ -580,7 +630,7 @@ int init_master_info(MASTER_INFO* mi)
mi->inited = 1; mi->inited = 1;
// now change the cache from READ to WRITE - must do this // now change the cache from READ to WRITE - must do this
// before flush_master_info // before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE, 0L,0,1); reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
error=test(flush_master_info(mi)); error=test(flush_master_info(mi));
pthread_mutex_unlock(&mi->lock); pthread_mutex_unlock(&mi->lock);
return error; return error;
@ -943,12 +993,14 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{ {
const char *error_msg; const char *error_msg;
Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1, Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
event_len, &error_msg); event_len, &error_msg,
mi->old_format);
if (ev) if (ev)
{ {
int type_code = ev->get_type_code(); int type_code = ev->get_type_code();
int exec_res; int exec_res;
if (ev->server_id == ::server_id || slave_skip_counter) if (ev->server_id == ::server_id ||
(slave_skip_counter && ev->get_type_code() != ROTATE_EVENT))
{ {
if(type_code == LOAD_EVENT) if(type_code == LOAD_EVENT)
skip_load_data_infile(net); skip_load_data_infile(net);
@ -1070,9 +1122,17 @@ connected:
// register ourselves with the master // register ourselves with the master
// if fails, this is not fatal - we just print the error message and go // if fails, this is not fatal - we just print the error message and go
// on with life // on with life
thd->proc_info = "Registering slave on master"; thd->proc_info = "Checking master version";
register_slave_on_master(mysql); if (check_master_version(mysql, &glob_mi))
update_slave_list(mysql); {
goto err;
}
if (!glob_mi.old_format)
{
thd->proc_info = "Registering slave on master";
if (register_slave_on_master(mysql) || update_slave_list(mysql))
goto err;
}
while (!slave_killed(thd)) while (!slave_killed(thd))
{ {

View file

@ -23,8 +23,10 @@ typedef struct st_master_info
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t cond; pthread_cond_t cond;
bool inited; bool inited;
bool old_format; /* master binlog is in 3.23 format */
st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0) st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0),
old_format(0)
{ {
host[0] = 0; user[0] = 0; password[0] = 0; host[0] = 0; user[0] = 0; password[0] = 0;
pthread_mutex_init(&lock, MY_MUTEX_INIT_FAST); pthread_mutex_init(&lock, MY_MUTEX_INIT_FAST);

View file

@ -72,15 +72,18 @@ class MYSQL_LOG {
// we should not try to rotate it or write any rotation events // we should not try to rotate it or write any rotation events
// the user should use FLUSH MASTER instead of FLUSH LOGS for // the user should use FLUSH MASTER instead of FLUSH LOGS for
// purging // purging
enum cache_type io_cache_type;
bool need_start_event;
friend class Log_event; friend class Log_event;
public: public:
MYSQL_LOG(); MYSQL_LOG();
~MYSQL_LOG(); ~MYSQL_LOG();
pthread_mutex_t* get_log_lock() { return &LOCK_log; } pthread_mutex_t* get_log_lock() { return &LOCK_log; }
void set_need_start_event() { need_start_event = 1; }
void set_index_file_name(const char* index_file_name = 0); void set_index_file_name(const char* index_file_name = 0);
void init(enum_log_type log_type_arg); void init(enum_log_type log_type_arg,
enum cache_type io_cache_type_arg = WRITE_CACHE);
void open(const char *log_name,enum_log_type log_type, void open(const char *log_name,enum_log_type log_type,
const char *new_name=0); const char *new_name=0);
void new_file(bool inside_mutex = 0); void new_file(bool inside_mutex = 0);

View file

@ -22,6 +22,7 @@
#include "mysql_priv.h" #include "mysql_priv.h"
#include "sql_acl.h" #include "sql_acl.h"
#include "sql_repl.h" #include "sql_repl.h"
#include "repl_failsafe.h"
#include <m_ctype.h> #include <m_ctype.h>
#include <thr_alarm.h> #include <thr_alarm.h>
#include <myisam.h> #include <myisam.h>

View file

@ -24,58 +24,15 @@
#include <thr_alarm.h> #include <thr_alarm.h>
#include <my_dir.h> #include <my_dir.h>
#define SLAVE_LIST_CHUNK 128
#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
extern const char* any_db; extern const char* any_db;
extern pthread_handler_decl(handle_slave,arg); extern pthread_handler_decl(handle_slave,arg);
HASH slave_list;
#ifndef DBUG_OFF #ifndef DBUG_OFF
int max_binlog_dump_events = 0; // unlimited int max_binlog_dump_events = 0; // unlimited
bool opt_sporadic_binlog_dump_fail = 0; bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0; static int binlog_dump_count = 0;
#endif #endif
#ifdef SIGNAL_WITH_VIO_CLOSE
#define KICK_SLAVE { slave_thd->close_active_vio(); \
thr_alarm_kill(slave_real_id); }
#else
#define KICK_SLAVE thr_alarm_kill(slave_real_id);
#endif
static Slave_log_event* find_slave_event(IO_CACHE* log,
const char* log_file_name,
char* errmsg);
static uint32* slave_list_key(SLAVE_INFO* si, uint* len,
my_bool not_used __attribute__((unused)))
{
*len = 4;
return &si->server_id;
}
static void slave_info_free(void *s)
{
my_free((gptr) s, MYF(MY_WME));
}
void init_slave_list()
{
hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0,
(hash_get_key) slave_list_key, slave_info_free, 0);
pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
}
void end_slave_list()
{
pthread_mutex_lock(&LOCK_slave_list);
hash_free(&slave_list);
pthread_mutex_unlock(&LOCK_slave_list);
pthread_mutex_destroy(&LOCK_slave_list);
}
static int fake_rotate_event(NET* net, String* packet, char* log_file_name, static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
const char**errmsg) const char**errmsg)
{ {
@ -104,69 +61,6 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
return 0; return 0;
} }
#define get_object(p, obj) \
{\
uint len = (uint)*p++; \
if (p + len > p_end || len >= sizeof(obj)) \
goto err; \
strmake(obj,(char*) p,len); \
p+= len; \
}\
void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
{
if (need_mutex)
pthread_mutex_lock(&LOCK_slave_list);
if (thd->server_id)
{
SLAVE_INFO* old_si;
if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
(byte*)&thd->server_id, 4)) &&
(!only_mine || old_si->thd == thd))
hash_delete(&slave_list, (byte*)old_si);
}
if (need_mutex)
pthread_mutex_unlock(&LOCK_slave_list);
}
int register_slave(THD* thd, uchar* packet, uint packet_length)
{
SLAVE_INFO *si;
int res = 1;
uchar* p = packet, *p_end = packet + packet_length;
if (check_access(thd, FILE_ACL, any_db))
return 1;
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
goto err;
thd->server_id = si->server_id = uint4korr(p);
p += 4;
get_object(p,si->host);
get_object(p,si->user);
get_object(p,si->password);
si->port = uint2korr(p);
p += 2;
si->rpl_recovery_rank = uint4korr(p);
p += 4;
if (!(si->master_id = uint4korr(p)))
si->master_id = server_id;
si->thd = thd;
pthread_mutex_lock(&LOCK_slave_list);
unregister_slave(thd,0,0);
res = hash_insert(&slave_list, (byte*) si);
pthread_mutex_unlock(&LOCK_slave_list);
return res;
err:
if (si)
my_free((gptr) si, MYF(MY_WME));
return res;
}
static int send_file(THD *thd) static int send_file(THD *thd)
{ {
NET* net = &thd->net; NET* net = &thd->net;
@ -252,6 +146,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
if (my_b_read(log, (byte*) magic, sizeof(magic))) if (my_b_read(log, (byte*) magic, sizeof(magic)))
{ {
*errmsg = "I/O error reading the header from the binary log"; *errmsg = "I/O error reading the header from the binary log";
sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
log->error);
goto err; goto err;
} }
if (memcmp(magic, BINLOG_MAGIC, sizeof(magic))) if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
@ -887,8 +783,13 @@ void reset_master()
} }
LOG_INFO linfo; LOG_INFO linfo;
pthread_mutex_t* log_lock = mysql_bin_log.get_log_lock();
pthread_mutex_lock(log_lock);
if (mysql_bin_log.find_first_log(&linfo, "")) if (mysql_bin_log.find_first_log(&linfo, ""))
{
pthread_mutex_unlock(log_lock);
return; return;
}
for(;;) for(;;)
{ {
@ -898,7 +799,9 @@ void reset_master()
} }
mysql_bin_log.close(1); // exiting close mysql_bin_log.close(1); // exiting close
my_delete(mysql_bin_log.get_index_fname(), MYF(MY_WME)); my_delete(mysql_bin_log.get_index_fname(), MYF(MY_WME));
mysql_bin_log.set_need_start_event();
mysql_bin_log.open(opt_bin_logname,LOG_BIN); mysql_bin_log.open(opt_bin_logname,LOG_BIN);
pthread_mutex_unlock(log_lock);
} }
@ -915,242 +818,6 @@ int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
return -1; return -1;
} }
static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi)
{
return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name,
mi->pos);
}
static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg)
{
uint32 log_seq = mi->last_log_seq;
uint32 target_server_id = mi->server_id;
for (;;)
{
Log_event* ev;
if (!(ev = Log_event::read_log_event(log, 0)))
{
if (log->error > 0)
strmov(errmsg, "Binary log truncated in the middle of event");
else if (log->error < 0)
strmov(errmsg, "I/O error reading binary log");
else
strmov(errmsg, "Could not find target event in the binary log");
return 1;
}
if (ev->log_seq == log_seq && ev->server_id == target_server_id)
{
delete ev;
mi->pos = my_b_tell(log);
return 0;
}
delete ev;
}
}
int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
{
LOG_INFO linfo;
char search_file_name[FN_REFLEN],last_log_name[FN_REFLEN];
IO_CACHE log;
File file = -1, last_file = -1;
pthread_mutex_t *log_lock;
const char* errmsg_p;
Slave_log_event* sev = 0;
my_off_t last_pos = 0;
int error = 1;
int cmp_res;
LINT_INIT(cmp_res);
if (!mysql_bin_log.is_open())
{
strmov(errmsg,"Binary log is not open");
return 1;
}
if (!server_id_supplied)
{
strmov(errmsg, "Misconfigured master - server id was not set");
return 1;
}
linfo.index_file_offset = 0;
search_file_name[0] = 0;
if (mysql_bin_log.find_first_log(&linfo, search_file_name))
{
strmov(errmsg,"Could not find first log");
return 1;
}
thd->current_linfo = &linfo;
bzero((char*) &log,sizeof(log));
log_lock = mysql_bin_log.get_log_lock();
pthread_mutex_lock(log_lock);
for (;;)
{
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0)
{
strmov(errmsg, errmsg_p);
goto err;
}
if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg)))
goto err;
cmp_res = cmp_master_pos(sev, mi);
delete sev;
if (!cmp_res)
{
/* Copy basename */
fn_format(mi->log_file_name, linfo.log_file_name, "","",1);
mi->pos = my_b_tell(&log);
goto mi_inited;
}
else if (cmp_res > 0)
{
if (!last_pos)
{
strmov(errmsg,
"Slave event in first log points past the target position");
goto err;
}
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0,
MYF(MY_WME)))
{
errmsg[0] = 0;
goto err;
}
break;
}
strmov(last_log_name, linfo.log_file_name);
last_pos = my_b_tell(&log);
switch (mysql_bin_log.find_next_log(&linfo)) {
case LOG_INFO_EOF:
if (last_file >= 0)
(void)my_close(last_file, MYF(MY_WME));
last_file = -1;
goto found_log;
case 0:
break;
default:
strmov(errmsg, "Error reading log index");
goto err;
}
end_io_cache(&log);
if (last_file >= 0)
(void) my_close(last_file, MYF(MY_WME));
last_file = file;
}
found_log:
my_b_seek(&log, last_pos);
if (find_target_pos(mi,&log,errmsg))
goto err;
fn_format(mi->log_file_name, last_log_name, "","",1); /* Copy basename */
mi_inited:
error = 0;
err:
pthread_mutex_unlock(log_lock);
end_io_cache(&log);
pthread_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
pthread_mutex_unlock(&LOCK_thread_count);
if (file >= 0)
(void) my_close(file, MYF(MY_WME));
if (last_file >= 0 && last_file != file)
(void) my_close(last_file, MYF(MY_WME));
return error;
}
// caller must delete result when done
static Slave_log_event* find_slave_event(IO_CACHE* log,
const char* log_file_name,
char* errmsg)
{
Log_event* ev;
if (!(ev = Log_event::read_log_event(log, 0)))
{
my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
"Error reading start event in log '%s'",
(char*)log_file_name);
return 0;
}
delete ev;
if (!(ev = Log_event::read_log_event(log, 0)))
{
my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
"Error reading slave event in log '%s'",
(char*)log_file_name);
return 0;
}
if (ev->get_type_code() != SLAVE_EVENT)
{
my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
"Second event in log '%s' is not slave event",
(char*)log_file_name);
delete ev;
return 0;
}
return (Slave_log_event*)ev;
}
int show_new_master(THD* thd)
{
DBUG_ENTER("show_new_master");
List<Item> field_list;
char errmsg[SLAVE_ERRMSG_SIZE];
LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
errmsg[0]=0; // Safety
if (translate_master(thd, lex_mi, errmsg))
{
if (errmsg[0])
net_printf(&thd->net, ER_ERROR_WHEN_EXECUTING_COMMAND,
"SHOW NEW MASTER", errmsg);
else
send_error(&thd->net, 0);
DBUG_RETURN(1);
}
else
{
String* packet = &thd->packet;
field_list.push_back(new Item_empty_string("Log_name", 20));
field_list.push_back(new Item_empty_string("Log_pos", 20));
if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
packet->length(0);
net_store_data(packet, lex_mi->log_file_name);
net_store_data(packet, (longlong)lex_mi->pos);
if (my_net_write(&thd->net, packet->ptr(), packet->length()))
DBUG_RETURN(-1);
send_eof(&thd->net);
DBUG_RETURN(0);
}
}
int show_binlog_events(THD* thd) int show_binlog_events(THD* thd)
{ {
DBUG_ENTER("show_binlog_events"); DBUG_ENTER("show_binlog_events");
@ -1202,7 +869,8 @@ int show_binlog_events(THD* thd)
pthread_mutex_lock(mysql_bin_log.get_log_lock()); pthread_mutex_lock(mysql_bin_log.get_log_lock());
my_b_seek(&log, pos); my_b_seek(&log, pos);
for (event_count = 0; (ev = Log_event::read_log_event(&log, 0)); ) for (event_count = 0;
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,0)); )
{ {
if (event_count >= limit_start && if (event_count >= limit_start &&
ev->net_send(thd, linfo.log_file_name, pos)) ev->net_send(thd, linfo.log_file_name, pos))
@ -1247,56 +915,6 @@ err:
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int show_slave_hosts(THD* thd)
{
List<Item> field_list;
NET* net = &thd->net;
String* packet = &thd->packet;
DBUG_ENTER("show_slave_hosts");
field_list.push_back(new Item_empty_string("Server_id", 20));
field_list.push_back(new Item_empty_string("Host", 20));
if (opt_show_slave_auth_info)
{
field_list.push_back(new Item_empty_string("User",20));
field_list.push_back(new Item_empty_string("Password",20));
}
field_list.push_back(new Item_empty_string("Port",20));
field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
field_list.push_back(new Item_empty_string("Master_id", 20));
if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
pthread_mutex_lock(&LOCK_slave_list);
for (uint i = 0; i < slave_list.records; ++i)
{
SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i);
packet->length(0);
net_store_data(packet, si->server_id);
net_store_data(packet, si->host);
if (opt_show_slave_auth_info)
{
net_store_data(packet, si->user);
net_store_data(packet, si->password);
}
net_store_data(packet, (uint32) si->port);
net_store_data(packet, si->rpl_recovery_rank);
net_store_data(packet, si->master_id);
if (my_net_write(net, (char*)packet->ptr(), packet->length()))
{
pthread_mutex_unlock(&LOCK_slave_list);
DBUG_RETURN(-1);
}
}
pthread_mutex_unlock(&LOCK_slave_list);
send_eof(net);
DBUG_RETURN(0);
}
int show_binlog_info(THD* thd) int show_binlog_info(THD* thd)
{ {
DBUG_ENTER("show_binlog_info"); DBUG_ENTER("show_binlog_info");
@ -1402,230 +1020,6 @@ err:
return 1; return 1;
} }
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
{
if (!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))
{
sql_print_error("Connection to master failed: %s",
mc_mysql_error(mysql));
return 1;
}
return 0;
}
static inline void cleanup_mysql_results(MYSQL_RES* db_res,
MYSQL_RES** cur, MYSQL_RES** start)
{
for( ; cur >= start; --cur)
{
if (*cur)
mc_mysql_free_result(*cur);
}
mc_mysql_free_result(db_res);
}
static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
MYSQL_RES* table_res)
{
MYSQL_ROW row;
for( row = mc_mysql_fetch_row(table_res); row;
row = mc_mysql_fetch_row(table_res))
{
TABLE_LIST table;
const char* table_name = row[0];
int error;
if (table_rules_on)
{
table.next = 0;
table.db = (char*)db;
table.real_name = (char*)table_name;
table.updating = 1;
if (!tables_ok(thd, &table))
continue;
}
if ((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql)))
return error;
}
return 0;
}
int load_master_data(THD* thd)
{
MYSQL mysql;
MYSQL_RES* master_status_res = 0;
bool slave_was_running = 0;
int error = 0;
mc_mysql_init(&mysql);
// we do not want anyone messing with the slave at all for the entire
// duration of the data load;
pthread_mutex_lock(&LOCK_slave);
// first, kill the slave
if ((slave_was_running = slave_running))
{
abort_slave = 1;
KICK_SLAVE;
thd->proc_info = "waiting for slave to die";
while (slave_running)
pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
}
if (connect_to_master(thd, &mysql, &glob_mi))
{
net_printf(&thd->net, error = ER_CONNECT_TO_MASTER,
mc_mysql_error(&mysql));
goto err;
}
// now that we are connected, get all database and tables in each
{
MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
uint num_dbs;
if (mc_mysql_query(&mysql, "show databases", 0) ||
!(db_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
if (!(num_dbs = (uint) mc_mysql_num_rows(db_res)))
goto err;
// in theory, the master could have no databases at all
// and run with skip-grant
if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
{
net_printf(&thd->net, error = ER_OUTOFMEMORY);
goto err;
}
// this is a temporary solution until we have online backup
// capabilities - to be replaced once online backup is working
// we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
// can to minimize the lock time
if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) ||
mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
!(master_status_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
// go through every table in every database, and if the replication
// rules allow replicating it, get it
table_res_end = table_res + num_dbs;
for(cur_table_res = table_res; cur_table_res < table_res_end;
cur_table_res++)
{
// since we know how many rows we have, this can never be NULL
MYSQL_ROW row = mc_mysql_fetch_row(db_res);
char* db = row[0];
/*
Do not replicate databases excluded by rules
also skip mysql database - in most cases the user will
mess up and not exclude mysql database with the rules when
he actually means to - in this case, he is up for a surprise if
his priv tables get dropped and downloaded from master
TO DO - add special option, not enabled
by default, to allow inclusion of mysql database into load
data from master
*/
if (!db_ok(db, replicate_do_db, replicate_ignore_db) ||
!strcmp(db,"mysql"))
{
*cur_table_res = 0;
continue;
}
if (mysql_rm_db(thd, db, 1,1) ||
mysql_create_db(thd, db, 0, 1))
{
send_error(&thd->net, 0, 0);
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
goto err;
}
if (mc_mysql_select_db(&mysql, db) ||
mc_mysql_query(&mysql, "show tables", 0) ||
!(*cur_table_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
goto err;
}
if ((error = fetch_db_tables(thd, &mysql, db, *cur_table_res)))
{
// we do not report the error - fetch_db_tables handles it
cleanup_mysql_results(db_res, cur_table_res, table_res);
goto err;
}
}
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
// adjust position in the master
if (master_status_res)
{
MYSQL_ROW row = mc_mysql_fetch_row(master_status_res);
/*
We need this check because the master may not be running with
log-bin, but it will still allow us to do all the steps
of LOAD DATA FROM MASTER - no reason to forbid it, really,
although it does not make much sense for the user to do it
*/
if (row[0] && row[1])
{
strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name));
glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB
if (glob_mi.pos < 4)
glob_mi.pos = 4; // don't hit the magic number
glob_mi.pending = 0;
flush_master_info(&glob_mi);
}
mc_mysql_free_result(master_status_res);
}
if (mc_mysql_query(&mysql, "UNLOCK TABLES", 0))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
}
err:
pthread_mutex_unlock(&LOCK_slave);
if (slave_was_running)
start_slave(0, 0);
mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
if (!error)
send_ok(&thd->net);
return error;
}
int log_loaded_block(IO_CACHE* file) int log_loaded_block(IO_CACHE* file)
{ {
LOAD_FILE_INFO* lf_info; LOAD_FILE_INFO* lf_info;

View file

@ -15,7 +15,6 @@ typedef struct st_slave_info
} SLAVE_INFO; } SLAVE_INFO;
extern bool opt_show_slave_auth_info, opt_old_rpl_compat; extern bool opt_show_slave_auth_info, opt_old_rpl_compat;
extern HASH slave_list;
extern char* master_host; extern char* master_host;
extern my_string opt_bin_logname, master_info_file; extern my_string opt_bin_logname, master_info_file;
extern uint32 server_id; extern uint32 server_id;
@ -27,26 +26,24 @@ extern int max_binlog_dump_events;
extern bool opt_sporadic_binlog_dump_fail; extern bool opt_sporadic_binlog_dump_fail;
#endif #endif
#ifdef SIGNAL_WITH_VIO_CLOSE
#define KICK_SLAVE { slave_thd->close_active_vio(); \
thr_alarm_kill(slave_real_id); }
#else
#define KICK_SLAVE thr_alarm_kill(slave_real_id);
#endif
File open_binlog(IO_CACHE *log, const char *log_file_name, File open_binlog(IO_CACHE *log, const char *log_file_name,
const char **errmsg); const char **errmsg);
int start_slave(THD* thd = 0, bool net_report = 1); int start_slave(THD* thd = 0, bool net_report = 1);
int stop_slave(THD* thd = 0, bool net_report = 1); int stop_slave(THD* thd = 0, bool net_report = 1);
int load_master_data(THD* thd);
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
int change_master(THD* thd); int change_master(THD* thd);
int show_new_master(THD* thd);
int show_slave_hosts(THD* thd);
int show_binlog_events(THD* thd); int show_binlog_events(THD* thd);
int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg);
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
const char* log_file_name2, ulonglong log_pos2); const char* log_file_name2, ulonglong log_pos2);
void reset_slave(); void reset_slave();
void reset_master(); void reset_master();
void init_slave_list();
void end_slave_list();
int register_slave(THD* thd, uchar* packet, uint packet_length);
void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
int purge_master_logs(THD* thd, const char* to_log); int purge_master_logs(THD* thd, const char* to_log);
bool log_in_use(const char* log_name); bool log_in_use(const char* log_name);
void adjust_linfo_offsets(my_off_t purge_offset); void adjust_linfo_offsets(my_off_t purge_offset);