mariadb/sql/log_event.cc
unknown 418fcdf16a Do not use 'created' for time anymore in Start_log_event, it's the same
as the already-stored timestamp. Now 'created' is used only to know if
this is a first binlog or not. And we may re-use the superfluous bytes
in 5.0 when we need room.


sql/log_event.cc:
  This sort of reverts a change I made in 3.23.57. In 3.23.57 I set 'created' to 0
  if this was a non-first binlog, so I made mysqlbinlog not print the "created xx"
  part if created == 0. While this was sensible, as 'created' is 0 or equal to
  the timestamp which is already stored in the event, we can always print
  "created xx" by using the timestamp, and leaving the use of 'created' only to
  know if this is a first binlog or not (which we print as "created at startup").
sql/log_event.h:
  In Start_log_event, 'created' is always the same as 'when', or O.
  We didn't need 4 bytes for this, a bit would have been enough (O or
  "same as 'when'"). Possibly in 5.0 we will re-use the useless bytes.
2003-06-16 23:38:33 +02:00

754 lines
18 KiB
C++

/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef MYSQL_CLIENT
#ifdef __GNUC__
#pragma implementation // gcc: Class implementation
#endif
#include "mysql_priv.h"
#endif /* MYSQL_CLIENT */
static void pretty_print_char(FILE* file, int c)
{
fputc('\'', file);
switch(c) {
case '\n': fprintf(file, "\\n"); break;
case '\r': fprintf(file, "\\r"); break;
case '\\': fprintf(file, "\\\\"); break;
case '\b': fprintf(file, "\\b"); break;
case '\'': fprintf(file, "\\'"); break;
case 0 : fprintf(file, "\\0"); break;
default:
fputc(c, file);
break;
}
fputc('\'', file);
}
int Query_log_event::write(IO_CACHE* file)
{
return query ? Log_event::write(file) : -1;
}
int Log_event::write(IO_CACHE* file)
{
return (write_header(file) || write_data(file)) ? -1 : 0;
}
int Log_event::write_header(IO_CACHE* file)
{
// make sure to change this when the header gets bigger
char buf[LOG_EVENT_HEADER_LEN];
char* pos = buf;
int4store(pos, (ulong) when); // timestamp
pos += 4;
*pos++ = get_type_code(); // event type code
int4store(pos, server_id);
pos += 4;
long tmp=get_data_size() + LOG_EVENT_HEADER_LEN;
int4store(pos, tmp);
pos += 4;
return (my_b_write(file, (byte*) buf, (uint) (pos - buf)));
}
#ifndef MYSQL_CLIENT
int Log_event::read_log_event(IO_CACHE* file, String* packet,
pthread_mutex_t* log_lock)
{
ulong data_len;
char buf[LOG_EVENT_HEADER_LEN];
if (log_lock)
pthread_mutex_lock(log_lock);
if (my_b_read(file, (byte*) buf, sizeof(buf)))
{
if (log_lock) pthread_mutex_unlock(log_lock);
// if the read hits eof, we must report it as eof
// so the caller will know it can go into cond_wait to be woken up
// on the next update to the log
if(!file->error) return LOG_READ_EOF;
return file->error > 0 ? LOG_READ_TRUNC: LOG_READ_IO;
}
data_len = uint4korr(buf + EVENT_LEN_OFFSET);
if (data_len < LOG_EVENT_HEADER_LEN || data_len > max_allowed_packet)
{
if (log_lock) pthread_mutex_unlock(log_lock);
return (data_len < LOG_EVENT_HEADER_LEN) ? LOG_READ_BOGUS :
LOG_READ_TOO_LARGE;
}
packet->append(buf, sizeof(buf));
data_len -= LOG_EVENT_HEADER_LEN;
if (data_len)
{
if (packet->append(file, data_len))
{
if(log_lock)
pthread_mutex_unlock(log_lock);
// here we should never hit eof in a non-error condtion
// eof means we are reading the event partially, which should
// never happen
return file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO;
}
}
if (log_lock) pthread_mutex_unlock(log_lock);
return 0;
}
#endif // MYSQL_CLIENT
#ifndef MYSQL_CLIENT
#define UNLOCK_MUTEX if(log_lock) pthread_mutex_unlock(log_lock);
#else
#define UNLOCK_MUTEX
#endif
// allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT
Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock)
#else
Log_event* Log_event::read_log_event(IO_CACHE* file)
#endif
{
time_t timestamp;
uint32 server_id;
char buf[LOG_EVENT_HEADER_LEN-4];
#ifndef MYSQL_CLIENT
if(log_lock) pthread_mutex_lock(log_lock);
#endif
if (my_b_read(file, (byte *) buf, sizeof(buf)))
{
UNLOCK_MUTEX
return NULL;
}
timestamp = uint4korr(buf);
server_id = uint4korr(buf + 5);
switch(buf[EVENT_TYPE_OFFSET])
{
case QUERY_EVENT:
{
Query_log_event* q = new Query_log_event(file, timestamp, server_id);
UNLOCK_MUTEX
if (!q->query)
{
delete q;
q=NULL;
}
return q;
}
case LOAD_EVENT:
{
Load_log_event* l = new Load_log_event(file, timestamp, server_id);
UNLOCK_MUTEX
if (!l->table_name)
{
delete l;
l=NULL;
}
return l;
}
case ROTATE_EVENT:
{
Rotate_log_event* r = new Rotate_log_event(file, timestamp, server_id);
UNLOCK_MUTEX
if (!r->new_log_ident)
{
delete r;
r=NULL;
}
return r;
}
case INTVAR_EVENT:
{
Intvar_log_event* e = new Intvar_log_event(file, timestamp, server_id);
UNLOCK_MUTEX
if (e->type == INVALID_INT_EVENT)
{
delete e;
e=NULL;
}
return e;
}
case START_EVENT:
{
Start_log_event* e = new Start_log_event(file, timestamp, server_id);
UNLOCK_MUTEX
return e;
}
case STOP_EVENT:
{
Stop_log_event* e = new Stop_log_event(file, timestamp, server_id);
UNLOCK_MUTEX
return e;
}
default:
break;
}
// default
UNLOCK_MUTEX
return NULL;
}
Log_event* Log_event::read_log_event(const char* buf, int event_len)
{
if(event_len < EVENT_LEN_OFFSET ||
(uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET))
return NULL; // general sanity check - will fail on a partial read
switch(buf[EVENT_TYPE_OFFSET])
{
case QUERY_EVENT:
{
Query_log_event* q = new Query_log_event(buf, event_len);
if (!q->query)
{
delete q;
return NULL;
}
return q;
}
case LOAD_EVENT:
{
Load_log_event* l = new Load_log_event(buf, event_len);
if (!l->table_name)
{
delete l;
return NULL;
}
return l;
}
case ROTATE_EVENT:
{
Rotate_log_event* r = new Rotate_log_event(buf, event_len);
if (!r->new_log_ident)
{
delete r;
return NULL;
}
return r;
}
case START_EVENT: return new Start_log_event(buf);
case STOP_EVENT: return new Stop_log_event(buf);
case INTVAR_EVENT: return new Intvar_log_event(buf);
default:
break;
}
return NULL; // default value
}
void Log_event::print_header(FILE* file)
{
fputc('#', file);
print_timestamp(file);
fprintf(file, " server id %d ", server_id);
}
void Log_event::print_timestamp(FILE* file, time_t* ts)
{
#ifdef MYSQL_SERVER
struct tm tm_tmp;
#endif
struct tm *res;
if (!ts)
{
ts = &when;
}
#ifdef MYSQL_SERVER
localtime_r(ts,(res= &tm_tmp));
#else
res=localtime(ts);
#endif
fprintf(file,"%02d%02d%02d %2d:%02d:%02d",
res->tm_year % 100,
res->tm_mon+1,
res->tm_mday,
res->tm_hour,
res->tm_min,
res->tm_sec);
}
void Start_log_event::print(FILE* file, bool short_form, char* last_db)
{
if (short_form)
return;
print_header(file);
fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version,
server_version);
print_timestamp(file);
if (created)
fprintf(file," at startup");
fputc('\n', file);
fflush(file);
}
void Stop_log_event::print(FILE* file, bool short_form, char* last_db)
{
if (short_form)
return;
print_header(file);
fprintf(file, "\tStop\n");
fflush(file);
}
void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
{
if (short_form)
return;
print_header(file);
fprintf(file, "\tRotate to ");
if (new_log_ident)
my_fwrite(file, (byte*) new_log_ident, (uint)ident_len,
MYF(MY_NABP | MY_WME));
fprintf(file, "\n");
fflush(file);
}
Rotate_log_event::Rotate_log_event(IO_CACHE* file, time_t when_arg,
uint32 server_id):
Log_event(when_arg, 0, 0, server_id),new_log_ident(NULL),alloced(0)
{
char *tmp_ident;
char buf[4];
if (my_b_read(file, (byte*) buf, sizeof(buf)))
return;
ulong event_len;
event_len = uint4korr(buf);
if (event_len < ROTATE_EVENT_OVERHEAD)
return;
ident_len = (uchar)(event_len - ROTATE_EVENT_OVERHEAD);
if (!(tmp_ident = (char*) my_malloc((uint)ident_len, MYF(MY_WME))))
return;
if (my_b_read( file, (byte*) tmp_ident, (uint) ident_len))
{
my_free((gptr) tmp_ident, MYF(0));
return;
}
new_log_ident = tmp_ident;
alloced = 1;
}
Start_log_event::Start_log_event(const char* buf) :Log_event(buf)
{
buf += EVENT_LEN_OFFSET + 4; // skip even length
binlog_version = uint2korr(buf);
memcpy(server_version, buf + 2, sizeof(server_version));
created = uint4korr(buf + 2 + sizeof(server_version));
}
int Start_log_event::write_data(IO_CACHE* file)
{
char buff[sizeof(server_version)+2+4];
int2store(buff,binlog_version);
memcpy(buff+2,server_version,sizeof(server_version));
int4store(buff+2+sizeof(server_version),created);
return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
}
Rotate_log_event::Rotate_log_event(const char* buf, int event_len):
Log_event(buf),new_log_ident(NULL),alloced(0)
{
// the caller will ensure that event_len is what we have at
// EVENT_LEN_OFFSET
if(event_len < ROTATE_EVENT_OVERHEAD)
return;
ident_len = (uchar)(event_len - ROTATE_EVENT_OVERHEAD);
if (!(new_log_ident = (char*) my_memdup((byte*) buf + LOG_EVENT_HEADER_LEN,
(uint) ident_len, MYF(MY_WME))))
return;
alloced = 1;
}
int Rotate_log_event::write_data(IO_CACHE* file)
{
return my_b_write(file, (byte*) new_log_ident, (uint) ident_len) ? -1 :0;
}
Query_log_event::Query_log_event(IO_CACHE* file, time_t when_arg,
uint32 server_id):
Log_event(when_arg,0,0,server_id),data_buf(0),query(NULL),db(NULL)
{
char buf[QUERY_HEADER_LEN + 4];
ulong data_len;
if (my_b_read(file, (byte*) buf, sizeof(buf)))
return; // query == NULL will tell the
// caller there was a problem
data_len = uint4korr(buf);
if (data_len < QUERY_EVENT_OVERHEAD)
return; // tear-drop attack protection :)
data_len -= QUERY_EVENT_OVERHEAD;
exec_time = uint4korr(buf + 8);
db_len = (uint)buf[12];
error_code = uint2korr(buf + 13);
/* Allocate one byte extra for end \0 */
if (!(data_buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
return;
if (my_b_read( file, (byte*) data_buf, data_len))
{
my_free((gptr) data_buf, MYF(0));
data_buf = 0;
return;
}
thread_id = uint4korr(buf + 4);
db = data_buf;
query=data_buf + db_len + 1;
q_len = data_len - 1 - db_len;
*((char*) query + q_len) = 0; // Safety
}
Query_log_event::Query_log_event(const char* buf, int event_len):
Log_event(buf),data_buf(0), query(NULL), db(NULL)
{
if ((uint)event_len < QUERY_EVENT_OVERHEAD)
return;
ulong data_len;
buf += EVENT_LEN_OFFSET;
data_len = event_len - QUERY_EVENT_OVERHEAD;
exec_time = uint4korr(buf + 8);
error_code = uint2korr(buf + 13);
if (!(data_buf = (char*) my_malloc(data_len + 1, MYF(MY_WME))))
return;
memcpy(data_buf, buf + QUERY_HEADER_LEN + 4, data_len);
thread_id = uint4korr(buf + 4);
db = data_buf;
db_len = (uint)buf[12];
query=data_buf + db_len + 1;
q_len = data_len - 1 - db_len;
*((char*)query+q_len) = 0;
}
void Query_log_event::print(FILE* file, bool short_form, char* last_db)
{
char buff[40],*end; // Enough for SET TIMESTAMP
if (!short_form)
{
print_header(file);
fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\terror_code=%d\n",
(ulong) thread_id, (ulong) exec_time, error_code);
}
bool same_db = 0;
if(db && last_db)
{
if(!(same_db = !memcmp(last_db, db, db_len + 1)))
memcpy(last_db, db, db_len + 1);
}
if (db && db[0] && !same_db)
fprintf(file, "use %s;\n", db);
end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
*end++=';';
*end++='\n';
my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME));
my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME));
fprintf(file, ";\n");
}
int Query_log_event::write_data(IO_CACHE* file)
{
if (!query) return -1;
char buf[QUERY_HEADER_LEN];
char* pos = buf;
int4store(pos, thread_id);
pos += 4;
int4store(pos, exec_time);
pos += 4;
*pos++ = (char)db_len;
int2store(pos, error_code);
pos += 2;
return (my_b_write(file, (byte*) buf, (uint)(pos - buf)) ||
my_b_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
my_b_write(file, (byte*) query, q_len)) ? -1 : 0;
}
Intvar_log_event:: Intvar_log_event(IO_CACHE* file, time_t when_arg,
uint32 server_id)
:Log_event(when_arg,0,0,server_id), type(INVALID_INT_EVENT)
{
char buf[9+4];
if (!my_b_read(file, (byte*) buf, sizeof(buf)))
{
type = buf[4];
val = uint8korr(buf+1+4);
}
}
Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf)
{
buf += LOG_EVENT_HEADER_LEN;
type = buf[0];
val = uint8korr(buf+1);
}
int Intvar_log_event::write_data(IO_CACHE* file)
{
char buf[9];
buf[0] = type;
int8store(buf + 1, val);
return my_b_write(file, (byte*) buf, sizeof(buf));
}
void Intvar_log_event::print(FILE* file, bool short_form, char* last_db)
{
char llbuff[22];
if(!short_form)
{
print_header(file);
fprintf(file, "\tIntvar\n");
}
fprintf(file, "SET ");
switch(type)
{
case LAST_INSERT_ID_EVENT:
fprintf(file, "LAST_INSERT_ID = ");
break;
case INSERT_ID_EVENT:
fprintf(file, "INSERT_ID = ");
break;
}
fprintf(file, "%s;\n", llstr(val,llbuff));
fflush(file);
}
int Load_log_event::write_data(IO_CACHE* file)
{
char buf[LOAD_HEADER_LEN];
int4store(buf, thread_id);
int4store(buf + 4, exec_time);
int4store(buf + 8, skip_lines);
buf[12] = (char)table_name_len;
buf[13] = (char)db_len;
int4store(buf + 14, num_fields);
if(my_b_write(file, (byte*)buf, sizeof(buf)) ||
my_b_write(file, (byte*)&sql_ex, sizeof(sql_ex)))
return 1;
if (num_fields && fields && field_lens)
{
if(my_b_write(file, (byte*)field_lens, num_fields) ||
my_b_write(file, (byte*)fields, field_block_len))
return 1;
}
if(my_b_write(file, (byte*)table_name, table_name_len + 1) ||
my_b_write(file, (byte*)db, db_len + 1) ||
my_b_write(file, (byte*)fname, fname_len))
return 1;
return 0;
}
Load_log_event::Load_log_event(IO_CACHE* file, time_t when, uint32 server_id):
Log_event(when,0,0,server_id),data_buf(0),num_fields(0),
fields(0),field_lens(0),field_block_len(0),
table_name(0),db(0),fname(0)
{
char buf[LOAD_HEADER_LEN + 4];
ulong data_len;
if (my_b_read(file, (byte*)buf, sizeof(buf)) ||
my_b_read(file, (byte*)&sql_ex, sizeof(sql_ex)))
return;
data_len = uint4korr(buf) - LOAD_EVENT_OVERHEAD;
if (!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME))))
return;
if (my_b_read(file, (byte*)data_buf, data_len))
return;
copy_log_event(buf,data_len);
}
Load_log_event::Load_log_event(const char* buf, int event_len):
Log_event(buf),data_buf(0),num_fields(0),fields(0),
field_lens(0),field_block_len(0),
table_name(0),db(0),fname(0)
{
ulong data_len;
if((uint)event_len < (LOAD_EVENT_OVERHEAD + LOG_EVENT_HEADER_LEN))
return;
buf += EVENT_LEN_OFFSET;
memcpy(&sql_ex, buf + LOAD_HEADER_LEN + 4, sizeof(sql_ex));
data_len = event_len;
if(!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME))))
return;
memcpy(data_buf, buf + 22 + sizeof(sql_ex), data_len);
copy_log_event(buf, data_len);
}
void Load_log_event::copy_log_event(const char *buf, ulong data_len)
{
thread_id = uint4korr(buf+4);
exec_time = uint4korr(buf+8);
skip_lines = uint4korr(buf + 12);
table_name_len = (uint)buf[16];
db_len = (uint)buf[17];
num_fields = uint4korr(buf + 18);
if (num_fields > data_len) // simple sanity check against corruption
return;
field_lens = (uchar*) data_buf;
uint i;
for (i = 0; i < num_fields; i++)
{
field_block_len += (uint)field_lens[i] + 1;
}
fields = (char*)field_lens + num_fields;
*((char*)data_buf+data_len) = 0;
table_name = fields + field_block_len;
db = table_name + table_name_len + 1;
fname = db + db_len + 1;
fname_len = data_len - 2 - db_len - table_name_len - num_fields -
field_block_len;
}
void Load_log_event::print(FILE* file, bool short_form, char* last_db)
{
if (!short_form)
{
print_header(file);
fprintf(file, "\tQuery\tthread_id=%ld\texec_time=%ld\n",
thread_id, exec_time);
}
bool same_db = 0;
if(db && last_db)
{
if(!(same_db = !memcmp(last_db, db, db_len + 1)))
memcpy(last_db, db, db_len + 1);
}
if(db && db[0] && !same_db)
fprintf(file, "use %s;\n", db);
fprintf(file, "LOAD DATA INFILE '%s' ", fname);
if(sql_ex.opt_flags & REPLACE_FLAG )
fprintf(file," REPLACE ");
else if(sql_ex.opt_flags & IGNORE_FLAG )
fprintf(file," IGNORE ");
fprintf(file, "INTO TABLE %s ", table_name);
if(!(sql_ex.empty_flags & FIELD_TERM_EMPTY))
{
fprintf(file, " FIELDS TERMINATED BY ");
pretty_print_char(file, sql_ex.field_term);
}
if(!(sql_ex.empty_flags & ENCLOSED_EMPTY))
{
if(sql_ex.opt_flags & OPT_ENCLOSED_FLAG )
fprintf(file," OPTIONALLY ");
fprintf(file, " ENCLOSED BY ");
pretty_print_char(file, sql_ex.enclosed);
}
if(!(sql_ex.empty_flags & ESCAPED_EMPTY))
{
fprintf(file, " ESCAPED BY ");
pretty_print_char(file, sql_ex.escaped);
}
bool line_lexem_added= false;
if(!(sql_ex.empty_flags & LINE_TERM_EMPTY))
{
fprintf(file," LINES TERMINATED BY ");
pretty_print_char(file, sql_ex.line_term);
line_lexem_added= true;
}
if(!(sql_ex.empty_flags & LINE_START_EMPTY))
{
if (!line_lexem_added)
fprintf(file," LINES");
fprintf(file," STARTING BY ");
pretty_print_char(file, sql_ex.line_start);
}
if((int)skip_lines > 0)
fprintf(file, " IGNORE %ld LINES ", (long) skip_lines);
if (num_fields)
{
uint i;
const char* field = fields;
fprintf( file, " (");
for(i = 0; i < num_fields; i++)
{
if(i)
fputc(',', file);
fprintf(file, field);
field += field_lens[i] + 1;
}
fputc(')', file);
}
fprintf(file, ";\n");
}
#ifndef MYSQL_CLIENT
void Load_log_event::set_fields(List<Item> &fields)
{
uint i;
const char* field = this->fields;
for(i = 0; i < num_fields; i++)
{
fields.push_back(new Item_field(db, table_name, field));
field += field_lens[i] + 1;
}
}
#endif