mirror of
https://github.com/MariaDB/server.git
synced 2025-01-16 12:02:42 +01:00
563 lines
17 KiB
C++
563 lines
17 KiB
C++
/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
|
|
|
|
#include <my_global.h>
|
|
#include "sql_priv.h"
|
|
#include "unireg.h"
|
|
|
|
#include "rpl_mi.h"
|
|
#include "sql_repl.h"
|
|
#include "log_event.h"
|
|
#include "rpl_filter.h"
|
|
#include <my_dir.h>
|
|
#include "rpl_handler.h"
|
|
|
|
Trans_delegate *transaction_delegate;
|
|
Binlog_storage_delegate *binlog_storage_delegate;
|
|
#ifdef HAVE_REPLICATION
|
|
Binlog_transmit_delegate *binlog_transmit_delegate;
|
|
Binlog_relay_IO_delegate *binlog_relay_io_delegate;
|
|
#endif /* HAVE_REPLICATION */
|
|
|
|
/*
|
|
structure to save transaction log filename and position
|
|
*/
|
|
typedef struct Trans_binlog_info {
|
|
my_off_t log_pos;
|
|
char log_file[FN_REFLEN];
|
|
} Trans_binlog_info;
|
|
|
|
static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
|
|
|
|
int get_user_var_int(const char *name,
|
|
long long int *value, int *null_value)
|
|
{
|
|
bool null_val;
|
|
user_var_entry *entry=
|
|
(user_var_entry*) my_hash_search(¤t_thd->user_vars,
|
|
(uchar*) name, strlen(name));
|
|
if (!entry)
|
|
return 1;
|
|
*value= entry->val_int(&null_val);
|
|
if (null_value)
|
|
*null_value= null_val;
|
|
return 0;
|
|
}
|
|
|
|
int get_user_var_real(const char *name,
|
|
double *value, int *null_value)
|
|
{
|
|
bool null_val;
|
|
user_var_entry *entry=
|
|
(user_var_entry*) my_hash_search(¤t_thd->user_vars,
|
|
(uchar*) name, strlen(name));
|
|
if (!entry)
|
|
return 1;
|
|
*value= entry->val_real(&null_val);
|
|
if (null_value)
|
|
*null_value= null_val;
|
|
return 0;
|
|
}
|
|
|
|
int get_user_var_str(const char *name, char *value,
|
|
size_t len, unsigned int precision, int *null_value)
|
|
{
|
|
String str;
|
|
bool null_val;
|
|
user_var_entry *entry=
|
|
(user_var_entry*) my_hash_search(¤t_thd->user_vars,
|
|
(uchar*) name, strlen(name));
|
|
if (!entry)
|
|
return 1;
|
|
entry->val_str(&null_val, &str, precision);
|
|
strncpy(value, str.c_ptr(), len);
|
|
if (null_value)
|
|
*null_value= null_val;
|
|
return 0;
|
|
}
|
|
|
|
int delegates_init()
|
|
{
|
|
static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
|
|
static my_aligned_storage<sizeof(Binlog_storage_delegate),
|
|
MY_ALIGNOF(long)> storage_mem;
|
|
#ifdef HAVE_REPLICATION
|
|
static my_aligned_storage<sizeof(Binlog_transmit_delegate),
|
|
MY_ALIGNOF(long)> transmit_mem;
|
|
static my_aligned_storage<sizeof(Binlog_relay_IO_delegate),
|
|
MY_ALIGNOF(long)> relay_io_mem;
|
|
#endif
|
|
|
|
void *place_trans_mem= trans_mem.data;
|
|
void *place_storage_mem= storage_mem.data;
|
|
|
|
transaction_delegate= new (place_trans_mem) Trans_delegate;
|
|
|
|
if (!transaction_delegate->is_inited())
|
|
{
|
|
sql_print_error("Initialization of transaction delegates failed. "
|
|
"Please report a bug.");
|
|
return 1;
|
|
}
|
|
|
|
binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;
|
|
|
|
if (!binlog_storage_delegate->is_inited())
|
|
{
|
|
sql_print_error("Initialization binlog storage delegates failed. "
|
|
"Please report a bug.");
|
|
return 1;
|
|
}
|
|
|
|
#ifdef HAVE_REPLICATION
|
|
void *place_transmit_mem= transmit_mem.data;
|
|
void *place_relay_io_mem= relay_io_mem.data;
|
|
|
|
binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate;
|
|
|
|
if (!binlog_transmit_delegate->is_inited())
|
|
{
|
|
sql_print_error("Initialization of binlog transmit delegates failed. "
|
|
"Please report a bug.");
|
|
return 1;
|
|
}
|
|
|
|
binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate;
|
|
|
|
if (!binlog_relay_io_delegate->is_inited())
|
|
{
|
|
sql_print_error("Initialization binlog relay IO delegates failed. "
|
|
"Please report a bug.");
|
|
return 1;
|
|
}
|
|
#endif
|
|
|
|
if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
|
|
{
|
|
sql_print_error("Error while creating pthread specific data key for replication. "
|
|
"Please report a bug.");
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
void delegates_destroy()
|
|
{
|
|
if (transaction_delegate)
|
|
transaction_delegate->~Trans_delegate();
|
|
if (binlog_storage_delegate)
|
|
binlog_storage_delegate->~Binlog_storage_delegate();
|
|
#ifdef HAVE_REPLICATION
|
|
if (binlog_transmit_delegate)
|
|
binlog_transmit_delegate->~Binlog_transmit_delegate();
|
|
if (binlog_relay_io_delegate)
|
|
binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
|
|
#endif /* HAVE_REPLICATION */
|
|
}
|
|
|
|
/*
|
|
This macro is used by almost all the Delegate methods to iterate
|
|
over all the observers running given callback function of the
|
|
delegate.
|
|
*/
|
|
#define FOREACH_OBSERVER(r, f, do_lock, args) \
|
|
param.server_id= thd->variables.server_id; \
|
|
read_lock(); \
|
|
Observer_info_iterator iter= observer_info_iter(); \
|
|
Observer_info *info= iter++; \
|
|
for (; info; info= iter++) \
|
|
{ \
|
|
if (do_lock) plugin_lock(thd, plugin_int_to_ref(info->plugin_int)); \
|
|
if (((Observer *)info->observer)->f \
|
|
&& ((Observer *)info->observer)->f args) \
|
|
{ \
|
|
r= 1; \
|
|
sql_print_error("Run function '" #f "' in plugin '%s' failed", \
|
|
info->plugin_int->name.str); \
|
|
break; \
|
|
} \
|
|
} \
|
|
unlock();
|
|
|
|
|
|
int Trans_delegate::after_commit(THD *thd, bool all)
|
|
{
|
|
Trans_param param;
|
|
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
|
|
|
|
param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
|
|
|
|
Trans_binlog_info *log_info=
|
|
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
|
|
|
|
param.log_file= log_info ? log_info->log_file : 0;
|
|
param.log_pos= log_info ? log_info->log_pos : 0;
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, after_commit, false, (¶m));
|
|
|
|
/*
|
|
This is the end of a real transaction or autocommit statement, we
|
|
can free the memory allocated for binlog file and position.
|
|
*/
|
|
if (is_real_trans && log_info)
|
|
{
|
|
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
|
|
my_free(log_info);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int Trans_delegate::after_rollback(THD *thd, bool all)
|
|
{
|
|
Trans_param param;
|
|
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
|
|
|
|
param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
|
|
|
|
Trans_binlog_info *log_info=
|
|
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
|
|
|
|
param.log_file= log_info ? log_info->log_file : 0;
|
|
param.log_pos= log_info ? log_info->log_pos : 0;
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, after_rollback, false, (¶m));
|
|
|
|
/*
|
|
This is the end of a real transaction or autocommit statement, we
|
|
can free the memory allocated for binlog file and position.
|
|
*/
|
|
if (is_real_trans && log_info)
|
|
{
|
|
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
|
|
my_free(log_info);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int Binlog_storage_delegate::after_flush(THD *thd,
|
|
const char *log_file,
|
|
my_off_t log_pos,
|
|
bool synced,
|
|
bool first_in_group,
|
|
bool last_in_group)
|
|
{
|
|
Binlog_storage_param param;
|
|
uint32 flags=0;
|
|
if (synced)
|
|
flags |= BINLOG_STORAGE_IS_SYNCED;
|
|
if (first_in_group)
|
|
flags|= BINLOG_GROUP_COMMIT_LEADER;
|
|
if (last_in_group)
|
|
flags|= BINLOG_GROUP_COMMIT_TRAILER;
|
|
|
|
Trans_binlog_info *log_info=
|
|
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
|
|
|
|
if (!log_info)
|
|
{
|
|
if(!(log_info=
|
|
(Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
|
|
return 1;
|
|
my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
|
|
}
|
|
|
|
strcpy(log_info->log_file, log_file+dirname_length(log_file));
|
|
log_info->log_pos = log_pos;
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, after_flush, false,
|
|
(¶m, log_info->log_file, log_info->log_pos, flags));
|
|
return ret;
|
|
}
|
|
|
|
int Binlog_storage_delegate::after_sync(THD *thd,
|
|
const char *log_file,
|
|
my_off_t log_pos,
|
|
bool first_in_group,
|
|
bool last_in_group)
|
|
{
|
|
Binlog_storage_param param;
|
|
uint32 flags=0;
|
|
|
|
if (first_in_group)
|
|
flags|= BINLOG_GROUP_COMMIT_LEADER;
|
|
if (last_in_group)
|
|
flags|= BINLOG_GROUP_COMMIT_TRAILER;
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, after_sync, false,
|
|
(¶m, log_file+dirname_length(log_file), log_pos, flags));
|
|
|
|
return ret;
|
|
}
|
|
|
|
#ifdef HAVE_REPLICATION
|
|
int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
|
|
const char *log_file,
|
|
my_off_t log_pos)
|
|
{
|
|
Binlog_transmit_param param;
|
|
param.flags= flags;
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, transmit_start, true, (¶m, log_file, log_pos));
|
|
return ret;
|
|
}
|
|
|
|
int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
|
|
{
|
|
Binlog_transmit_param param;
|
|
param.flags= flags;
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, transmit_stop, false, (¶m));
|
|
return ret;
|
|
}
|
|
|
|
int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
|
|
String *packet)
|
|
{
|
|
/* NOTE2ME: Maximum extra header size for each observer, I hope 32
|
|
bytes should be enough for each Observer to reserve their extra
|
|
header. If later found this is not enough, we can increase this
|
|
/HEZX
|
|
*/
|
|
#define RESERVE_HEADER_SIZE 32
|
|
unsigned char header[RESERVE_HEADER_SIZE];
|
|
ulong hlen;
|
|
Binlog_transmit_param param;
|
|
param.flags= flags;
|
|
param.server_id= thd->variables.server_id;
|
|
|
|
int ret= 0;
|
|
read_lock();
|
|
Observer_info_iterator iter= observer_info_iter();
|
|
Observer_info *info= iter++;
|
|
for (; info; info= iter++)
|
|
{
|
|
hlen= 0;
|
|
if (((Observer *)info->observer)->reserve_header
|
|
&& ((Observer *)info->observer)->reserve_header(¶m,
|
|
header,
|
|
RESERVE_HEADER_SIZE,
|
|
&hlen))
|
|
{
|
|
ret= 1;
|
|
break;
|
|
}
|
|
if (hlen == 0)
|
|
continue;
|
|
if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
|
|
{
|
|
ret= 1;
|
|
break;
|
|
}
|
|
}
|
|
unlock();
|
|
return ret;
|
|
}
|
|
|
|
int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
|
|
String *packet,
|
|
const char *log_file,
|
|
my_off_t log_pos)
|
|
{
|
|
Binlog_transmit_param param;
|
|
param.flags= flags;
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, before_send_event, false,
|
|
(¶m, (uchar *)packet->c_ptr(),
|
|
packet->length(),
|
|
log_file+dirname_length(log_file), log_pos));
|
|
return ret;
|
|
}
|
|
|
|
int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
|
|
String *packet)
|
|
{
|
|
Binlog_transmit_param param;
|
|
param.flags= flags;
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, after_send_event, false,
|
|
(¶m, packet->c_ptr(), packet->length()));
|
|
return ret;
|
|
}
|
|
|
|
int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
|
|
|
|
{
|
|
Binlog_transmit_param param;
|
|
param.flags= flags;
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, after_reset_master, false, (¶m));
|
|
return ret;
|
|
}
|
|
|
|
void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
|
|
Master_info *mi)
|
|
{
|
|
param->mysql= mi->mysql;
|
|
param->user= mi->user;
|
|
param->host= mi->host;
|
|
param->port= mi->port;
|
|
param->master_log_name= mi->master_log_name;
|
|
param->master_log_pos= mi->master_log_pos;
|
|
}
|
|
|
|
int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
|
|
{
|
|
Binlog_relay_IO_param param;
|
|
init_param(¶m, mi);
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, thread_start, true, (¶m));
|
|
return ret;
|
|
}
|
|
|
|
|
|
int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
|
|
{
|
|
|
|
Binlog_relay_IO_param param;
|
|
init_param(¶m, mi);
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, thread_stop, false, (¶m));
|
|
return ret;
|
|
}
|
|
|
|
int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
|
|
Master_info *mi,
|
|
ushort flags)
|
|
{
|
|
Binlog_relay_IO_param param;
|
|
init_param(¶m, mi);
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, before_request_transmit, false, (¶m, (uint32)flags));
|
|
return ret;
|
|
}
|
|
|
|
int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
|
|
const char *packet, ulong len,
|
|
const char **event_buf,
|
|
ulong *event_len)
|
|
{
|
|
Binlog_relay_IO_param param;
|
|
init_param(¶m, mi);
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, after_read_event, false,
|
|
(¶m, packet, len, event_buf, event_len));
|
|
return ret;
|
|
}
|
|
|
|
int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
|
|
const char *event_buf,
|
|
ulong event_len,
|
|
bool synced)
|
|
{
|
|
Binlog_relay_IO_param param;
|
|
init_param(¶m, mi);
|
|
|
|
uint32 flags=0;
|
|
if (synced)
|
|
flags |= BINLOG_STORAGE_IS_SYNCED;
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, after_queue_event, false,
|
|
(¶m, event_buf, event_len, flags));
|
|
return ret;
|
|
}
|
|
|
|
int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
|
|
|
|
{
|
|
Binlog_relay_IO_param param;
|
|
init_param(¶m, mi);
|
|
|
|
int ret= 0;
|
|
FOREACH_OBSERVER(ret, after_reset_slave, false, (¶m));
|
|
return ret;
|
|
}
|
|
#endif /* HAVE_REPLICATION */
|
|
|
|
int register_trans_observer(Trans_observer *observer, void *p)
|
|
{
|
|
return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
|
|
}
|
|
|
|
int unregister_trans_observer(Trans_observer *observer, void *p)
|
|
{
|
|
return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
|
|
}
|
|
|
|
int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
|
|
{
|
|
return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
|
|
}
|
|
|
|
int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
|
|
{
|
|
return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
|
|
}
|
|
|
|
#ifdef HAVE_REPLICATION
|
|
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
|
|
{
|
|
return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
|
|
}
|
|
|
|
int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
|
|
{
|
|
return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
|
|
}
|
|
|
|
int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
|
|
{
|
|
return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
|
|
}
|
|
|
|
int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
|
|
{
|
|
return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
|
|
}
|
|
#else
|
|
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
|
|
{
|
|
return 0;
|
|
}
|
|
#endif /* HAVE_REPLICATION */
|