mariadb/sql/rpl_handler.cc

564 lines
17 KiB
C++
Raw Normal View History

2011-06-30 17:46:53 +02:00
/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
2011-06-30 17:46:53 +02:00
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
MDEV-5120 Test suite test maria-no-logging fails The reason for the failure was a bug in an include file on debian that causes 'struct stat' to have different sized depending on the environment. This patch fixes so that we always include my_global.h or my_config.h before we include any other files. Other things: - Removed #include <my_global.h> in some include files; Better to always do this at the top level to have as few "always-include-this-file-first' files as possible. - Removed usage of some include files that where already included by my_global.h or by other files. client/mysql_plugin.c: Use my_global.h first client/mysqlslap.c: Remove duplicated include files extra/comp_err.c: Remove duplicated include files include/m_string.h: Remove duplicated include files include/maria.h: Remove duplicated include files libmysqld/emb_qcache.cc: Use my_global.h first plugin/semisync/semisync.h: Use my_pthread.h first sql/datadict.cc: Use my_global.h first sql/debug_sync.cc: Use my_global.h first sql/derror.cc: Use my_global.h first sql/des_key_file.cc: Use my_global.h first sql/discover.cc: Use my_global.h first sql/event_data_objects.cc: Use my_global.h first sql/event_db_repository.cc: Use my_global.h first sql/event_parse_data.cc: Use my_global.h first sql/event_queue.cc: Use my_global.h first sql/event_scheduler.cc: Use my_global.h first sql/events.cc: Use my_global.h first sql/field.cc: Use my_global.h first Remove duplicated include files sql/field_conv.cc: Use my_global.h first sql/filesort.cc: Use my_global.h first Remove duplicated include files sql/gstream.cc: Use my_global.h first sql/ha_ndbcluster.cc: Use my_global.h first sql/ha_ndbcluster_binlog.cc: Use my_global.h first sql/ha_ndbcluster_cond.cc: Use my_global.h first sql/ha_partition.cc: Use my_global.h first sql/handler.cc: Use my_global.h first sql/hash_filo.cc: Use my_global.h first sql/hostname.cc: Use my_global.h first sql/init.cc: Use my_global.h first sql/item.cc: Use my_global.h first sql/item_buff.cc: Use my_global.h first sql/item_cmpfunc.cc: Use my_global.h first sql/item_create.cc: Use my_global.h first sql/item_geofunc.cc: Use my_global.h first sql/item_inetfunc.cc: Use my_global.h first sql/item_row.cc: Use my_global.h first sql/item_strfunc.cc: Use my_global.h first sql/item_subselect.cc: Use my_global.h first sql/item_sum.cc: Use my_global.h first sql/item_timefunc.cc: Use my_global.h first sql/item_xmlfunc.cc: Use my_global.h first sql/key.cc: Use my_global.h first sql/lock.cc: Use my_global.h first sql/log.cc: Use my_global.h first sql/log_event.cc: Use my_global.h first sql/log_event_old.cc: Use my_global.h first sql/mf_iocache.cc: Use my_global.h first sql/mysql_install_db.cc: Remove duplicated include files sql/mysqld.cc: Remove duplicated include files sql/net_serv.cc: Remove duplicated include files sql/opt_range.cc: Use my_global.h first sql/opt_subselect.cc: Use my_global.h first sql/opt_sum.cc: Use my_global.h first sql/parse_file.cc: Use my_global.h first sql/partition_info.cc: Use my_global.h first sql/procedure.cc: Use my_global.h first sql/protocol.cc: Use my_global.h first sql/records.cc: Use my_global.h first sql/records.h: Don't include my_global.h Better to do this at the upper level sql/repl_failsafe.cc: Use my_global.h first sql/rpl_filter.cc: Use my_global.h first sql/rpl_gtid.cc: Use my_global.h first sql/rpl_handler.cc: Use my_global.h first sql/rpl_injector.cc: Use my_global.h first sql/rpl_record.cc: Use my_global.h first sql/rpl_record_old.cc: Use my_global.h first sql/rpl_reporting.cc: Use my_global.h first sql/rpl_rli.cc: Use my_global.h first sql/rpl_tblmap.cc: Use my_global.h first sql/rpl_utility.cc: Use my_global.h first sql/set_var.cc: Added comment sql/slave.cc: Use my_global.h first sql/sp.cc: Use my_global.h first sql/sp_cache.cc: Use my_global.h first sql/sp_head.cc: Use my_global.h first sql/sp_pcontext.cc: Use my_global.h first sql/sp_rcontext.cc: Use my_global.h first sql/spatial.cc: Use my_global.h first sql/sql_acl.cc: Use my_global.h first sql/sql_admin.cc: Use my_global.h first sql/sql_analyse.cc: Use my_global.h first sql/sql_audit.cc: Use my_global.h first sql/sql_base.cc: Use my_global.h first sql/sql_binlog.cc: Use my_global.h first sql/sql_bootstrap.cc: Use my_global.h first Use my_global.h first sql/sql_cache.cc: Use my_global.h first sql/sql_class.cc: Use my_global.h first sql/sql_client.cc: Use my_global.h first sql/sql_connect.cc: Use my_global.h first sql/sql_crypt.cc: Use my_global.h first sql/sql_cursor.cc: Use my_global.h first sql/sql_db.cc: Use my_global.h first sql/sql_delete.cc: Use my_global.h first sql/sql_derived.cc: Use my_global.h first sql/sql_do.cc: Use my_global.h first sql/sql_error.cc: Use my_global.h first sql/sql_explain.cc: Use my_global.h first sql/sql_expression_cache.cc: Use my_global.h first sql/sql_handler.cc: Use my_global.h first sql/sql_help.cc: Use my_global.h first sql/sql_insert.cc: Use my_global.h first sql/sql_lex.cc: Use my_global.h first sql/sql_load.cc: Use my_global.h first sql/sql_locale.cc: Use my_global.h first sql/sql_manager.cc: Use my_global.h first sql/sql_parse.cc: Use my_global.h first sql/sql_partition.cc: Use my_global.h first sql/sql_plugin.cc: Added comment sql/sql_prepare.cc: Use my_global.h first sql/sql_priv.h: Added error if we use this before including my_global.h This check is here becasue so many files includes sql_priv.h first. sql/sql_profile.cc: Use my_global.h first sql/sql_reload.cc: Use my_global.h first sql/sql_rename.cc: Use my_global.h first sql/sql_repl.cc: Use my_global.h first sql/sql_select.cc: Use my_global.h first sql/sql_servers.cc: Use my_global.h first sql/sql_show.cc: Added comment sql/sql_signal.cc: Use my_global.h first sql/sql_statistics.cc: Use my_global.h first sql/sql_table.cc: Use my_global.h first sql/sql_tablespace.cc: Use my_global.h first sql/sql_test.cc: Use my_global.h first sql/sql_time.cc: Use my_global.h first sql/sql_trigger.cc: Use my_global.h first sql/sql_udf.cc: Use my_global.h first sql/sql_union.cc: Use my_global.h first sql/sql_update.cc: Use my_global.h first sql/sql_view.cc: Use my_global.h first sql/sys_vars.cc: Added comment sql/table.cc: Use my_global.h first sql/thr_malloc.cc: Use my_global.h first sql/transaction.cc: Use my_global.h first sql/uniques.cc: Use my_global.h first sql/unireg.cc: Use my_global.h first sql/unireg.h: Removed inclusion of my_global.h storage/archive/ha_archive.cc: Added comment storage/blackhole/ha_blackhole.cc: Use my_global.h first storage/csv/ha_tina.cc: Use my_global.h first storage/csv/transparent_file.cc: Use my_global.h first storage/federated/ha_federated.cc: Use my_global.h first storage/federatedx/federatedx_io.cc: Use my_global.h first storage/federatedx/federatedx_io_mysql.cc: Use my_global.h first storage/federatedx/federatedx_io_null.cc: Use my_global.h first storage/federatedx/federatedx_txn.cc: Use my_global.h first storage/heap/ha_heap.cc: Use my_global.h first storage/innobase/handler/handler0alter.cc: Use my_global.h first storage/maria/ha_maria.cc: Use my_global.h first storage/maria/unittest/ma_maria_log_cleanup.c: Remove duplicated include files storage/maria/unittest/test_file.c: Added comment storage/myisam/ha_myisam.cc: Move sql_plugin.h first as this includes my_global.h storage/myisammrg/ha_myisammrg.cc: Use my_global.h first storage/oqgraph/oqgraph_thunk.cc: Use my_config.h and my_global.h first One could not include my_global.h before oqgraph_thunk.h (don't know why) storage/spider/ha_spider.cc: Use my_global.h first storage/spider/hs_client/config.cpp: Use my_global.h first storage/spider/hs_client/escape.cpp: Use my_global.h first storage/spider/hs_client/fatal.cpp: Use my_global.h first storage/spider/hs_client/hstcpcli.cpp: Use my_global.h first storage/spider/hs_client/socket.cpp: Use my_global.h first storage/spider/hs_client/string_util.cpp: Use my_global.h first storage/spider/spd_conn.cc: Use my_global.h first storage/spider/spd_copy_tables.cc: Use my_global.h first storage/spider/spd_db_conn.cc: Use my_global.h first storage/spider/spd_db_handlersocket.cc: Use my_global.h first storage/spider/spd_db_mysql.cc: Use my_global.h first storage/spider/spd_db_oracle.cc: Use my_global.h first storage/spider/spd_direct_sql.cc: Use my_global.h first storage/spider/spd_i_s.cc: Use my_global.h first storage/spider/spd_malloc.cc: Use my_global.h first storage/spider/spd_param.cc: Use my_global.h first storage/spider/spd_ping_table.cc: Use my_global.h first storage/spider/spd_sys_table.cc: Use my_global.h first storage/spider/spd_table.cc: Use my_global.h first storage/spider/spd_trx.cc: Use my_global.h first storage/xtradb/handler/handler0alter.cc: Use my_global.h first storage/xtradb/handler/i_s.cc: Use my_global.h first
2014-09-30 19:31:14 +02:00
#include <my_global.h>
#include "sql_priv.h"
#include "unireg.h"
#include "rpl_mi.h"
#include "sql_repl.h"
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
#include "rpl_handler.h"
Trans_delegate *transaction_delegate;
Binlog_storage_delegate *binlog_storage_delegate;
#ifdef HAVE_REPLICATION
Binlog_transmit_delegate *binlog_transmit_delegate;
Binlog_relay_IO_delegate *binlog_relay_io_delegate;
#endif /* HAVE_REPLICATION */
/*
structure to save transaction log filename and position
*/
typedef struct Trans_binlog_info {
my_off_t log_pos;
char log_file[FN_REFLEN];
} Trans_binlog_info;
static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
int get_user_var_int(const char *name,
long long int *value, int *null_value)
{
2011-04-25 17:22:25 +02:00
bool null_val;
user_var_entry *entry=
(user_var_entry*) my_hash_search(&current_thd->user_vars,
(uchar*) name, strlen(name));
if (!entry)
return 1;
*value= entry->val_int(&null_val);
if (null_value)
*null_value= null_val;
return 0;
}
int get_user_var_real(const char *name,
double *value, int *null_value)
{
2011-04-25 17:22:25 +02:00
bool null_val;
user_var_entry *entry=
(user_var_entry*) my_hash_search(&current_thd->user_vars,
(uchar*) name, strlen(name));
if (!entry)
return 1;
*value= entry->val_real(&null_val);
if (null_value)
*null_value= null_val;
return 0;
}
int get_user_var_str(const char *name, char *value,
size_t len, unsigned int precision, int *null_value)
{
String str;
2011-04-25 17:22:25 +02:00
bool null_val;
user_var_entry *entry=
(user_var_entry*) my_hash_search(&current_thd->user_vars,
(uchar*) name, strlen(name));
if (!entry)
return 1;
entry->val_str(&null_val, &str, precision);
strncpy(value, str.c_ptr(), len);
if (null_value)
*null_value= null_val;
return 0;
}
int delegates_init()
{
static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
static my_aligned_storage<sizeof(Binlog_storage_delegate),
MY_ALIGNOF(long)> storage_mem;
#ifdef HAVE_REPLICATION
static my_aligned_storage<sizeof(Binlog_transmit_delegate),
MY_ALIGNOF(long)> transmit_mem;
static my_aligned_storage<sizeof(Binlog_relay_IO_delegate),
MY_ALIGNOF(long)> relay_io_mem;
#endif
2010-05-26 16:12:23 +02:00
void *place_trans_mem= trans_mem.data;
void *place_storage_mem= storage_mem.data;
transaction_delegate= new (place_trans_mem) Trans_delegate;
if (!transaction_delegate->is_inited())
{
sql_print_error("Initialization of transaction delegates failed. "
"Please report a bug.");
return 1;
}
binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;
if (!binlog_storage_delegate->is_inited())
{
sql_print_error("Initialization binlog storage delegates failed. "
"Please report a bug.");
return 1;
}
#ifdef HAVE_REPLICATION
void *place_transmit_mem= transmit_mem.data;
void *place_relay_io_mem= relay_io_mem.data;
binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate;
if (!binlog_transmit_delegate->is_inited())
{
sql_print_error("Initialization of binlog transmit delegates failed. "
"Please report a bug.");
return 1;
}
binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate;
if (!binlog_relay_io_delegate->is_inited())
{
sql_print_error("Initialization binlog relay IO delegates failed. "
"Please report a bug.");
return 1;
}
#endif
if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
{
sql_print_error("Error while creating pthread specific data key for replication. "
"Please report a bug.");
return 1;
}
return 0;
}
void delegates_destroy()
{
if (transaction_delegate)
transaction_delegate->~Trans_delegate();
if (binlog_storage_delegate)
binlog_storage_delegate->~Binlog_storage_delegate();
#ifdef HAVE_REPLICATION
if (binlog_transmit_delegate)
binlog_transmit_delegate->~Binlog_transmit_delegate();
if (binlog_relay_io_delegate)
binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
#endif /* HAVE_REPLICATION */
}
/*
This macro is used by almost all the Delegate methods to iterate
over all the observers running given callback function of the
delegate.
*/
#define FOREACH_OBSERVER(r, f, do_lock, args) \
param.server_id= thd->variables.server_id; \
read_lock(); \
Observer_info_iterator iter= observer_info_iter(); \
Observer_info *info= iter++; \
for (; info; info= iter++) \
{ \
if (do_lock) plugin_lock(thd, plugin_int_to_ref(info->plugin_int)); \
if (((Observer *)info->observer)->f \
&& ((Observer *)info->observer)->f args) \
{ \
r= 1; \
sql_print_error("Run function '" #f "' in plugin '%s' failed", \
info->plugin_int->name.str); \
break; \
} \
} \
unlock();
int Trans_delegate::after_commit(THD *thd, bool all)
{
Trans_param param;
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
Trans_binlog_info *log_info=
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
param.log_file= log_info ? log_info->log_file : 0;
param.log_pos= log_info ? log_info->log_pos : 0;
int ret= 0;
FOREACH_OBSERVER(ret, after_commit, false, (&param));
/*
This is the end of a real transaction or autocommit statement, we
can free the memory allocated for binlog file and position.
*/
if (is_real_trans && log_info)
{
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
Bug#34043: Server loops excessively in _checkchunk() when safemalloc is enabled Essentially, the problem is that safemalloc is excruciatingly slow as it checks all allocated blocks for overrun at each memory management primitive, yielding a almost exponential slowdown for the memory management functions (malloc, realloc, free). The overrun check basically consists of verifying some bytes of a block for certain magic keys, which catches some simple forms of overrun. Another minor problem is violation of aliasing rules and that its own internal list of blocks is prone to corruption. Another issue with safemalloc is rather the maintenance cost as the tool has a significant impact on the server code. Given the magnitude of memory debuggers available nowadays, especially those that are provided with the platform malloc implementation, maintenance of a in-house and largely obsolete memory debugger becomes a burden that is not worth the effort due to its slowness and lack of support for detecting more common forms of heap corruption. Since there are third-party tools that can provide the same functionality at a lower or comparable performance cost, the solution is to simply remove safemalloc. Third-party tools can provide the same functionality at a lower or comparable performance cost. The removal of safemalloc also allows a simplification of the malloc wrappers, removing quite a bit of kludge: redefinition of my_malloc, my_free and the removal of the unused second argument of my_free. Since free() always check whether the supplied pointer is null, redudant checks are also removed. Also, this patch adds unit testing for my_malloc and moves my_realloc implementation into the same file as the other memory allocation primitives. client/mysqldump.c: Pass my_free directly as its signature is compatible with the callback type -- which wasn't the case for free_table_ent.
2010-07-08 23:20:08 +02:00
my_free(log_info);
}
return ret;
}
int Trans_delegate::after_rollback(THD *thd, bool all)
{
Trans_param param;
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
Trans_binlog_info *log_info=
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
param.log_file= log_info ? log_info->log_file : 0;
param.log_pos= log_info ? log_info->log_pos : 0;
int ret= 0;
FOREACH_OBSERVER(ret, after_rollback, false, (&param));
/*
This is the end of a real transaction or autocommit statement, we
can free the memory allocated for binlog file and position.
*/
if (is_real_trans && log_info)
{
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
Bug#34043: Server loops excessively in _checkchunk() when safemalloc is enabled Essentially, the problem is that safemalloc is excruciatingly slow as it checks all allocated blocks for overrun at each memory management primitive, yielding a almost exponential slowdown for the memory management functions (malloc, realloc, free). The overrun check basically consists of verifying some bytes of a block for certain magic keys, which catches some simple forms of overrun. Another minor problem is violation of aliasing rules and that its own internal list of blocks is prone to corruption. Another issue with safemalloc is rather the maintenance cost as the tool has a significant impact on the server code. Given the magnitude of memory debuggers available nowadays, especially those that are provided with the platform malloc implementation, maintenance of a in-house and largely obsolete memory debugger becomes a burden that is not worth the effort due to its slowness and lack of support for detecting more common forms of heap corruption. Since there are third-party tools that can provide the same functionality at a lower or comparable performance cost, the solution is to simply remove safemalloc. Third-party tools can provide the same functionality at a lower or comparable performance cost. The removal of safemalloc also allows a simplification of the malloc wrappers, removing quite a bit of kludge: redefinition of my_malloc, my_free and the removal of the unused second argument of my_free. Since free() always check whether the supplied pointer is null, redudant checks are also removed. Also, this patch adds unit testing for my_malloc and moves my_realloc implementation into the same file as the other memory allocation primitives. client/mysqldump.c: Pass my_free directly as its signature is compatible with the callback type -- which wasn't the case for free_table_ent.
2010-07-08 23:20:08 +02:00
my_free(log_info);
}
return ret;
}
int Binlog_storage_delegate::after_flush(THD *thd,
const char *log_file,
my_off_t log_pos,
bool synced,
bool first_in_group,
bool last_in_group)
{
Binlog_storage_param param;
uint32 flags=0;
if (synced)
flags |= BINLOG_STORAGE_IS_SYNCED;
if (first_in_group)
flags|= BINLOG_GROUP_COMMIT_LEADER;
if (last_in_group)
flags|= BINLOG_GROUP_COMMIT_TRAILER;
Trans_binlog_info *log_info=
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
if (!log_info)
{
if(!(log_info=
(Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
return 1;
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
}
strcpy(log_info->log_file, log_file+dirname_length(log_file));
log_info->log_pos = log_pos;
int ret= 0;
FOREACH_OBSERVER(ret, after_flush, false,
(&param, log_info->log_file, log_info->log_pos, flags));
return ret;
}
int Binlog_storage_delegate::after_sync(THD *thd,
const char *log_file,
my_off_t log_pos,
bool first_in_group,
bool last_in_group)
{
Binlog_storage_param param;
uint32 flags=0;
if (first_in_group)
flags|= BINLOG_GROUP_COMMIT_LEADER;
if (last_in_group)
flags|= BINLOG_GROUP_COMMIT_TRAILER;
int ret= 0;
FOREACH_OBSERVER(ret, after_sync, thd,
(&param, log_file+dirname_length(log_file), log_pos, flags));
return ret;
}
#ifdef HAVE_REPLICATION
int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
const char *log_file,
my_off_t log_pos)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, transmit_start, true, (&param, log_file, log_pos));
return ret;
}
int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, transmit_stop, false, (&param));
return ret;
}
int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
String *packet)
{
/* NOTE2ME: Maximum extra header size for each observer, I hope 32
bytes should be enough for each Observer to reserve their extra
header. If later found this is not enough, we can increase this
/HEZX
*/
#define RESERVE_HEADER_SIZE 32
unsigned char header[RESERVE_HEADER_SIZE];
ulong hlen;
Binlog_transmit_param param;
param.flags= flags;
param.server_id= thd->variables.server_id;
int ret= 0;
read_lock();
Observer_info_iterator iter= observer_info_iter();
Observer_info *info= iter++;
for (; info; info= iter++)
{
hlen= 0;
if (((Observer *)info->observer)->reserve_header
&& ((Observer *)info->observer)->reserve_header(&param,
header,
RESERVE_HEADER_SIZE,
&hlen))
{
ret= 1;
break;
}
if (hlen == 0)
continue;
if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
{
ret= 1;
break;
}
}
unlock();
return ret;
}
int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
String *packet,
const char *log_file,
my_off_t log_pos)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, before_send_event, false,
(&param, (uchar *)packet->c_ptr(),
packet->length(),
log_file+dirname_length(log_file), log_pos));
return ret;
}
int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
String *packet)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, after_send_event, false,
(&param, packet->c_ptr(), packet->length()));
return ret;
}
int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, after_reset_master, false, (&param));
return ret;
}
void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
Master_info *mi)
{
param->mysql= mi->mysql;
param->user= mi->user;
param->host= mi->host;
param->port= mi->port;
param->master_log_name= mi->master_log_name;
param->master_log_pos= mi->master_log_pos;
}
int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, thread_start, true, (&param));
return ret;
}
int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, thread_stop, false, (&param));
return ret;
}
int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
Master_info *mi,
ushort flags)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, before_request_transmit, false, (&param, (uint32)flags));
return ret;
}
int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
const char *packet, ulong len,
const char **event_buf,
ulong *event_len)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, after_read_event, false,
(&param, packet, len, event_buf, event_len));
return ret;
}
int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
const char *event_buf,
ulong event_len,
bool synced)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
uint32 flags=0;
if (synced)
flags |= BINLOG_STORAGE_IS_SYNCED;
int ret= 0;
FOREACH_OBSERVER(ret, after_queue_event, false,
(&param, event_buf, event_len, flags));
return ret;
}
int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, after_reset_slave, false, (&param));
return ret;
}
#endif /* HAVE_REPLICATION */
int register_trans_observer(Trans_observer *observer, void *p)
{
return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_trans_observer(Trans_observer *observer, void *p)
{
return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
}
int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
{
return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
{
return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
}
#ifdef HAVE_REPLICATION
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
}
int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
{
return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
{
return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
}
#else
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
return 0;
}
int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
return 0;
}
int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
{
return 0;
}
int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
{
return 0;
}
#endif /* HAVE_REPLICATION */