This commit is contained in:
monty@hundin.mysql.fi 2002-02-08 03:29:29 +02:00
commit d3cfc86a97
8 changed files with 194 additions and 43 deletions

View file

@ -556,6 +556,8 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Query_log_event(buf, event_len, old_format);
break;
case LOAD_EVENT:
ev = new Create_file_log_event(buf, event_len, old_format);
break;
case NEW_LOAD_EVENT:
ev = new Load_log_event(buf, event_len, old_format);
break;
@ -566,7 +568,7 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Slave_log_event(buf, event_len);
break;
case CREATE_FILE_EVENT:
ev = new Create_file_log_event(buf, event_len);
ev = new Create_file_log_event(buf, event_len, old_format);
break;
case APPEND_BLOCK_EVENT:
ev = new Append_block_log_event(buf, event_len);
@ -959,6 +961,12 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
if (use_new_format)
{
empty_flags=0;
/* the code below assumes that buf will not disappear from
under our feet during the lifetime of the event. This assumption
holds true in the slave thread if the log is in new format, but is not
the case when we have old format because we will be reusing net buffer
to read the actual file before we write out the Create_file event
*/
if (read_str(buf, buf_end, field_term, field_term_len) ||
read_str(buf, buf_end, enclosed, enclosed_len) ||
read_str(buf, buf_end, line_term, line_term_len) ||
@ -970,11 +978,11 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
else
{
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
*field_term=*buf++;
*enclosed= *buf++;
*line_term= *buf++;
*line_start=*buf++;
*escaped= *buf++;
field_term = buf++;
enclosed= buf++;
line_term= buf++;
line_start= buf++;
escaped= buf++;
opt_flags = *buf++;
empty_flags=*buf++;
if (empty_flags & FIELD_TERM_EMPTY)
@ -1095,7 +1103,9 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
db_len = (uint)data_head[L_DB_LEN_OFFSET];
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
int body_offset = get_data_body_offset();
int body_offset = (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
LOAD_HEADER_LEN + OLD_HEADER_LEN : get_data_body_offset();
if ((int) event_len < body_offset)
return 1;
//sql_ex.init() on success returns the pointer to the first byte after
@ -1117,7 +1127,6 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
table_name = fields + field_block_len;
db = table_name + table_name_len + 1;
fname = db + db_len + 1;
int type_code = get_type_code();
fname_len = strlen(fname);
// null termination is accomplished by the caller doing buf[event_len]=0
return 0;
@ -1367,20 +1376,29 @@ int Create_file_log_event::write_base(IO_CACHE* file)
return res;
}
Create_file_log_event::Create_file_log_event(const char* buf, int len):
Load_log_event(buf,0,0),fake_base(0),block(0)
Create_file_log_event::Create_file_log_event(const char* buf, int len,
bool old_format):
Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0)
{
int block_offset;
if (copy_log_event(buf,len,0))
if (copy_log_event(buf,len,old_format))
return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
+ LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname
if (len < block_offset)
return;
block = (char*)buf + block_offset;
block_len = len - block_offset;
if (!old_format)
{
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
+ LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname
if (len < block_offset)
return;
block = (char*)buf + block_offset;
block_len = len - block_offset;
}
else
{
sql_ex.force_new_format();
inited_from_old = 1;
}
}
@ -1568,6 +1586,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
int expected_error,actual_error = 0;
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = rewrite_db((char*)db);
DBUG_ASSERT(q_len == strlen(query));
if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->query = (char*)query;
@ -1739,11 +1758,12 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
int Start_log_event::exec_event(struct st_relay_log_info* rli)
{
close_temporary_tables(thd);
// if we have old format, load_tmpdir is cleaned up by the I/O thread
// TODO: cleanup_load_tmpdir() needs to remove only the files associated
// with the server id that has just started
if (!rli->mi->old_format)
{
close_temporary_tables(thd);
cleanup_load_tmpdir();
}
return Log_event::exec_event(rli);
}

View file

@ -64,6 +64,8 @@ struct old_sql_ex
char empty_flags;
};
#define NUM_LOAD_DELIM_STRS 5
struct sql_ex_info
{
@ -153,8 +155,8 @@ struct sql_ex_info
#define L_THREAD_ID_OFFSET 0
#define L_EXEC_TIME_OFFSET 4
#define L_SKIP_LINES_OFFSET 8
#define L_DB_LEN_OFFSET 12
#define L_TBL_LEN_OFFSET 13
#define L_TBL_LEN_OFFSET 12
#define L_DB_LEN_OFFSET 13
#define L_NUM_FIELDS_OFFSET 14
#define L_SQL_EX_OFFSET 18
#define L_DATA_OFFSET LOAD_HEADER_LEN
@ -570,6 +572,7 @@ public:
char* block;
uint block_len;
uint file_id;
bool inited_from_old;
#ifndef MYSQL_CLIENT
Create_file_log_event(THD* thd, sql_exchange* ex, const char* db_arg,
const char* table_name_arg,
@ -578,7 +581,7 @@ public:
char* block_arg, uint block_len_arg);
#endif
Create_file_log_event(const char* buf, int event_len);
Create_file_log_event(const char* buf, int event_len, bool old_format);
~Create_file_log_event()
{
}
@ -591,7 +594,7 @@ public:
4 + 1 + block_len;}
int get_data_body_offset() { return fake_base ? LOAD_EVENT_OVERHEAD:
LOAD_EVENT_OVERHEAD + CREATE_FILE_HEADER_LEN; }
bool is_valid() { return block != 0; }
bool is_valid() { return inited_from_old || block != 0; }
int write_data_header(IO_CACHE* file);
int write_data_body(IO_CACHE* file);
int write_base(IO_CACHE* file); // cut out Create_file extentions and

View file

@ -48,6 +48,7 @@ char *sql_strmake(const char *str,uint len);
gptr sql_memdup(const void * ptr,unsigned size);
void sql_element_free(void *ptr);
void kill_one_thread(THD *thd, ulong id);
int net_request_file(NET* net, const char* fname);
char* query_table_status(THD *thd,const char *db,const char *table_name);
#define x_free(A) { my_free((gptr) (A),MYF(MY_WME | MY_FAE | MY_ALLOW_ZERO_PTR)); }

View file

@ -814,3 +814,14 @@ my_net_read(NET *net)
#endif /* HAVE_COMPRESS */
return len;
}
int net_request_file(NET* net, const char* fname)
{
char tmp [FN_REFLEN+1],*end;
DBUG_ENTER("net_request_file");
tmp[0] = (char) 251; /* NULL_LENGTH */
end=strnmov(tmp+1,fname,sizeof(tmp)-2);
DBUG_RETURN(my_net_write(net,tmp,(uint) (end-tmp)) ||
net_flush(net));
}

