/* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef RPL_HANDLER_H #define RPL_HANDLER_H #include "sql_priv.h" #include "rpl_mi.h" #include "rpl_rli.h" #include "sql_plugin.h" #include "replication.h" class Observer_info { public: void *observer; st_plugin_int *plugin_int; Observer_info(void *ob, st_plugin_int *p) :observer(ob), plugin_int(p) { } }; class Delegate { public: typedef List Observer_info_list; typedef List_iterator Observer_info_iterator; int add_observer(void *observer, st_plugin_int *plugin) { int ret= FALSE; if (!inited) return TRUE; write_lock(); Observer_info_iterator iter(observer_info_list); Observer_info *info= iter++; while (info && info->observer != observer) info= iter++; if (!info) { info= new Observer_info(observer, plugin); if (!info || observer_info_list.push_back(info, &memroot)) ret= TRUE; } else ret= TRUE; unlock(); return ret; } int remove_observer(void *observer, st_plugin_int *plugin) { int ret= FALSE; if (!inited) return TRUE; write_lock(); Observer_info_iterator iter(observer_info_list); Observer_info *info= iter++; while (info && info->observer != observer) info= iter++; if (info) { iter.remove(); delete info; } else ret= TRUE; unlock(); return ret; } inline Observer_info_iterator observer_info_iter() { return Observer_info_iterator(observer_info_list); } inline bool is_empty() { return observer_info_list.is_empty(); } inline int read_lock() { if (!inited) return TRUE; return rw_rdlock(&lock); } inline int write_lock() { if (!inited) return TRUE; return rw_wrlock(&lock); } inline int unlock() { if (!inited) return TRUE; return rw_unlock(&lock); } inline bool is_inited() { return inited; } Delegate() { inited= FALSE; if (my_rwlock_init(&lock, NULL)) return; init_sql_alloc(&memroot, "Delegate", 1024, 0, MYF(0)); inited= TRUE; } ~Delegate() { inited= FALSE; rwlock_destroy(&lock); free_root(&memroot, MYF(0)); } private: Observer_info_list observer_info_list; rw_lock_t lock; MEM_ROOT memroot; bool inited; }; class Trans_delegate :public Delegate { public: typedef Trans_observer Observer; int before_commit(THD *thd, bool all); int before_rollback(THD *thd, bool all); int after_commit(THD *thd, bool all); int after_rollback(THD *thd, bool all); }; class Binlog_storage_delegate :public Delegate { public: typedef Binlog_storage_observer Observer; int after_flush(THD *thd, const char *log_file, my_off_t log_pos, bool synced, bool first_in_group, bool last_in_group); int after_sync(THD *thd, const char *log_file, my_off_t log_pos, bool first_in_group, bool last_in_group); }; #ifdef HAVE_REPLICATION class Binlog_transmit_delegate :public Delegate { public: typedef Binlog_transmit_observer Observer; int transmit_start(THD *thd, ushort flags, const char *log_file, my_off_t log_pos); int transmit_stop(THD *thd, ushort flags); int reserve_header(THD *thd, ushort flags, String *packet); int before_send_event(THD *thd, ushort flags, String *packet, const char *log_file, my_off_t log_pos ); int after_send_event(THD *thd, ushort flags, String *packet); int after_reset_master(THD *thd, ushort flags); }; class Binlog_relay_IO_delegate :public Delegate { public: typedef Binlog_relay_IO_observer Observer; int thread_start(THD *thd, Master_info *mi); int thread_stop(THD *thd, Master_info *mi); int before_request_transmit(THD *thd, Master_info *mi, ushort flags); int after_read_event(THD *thd, Master_info *mi, const char *packet, ulong len, const char **event_buf, ulong *event_len); int after_queue_event(THD *thd, Master_info *mi, const char *event_buf, ulong event_len, bool synced); int after_reset_slave(THD *thd, Master_info *mi); private: void init_param(Binlog_relay_IO_param *param, Master_info *mi); }; #endif /* HAVE_REPLICATION */ int delegates_init(); void delegates_destroy(); extern Trans_delegate *transaction_delegate; extern Binlog_storage_delegate *binlog_storage_delegate; #ifdef HAVE_REPLICATION extern Binlog_transmit_delegate *binlog_transmit_delegate; extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; #endif /* HAVE_REPLICATION */ /* if there is no observers in the delegate, we can return 0 immediately. */ #define RUN_HOOK(group, hook, args) \ (group ##_delegate->is_empty() ? \ 0 : group ##_delegate->hook args) #endif /* RPL_HANDLER_H */