/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "mysql_priv.h" #include #include "mini_client.h" #include #include pthread_handler_decl(handle_slave,arg); extern bool volatile abort_loop, abort_slave; // the master variables are defaults read from my.cnf or command line extern uint master_port, master_connect_retry; extern my_string master_user, master_password, master_host, master_info_file; extern I_List replicate_do_db, replicate_ignore_db; extern I_List threads; bool slave_running = 0; pthread_t slave_real_id; MASTER_INFO glob_mi; extern bool opt_log_slave_updates ; static inline bool slave_killed(THD* thd); static int init_slave_thread(THD* thd); int init_master_info(MASTER_INFO* mi); static void safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static void safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_sleep(THD* thd, int sec); static int request_table_dump(MYSQL* mysql, char* db, char* table); static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name); static inline bool slave_killed(THD* thd) { return abort_slave || abort_loop || thd->killed; } int db_ok(const char* db, I_List &do_list, I_List &ignore_list ) { if(do_list.is_empty() && ignore_list.is_empty()) return 1; // ok to replicate if the user puts no constraints if(!db) return 0; // if the user has specified restrictions on which databases to replicate // and db was not selected, do not replicate if(!do_list.is_empty()) // if the do's are not empty { I_List_iterator it(do_list); i_string* tmp; while((tmp=it++)) { if(!strcmp(tmp->ptr, db)) return 1; // match } return 0; } else // there are some elements in the don't, otherwise we cannot get here { I_List_iterator it(ignore_list); i_string* tmp; while((tmp=it++)) { if(!strcmp(tmp->ptr, db)) return 0; // match } return 1; } // impossible return 0; } static void init_strvar_from_file(char* var, int max_size, FILE* f, char* default_val) { if(fgets(var, max_size, f)) { *(strend(var)-1) = 0; } else if(default_val) strmake(var, default_val, max_size); } static void init_intvar_from_file(int* var, FILE* f, int default_val) { char buf[32]; if(fgets(buf, sizeof(buf), f)) { *var = atoi(buf); } else if(default_val) *var = default_val; } static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name) { uint packet_len = my_net_read(net); // read create table statement TABLE_LIST tables; int error = 0; if(packet_len == packet_error) { send_error(&thd->net, ER_MASTER_NET_READ); return 1; } if(net->read_pos[0] == 255) // error from master { net->read_pos[packet_len] = 0; net_printf(&thd->net, ER_MASTER, net->read_pos + 3); return 1; } thd->command = COM_TABLE_DUMP; thd->query = sql_alloc(packet_len + 1); if(!thd->query) { sql_print_error("create_table_from_dump: out of memory"); net_printf(&thd->net, ER_GET_ERRNO, "Out of memory"); return 1; } memcpy(thd->query, net->read_pos, packet_len); thd->query[packet_len] = 0; thd->current_tablenr = 0; thd->query_error = 0; thd->net.no_send_ok = 1; thd->proc_info = "Creating table from master dump"; char* save_db = thd->db; thd->db = thd->last_nx_db; // in case we are creating in a different // database mysql_parse(thd, thd->query, packet_len); // run create table thd->db = save_db; // leave things the way the were before if(thd->query_error) { close_thread_tables(thd); // mysql_parse takes care of the error send return 1; } bzero((char*) &tables,sizeof(tables)); tables.db = (char*)db; tables.name = tables.real_name = (char*)table_name; tables.lock_type = TL_WRITE; thd->proc_info = "Opening master dump table"; if(!open_ltable(thd, &tables, TL_WRITE)) { // open tables will send the error sql_print_error("create_table_from_dump: could not open created table"); close_thread_tables(thd); return 1; } handler *file = tables.table->file; thd->proc_info = "Reading master dump table data"; if(file->net_read_dump(net)) { net_printf(&thd->net, ER_MASTER_NET_READ); sql_print_error("create_table_from_dump::failed in\ handler::net_read_dump()"); close_thread_tables(thd); return 1; } HA_CHECK_OPT check_opt; check_opt.init(); check_opt.quick = 1; thd->proc_info = "rebuilding the index on master dump table"; Vio* save_vio = thd->net.vio; thd->net.vio = 0; // we do not want repair() to spam us with messages // just send them to the error log, and report the failure in case of // problems if(file->repair(thd,&check_opt )) { net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name ); error = 1; } thd->net.vio = save_vio; close_thread_tables(thd); thd->net.no_send_ok = 0; return error; } int fetch_nx_table(THD* thd, MASTER_INFO* mi) { MYSQL* mysql = mc_mysql_init(NULL); int error = 1; int nx_errno = 0; if(!mysql) { sql_print_error("fetch_nx_table: Error in mysql_init()"); nx_errno = ER_GET_ERRNO; goto err; } safe_connect(thd, mysql, mi); if(slave_killed(thd)) goto err; if(request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table)) { nx_errno = ER_GET_ERRNO; sql_print_error("fetch_nx_table: failed on table dump request "); goto err; } if(create_table_from_dump(thd, &mysql->net, thd->last_nx_db, thd->last_nx_table)) { // create_table_from_dump will have sent the error alread sql_print_error("fetch_nx_table: failed on create table "); goto err; } error = 0; err: if(mysql) mc_mysql_close(mysql); if(nx_errno && thd->net.vio) send_error(&thd->net, nx_errno, "Error in fetch_nx_table"); return error; } int init_master_info(MASTER_INFO* mi) { FILE* file; MY_STAT stat_area; char fname[FN_REFLEN]; fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32); if(!mi->inited) pthread_mutex_init(&mi->lock, NULL); // we need a mutex while we are changing master info parameters to // keep other threads from reading bogus info pthread_mutex_lock(&mi->lock); if(!my_stat(fname, &stat_area, MYF(0))) // we do not want any messages // if the file does not exist { file = my_fopen(fname, O_CREAT|O_RDWR, MYF(MY_WME)); if(!file) return 1; mi->log_file_name[0] = 0; mi->pos = 0; mi->file = file; if(master_host) strmake(mi->host, master_host, sizeof(mi->host)); if(master_user) strmake(mi->user, master_user, sizeof(mi->user)); if(master_password) strmake(mi->password, master_password, sizeof(mi->password)); mi->port = master_port; mi->connect_retry = master_connect_retry; if(flush_master_info(mi)) return 1; } else { file = my_fopen(fname, O_RDWR, MYF(MY_WME)); if(!file) return 1; if(!fgets(mi->log_file_name, sizeof(mi->log_file_name), file)) { sql_print_error("Error reading log file name from master info file "); return 1; } *(strend(mi->log_file_name) - 1) = 0; // kill \n char buf[FN_REFLEN]; if(!fgets(buf, sizeof(buf), file)) { sql_print_error("Error reading log file position from master info file"); return 1; } mi->pos = atoi(buf); mi->file = file; init_strvar_from_file(mi->host, sizeof(mi->host), file, master_host); init_strvar_from_file(mi->user, sizeof(mi->user), file, master_user); init_strvar_from_file(mi->password, sizeof(mi->password), file, master_password); init_intvar_from_file((int*)&mi->port, file, master_port); init_intvar_from_file((int*)&mi->connect_retry, file, master_connect_retry); } mi->inited = 1; pthread_mutex_unlock(&mi->lock); return 0; } int show_master_info(THD* thd) { DBUG_ENTER("show_master_info"); List field_list; field_list.push_back(new Item_empty_string("Master_Host", sizeof(glob_mi.host))); field_list.push_back(new Item_empty_string("Master_User", sizeof(glob_mi.user))); field_list.push_back(new Item_empty_string("Master_Port", 6)); field_list.push_back(new Item_empty_string("Connect_retry", 6)); field_list.push_back( new Item_empty_string("Log_File", FN_REFLEN)); field_list.push_back(new Item_empty_string("Pos", 12)); field_list.push_back(new Item_empty_string("Slave_Running", 3)); field_list.push_back(new Item_empty_string("Replicate_do_db", 20)); field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20)); if(send_fields(thd, field_list, 1)) DBUG_RETURN(-1); String* packet = &thd->packet; packet->length(0); pthread_mutex_lock(&glob_mi.lock); net_store_data(packet, glob_mi.host); net_store_data(packet, glob_mi.user); net_store_data(packet, (uint32) glob_mi.port); net_store_data(packet, (uint32) glob_mi.connect_retry); net_store_data(packet, glob_mi.log_file_name); net_store_data(packet, (longlong)glob_mi.pos); pthread_mutex_unlock(&glob_mi.lock); pthread_mutex_lock(&LOCK_slave); net_store_data(packet, slave_running ? "Yes":"No"); pthread_mutex_unlock(&LOCK_slave); net_store_data(packet, &replicate_do_db); net_store_data(packet, &replicate_ignore_db); if(my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length())) DBUG_RETURN(-1); send_eof(&thd->net); DBUG_RETURN(0); } int flush_master_info(MASTER_INFO* mi) { FILE* file = mi->file; char lbuf[22]; if(my_fseek(file, 0L, MY_SEEK_SET, MYF(MY_WME)) == MY_FILEPOS_ERROR || fprintf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n", mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user, mi->password, mi->port, mi->connect_retry) < 0 || fflush(file)) { sql_print_error("Write error flushing master_info: %d", errno); return 1; } return 0; } static int init_slave_thread(THD* thd) { DBUG_ENTER("init_slave_thread"); thd->system_thread = thd->bootstrap = 1; thd->client_capabilities = 0; my_net_init(&thd->net, 0); thd->max_packet_length=thd->net.max_packet; thd->master_access= ~0; thd->priv_user = 0; thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_COMMIT | OPTION_AUTO_IS_NULL) ; thd->system_thread = 1; thd->client_capabilities = CLIENT_LOCAL_FILES; slave_real_id=thd->real_id=pthread_self(); pthread_mutex_lock(&LOCK_thread_count); thd->thread_id = thread_id++; pthread_mutex_unlock(&LOCK_thread_count); if (init_thr_lock() || my_pthread_setspecific_ptr(THR_THD, thd) || my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) || my_pthread_setspecific_ptr(THR_NET, &thd->net)) { close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed? end_thread(thd,0); DBUG_RETURN(-1); } thd->mysys_var=my_thread_var; thd->dbug_thread_id=my_thread_id(); #ifndef __WIN__ sigset_t set; VOID(sigemptyset(&set)); // Get mask in use VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); #endif thd->mem_root.free=thd->mem_root.used=0; // Probably not needed if (thd->max_join_size == (ulong) ~0L) thd->options |= OPTION_BIG_SELECTS; thd->proc_info="Waiting for master update"; thd->version=refresh_version; thd->set_time(); DBUG_RETURN(0); } static int safe_sleep(THD* thd, int sec) { thr_alarm_t alarmed; thr_alarm_init(&alarmed); time_t start_time= time((time_t*) 0); time_t end_time= start_time+sec; ALARM alarm_buff; while (start_time < end_time) { int nap_time = (int) (end_time - start_time); thr_alarm(&alarmed, 2 * nap_time,&alarm_buff); // the only reason we are asking for alarm is so that // we will be woken up in case of murder, so if we do not get killed, set the alarm // so it goes off after we wake up naturally sleep(nap_time); if (thr_alarm_in_use(&alarmed)) // if we wake up before the alarm goes off, hit the button thr_end_alarm(&alarmed); // so it will not wake up the wife and kids :-) if (slave_killed(thd)) return 1; start_time=time((time_t*) 0); } return 0; } static int request_dump(MYSQL* mysql, MASTER_INFO* mi) { char buf[FN_REFLEN + 6]; int len; int binlog_flags = 0; // for now char* logname = mi->log_file_name; int4store(buf, mi->pos); int2store(buf + 4, binlog_flags); len = (uint) strlen(logname); memcpy(buf + 6, logname,len); if(mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 6, 1)) // something went wrong, so we will just reconnect and retry later // in the future, we should do a better error analysis, but for // now we just fill up the error log :-) { sql_print_error("Error on COM_BINLOG_DUMP: %s, will retry in %d secs", mc_mysql_error(mysql), master_connect_retry); return 1; } return 0; } static int request_table_dump(MYSQL* mysql, char* db, char* table) { char buf[1024]; char * p = buf; uint table_len = (uint) strlen(table); uint db_len = (uint) strlen(db); if(table_len + db_len > sizeof(buf) - 2) { sql_print_error("request_table_dump: Buffer overrun"); return 1; } *p++ = db_len; memcpy(p, db, db_len); p += db_len; *p++ = table_len; memcpy(p, table, table_len); if(mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1)) { sql_print_error("request_table_dump: Error sending the table dump \ command"); return 1; } return 0; } static uint read_event(MYSQL* mysql, MASTER_INFO *mi) { uint len = packet_error; NET* net = &mysql->net; int read_errno = EINTR; // for convinience lets think we start by // being in the interrupted state :-) // my_real_read() will time us out // we check if we were told to die, and if not, try reading again while (!abort_loop && !abort_slave && len == packet_error && read_errno == EINTR ) { len = mc_net_safe_read(mysql); read_errno = errno; } if(abort_loop || abort_slave) return packet_error; if (len == packet_error || (int) len < 1) { sql_print_error("Error reading packet from server: %s (%d)", mc_mysql_error(mysql), read_errno); return packet_error; } if(len == 1) { sql_print_error("Received 0 length packet from server, looks like master shutdown: %s (%d)", mc_mysql_error(mysql), read_errno); return packet_error; } DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n", len, net->read_pos[4])); return len - 1; } static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) { Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1 , event_len); if(ev) { switch(ev->get_type_code()) { case QUERY_EVENT: { Query_log_event* qev = (Query_log_event*)ev; int q_len = qev->q_len; init_sql_alloc(&thd->mem_root, 8192); thd->db = (char*)qev->db; if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) { thd->query = (char*)qev->query; thd->set_time((time_t)qev->when); thd->current_tablenr = 0; VOID(pthread_mutex_lock(&LOCK_thread_count)); thd->query_id = query_id++; VOID(pthread_mutex_unlock(&LOCK_thread_count)); thd->last_nx_table = thd->last_nx_db = 0; for(;;) { thd->query_error = 0; // clear error thd->last_nx_table = thd->last_nx_db = 0; thd->net.last_errno = 0; thd->net.last_error[0] = 0; mysql_parse(thd, thd->query, q_len); // try query if(!thd->query_error || slave_killed(thd)) // break if ok break; // if not ok if(thd->last_nx_table && thd->last_nx_db) { // for now, let's just fail if the table is not // there, and not try to be a smart alec... // if table was not there //if(fetch_nx_table(thd,&glob_mi)) // try to to fetch from master break; // if we can't, just break } else break; // if failed for some other reason, bail out // if fetched the table from master successfully // we need to restore query info in thd because // fetch_nx_table executes create table thd->query = (char*)qev->query; thd->set_time((time_t)qev->when); thd->current_tablenr = 0; } } thd->db = 0;// prevent db from being freed thd->query = 0; // just to be sure close_thread_tables(thd); free_root(&thd->mem_root); if (thd->query_error) { sql_print_error("Slave: error running query '%s' ", qev->query); return 1; } delete ev; if(thd->fatal_error) { sql_print_error("Slave: Fatal error running query '%s' ", thd->query); return 1; } mi->inc_pos(event_len); flush_master_info(mi); break; } case LOAD_EVENT: { Load_log_event* lev = (Load_log_event*)ev; init_sql_alloc(&thd->mem_root, 8192); thd->db = (char*)lev->db; thd->query = 0; thd->query_error = 0; if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) { thd->set_time((time_t)lev->when); thd->current_tablenr = 0; VOID(pthread_mutex_lock(&LOCK_thread_count)); thd->query_id = query_id++; VOID(pthread_mutex_unlock(&LOCK_thread_count)); enum enum_duplicates handle_dup = DUP_IGNORE; if(lev->sql_ex.opt_flags && REPLACE_FLAG) handle_dup = DUP_REPLACE; sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags && DUMPFILE_FLAG ); String field_term(&lev->sql_ex.field_term, 1), enclosed(&lev->sql_ex.enclosed, 1), line_term(&lev->sql_ex.line_term,1), escaped(&lev->sql_ex.escaped, 1), line_start(&lev->sql_ex.line_start, 1); ex.field_term = &field_term; if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) ex.field_term->length(0); ex.enclosed = &enclosed; if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY) ex.enclosed->length(0); ex.line_term = &line_term; if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY) ex.line_term->length(0); ex.line_start = &line_start; if(lev->sql_ex.empty_flags & LINE_START_EMPTY) ex.line_start->length(0); ex.escaped = &escaped; if(lev->sql_ex.empty_flags & ESCAPED_EMPTY) ex.escaped->length(0); ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG); if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) ex.field_term->length(0); ex.skip_lines = lev->skip_lines; TABLE_LIST tables; bzero((char*) &tables,sizeof(tables)); tables.db = thd->db; tables.name = tables.real_name = (char*)lev->table_name; tables.lock_type = TL_WRITE; if (open_tables(thd, &tables)) { sql_print_error("Slave: error opening table %s ", tables.name); delete ev; return 1; } List fields; lev->set_fields(fields); thd->net.vio = net->vio; // mysql_load will use thd->net to read the file thd->net.pkt_nr = net->pkt_nr; // make sure the client does get confused // about the packet sequence if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1, TL_WRITE)) thd->query_error = 1; net->pkt_nr = thd->net.pkt_nr; } else // we will just ask the master to send us /dev/null if we do not want to // load the data :-) { (void)my_net_write(net, "\xfb/dev/null", 10); (void)net_flush(net); (void)my_net_read(net); // discard response send_ok(net); // the master expects it } thd->net.vio = 0; thd->db = 0;// prevent db from being freed close_thread_tables(thd); if(thd->query_error) { int sql_error = thd->net.last_errno; if(!sql_error) sql_error = ER_UNKNOWN_ERROR; sql_print_error("Slave: error '%s' running load data infile ", ER(sql_error)); delete ev; return 1; } delete ev; if(thd->fatal_error) { sql_print_error("Slave: Fatal error running query '%s' ", thd->query); return 1; } mi->inc_pos(event_len); flush_master_info(mi); break; } case START_EVENT: mi->inc_pos(event_len); flush_master_info(mi); break; case STOP_EVENT: mi->inc_pos(event_len); flush_master_info(mi); break; case ROTATE_EVENT: { Rotate_log_event* rev = (Rotate_log_event*)ev; int ident_len = rev->ident_len; memcpy(mi->log_file_name, rev->new_log_ident,ident_len ); mi->log_file_name[ident_len] = 0; mi->pos = 0; break; } case INTVAR_EVENT: { Intvar_log_event* iev = (Intvar_log_event*)ev; switch(iev->type) { case LAST_INSERT_ID_EVENT: thd->last_insert_id_used = 1; thd->last_insert_id = iev->val; break; case INSERT_ID_EVENT: thd->next_insert_id = iev->val; break; } mi->inc_pos(event_len); flush_master_info(mi); break; } } } else { sql_print_error("Could not parse log event entry, check the master for binlog corruption\ This may also be a network problem, or just a bug in the master or slave code"); return 1; } return 0; } // slave thread pthread_handler_decl(handle_slave,arg __attribute__((unused))) { THD *thd;; // needs to be first for thread_stack MYSQL *mysql = NULL ; pthread_mutex_lock(&LOCK_slave); if(slave_running) { pthread_mutex_unlock(&LOCK_slave); return 0; // safety just in case } slave_running = 1; abort_slave = 0; pthread_mutex_unlock(&LOCK_slave); int error = 1; my_thread_init(); // needs to be up here, otherwise we get a coredump // trying to use DBUG_ stuff thd = new THD; // note that contructor of THD uses DBUG_ ! DBUG_ENTER("handle_slave"); pthread_detach_this_thread(); if(init_slave_thread(thd) || init_master_info(&glob_mi)) goto err; thd->thread_stack = (char*)&thd; // remember where our stack is threads.append(thd); DBUG_PRINT("info",("master info: log_file_name=%s, position=%d", glob_mi.log_file_name, glob_mi.pos)); mysql = mc_mysql_init(NULL); if(!mysql) { sql_print_error("Slave thread: error in mc_mysql_init()"); goto err; } thd->proc_info = "connecting to master"; safe_connect(thd, mysql, &glob_mi); while(!slave_killed(thd)) { thd->proc_info = "requesting binlog dump"; if(request_dump(mysql, &glob_mi)) { sql_print_error("Failed on request_dump()"); if(slave_killed(thd)) goto err; thd->proc_info = "waiting to reconnect after a failed dump request"; safe_sleep(thd, glob_mi.connect_retry); if(slave_killed(thd)) goto err; thd->proc_info = "reconnecting after a failed dump request"; safe_reconnect(thd, mysql, &glob_mi); if(slave_killed(thd)) goto err; continue; } while(!slave_killed(thd)) { bool reset = 0; thd->proc_info = "reading master update"; uint event_len = read_event(mysql, &glob_mi); if(slave_killed(thd)) goto err; if (event_len == packet_error) { thd->proc_info = "waiting to reconnect after a failed read"; safe_sleep(thd, glob_mi.connect_retry); if(slave_killed(thd)) goto err; thd->proc_info = "reconnecting after a failed read"; safe_reconnect(thd, mysql, &glob_mi); if(slave_killed(thd)) goto err; reset = 1; } if(reset) break; thd->proc_info = "processing master log event"; if(exec_event(thd, &mysql->net, &glob_mi, event_len)) { sql_print_error("Error running query, slave aborted. Fix the problem, and re-start\ the slave thread with mysqladmin start-slave"); goto err; // there was an error running the query // abort the slave thread, when the problem is fixed, the user // should restart the slave with mysqladmin start-slave } } } error = 0; err: thd->query = thd->db = 0; // extra safety if(mysql) mc_mysql_close(mysql); thd->proc_info = "waiting for slave mutex on exit"; pthread_mutex_lock(&LOCK_slave); slave_running = 0; abort_slave = 0; pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done pthread_mutex_unlock(&LOCK_slave); delete thd; my_thread_end(); pthread_exit(0); DBUG_RETURN(0); // Can't return anything here } static void safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) // will try to connect until successful { while(!slave_killed(thd) && !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, mi->port, 0, 0)) { sql_print_error( "Slave thread: error connecting to slave:%s, retry in %d sec", mc_mysql_error(mysql), mi->connect_retry); safe_sleep(thd, mi->connect_retry); } } // will try to connect until successful static void safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) { while(!slave_killed(thd) && mc_mysql_reconnect(mysql)) { sql_print_error( "Slave thread: error connecting to slave:%s, retry in %d sec", mc_mysql_error(mysql), mi->connect_retry); safe_sleep(thd, mi->connect_retry); } } #ifdef __GNUC__ template class I_List_iterator; #endif