View file

@ -828,6 +828,7 @@ int load_master_data(THD* thd)
active_mi->rli.master_log_pos = active_mi->master_log_pos;
strnmov(active_mi->rli.master_log_name,active_mi->master_log_name,
sizeof(active_mi->rli.master_log_name));
flush_relay_log_info(&active_mi->rli);
pthread_cond_broadcast(&active_mi->rli.data_cond);
pthread_mutex_unlock(&active_mi->rli.data_lock);
thd->proc_info = "starting slave";

View file

@ -55,6 +55,7 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
void skip_load_data_infile(NET* net);
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
static int queue_old_event(MASTER_INFO* mi, const char* buf,
uint event_len);
static inline bool slave_killed(THD* thd,MASTER_INFO* mi);
@ -654,7 +655,7 @@ char* rewrite_db(char* db)
int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list )
{
if(do_list.is_empty() && ignore_list.is_empty())
if (do_list.is_empty() && ignore_list.is_empty())
return 1; // ok to replicate if the user puts no constraints
// if the user has specified restrictions on which databases to replicate
@ -1058,6 +1059,8 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
if (init_relay_log_info(&mi->rli, slave_info_fname))
return 1;
mi->rli.mi = mi;
mi->mysql=0;
mi->file_id=1;
mi->ignore_stop_event=0;
int fd,error;
MY_STAT stat_area;
@ -1621,7 +1624,7 @@ slave_begin:
DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
mi->master_log_name, llstr(mi->master_log_pos,llbuff)));
if (!(mysql = mc_mysql_init(NULL)))
if (!(mi->mysql = mysql = mc_mysql_init(NULL)))
{
sql_print_error("Slave I/O thread: error in mc_mysql_init()");
goto err;
@ -1780,8 +1783,11 @@ err:
sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
thd->query = thd->db = 0; // extra safety
if(mysql)
if (mysql)
{
mc_mysql_close(mysql);
mi->mysql=0;
}
thd->proc_info = "Waiting for slave mutex on exit";
pthread_mutex_lock(&mi->run_lock);
mi->slave_running = 0;
@ -1790,7 +1796,7 @@ err:
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
mi->abort_slave = 0; // TODO: check if this is needed
DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because we are weird
net_end(&thd->net); // destructor will not free it, because net.vio is 0
pthread_mutex_lock(&LOCK_thread_count);
delete thd;
pthread_mutex_unlock(&LOCK_thread_count);
@ -1926,11 +1932,97 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
DBUG_RETURN(0); // Can't return anything here
}
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
int error = 1;
ulong num_bytes;
bool cev_not_written;
THD* thd;
NET* net = &mi->mysql->net;
if (unlikely(!cev->is_valid()))
return 1;
/*
TODO: fix to honor table rules, not only db rules
*/
if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
{
skip_load_data_infile(net);
return 0;
}
DBUG_ASSERT(cev->inited_from_old);
thd = mi->io_thd;
thd->file_id = cev->file_id = mi->file_id++;
cev_not_written = 1;
if (unlikely(net_request_file(net,cev->fname)))
{
sql_print_error("Slave I/O: failed requesting download of '%s'",
cev->fname);
goto err;
}
/* this dummy block is so we could insantiate Append_block_log_event
once and then modify it slightly instead of doing it multiple times
in the loop
*/
{
Append_block_log_event aev(thd,0,0);
for (;;)
{
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
{
sql_print_error("Network read error downloading '%s' from master",
cev->fname);
goto err;
}
if (unlikely(!num_bytes)) /* eof */
{
send_ok(net); /* 3.23 master wants it */
Execute_load_log_event xev(mi->io_thd);
if (unlikely(mi->rli.relay_log.append(&xev)))
{
sql_print_error("Slave I/O: error writing Exec_load event to \
relay log");
goto err;
}
break;
}
if (unlikely(cev_not_written))
{
cev->block = (char*)net->read_pos;
cev->block_len = num_bytes;
if (unlikely(mi->rli.relay_log.append(cev)))
{
sql_print_error("Slave I/O: error writing Create_file event to \
relay log");
goto err;
}
cev_not_written=0;
}
else
{
aev.block = (char*)net->read_pos;
aev.block_len = num_bytes;
if (unlikely(mi->rli.relay_log.append(&aev)))
{
sql_print_error("Slave I/O: error writing Append_block event to \
relay log");
goto err;
}
}
}
}
error=0;
err:
return error;
}
// We assume we already locked mi->data_lock
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev)
{
if (!rev->is_valid())
if (unlikely(!rev->is_valid()))
return 1;
DBUG_ASSERT(rev->ident_len<sizeof(mi->master_log_name));
memcpy(mi->master_log_name,rev->new_log_ident,
@ -1961,6 +2053,21 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
const char *errmsg = 0;
bool inc_pos = 1;
bool processed_stop_event = 0;
char* tmp_buf = 0;
/* if we get Load event, we need to pass a non-reusable buffer
to read_log_event, so we do a trick
*/
if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
{
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
{
sql_print_error("Slave I/O: out of memory for Load event");
return 1;
}
memcpy(tmp_buf,buf,event_len);
tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer
buf = (const char*)tmp_buf;
}
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
1 /*old format*/ );
if (unlikely(!ev))
@ -1968,6 +2075,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
sql_print_error("Read invalid event from master: '%s',\
master could be corrupt but a more likely cause of this is a bug",
errmsg);
my_free((char*)tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
return 1;
}
pthread_mutex_lock(&mi->data_lock);
@ -1978,6 +2086,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
{
delete ev;
pthread_mutex_unlock(&mi->data_lock);
DBUG_ASSERT(!tmp_buf);
return 1;
}
mi->ignore_stop_event=1;
@ -1986,12 +2095,16 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
case STOP_EVENT:
processed_stop_event=1;
break;
case LOAD_EVENT:
// TODO: actually process it
mi->master_log_pos += event_len;
case CREATE_FILE_EVENT:
{
int error = process_io_create_file(mi,(Create_file_log_event*)ev);
delete ev;
mi->master_log_pos += event_len;
pthread_mutex_unlock(&mi->data_lock);
return 0;
DBUG_ASSERT(tmp_buf);
my_free((char*)tmp_buf, MYF(0));
return error;
}
default:
mi->ignore_stop_event=0;
break;
@ -2002,6 +2115,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
{
delete ev;
pthread_mutex_unlock(&mi->data_lock);
DBUG_ASSERT(!tmp_buf);
return 1;
}
}
@ -2011,6 +2125,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
if (unlikely(processed_stop_event))
mi->ignore_stop_event=1;
pthread_mutex_unlock(&mi->data_lock);
DBUG_ASSERT(!tmp_buf);
return 0;
}
@ -2173,7 +2288,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
int flush_relay_log_info(RELAY_LOG_INFO* rli)
{
IO_CACHE* file = &rli->info_file;
register IO_CACHE* file = &rli->info_file;
char lbuf[22],lbuf1[22];
my_b_seek(file, 0L);
@ -2251,7 +2366,10 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
}
DBUG_ASSERT(my_b_tell(cur_log) >= 4);
DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
if ((ev=Log_event::read_log_event(cur_log,0,rli->mi->old_format)))
/* relay log is always in new format - if the master is 3.23, the
I/O thread will convert the format for us
*/
if ((ev=Log_event::read_log_event(cur_log,0,(bool)0/*new format*/)))
{
DBUG_ASSERT(thd==rli->sql_thd);
if (hot_log)

View file

@ -254,6 +254,8 @@ typedef struct st_master_info
pthread_mutex_t data_lock,run_lock;
pthread_cond_t data_cond,start_cond,stop_cond;
THD *io_thd;
MYSQL* mysql;
uint32 file_id; // for 3.23 load data infile
RELAY_LOG_INFO rli;
uint port;
uint connect_retry;

View file

@ -147,12 +147,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (read_file_from_client && (thd->client_capabilities & CLIENT_LOCAL_FILES))
{
char tmp [FN_REFLEN+1],*end;
DBUG_PRINT("info",("reading local file"));
tmp[0] = (char) 251; /* NULL_LENGTH */
end=strnmov(tmp+1,ex->file_name,sizeof(tmp)-2);
(void) my_net_write(&thd->net,tmp,(uint) (end-tmp));
(void) net_flush(&thd->net);
(void)net_request_file(&thd->net,ex->file_name);
file = -1;
}
else