From cb52174693801ae573aa21d78a9d49113cdd65cd Mon Sep 17 00:00:00 2001 From: Nikita Malyavin Date: Fri, 22 Sep 2023 17:09:21 +0400 Subject: [PATCH] online alter: extract the source to a separate file Move all the functions dedicated to online alter to a newly created online_alter.cc. With that, make many functions static and simplify the static functions naming. Also, rename binlog_log_row_online_alter -> online_alter_log_row. --- sql/CMakeLists.txt | 6 +- sql/handler.cc | 4 +- sql/handler.h | 6 +- sql/log.cc | 526 +------------------------------------------- sql/log.h | 6 +- sql/log_cache.h | 247 +++++++++++++++++++++ sql/online_alter.cc | 320 +++++++++++++++++++++++++++ 7 files changed, 577 insertions(+), 538 deletions(-) create mode 100644 sql/log_cache.h create mode 100644 sql/online_alter.cc diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index bf26e987eb5..72e906681fe 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -218,8 +218,9 @@ MYSQL_ADD_PLUGIN(partition ha_partition.cc STORAGE_ENGINE DEFAULT STATIC_ONLY RECOMPILE_FOR_EMBEDDED) MYSQL_ADD_PLUGIN(sql_sequence ha_sequence.cc STORAGE_ENGINE MANDATORY STATIC_ONLY RECOMPILE_FOR_EMBEDDED) -MYSQL_ADD_PLUGIN(online_alter_log log.cc STORAGE_ENGINE MANDATORY STATIC_ONLY -NOT_EMBEDDED) +MYSQL_ADD_PLUGIN(online_alter_log online_alter.cc STORAGE_ENGINE MANDATORY +STATIC_ONLY NOT_EMBEDDED) + ADD_LIBRARY(sql STATIC ${SQL_SOURCE}) MAYBE_DISABLE_IPO(sql) @@ -227,6 +228,7 @@ DTRACE_INSTRUMENT(sql) TARGET_LINK_LIBRARIES(sql mysys mysys_ssl dbug strings vio pcre2-8 tpool + online_alter_log ${LIBWRAP} ${LIBCRYPT} ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT} ${SSL_LIBRARIES} ${LIBSYSTEMD}) diff --git a/sql/handler.cc b/sql/handler.cc index 320d1d55292..a48c16c278c 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -7304,8 +7304,8 @@ int handler::binlog_log_row(const uchar *before_record, #ifdef HAVE_REPLICATION if (unlikely(!error && table->s->online_alter_binlog && is_root_handler())) - error= binlog_log_row_online_alter(table, before_record, after_record, - log_func); + error= online_alter_log_row(table, before_record, after_record, + log_func); #endif // HAVE_REPLICATION DBUG_RETURN(error); diff --git a/sql/handler.h b/sql/handler.h index 65b3ca2c5ff..07b392c010a 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -1098,8 +1098,6 @@ extern MYSQL_PLUGIN_IMPORT st_plugin_int *hton2plugin[MAX_HA]; struct handlerton; -extern handlerton *online_alter_hton; - #define view_pseudo_hton ((handlerton *)1) /* @@ -1972,9 +1970,7 @@ public: bool is_trx_read_write() const { DBUG_ASSERT(is_started()); - bool result= m_flags & (int) TRX_READ_WRITE; - DBUG_ASSERT(!result || m_ht != online_alter_hton); - return result; + return m_flags & (int) TRX_READ_WRITE; } bool is_started() const { return m_ht != NULL; } /** Mark this transaction read-write if the argument is read-write. */ diff --git a/sql/log.cc b/sql/log.cc index 34f8ac5cb2b..aa71631155b 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -59,6 +59,7 @@ #include "sp_rcontext.h" #include "sp_head.h" #include "sql_table.h" +#include "log_cache.h" #include "wsrep_mysqld.h" #ifdef WITH_WSREP @@ -75,9 +76,6 @@ /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 #define MAX_TIME_SIZE 32 -#define MY_OFF_T_UNDEF (~(my_off_t)0UL) -/* Truncate cache log files bigger than this */ -#define CACHE_FILE_TRUNC_SIZE 65536 #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -104,8 +102,6 @@ static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, Log_event *end_ev, bool all, bool using_stmt, bool using_trx, bool is_ro_1pc); -static int binlog_online_alter_end_trans(THD *thd, bool all, bool commit); - static const LEX_CSTRING write_error_msg= { STRING_WITH_LEN("error writing to the binary log") }; @@ -275,243 +271,7 @@ void make_default_log_name(char **out, const char* log_ext, bool once) Helper classes to store non-transactional and transactional data before copying it to the binary log. */ -class binlog_cache_data -{ -public: - binlog_cache_data(): before_stmt_pos(MY_OFF_T_UNDEF), m_pending(0), status(0), - incident(FALSE), saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0), - ptr_binlog_cache_disk_use(0) - { } - - ~binlog_cache_data() - { - DBUG_ASSERT(empty()); - close_cached_file(&cache_log); - } - /* - Return 1 if there is no relevant entries in the cache - - This is: - - Cache is empty - - There are row or critical (DDL?) events in the cache - - The status test is needed to avoid writing entries with only - a table map entry, which would crash in do_apply_event() on the slave - as it assumes that there is always a row entry after a table map. - */ - bool empty() const - { - return (pending() == NULL && - (my_b_write_tell(&cache_log) == 0 || - ((status & (LOGGED_ROW_EVENT | LOGGED_CRITICAL)) == 0))); - } - - Rows_log_event *pending() const - { - return m_pending; - } - - void set_pending(Rows_log_event *const pending_arg) - { - m_pending= pending_arg; - } - - void set_incident(void) - { - incident= TRUE; - } - - bool has_incident(void) const - { - return(incident); - } - - void reset() - { - bool cache_was_empty= empty(); - bool truncate_file= (cache_log.file != -1 && - my_b_write_tell(&cache_log) > CACHE_FILE_TRUNC_SIZE); - truncate(0,1); // Forget what's in cache - if (!cache_was_empty) - compute_statistics(); - if (truncate_file) - my_chsize(cache_log.file, 0, 0, MYF(MY_WME)); - - status= 0; - incident= FALSE; - before_stmt_pos= MY_OFF_T_UNDEF; - DBUG_ASSERT(empty()); - } - - my_off_t get_byte_position() const - { - return my_b_tell(&cache_log); - } - - my_off_t get_prev_position() const - { - return(before_stmt_pos); - } - - void set_prev_position(my_off_t pos) - { - before_stmt_pos= pos; - } - - void restore_prev_position() - { - truncate(before_stmt_pos); - } - - void restore_savepoint(my_off_t pos) - { - truncate(pos); - if (pos < before_stmt_pos) - before_stmt_pos= MY_OFF_T_UNDEF; - } - - void set_binlog_cache_info(my_off_t param_max_binlog_cache_size, - ulong *param_ptr_binlog_cache_use, - ulong *param_ptr_binlog_cache_disk_use) - { - /* - The assertions guarantee that the set_binlog_cache_info is - called just once and information passed as parameters are - never zero. - - This is done while calling the constructor binlog_cache_mngr. - We cannot set information in the constructor binlog_cache_data - because the space for binlog_cache_mngr is allocated through - a placement new. - - In the future, we can refactor this and change it to avoid - the set_binlog_info. - */ - DBUG_ASSERT(saved_max_binlog_cache_size == 0); - DBUG_ASSERT(param_max_binlog_cache_size != 0); - DBUG_ASSERT(ptr_binlog_cache_use == 0); - DBUG_ASSERT(param_ptr_binlog_cache_use != 0); - DBUG_ASSERT(ptr_binlog_cache_disk_use == 0); - DBUG_ASSERT(param_ptr_binlog_cache_disk_use != 0); - - saved_max_binlog_cache_size= param_max_binlog_cache_size; - ptr_binlog_cache_use= param_ptr_binlog_cache_use; - ptr_binlog_cache_disk_use= param_ptr_binlog_cache_disk_use; - cache_log.end_of_file= saved_max_binlog_cache_size; - } - - void add_status(enum_logged_status status_arg) - { - status|= status_arg; - } - - /* - Cache to store data before copying it to the binary log. - */ - IO_CACHE cache_log; - -protected: - /* - Binlog position before the start of the current statement. - */ - my_off_t before_stmt_pos; - -private: - /* - Pending binrows event. This event is the event where the rows are currently - written. - */ - Rows_log_event *m_pending; - - /* - Bit flags for what has been writing to cache. Used to - discard logs without any data changes. - see enum_logged_status; - */ - uint32 status; - - /* - This indicates that some events did not get into the cache and most likely - it is corrupted. - */ - bool incident; - - /** - This function computes binlog cache and disk usage. - */ - void compute_statistics() - { - statistic_increment(*ptr_binlog_cache_use, &LOCK_status); - if (cache_log.disk_writes != 0) - { -#ifdef REAL_STATISTICS - statistic_add(*ptr_binlog_cache_disk_use, - cache_log.disk_writes, &LOCK_status); -#else - statistic_increment(*ptr_binlog_cache_disk_use, &LOCK_status); -#endif - cache_log.disk_writes= 0; - } - } - - /* - Stores the values of maximum size of the cache allowed when this cache - is configured. This corresponds to either - . max_binlog_cache_size or max_binlog_stmt_cache_size. - */ - my_off_t saved_max_binlog_cache_size; - - /* - Stores a pointer to the status variable that keeps track of the in-memory - cache usage. This corresponds to either - . binlog_cache_use or binlog_stmt_cache_use. - */ - ulong *ptr_binlog_cache_use; - - /* - Stores a pointer to the status variable that keeps track of the disk - cache usage. This corresponds to either - . binlog_cache_disk_use or binlog_stmt_cache_disk_use. - */ - ulong *ptr_binlog_cache_disk_use; - - /* - It truncates the cache to a certain position. This includes deleting the - pending event. - */ - void truncate(my_off_t pos, bool reset_cache=0) - { - DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos)); - cache_log.error=0; - if (pending()) - { - delete pending(); - set_pending(0); - } - my_bool res= reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, reset_cache); - DBUG_ASSERT(res == 0); - cache_log.end_of_file= saved_max_binlog_cache_size; - } - - binlog_cache_data& operator=(const binlog_cache_data& info); - binlog_cache_data(const binlog_cache_data& info); -}; - - -class online_alter_cache_data: public Sql_alloc, public ilist_node<>, - public binlog_cache_data -{ -public: - void store_prev_position() - { - before_stmt_pos= my_b_write_tell(&cache_log); - } - - handlerton *hton; - Cache_flip_event_log *sink_log; - SAVEPOINT *sv_list; -}; void Log_event_writer::add_status(enum_logged_status status) { @@ -2266,61 +2026,6 @@ static int binlog_commit_flush_xa_prepare(THD *thd, bool all, return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); } -#ifdef HAVE_REPLICATION -int binlog_log_row_online_alter(TABLE* table, const uchar *before_record, - const uchar *after_record, Log_func *log_func) -{ - THD *thd= table->in_use; - - if (!table->online_alter_cache) - { - table->online_alter_cache= online_alter_binlog_get_cache_data(thd, table); - trans_register_ha(thd, false, online_alter_hton, 0); - if (thd->in_multi_stmt_transaction_mode()) - trans_register_ha(thd, true, online_alter_hton, 0); - } - - // We need to log all columns for the case if alter table changes primary key - DBUG_ASSERT(!before_record || bitmap_is_set_all(table->read_set)); - MY_BITMAP *old_rpl_write_set= table->rpl_write_set; - table->rpl_write_set= &table->s->all_set; - - table->online_alter_cache->store_prev_position(); - int error= (*log_func)(thd, table, table->s->online_alter_binlog, - table->online_alter_cache, - table->file->has_transactions_and_rollback(), - BINLOG_ROW_IMAGE_FULL, - before_record, after_record); - - table->rpl_write_set= old_rpl_write_set; - - if (unlikely(error)) - { - table->online_alter_cache->restore_prev_position(); - return HA_ERR_RBR_LOGGING_FAILED; - } - - return 0; -} - -static void -binlog_online_alter_cleanup(ilist &list, bool ending_trans) -{ - if (ending_trans) - { - auto it= list.begin(); - while (it != list.end()) - { - auto &cache= *it++; - cache.sink_log->release(); - cache.reset(); - delete &cache; - } - list.clear(); - DBUG_ASSERT(list.empty()); - } -} -#endif // HAVE_REPLICATION /** This function is called once after each statement. @@ -6433,54 +6138,6 @@ write_err: } -#ifdef HAVE_REPLICATION -static online_alter_cache_data * -online_alter_binlog_setup_cache_data(MEM_ROOT *root, TABLE_SHARE *share) -{ - static ulong online_alter_cache_use= 0, online_alter_cache_disk_use= 0; - - auto cache= new (root) online_alter_cache_data(); - if (!cache || open_cached_file(&cache->cache_log, mysql_tmpdir, - LOG_PREFIX, (size_t)binlog_cache_size, MYF(MY_WME))) - { - delete cache; - return NULL; - } - - share->online_alter_binlog->acquire(); - cache->hton= share->db_type(); - cache->sink_log= share->online_alter_binlog; - - my_off_t binlog_max_size= SIZE_T_MAX; // maximum possible cache size - DBUG_EXECUTE_IF("online_alter_small_cache", binlog_max_size= 4096;); - - cache->set_binlog_cache_info(binlog_max_size, - &online_alter_cache_use, - &online_alter_cache_disk_use); - cache->store_prev_position(); - return cache; -} - - -online_alter_cache_data *online_alter_binlog_get_cache_data(THD *thd, TABLE *table) -{ - ilist &list= thd->online_alter_cache_list; - - /* we assume it's very rare to have more than one online ALTER running */ - for (auto &cache: list) - { - if (cache.sink_log == table->s->online_alter_binlog) - return &cache; - } - - MEM_ROOT *root= &thd->transaction->mem_root; - auto *new_cache_data= online_alter_binlog_setup_cache_data(root, table->s); - list.push_back(*new_cache_data); - - return new_cache_data; -} -#endif - binlog_cache_mngr *THD::binlog_get_cache_mngr() const { return (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); @@ -7737,129 +7394,6 @@ private: bool first; }; -static int binlog_online_alter_end_trans(THD *thd, bool all, bool commit) -{ - DBUG_ENTER("binlog_online_alter_end_trans"); - int error= 0; -#ifdef HAVE_REPLICATION - if (thd->online_alter_cache_list.empty()) - DBUG_RETURN(0); - - bool is_ending_transaction= ending_trans(thd, all); - - for (auto &cache: thd->online_alter_cache_list) - { - auto *binlog= cache.sink_log; - DBUG_ASSERT(binlog); - bool non_trans= cache.hton->flags & HTON_NO_ROLLBACK // Aria - || !cache.hton->rollback; - bool do_commit= (commit && is_ending_transaction) || non_trans; - - if (commit || non_trans) - { - // Do not set STMT_END for last event to leave table open in altering thd - error= binlog_flush_pending_rows_event(thd, false, true, binlog, &cache); - } - - if (do_commit) - { - /* - If the cache wasn't reinited to write, then it remains empty after - the last write. - */ - if (my_b_bytes_in_cache(&cache.cache_log) && likely(!error)) - { - DBUG_ASSERT(cache.cache_log.type != READ_CACHE); - mysql_mutex_lock(binlog->get_log_lock()); - error= binlog->write_cache_raw(thd, &cache.cache_log); - mysql_mutex_unlock(binlog->get_log_lock()); - } - } - else if (!commit) // rollback - { - DBUG_ASSERT(!non_trans); - cache.restore_prev_position(); - } - else - { - DBUG_ASSERT(!is_ending_transaction); - cache.store_prev_position(); - } - - - if (error) - { - my_error(ER_ERROR_ON_WRITE, MYF(ME_ERROR_LOG), - binlog->get_name(), errno); - binlog_online_alter_cleanup(thd->online_alter_cache_list, - is_ending_transaction); - DBUG_RETURN(error); - } - } - - binlog_online_alter_cleanup(thd->online_alter_cache_list, - is_ending_transaction); - - for (TABLE *table= thd->open_tables; table; table= table->next) - table->online_alter_cache= NULL; -#endif // HAVE_REPLICATION - DBUG_RETURN(error); -} - -SAVEPOINT** find_savepoint_in_list(THD *thd, LEX_CSTRING name, - SAVEPOINT ** const list); - -SAVEPOINT* savepoint_add(THD *thd, LEX_CSTRING name, SAVEPOINT **list, - int (*release_old)(THD*, SAVEPOINT*)); - -int online_alter_savepoint_set(THD *thd, LEX_CSTRING name) -{ - - DBUG_ENTER("binlog_online_alter_savepoint"); -#ifdef HAVE_REPLICATION - if (thd->online_alter_cache_list.empty()) - DBUG_RETURN(0); - - if (savepoint_alloc_size < sizeof (SAVEPOINT) + sizeof(my_off_t)) - savepoint_alloc_size= sizeof (SAVEPOINT) + sizeof(my_off_t); - - for (auto &cache: thd->online_alter_cache_list) - { - if (cache.hton->savepoint_set == NULL) - continue; - - SAVEPOINT *sv= savepoint_add(thd, name, &cache.sv_list, NULL); - if(unlikely(sv == NULL)) - DBUG_RETURN(1); - my_off_t *pos= (my_off_t*)(sv+1); - *pos= cache.get_byte_position(); - - sv->prev= cache.sv_list; - cache.sv_list= sv; - } -#endif - DBUG_RETURN(0); -} - -int online_alter_savepoint_rollback(THD *thd, LEX_CSTRING name) -{ - DBUG_ENTER("online_alter_savepoint_rollback"); -#ifdef HAVE_REPLICATION - for (auto &cache: thd->online_alter_cache_list) - { - if (cache.hton->savepoint_set == NULL) - continue; - - SAVEPOINT **sv= find_savepoint_in_list(thd, name, &cache.sv_list); - // sv is null if savepoint was set up before online table was modified - my_off_t pos= *sv ? *(my_off_t*)(*sv+1) : 0; - - cache.restore_savepoint(pos); - } - -#endif - DBUG_RETURN(0); -} int Event_log::write_cache_raw(THD *thd, IO_CACHE *cache) { @@ -12528,61 +12062,3 @@ void wsrep_register_binlog_handler(THD *thd, bool trx) } #endif /* WITH_WSREP */ - -static int online_alter_close_connection(handlerton *hton, THD *thd) -{ - DBUG_ASSERT(thd->online_alter_cache_list.empty()); - return 0; -} - -handlerton *online_alter_hton; - -int online_alter_log_init(void *p) -{ - online_alter_hton= (handlerton *)p; - online_alter_hton->db_type= DB_TYPE_ONLINE_ALTER; - online_alter_hton->savepoint_offset= sizeof(my_off_t); - online_alter_hton->close_connection= online_alter_close_connection; - - online_alter_hton->savepoint_set= // Done by online_alter_savepoint_set - [](handlerton *, THD *, void *){ return 0; }; - online_alter_hton->savepoint_rollback= // Done by online_alter_savepoint_rollback - [](handlerton *, THD *, void *){ return 0; }; - online_alter_hton->savepoint_rollback_can_release_mdl= - [](handlerton *hton, THD *thd){ return true; }; - - online_alter_hton->commit= [](handlerton *, THD *thd, bool all) - { return binlog_online_alter_end_trans(thd, all, true); }; - online_alter_hton->rollback= [](handlerton *, THD *thd, bool all) - { return binlog_online_alter_end_trans(thd, all, false); }; - online_alter_hton->commit_by_xid= [](handlerton *hton, XID *xid) - { return binlog_online_alter_end_trans(current_thd, true, true); }; - online_alter_hton->rollback_by_xid= [](handlerton *hton, XID *xid) - { return binlog_online_alter_end_trans(current_thd, true, false); }; - - online_alter_hton->drop_table= [](handlerton *, const char*) { return -1; }; - online_alter_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN - | HTON_NO_ROLLBACK; - return 0; -} - -struct st_mysql_storage_engine online_alter_storage_engine= -{ MYSQL_HANDLERTON_INTERFACE_VERSION }; - -maria_declare_plugin(online_alter_log) -{ - MYSQL_STORAGE_ENGINE_PLUGIN, - &online_alter_storage_engine, - "online_alter_log", - "MariaDB PLC", - "This is a pseudo storage engine to represent the online alter log in a transaction", - PLUGIN_LICENSE_GPL, - online_alter_log_init, - NULL, - 0x0100, // 1.0 - NULL, // no status vars - NULL, // no sysvars - "1.0", - MariaDB_PLUGIN_MATURITY_STABLE -} -maria_declare_plugin_end; diff --git a/sql/log.h b/sql/log.h index f617f23911e..eb5c42d1bf2 100644 --- a/sql/log.h +++ b/sql/log.h @@ -1343,15 +1343,13 @@ int binlog_flush_pending_rows_event(THD *thd, bool stmt_end, binlog_cache_data *cache_data); Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr, bool use_trans_cache); -int binlog_log_row_online_alter(TABLE* table, const uchar *before_record, - const uchar *after_record, Log_func *log_func); -online_alter_cache_data *online_alter_binlog_get_cache_data(THD *thd, TABLE *table); +int online_alter_log_row(TABLE* table, const uchar *before_record, + const uchar *after_record, Log_func *log_func); binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr, bool use_trans_cache); extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log; extern handlerton *binlog_hton; -extern handlerton *online_alter_hton; extern LOGGER logger; extern const char *log_bin_index; diff --git a/sql/log_cache.h b/sql/log_cache.h new file mode 100644 index 00000000000..31342768942 --- /dev/null +++ b/sql/log_cache.h @@ -0,0 +1,247 @@ +/* + Copyright (c) 2023, MariaDB plc + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA +*/ + +#include "log_event.h" + +static constexpr my_off_t MY_OFF_T_UNDEF= ~0ULL; +/** Truncate cache log files bigger than this */ +static constexpr my_off_t CACHE_FILE_TRUNC_SIZE = 65536; + + +class binlog_cache_data +{ +public: + binlog_cache_data(): before_stmt_pos(MY_OFF_T_UNDEF), m_pending(0), status(0), + incident(FALSE), saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0), + ptr_binlog_cache_disk_use(0) + { } + + ~binlog_cache_data() + { + DBUG_ASSERT(empty()); + close_cached_file(&cache_log); + } + + /* + Return 1 if there is no relevant entries in the cache + + This is: + - Cache is empty + - There are row or critical (DDL?) events in the cache + + The status test is needed to avoid writing entries with only + a table map entry, which would crash in do_apply_event() on the slave + as it assumes that there is always a row entry after a table map. + */ + bool empty() const + { + return (pending() == NULL && + (my_b_write_tell(&cache_log) == 0 || + ((status & (LOGGED_ROW_EVENT | LOGGED_CRITICAL)) == 0))); + } + + Rows_log_event *pending() const + { + return m_pending; + } + + void set_pending(Rows_log_event *const pending_arg) + { + m_pending= pending_arg; + } + + void set_incident(void) + { + incident= TRUE; + } + + bool has_incident(void) const + { + return(incident); + } + + void reset() + { + bool cache_was_empty= empty(); + bool truncate_file= (cache_log.file != -1 && + my_b_write_tell(&cache_log) > CACHE_FILE_TRUNC_SIZE); + truncate(0,1); // Forget what's in cache + if (!cache_was_empty) + compute_statistics(); + if (truncate_file) + my_chsize(cache_log.file, 0, 0, MYF(MY_WME)); + + status= 0; + incident= FALSE; + before_stmt_pos= MY_OFF_T_UNDEF; + DBUG_ASSERT(empty()); + } + + my_off_t get_byte_position() const + { + return my_b_tell(&cache_log); + } + + my_off_t get_prev_position() const + { + return(before_stmt_pos); + } + + void set_prev_position(my_off_t pos) + { + before_stmt_pos= pos; + } + + void restore_prev_position() + { + truncate(before_stmt_pos); + } + + void restore_savepoint(my_off_t pos) + { + truncate(pos); + if (pos < before_stmt_pos) + before_stmt_pos= MY_OFF_T_UNDEF; + } + + void set_binlog_cache_info(my_off_t param_max_binlog_cache_size, + ulong *param_ptr_binlog_cache_use, + ulong *param_ptr_binlog_cache_disk_use) + { + /* + The assertions guarantee that the set_binlog_cache_info is + called just once and information passed as parameters are + never zero. + + This is done while calling the constructor binlog_cache_mngr. + We cannot set information in the constructor binlog_cache_data + because the space for binlog_cache_mngr is allocated through + a placement new. + + In the future, we can refactor this and change it to avoid + the set_binlog_info. + */ + DBUG_ASSERT(saved_max_binlog_cache_size == 0); + DBUG_ASSERT(param_max_binlog_cache_size != 0); + DBUG_ASSERT(ptr_binlog_cache_use == 0); + DBUG_ASSERT(param_ptr_binlog_cache_use != 0); + DBUG_ASSERT(ptr_binlog_cache_disk_use == 0); + DBUG_ASSERT(param_ptr_binlog_cache_disk_use != 0); + + saved_max_binlog_cache_size= param_max_binlog_cache_size; + ptr_binlog_cache_use= param_ptr_binlog_cache_use; + ptr_binlog_cache_disk_use= param_ptr_binlog_cache_disk_use; + cache_log.end_of_file= saved_max_binlog_cache_size; + } + + void add_status(enum_logged_status status_arg) + { + status|= status_arg; + } + + /* + Cache to store data before copying it to the binary log. + */ + IO_CACHE cache_log; + +protected: + /* + Binlog position before the start of the current statement. + */ + my_off_t before_stmt_pos; + +private: + /* + Pending binrows event. This event is the event where the rows are currently + written. + */ + Rows_log_event *m_pending; + + /* + Bit flags for what has been writing to cache. Used to + discard logs without any data changes. + see enum_logged_status; + */ + uint32 status; + + /* + This indicates that some events did not get into the cache and most likely + it is corrupted. + */ + bool incident; + + /** + This function computes binlog cache and disk usage. + */ + void compute_statistics() + { + statistic_increment(*ptr_binlog_cache_use, &LOCK_status); + if (cache_log.disk_writes != 0) + { +#ifdef REAL_STATISTICS + statistic_add(*ptr_binlog_cache_disk_use, + cache_log.disk_writes, &LOCK_status); +#else + statistic_increment(*ptr_binlog_cache_disk_use, &LOCK_status); +#endif + cache_log.disk_writes= 0; + } + } + + /* + Stores the values of maximum size of the cache allowed when this cache + is configured. This corresponds to either + . max_binlog_cache_size or max_binlog_stmt_cache_size. + */ + my_off_t saved_max_binlog_cache_size; + + /* + Stores a pointer to the status variable that keeps track of the in-memory + cache usage. This corresponds to either + . binlog_cache_use or binlog_stmt_cache_use. + */ + ulong *ptr_binlog_cache_use; + + /* + Stores a pointer to the status variable that keeps track of the disk + cache usage. This corresponds to either + . binlog_cache_disk_use or binlog_stmt_cache_disk_use. + */ + ulong *ptr_binlog_cache_disk_use; + + /* + It truncates the cache to a certain position. This includes deleting the + pending event. + */ + void truncate(my_off_t pos, bool reset_cache=0) + { + DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos)); + cache_log.error=0; + if (pending()) + { + delete pending(); + set_pending(0); + } + my_bool res= reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, reset_cache); + DBUG_ASSERT(res == 0); + cache_log.end_of_file= saved_max_binlog_cache_size; + } + + binlog_cache_data& operator=(const binlog_cache_data& info); + binlog_cache_data(const binlog_cache_data& info); +}; diff --git a/sql/online_alter.cc b/sql/online_alter.cc new file mode 100644 index 00000000000..ac4eb23a981 --- /dev/null +++ b/sql/online_alter.cc @@ -0,0 +1,320 @@ +/* + Copyright (c) 2023, MariaDB plc + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA +*/ + +#include "my_global.h" +#include "handler.h" +#include "sql_class.h" +#include "log_cache.h" + + +static handlerton *online_alter_hton; + + +class online_alter_cache_data: public Sql_alloc, public ilist_node<>, + public binlog_cache_data +{ +public: + void store_prev_position() + { + before_stmt_pos= my_b_write_tell(&cache_log); + } + + handlerton *hton; + Cache_flip_event_log *sink_log; + SAVEPOINT *sv_list; +}; + + +static +online_alter_cache_data *setup_cache_data(MEM_ROOT *root, TABLE_SHARE *share) +{ + static ulong online_alter_cache_use= 0, online_alter_cache_disk_use= 0; + + auto cache= new (root) online_alter_cache_data(); + if (!cache || open_cached_file(&cache->cache_log, mysql_tmpdir, LOG_PREFIX, + (size_t)binlog_cache_size, MYF(MY_WME))) + { + delete cache; + return NULL; + } + + share->online_alter_binlog->acquire(); + cache->hton= share->db_type(); + cache->sink_log= share->online_alter_binlog; + + my_off_t binlog_max_size= SIZE_T_MAX; // maximum possible cache size + DBUG_EXECUTE_IF("online_alter_small_cache", binlog_max_size= 4096;); + + cache->set_binlog_cache_info(binlog_max_size, + &online_alter_cache_use, + &online_alter_cache_disk_use); + cache->store_prev_position(); + return cache; +} + + +static online_alter_cache_data *get_cache_data(THD *thd, TABLE *table) +{ + ilist &list= thd->online_alter_cache_list; + + /* we assume it's very rare to have more than one online ALTER running */ + for (auto &cache: list) + { + if (cache.sink_log == table->s->online_alter_binlog) + return &cache; + } + + MEM_ROOT *root= &thd->transaction->mem_root; + auto *new_cache_data= setup_cache_data(root, table->s); + list.push_back(*new_cache_data); + + return new_cache_data; +} + + +int online_alter_log_row(TABLE* table, const uchar *before_record, + const uchar *after_record, Log_func *log_func) +{ + THD *thd= table->in_use; + + if (!table->online_alter_cache) + { + table->online_alter_cache= get_cache_data(thd, table); + trans_register_ha(thd, false, online_alter_hton, 0); + if (thd->in_multi_stmt_transaction_mode()) + trans_register_ha(thd, true, online_alter_hton, 0); + } + + // We need to log all columns for the case if alter table changes primary key + DBUG_ASSERT(!before_record || bitmap_is_set_all(table->read_set)); + MY_BITMAP *old_rpl_write_set= table->rpl_write_set; + table->rpl_write_set= &table->s->all_set; + + table->online_alter_cache->store_prev_position(); + int error= (*log_func)(thd, table, table->s->online_alter_binlog, + table->online_alter_cache, + table->file->has_transactions_and_rollback(), + BINLOG_ROW_IMAGE_FULL, + before_record, after_record); + + table->rpl_write_set= old_rpl_write_set; + + if (unlikely(error)) + { + table->online_alter_cache->restore_prev_position(); + return HA_ERR_RBR_LOGGING_FAILED; + } + + return 0; +} + + +static void +cleanup_cache_list(ilist &list, bool ending_trans) +{ + if (ending_trans) + { + auto it= list.begin(); + while (it != list.end()) + { + auto &cache= *it++; + cache.sink_log->release(); + cache.reset(); + delete &cache; + } + list.clear(); + DBUG_ASSERT(list.empty()); + } +} + + +static +int online_alter_end_trans(handlerton *hton, THD *thd, bool all, bool commit) +{ + DBUG_ENTER("online_alter_end_trans"); + int error= 0; + if (thd->online_alter_cache_list.empty()) + DBUG_RETURN(0); + + bool is_ending_transaction= ending_trans(thd, all); + + for (auto &cache: thd->online_alter_cache_list) + { + auto *binlog= cache.sink_log; + DBUG_ASSERT(binlog); + bool non_trans= cache.hton->flags & HTON_NO_ROLLBACK // Aria + || !cache.hton->rollback; + bool do_commit= (commit && is_ending_transaction) || non_trans; + + if (commit || non_trans) + { + // Do not set STMT_END for last event to leave table open in altering thd + error= binlog_flush_pending_rows_event(thd, false, true, binlog, &cache); + } + + if (do_commit) + { + /* + If the cache wasn't reinited to write, then it remains empty after + the last write. + */ + if (my_b_bytes_in_cache(&cache.cache_log) && likely(!error)) + { + DBUG_ASSERT(cache.cache_log.type != READ_CACHE); + mysql_mutex_lock(binlog->get_log_lock()); + error= binlog->write_cache_raw(thd, &cache.cache_log); + mysql_mutex_unlock(binlog->get_log_lock()); + } + } + else if (!commit) // rollback + { + DBUG_ASSERT(!non_trans); + cache.restore_prev_position(); + } + else + { + DBUG_ASSERT(!is_ending_transaction); + cache.store_prev_position(); + } + + + if (error) + { + my_error(ER_ERROR_ON_WRITE, MYF(ME_ERROR_LOG), + binlog->get_name(), errno); + cleanup_cache_list(thd->online_alter_cache_list, + is_ending_transaction); + DBUG_RETURN(error); + } + } + + cleanup_cache_list(thd->online_alter_cache_list, + is_ending_transaction); + + for (TABLE *table= thd->open_tables; table; table= table->next) + table->online_alter_cache= NULL; + DBUG_RETURN(error); +} + +SAVEPOINT** find_savepoint_in_list(THD *thd, LEX_CSTRING name, + SAVEPOINT ** const list); + +SAVEPOINT* savepoint_add(THD *thd, LEX_CSTRING name, SAVEPOINT **list, + int (*release_old)(THD*, SAVEPOINT*)); + +int online_alter_savepoint_set(THD *thd, LEX_CSTRING name) +{ + DBUG_ENTER("binlog_online_alter_savepoint"); + if (thd->online_alter_cache_list.empty()) + DBUG_RETURN(0); + + if (savepoint_alloc_size < sizeof (SAVEPOINT) + sizeof(my_off_t)) + savepoint_alloc_size= sizeof (SAVEPOINT) + sizeof(my_off_t); + + for (auto &cache: thd->online_alter_cache_list) + { + if (cache.hton->savepoint_set == NULL) + continue; + + SAVEPOINT *sv= savepoint_add(thd, name, &cache.sv_list, NULL); + if(unlikely(sv == NULL)) + DBUG_RETURN(1); + my_off_t *pos= (my_off_t*)(sv+1); + *pos= cache.get_byte_position(); + + sv->prev= cache.sv_list; + cache.sv_list= sv; + } + DBUG_RETURN(0); +} + +int online_alter_savepoint_rollback(THD *thd, LEX_CSTRING name) +{ + DBUG_ENTER("online_alter_savepoint_rollback"); + for (auto &cache: thd->online_alter_cache_list) + { + if (cache.hton->savepoint_set == NULL) + continue; + + SAVEPOINT **sv= find_savepoint_in_list(thd, name, &cache.sv_list); + // sv is null if savepoint was set up before online table was modified + my_off_t pos= *sv ? *(my_off_t*)(*sv+1) : 0; + + cache.restore_savepoint(pos); + } + + DBUG_RETURN(0); +} + + +static int online_alter_close_connection(handlerton *hton, THD *thd) +{ + DBUG_ASSERT(thd->online_alter_cache_list.empty()); + return 0; +} + + +static int online_alter_log_init(void *p) +{ + online_alter_hton= (handlerton *)p; + online_alter_hton->db_type= DB_TYPE_ONLINE_ALTER; + online_alter_hton->savepoint_offset= sizeof(my_off_t); + online_alter_hton->close_connection= online_alter_close_connection; + + online_alter_hton->savepoint_set= // Done by online_alter_savepoint_set + [](handlerton *, THD *, void *){ return 0; }; + online_alter_hton->savepoint_rollback= // Done by online_alter_savepoint_rollback + [](handlerton *, THD *, void *){ return 0; }; + online_alter_hton->savepoint_rollback_can_release_mdl= + [](handlerton *hton, THD *thd){ return true; }; + + online_alter_hton->commit= [](handlerton *hton, THD *thd, bool all) + { return online_alter_end_trans(hton, thd, all, true); }; + online_alter_hton->rollback= [](handlerton *hton, THD *thd, bool all) + { return online_alter_end_trans(hton, thd, all, false); }; + online_alter_hton->commit_by_xid= [](handlerton *hton, XID *xid) + { return online_alter_end_trans(hton, current_thd, true, true); }; + online_alter_hton->rollback_by_xid= [](handlerton *hton, XID *xid) + { return online_alter_end_trans(hton, current_thd, true, false); }; + + online_alter_hton->drop_table= [](handlerton *, const char*) { return -1; }; + online_alter_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN + | HTON_NO_ROLLBACK; + return 0; +} + +struct st_mysql_storage_engine online_alter_storage_engine= +{ MYSQL_HANDLERTON_INTERFACE_VERSION }; + +maria_declare_plugin(online_alter_log) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, + &online_alter_storage_engine, + "online_alter_log", + "MariaDB PLC", + "A pseudo storage engine for the online alter log", + PLUGIN_LICENSE_GPL, + online_alter_log_init, + NULL, + 0x0100, // 1.0 + NULL, // no status vars + NULL, // no sysvars + "1.0", + MariaDB_PLUGIN_MATURITY_STABLE +} +maria_declare_plugin_end;