mariadb/sql/rpl_handler.h
Jonas Oreland 0b87de124d MDEV-162 Enhanced semisync replication
Implement --semi-sync-master-wait-point=AFTER_SYNC|AFTER_COMMIT.

When AFTER_SYNC, the semi-sync wait will be done earlier, before the storage
engine commit rather than after. This means that a transaction will not be
visible on the master until at least one slave has received it.
2014-12-23 14:16:32 +01:00

216 lines
5.5 KiB
C++

/* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef RPL_HANDLER_H
#define RPL_HANDLER_H
#include "sql_priv.h"
#include "rpl_mi.h"
#include "rpl_rli.h"
#include "sql_plugin.h"
#include "replication.h"
class Observer_info {
public:
void *observer;
st_plugin_int *plugin_int;
Observer_info(void *ob, st_plugin_int *p)
:observer(ob), plugin_int(p)
{ }
};
class Delegate {
public:
typedef List<Observer_info> Observer_info_list;
typedef List_iterator<Observer_info> Observer_info_iterator;
int add_observer(void *observer, st_plugin_int *plugin)
{
int ret= FALSE;
if (!inited)
return TRUE;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info= iter++;
while (info && info->observer != observer)
info= iter++;
if (!info)
{
info= new Observer_info(observer, plugin);
if (!info || observer_info_list.push_back(info, &memroot))
ret= TRUE;
}
else
ret= TRUE;
unlock();
return ret;
}
int remove_observer(void *observer, st_plugin_int *plugin)
{
int ret= FALSE;
if (!inited)
return TRUE;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info= iter++;
while (info && info->observer != observer)
info= iter++;
if (info)
{
iter.remove();
delete info;
}
else
ret= TRUE;
unlock();
return ret;
}
inline Observer_info_iterator observer_info_iter()
{
return Observer_info_iterator(observer_info_list);
}
inline bool is_empty()
{
return observer_info_list.is_empty();
}
inline int read_lock()
{
if (!inited)
return TRUE;
return rw_rdlock(&lock);
}
inline int write_lock()
{
if (!inited)
return TRUE;
return rw_wrlock(&lock);
}
inline int unlock()
{
if (!inited)
return TRUE;
return rw_unlock(&lock);
}
inline bool is_inited()
{
return inited;
}
Delegate()
{
inited= FALSE;
if (my_rwlock_init(&lock, NULL))
return;
init_sql_alloc(&memroot, 1024, 0, MYF(0));
inited= TRUE;
}
~Delegate()
{
inited= FALSE;
rwlock_destroy(&lock);
free_root(&memroot, MYF(0));
}
private:
Observer_info_list observer_info_list;
rw_lock_t lock;
MEM_ROOT memroot;
bool inited;
};
class Trans_delegate
:public Delegate {
public:
typedef Trans_observer Observer;
int before_commit(THD *thd, bool all);
int before_rollback(THD *thd, bool all);
int after_commit(THD *thd, bool all);
int after_rollback(THD *thd, bool all);
};
class Binlog_storage_delegate
:public Delegate {
public:
typedef Binlog_storage_observer Observer;
int after_flush(THD *thd, const char *log_file,
my_off_t log_pos, bool synced,
bool first_in_group, bool last_in_group);
int after_sync(THD *thd, const char *log_file, my_off_t log_pos,
bool first_in_group, bool last_in_group);
};
#ifdef HAVE_REPLICATION
class Binlog_transmit_delegate
:public Delegate {
public:
typedef Binlog_transmit_observer Observer;
int transmit_start(THD *thd, ushort flags,
const char *log_file, my_off_t log_pos);
int transmit_stop(THD *thd, ushort flags);
int reserve_header(THD *thd, ushort flags, String *packet);
int before_send_event(THD *thd, ushort flags,
String *packet, const
char *log_file, my_off_t log_pos );
int after_send_event(THD *thd, ushort flags,
String *packet);
int after_reset_master(THD *thd, ushort flags);
};
class Binlog_relay_IO_delegate
:public Delegate {
public:
typedef Binlog_relay_IO_observer Observer;
int thread_start(THD *thd, Master_info *mi);
int thread_stop(THD *thd, Master_info *mi);
int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
int after_read_event(THD *thd, Master_info *mi,
const char *packet, ulong len,
const char **event_buf, ulong *event_len);
int after_queue_event(THD *thd, Master_info *mi,
const char *event_buf, ulong event_len,
bool synced);
int after_reset_slave(THD *thd, Master_info *mi);
private:
void init_param(Binlog_relay_IO_param *param, Master_info *mi);
};
#endif /* HAVE_REPLICATION */
int delegates_init();
void delegates_destroy();
extern Trans_delegate *transaction_delegate;
extern Binlog_storage_delegate *binlog_storage_delegate;
#ifdef HAVE_REPLICATION
extern Binlog_transmit_delegate *binlog_transmit_delegate;
extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
#endif /* HAVE_REPLICATION */
/*
if there is no observers in the delegate, we can return 0
immediately.
*/
#define RUN_HOOK(group, hook, args) \
(group ##_delegate->is_empty() ? \
0 : group ##_delegate->hook args)
#endif /* RPL_HANDLER_H */