mariadb/sql/repl_failsafe.cc
unknown f01f49916b Lots of code fixes to the replication code (especially the binary logging and index log file handling)
Fixed bugs in my last changeset that made MySQL hard to compile.
Added mutex around some data that could cause table cache corruptions when using OPTIMIZE TABLE / REPAIR TABLE or automatic repair of MyISAM tables.
Added mutex around some data in the slave start/stop code that could cause THD linked list corruptions
Extended my_chsize() to allow one to specify a filler character.
Extend vio_blocking to return the old state (This made some usage of this function much simpler)
Added testing for some functions that they caller have got the required mutexes before calling the function.
Use setrlimit() to ensure that we can write core file if one specifies --core-file.
Added --slave-compressed-protocol
Made 2 the minimum length for ft_min_word_len
Added variables foreign_key_checks & unique_checks.
Less logging from replication code (if not started with --log-warnings)
Changed that SHOW INNODB STATUS requre the SUPER privilege
More DBUG statements and a lot of new code comments


BitKeeper/deleted/.del-rpl_compat.result~c950bc346b12c61a:
  Delete: mysql-test/r/rpl_compat.result
BitKeeper/deleted/.del-rpl_compat.test~5f6ba955e02aa95f:
  Delete: mysql-test/t/rpl_compat.test
Docs/manual.texi:
  Updated manual with fixes in this changeset
client/mysqltest.c:
  Indentation cleanup
  Better error messages for some error conditions.
include/my_pthread.h:
  Added 'safe_mutex_assert_owner()' to check that the thread really owns the mutex.
include/my_sys.h:
  Extended my_chsize() to allow one to specify a filler character.
  (For MySQL index logs)
include/raid.h:
  New my_chsize()
include/violite.h:
  Extend vio_blocking to return the old state
innobase/include/dyn0dyn.h:
  Merge with 3.23 (AIX DYN_ARRAY_DATA_SIZE)
innobase/include/dyn0dyn.ic:
  Merge with 3.23
isam/create.c:
  Fix for new my_chsize()
isam/isamchk.c:
  Fix for new my_chsize()
isam/pack_isam.c:
  Fix for new my_chsize()
libmysql/manager.c:
  Fix for new vio_blocking()
libmysqld/lib_sql.cc:
  Fix for new open_log()
myisam/mi_cache.c:
  Fix typo from previous checking
myisam/mi_check.c:
  Fix for new my_chsize()
myisam/mi_create.c:
  Fix for new my_chsize()
myisam/mi_delete_all.c:
  Fix for new my_chsize()
myisam/myisampack.c:
  Fix for new my_chsize()
mysql-test/include/master-slave.inc:
  Better initialization for replication tests
mysql-test/mysql-test-run.sh:
  Added option --log-warnings
mysql-test/r/insert.result:
  More tests if INSERT ...(DEFAULT)
mysql-test/r/rpl000001.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000002.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000003.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000004.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000005.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000006.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000007.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000008.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000009.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000010.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000011.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000012.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000013.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl000014.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl_alter.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl_empty_master_crash.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl_get_lock.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl_log.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl_magic.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl_mystery22.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl_skip_error.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/r/rpl_sporadic_master.result:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/insert.test:
  More tests if INSERT ...(DEFAULT)
mysql-test/t/rpl000001.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl000002.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl000003.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl000004.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl000005.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl000006.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl000007.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl000009.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl000011.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl000013.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl000014.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl_alter.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl_empty_master_crash.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl_get_lock.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl_magic.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl_mystery22.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl_skip_error.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysql-test/t/rpl_sporadic_master.test:
  Clean up tests for new master-slave.inc
  Remove 'use database'
mysys/mf_iocache.c:
  More debug info
  Force seek after reinit_io_cache()
mysys/mf_iocache2.c:
  Added my_b_filelength()
mysys/my_chsize.c:
  Extended my_chsize() to allow one to specify a filler character.
  (For MySQL index logs)
mysys/raid.cc:
  Extended my_chsize() to allow one to specify a filler character.
  (For MySQL index logs)
sql/field.h:
  Fix for INSERT ... (DEFAULT)
sql/ha_berkeley.h:
  Fix for dynamic variables
sql/ha_innodb.cc:
  Change sprintf() to my_sprintf() to make code portable.
  Fix after sync with 3.23
  (We still need to fix the storage of the replication position in innodb)
sql/ha_innodb.h:
  Fix for dynamic variables
sql/handler.cc:
  Remove writting of COMMIT to the binary log.
  (Now done in MYSQL_LOG::write())
sql/item_func.cc:
  Query_log_event() now always takes query length.
sql/item_func.h:
  Indentation cleanup
sql/item_strfunc.h:
  Indentation cleanup
sql/item_timefunc.h:
  Indentation cleanup
sql/lock.cc:
  Check that we own critical mutexes.
sql/log.cc:
  Big code cleanup / rewrite / optimize.
  - The index log file has its own IO_CACHE object.
  - Many functions totally rewritten to make them smaller and faster.
  - New handling of index log files
  - Lots of new comments
sql/log_event.cc:
  Code cleanup
  New comments
sql/log_event.h:
  Query_log_event() now always takes query length.
sql/mini_client.cc:
  Better error messages on reconnect.
  Fixed wrong variable usage from last commit.
sql/mysql_priv.h:
  New arguments to open_log()
sql/mysqld.cc:
  Use setrlimit() to ensure that we can write core file if one specifies --core-file
  Added index file name as parameter to openlog().
  Added --slave-compressed-protocol
  Made 2 the minimum length for ft_min_word_len
sql/net_serv.cc:
  Use new vio_blocking()
  (The vio_blocking() change was done to make this code more readable)
sql/repl_failsafe.cc:
  Minor code cleanup
sql/set_var.cc:
  Added variables slave_compressed_protocol, foreign_key_checks & unique_checks.
sql/set_var.h:
  Generalization
sql/slave.cc:
  Code cleanup & rewrite.
  Dont call SELECT VERSION() on check_master_version()
  New init_slave() code.
  Ensure that all threads create a THD early.
  Add locks around manipulation of critical structures
  Don't retry a command more than master_retry_count times.
  Write less warnings to the log file (if not started with --log-warnings)
  Faster flush_relay_log_info()
sql/slave.h:
  More comments
  Added new arguments to some functions.
sql/sql_acl.cc:
  More DBUG info
  New parameter to Query_log_event()
sql/sql_base.cc:
  Added some mutex checking.
sql/sql_cache.cc:
  Less not critical debug info
sql/sql_class.h:
  Fix for new log handling.
sql/sql_db.cc:
  Added mutex around remove_db_from_cache()
sql/sql_delete.cc:
  Added missing parameters to changed functions
sql/sql_insert.cc:
  Added missing parameters to changed functions
sql/sql_parse.cc:
  Do an 'end_active_trans()' before 'load_master_data'
  Changed that SHOW INNODB STATUS requre the SUPER privilege
  Added new function parameters to new functions
sql/sql_rename.cc:
  Added missing parameters to changed functions
sql/sql_repl.cc:
  Code cleanups / new comments
  Fix for new find_first_log() calling standard.
  More DBUG statements.
  Show binlogs updated to use new IO_CACHE:d index log file.
sql/sql_repl.h:
  New function arguments
sql/sql_select.cc:
  Indentation changes
sql/sql_table.cc:
  Added missing parameters to changed functions
  Added checking of mutex
  Added mutex around critical regions.
sql/sql_test.cc:
  Don't use THR_ALARM if the configuration doesn't support it.
sql/sql_update.cc:
  Added missing parameters to changed functions
sql/table.cc:
  Added missing parameters to changed functions
vio/vio.c:
  Extend vio_blocking to return the old state
vio/viosocket.c:
  Extend vio_blocking to return the old state
vio/viossl.c:
  Extend vio_blocking to return the old state
2002-08-08 03:12:02 +03:00

886 lines
22 KiB
C++

/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB & Sasha
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
// Sasha Pachev <sasha@mysql.com> is currently in charge of this file
#include "mysql_priv.h"
#include "repl_failsafe.h"
#include "sql_repl.h"
#include "slave.h"
#include "sql_acl.h"
#include "mini_client.h"
#include "log_event.h"
#include <mysql.h>
#define SLAVE_LIST_CHUNK 128
#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
RPL_STATUS rpl_status=RPL_NULL;
pthread_mutex_t LOCK_rpl_status;
pthread_cond_t COND_rpl_status;
HASH slave_list;
extern const char* any_db;
const char *rpl_role_type[] = {"MASTER","SLAVE",NullS};
TYPELIB rpl_role_typelib = {array_elements(rpl_role_type)-1,"",
rpl_role_type};
const char* rpl_status_type[]=
{
"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE", "LOST_SOLDIER","TROOP_SOLDIER",
"RECOVERY_CAPTAIN","NULL",NullS
};
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
rpl_status_type};
static Slave_log_event* find_slave_event(IO_CACHE* log,
const char* log_file_name,
char* errmsg);
static int init_failsafe_rpl_thread(THD* thd)
{
DBUG_ENTER("init_failsafe_rpl_thread");
thd->system_thread = thd->bootstrap = 1;
thd->client_capabilities = 0;
my_net_init(&thd->net, 0);
thd->net.read_timeout = slave_net_timeout;
thd->max_client_packet_length=thd->net.max_packet;
thd->master_access= ~0;
thd->priv_user = 0;
thd->system_thread = 1;
pthread_mutex_lock(&LOCK_thread_count);
thd->thread_id = thread_id++;
pthread_mutex_unlock(&LOCK_thread_count);
if (init_thr_lock() || thd->store_globals())
{
close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
end_thread(thd,0);
DBUG_RETURN(-1);
}
thd->mysys_var=my_thread_var;
thd->dbug_thread_id=my_thread_id();
#if !defined(__WIN__) && !defined(OS2)
sigset_t set;
VOID(sigemptyset(&set)); // Get mask in use
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif
thd->mem_root.free=thd->mem_root.used=0;
if ((ulong) thd->variables.max_join_size == (ulong) HA_POS_ERROR)
thd->options|= OPTION_BIG_SELECTS;
thd->proc_info="Thread initialized";
thd->version=refresh_version;
thd->set_time();
DBUG_RETURN(0);
}
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
{
pthread_mutex_lock(&LOCK_rpl_status);
if (rpl_status == from_status || rpl_status == RPL_ANY)
rpl_status = to_status;
pthread_cond_signal(&COND_rpl_status);
pthread_mutex_unlock(&LOCK_rpl_status);
}
#define get_object(p, obj) \
{\
uint len = (uint)*p++; \
if (p + len > p_end || len >= sizeof(obj)) \
goto err; \
strmake(obj,(char*) p,len); \
p+= len; \
}\
static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi)
{
return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name,
mi->pos);
}
void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
{
if (thd->server_id)
{
if (need_mutex)
pthread_mutex_lock(&LOCK_slave_list);
SLAVE_INFO* old_si;
if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
(byte*)&thd->server_id, 4)) &&
(!only_mine || old_si->thd == thd))
hash_delete(&slave_list, (byte*)old_si);
if (need_mutex)
pthread_mutex_unlock(&LOCK_slave_list);
}
}
int register_slave(THD* thd, uchar* packet, uint packet_length)
{
SLAVE_INFO *si;
int res = 1;
uchar* p = packet, *p_end = packet + packet_length;
if (check_access(thd, REPL_SLAVE_ACL, any_db))
return 1;
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
goto err;
thd->server_id = si->server_id = uint4korr(p);
p += 4;
get_object(p,si->host);
get_object(p,si->user);
get_object(p,si->password);
si->port = uint2korr(p);
p += 2;
si->rpl_recovery_rank = uint4korr(p);
p += 4;
if (!(si->master_id = uint4korr(p)))
si->master_id = server_id;
si->thd = thd;
pthread_mutex_lock(&LOCK_slave_list);
unregister_slave(thd,0,0);
res = hash_insert(&slave_list, (byte*) si);
pthread_mutex_unlock(&LOCK_slave_list);
return res;
err:
if (si)
my_free((gptr) si, MYF(MY_WME));
return res;
}
static uint32* slave_list_key(SLAVE_INFO* si, uint* len,
my_bool not_used __attribute__((unused)))
{
*len = 4;
return &si->server_id;
}
static void slave_info_free(void *s)
{
my_free((gptr) s, MYF(MY_WME));
}
void init_slave_list()
{
hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0,
(hash_get_key) slave_list_key, slave_info_free, 0);
pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
}
void end_slave_list()
{
/* No protection by a mutex needed as we are only called at shutdown */
if (hash_inited(&slave_list))
{
hash_free(&slave_list);
pthread_mutex_destroy(&LOCK_slave_list);
}
}
static int find_target_pos(LEX_MASTER_INFO *mi, IO_CACHE *log, char *errmsg)
{
my_off_t log_pos = (my_off_t) mi->pos;
uint32 target_server_id = mi->server_id;
for (;;)
{
Log_event* ev;
if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*) 0, 0)))
{
if (log->error > 0)
strmov(errmsg, "Binary log truncated in the middle of event");
else if (log->error < 0)
strmov(errmsg, "I/O error reading binary log");
else
strmov(errmsg, "Could not find target event in the binary log");
return 1;
}
if (ev->log_pos == log_pos && ev->server_id == target_server_id)
{
delete ev;
mi->pos = my_b_tell(log);
return 0;
}
delete ev;
}
/* Impossible */
}
int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
{
LOG_INFO linfo;
char last_log_name[FN_REFLEN];
IO_CACHE log;
File file = -1, last_file = -1;
pthread_mutex_t *log_lock;
const char* errmsg_p;
Slave_log_event* sev = 0;
my_off_t last_pos = 0;
int error = 1;
int cmp_res;
LINT_INIT(cmp_res);
if (!mysql_bin_log.is_open())
{
strmov(errmsg,"Binary log is not open");
return 1;
}
if (!server_id_supplied)
{
strmov(errmsg, "Misconfigured master - server id was not set");
return 1;
}
linfo.index_file_offset = 0;
if (mysql_bin_log.find_log_pos(&linfo, NullS))
{
strmov(errmsg,"Could not find first log");
return 1;
}
thd->current_linfo = &linfo;
bzero((char*) &log,sizeof(log));
log_lock = mysql_bin_log.get_log_lock();
pthread_mutex_lock(log_lock);
for (;;)
{
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0)
{
strmov(errmsg, errmsg_p);
goto err;
}
if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg)))
goto err;
cmp_res = cmp_master_pos(sev, mi);
delete sev;
if (!cmp_res)
{
/* Copy basename */
fn_format(mi->log_file_name, linfo.log_file_name, "","",1);
mi->pos = my_b_tell(&log);
goto mi_inited;
}
else if (cmp_res > 0)
{
if (!last_pos)
{
strmov(errmsg,
"Slave event in first log points past the target position");
goto err;
}
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0,
MYF(MY_WME)))
{
errmsg[0] = 0;
goto err;
}
break;
}
strmov(last_log_name, linfo.log_file_name);
last_pos = my_b_tell(&log);
switch (mysql_bin_log.find_next_log(&linfo)) {
case LOG_INFO_EOF:
if (last_file >= 0)
(void)my_close(last_file, MYF(MY_WME));
last_file = -1;
goto found_log;
case 0:
break;
default:
strmov(errmsg, "Error reading log index");
goto err;
}
end_io_cache(&log);
if (last_file >= 0)
(void) my_close(last_file, MYF(MY_WME));
last_file = file;
}
found_log:
my_b_seek(&log, last_pos);
if (find_target_pos(mi,&log,errmsg))
goto err;
fn_format(mi->log_file_name, last_log_name, "","",1); /* Copy basename */
mi_inited:
error = 0;
err:
pthread_mutex_unlock(log_lock);
end_io_cache(&log);
pthread_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
pthread_mutex_unlock(&LOCK_thread_count);
if (file >= 0)
(void) my_close(file, MYF(MY_WME));
if (last_file >= 0 && last_file != file)
(void) my_close(last_file, MYF(MY_WME));
return error;
}
/*
Caller must delete result when done
*/
static Slave_log_event* find_slave_event(IO_CACHE* log,
const char* log_file_name,
char* errmsg)
{
Log_event* ev;
int i;
bool slave_event_found = 0;
LINT_INIT(ev);
for (i = 0; i < 2; i++)
{
if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, 0)))
{
my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
"Error reading event in log '%s'",
(char*)log_file_name);
return 0;
}
if (ev->get_type_code() == SLAVE_EVENT)
{
slave_event_found = 1;
break;
}
delete ev;
}
if (!slave_event_found)
{
my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
"Could not find slave event in log '%s'",
(char*)log_file_name);
delete ev;
return 0;
}
return (Slave_log_event*)ev;
}
int show_new_master(THD* thd)
{
DBUG_ENTER("show_new_master");
List<Item> field_list;
char errmsg[SLAVE_ERRMSG_SIZE];
LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
errmsg[0]=0; // Safety
if (translate_master(thd, lex_mi, errmsg))
{
if (errmsg[0])
net_printf(&thd->net, ER_ERROR_WHEN_EXECUTING_COMMAND,
"SHOW NEW MASTER", errmsg);
else
send_error(&thd->net, 0);
DBUG_RETURN(1);
}
else
{
String* packet = &thd->packet;
field_list.push_back(new Item_empty_string("Log_name", 20));
field_list.push_back(new Item_empty_string("Log_pos", 20));
if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
packet->length(0);
net_store_data(packet, lex_mi->log_file_name);
net_store_data(packet, (longlong)lex_mi->pos);
if (my_net_write(&thd->net, packet->ptr(), packet->length()))
DBUG_RETURN(-1);
send_eof(&thd->net);
DBUG_RETURN(0);
}
}
int update_slave_list(MYSQL* mysql)
{
MYSQL_RES* res=0;
MYSQL_ROW row;
const char* error=0;
bool have_auth_info;
int port_ind;
if (mc_mysql_query(mysql,"SHOW SLAVE HOSTS",0) ||
!(res = mc_mysql_store_result(mysql)))
{
error = "Query error";
goto err;
}
switch (mc_mysql_num_fields(res)) {
case 5:
have_auth_info = 0;
port_ind=2;
break;
case 7:
have_auth_info = 1;
port_ind=4;
break;
default:
error = "Invalid number of fields in SHOW SLAVE HOSTS";
goto err;
}
pthread_mutex_lock(&LOCK_slave_list);
while ((row= mc_mysql_fetch_row(res)))
{
uint32 server_id;
SLAVE_INFO* si, *old_si;
server_id = atoi(row[0]);
if ((old_si= (SLAVE_INFO*)hash_search(&slave_list,
(byte*)&server_id,4)))
si = old_si;
else
{
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
{
error = "Out of memory";
pthread_mutex_unlock(&LOCK_slave_list);
goto err;
}
si->server_id = server_id;
hash_insert(&slave_list, (byte*)si);
}
strmake(si->host, row[1], sizeof(si->host)-1);
si->port = atoi(row[port_ind]);
si->rpl_recovery_rank = atoi(row[port_ind+1]);
si->master_id = atoi(row[port_ind+2]);
if (have_auth_info)
{
strmake(si->user, row[2], sizeof(si->user)-1);
strmake(si->password, row[3], sizeof(si->password)-1);
}
}
pthread_mutex_unlock(&LOCK_slave_list);
err:
if (res)
mc_mysql_free_result(res);
if (error)
{
sql_print_error("Error updating slave list:",error);
return 1;
}
return 0;
}
int find_recovery_captain(THD* thd, MYSQL* mysql)
{
return 0;
}
pthread_handler_decl(handle_failsafe_rpl,arg)
{
DBUG_ENTER("handle_failsafe_rpl");
THD *thd = new THD;
thd->thread_stack = (char*)&thd;
MYSQL* recovery_captain = 0;
pthread_detach_this_thread();
if (init_failsafe_rpl_thread(thd) || !(recovery_captain=mc_mysql_init(0)))
{
sql_print_error("Could not initialize failsafe replication thread");
goto err;
}
pthread_mutex_lock(&LOCK_rpl_status);
while (!thd->killed && !abort_loop)
{
bool break_req_chain = 0;
const char* msg = thd->enter_cond(&COND_rpl_status,
&LOCK_rpl_status, "Waiting for request");
pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status);
thd->proc_info="Processing request";
while (!break_req_chain)
{
switch (rpl_status) {
case RPL_LOST_SOLDIER:
if (find_recovery_captain(thd, recovery_captain))
rpl_status=RPL_TROOP_SOLDIER;
else
rpl_status=RPL_RECOVERY_CAPTAIN;
break_req_chain=1; /* for now until other states are implemented */
break;
default:
break_req_chain=1;
break;
}
}
thd->exit_cond(msg);
}
pthread_mutex_unlock(&LOCK_rpl_status);
err:
if (recovery_captain)
mc_mysql_close(recovery_captain);
delete thd;
my_thread_end();
pthread_exit(0);
DBUG_RETURN(0);
}
int show_slave_hosts(THD* thd)
{
List<Item> field_list;
NET* net = &thd->net;
String* packet = &thd->packet;
DBUG_ENTER("show_slave_hosts");
field_list.push_back(new Item_empty_string("Server_id", 20));
field_list.push_back(new Item_empty_string("Host", 20));
if (opt_show_slave_auth_info)
{
field_list.push_back(new Item_empty_string("User",20));
field_list.push_back(new Item_empty_string("Password",20));
}
field_list.push_back(new Item_empty_string("Port",20));
field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
field_list.push_back(new Item_empty_string("Master_id", 20));
if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
pthread_mutex_lock(&LOCK_slave_list);
for (uint i = 0; i < slave_list.records; ++i)
{
SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i);
packet->length(0);
net_store_data(packet, si->server_id);
net_store_data(packet, si->host);
if (opt_show_slave_auth_info)
{
net_store_data(packet, si->user);
net_store_data(packet, si->password);
}
net_store_data(packet, (uint32) si->port);
net_store_data(packet, si->rpl_recovery_rank);
net_store_data(packet, si->master_id);
if (my_net_write(net, (char*)packet->ptr(), packet->length()))
{
pthread_mutex_unlock(&LOCK_slave_list);
DBUG_RETURN(-1);
}
}
pthread_mutex_unlock(&LOCK_slave_list);
send_eof(net);
DBUG_RETURN(0);
}
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
{
DBUG_ENTER("connect_to_master");
if (!mi->host || !*mi->host) /* empty host */
{
DBUG_PRINT("error",("empty hostname"));
DBUG_RETURN(1);
}
if (!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0,
slave_net_timeout))
{
sql_print_error("Connection to master failed: %s",
mc_mysql_error(mysql));
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
static inline void cleanup_mysql_results(MYSQL_RES* db_res,
MYSQL_RES** cur, MYSQL_RES** start)
{
for (; cur >= start; --cur)
{
if (*cur)
mc_mysql_free_result(*cur);
}
mc_mysql_free_result(db_res);
}
static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
MYSQL_RES* table_res, MASTER_INFO* mi)
{
MYSQL_ROW row;
for (row = mc_mysql_fetch_row(table_res); row;
row = mc_mysql_fetch_row(table_res))
{
TABLE_LIST table;
const char* table_name = row[0];
int error;
if (table_rules_on)
{
table.next = 0;
table.db = (char*)db;
table.real_name = (char*)table_name;
table.updating = 1;
if (!tables_ok(thd, &table))
continue;
}
if ((error = fetch_master_table(thd, db, table_name, mi, mysql)))
return error;
}
return 0;
}
/*
Load all MyISAM tables from master to this slave.
REQUIREMENTS
- No active transaction (flush_relay_log_info would not work in this case)
*/
int load_master_data(THD* thd)
{
MYSQL mysql;
MYSQL_RES* master_status_res = 0;
int error = 0;
const char* errmsg=0;
int restart_thread_mask;
mc_mysql_init(&mysql);
/*
We do not want anyone messing with the slave at all for the entire
duration of the data load.
*/
LOCK_ACTIVE_MI;
lock_slave_threads(active_mi);
init_thread_mask(&restart_thread_mask,active_mi,0 /*not inverse*/);
if (restart_thread_mask &&
(error=terminate_slave_threads(active_mi,restart_thread_mask,
1 /*skip lock*/)))
{
send_error(&thd->net,error);
unlock_slave_threads(active_mi);
UNLOCK_ACTIVE_MI;
return 1;
}
if (connect_to_master(thd, &mysql, active_mi))
{
net_printf(&thd->net, error = ER_CONNECT_TO_MASTER,
mc_mysql_error(&mysql));
goto err;
}
// now that we are connected, get all database and tables in each
{
MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
uint num_dbs;
if (mc_mysql_query(&mysql, "show databases", 0) ||
!(db_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
if (!(num_dbs = (uint) mc_mysql_num_rows(db_res)))
goto err;
/*
In theory, the master could have no databases at all
and run with skip-grant
*/
if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
{
net_printf(&thd->net, error = ER_OUTOFMEMORY);
goto err;
}
/*
This is a temporary solution until we have online backup
capabilities - to be replaced once online backup is working
we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
can to minimize the lock time.
*/
if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) ||
mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
!(master_status_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
/*
Go through every table in every database, and if the replication
rules allow replicating it, get it
*/
table_res_end = table_res + num_dbs;
for (cur_table_res = table_res; cur_table_res < table_res_end;
cur_table_res++)
{
// since we know how many rows we have, this can never be NULL
MYSQL_ROW row = mc_mysql_fetch_row(db_res);
char* db = row[0];
/*
Do not replicate databases excluded by rules
also skip mysql database - in most cases the user will
mess up and not exclude mysql database with the rules when
he actually means to - in this case, he is up for a surprise if
his priv tables get dropped and downloaded from master
TODO - add special option, not enabled
by default, to allow inclusion of mysql database into load
data from master
*/
if (!db_ok(db, replicate_do_db, replicate_ignore_db) ||
!strcmp(db,"mysql"))
{
*cur_table_res = 0;
continue;
}
if (mysql_rm_db(thd, db, 1,1) ||
mysql_create_db(thd, db, 0, 1))
{
send_error(&thd->net, 0, 0);
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
goto err;
}
if (mc_mysql_select_db(&mysql, db) ||
mc_mysql_query(&mysql, "show tables", 0) ||
!(*cur_table_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
goto err;
}
if ((error = fetch_db_tables(thd,&mysql,db,*cur_table_res,active_mi)))
{
// we do not report the error - fetch_db_tables handles it
cleanup_mysql_results(db_res, cur_table_res, table_res);
goto err;
}
}
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
// adjust position in the master
if (master_status_res)
{
MYSQL_ROW row = mc_mysql_fetch_row(master_status_res);
/*
We need this check because the master may not be running with
log-bin, but it will still allow us to do all the steps
of LOAD DATA FROM MASTER - no reason to forbid it, really,
although it does not make much sense for the user to do it
*/
if (row[0] && row[1])
{
strmake(active_mi->master_log_name, row[0],
sizeof(active_mi->master_log_name));
active_mi->master_log_pos = strtoull(row[1], (char**) 0, 10);
// don't hit the magic number
if (active_mi->master_log_pos < BIN_LOG_HEADER_SIZE)
active_mi->master_log_pos = BIN_LOG_HEADER_SIZE;
active_mi->rli.pending = 0;
flush_master_info(active_mi);
}
mc_mysql_free_result(master_status_res);
}
if (mc_mysql_query(&mysql, "UNLOCK TABLES", 0))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
}
thd->proc_info="purging old relay logs";
if (purge_relay_logs(&active_mi->rli,thd,
0 /* not only reset, but also reinit */,
&errmsg))
{
send_error(&thd->net, 0, "Failed purging old relay logs");
unlock_slave_threads(active_mi);
UNLOCK_ACTIVE_MI;
return 1;
}
pthread_mutex_lock(&active_mi->rli.data_lock);
active_mi->rli.master_log_pos = active_mi->master_log_pos;
strmake(active_mi->rli.master_log_name,active_mi->master_log_name,
sizeof(active_mi->rli.master_log_name)-1);
flush_relay_log_info(&active_mi->rli);
pthread_cond_broadcast(&active_mi->rli.data_cond);
pthread_mutex_unlock(&active_mi->rli.data_lock);
thd->proc_info = "starting slave";
if (restart_thread_mask)
{
error=start_slave_threads(0 /* mutex not needed */,
1 /* wait for start */,
active_mi,master_info_file,relay_log_info_file,
restart_thread_mask);
}
err:
unlock_slave_threads(active_mi);
UNLOCK_ACTIVE_MI;
thd->proc_info = 0;
mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
if (!error)
send_ok(&thd->net);
return error;
}