mariadb/sql/wsrep_applier.cc

398 lines
12 KiB
C++
Raw Normal View History

2015-07-14 16:05:29 -04:00
/* Copyright (C) 2013-2015 Codership Oy <info@codership.com>
2014-01-17 13:28:43 +02:00
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
#include "wsrep_priv.h"
2015-07-14 16:05:29 -04:00
#include "wsrep_binlog.h" // wsrep_dump_rbr_buf()
#include "wsrep_xid.h"
2014-01-17 13:28:43 +02:00
2015-07-14 16:05:29 -04:00
#include "log_event.h" // class THD, EVENT_LEN_OFFSET, etc.
2014-01-17 13:28:43 +02:00
#include "wsrep_applier.h"
2016-02-23 20:33:21 -05:00
#include "debug_sync.h"
2014-01-17 13:28:43 +02:00
/*
read the first event from (*buf). The size of the (*buf) is (*buf_len).
At the end (*buf) is shitfed to point to the following event or NULL and
(*buf_len) will be changed to account just being read bytes of the 1st event.
*/
static Log_event* wsrep_read_log_event(
char **arg_buf, size_t *arg_buf_len,
const Format_description_log_event *description_event)
{
DBUG_ENTER("wsrep_read_log_event");
char *head= (*arg_buf);
uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
char *buf= (*arg_buf);
const char *error= 0;
Log_event *res= 0;
res= Log_event::read_log_event(buf, data_len, &error, description_event,
true);
2014-01-17 13:28:43 +02:00
if (!res)
{
DBUG_ASSERT(error != 0);
sql_print_error("Error in Log_event::read_log_event(): "
"'%s', data_len: %d, event_type: %d",
error,data_len,head[EVENT_TYPE_OFFSET]);
}
(*arg_buf)+= data_len;
(*arg_buf_len)-= data_len;
DBUG_RETURN(res);
}
#include "transaction.h" // trans_commit(), trans_rollback()
#include "rpl_rli.h" // class Relay_log_info;
void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev)
2014-01-17 13:28:43 +02:00
{
if (thd->wsrep_apply_format)
{
delete (Format_description_log_event*)thd->wsrep_apply_format;
}
thd->wsrep_apply_format= ev;
}
Format_description_log_event* wsrep_get_apply_format(THD* thd)
2014-01-17 13:28:43 +02:00
{
if (thd->wsrep_apply_format)
2015-07-14 16:05:29 -04:00
{
return (Format_description_log_event*) thd->wsrep_apply_format;
}
return thd->wsrep_rgi->rli->relay_log.description_event_for_exec;
2014-01-17 13:28:43 +02:00
}
static wsrep_cb_status_t wsrep_apply_events(THD* thd,
const void* events_buf,
size_t buf_len)
{
char *buf= (char *)events_buf;
int rcode= 0;
int event= 1;
2015-07-14 16:05:29 -04:00
Log_event_type typ;
2014-01-17 13:28:43 +02:00
2014-05-21 17:07:17 -04:00
DBUG_ENTER("wsrep_apply_events");
2014-01-17 13:28:43 +02:00
2014-09-30 18:06:15 -04:00
if (thd->killed == KILL_CONNECTION &&
thd->wsrep_conflict_state != REPLAYING)
2014-01-17 13:28:43 +02:00
{
WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld",
(long long) wsrep_thd_trx_seqno(thd));
DBUG_RETURN(WSREP_CB_FAILURE);
}
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_query_state= QUERY_EXEC;
if (thd->wsrep_conflict_state!= REPLAYING)
thd->wsrep_conflict_state= NO_CONFLICT;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
(long long) wsrep_thd_trx_seqno(thd));
while(buf_len)
{
int exec_res;
Log_event* ev= wsrep_read_log_event(&buf, &buf_len,
wsrep_get_apply_format(thd));
if (!ev)
{
WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %zu",
2014-01-17 13:28:43 +02:00
(long long)wsrep_thd_trx_seqno(thd), buf_len);
rcode= 1;
goto error;
}
2014-05-21 17:07:17 -04:00
2015-07-14 16:05:29 -04:00
typ= ev->get_type_code();
switch (typ) {
2014-01-17 13:28:43 +02:00
case FORMAT_DESCRIPTION_EVENT:
wsrep_set_apply_format(thd, (Format_description_log_event*)ev);
continue;
#ifdef GTID_SUPPORT
case GTID_LOG_EVENT:
{
Gtid_log_event* gev= (Gtid_log_event*)ev;
if (gev->get_gno() == 0)
{
/* Skip GTID log event to make binlog to generate LTID on commit */
delete ev;
continue;
}
}
#endif /* GTID_SUPPORT */
default:
break;
}
2015-07-14 16:05:29 -04:00
/* Use the original server id for logging. */
thd->set_server_id(ev->server_id);
thd->set_time(); // time the query
2014-01-17 13:28:43 +02:00
wsrep_xid_init(&thd->transaction.xid_state.xid,
2015-07-14 16:05:29 -04:00
thd->wsrep_trx_meta.gtid.uuid,
2014-01-17 13:28:43 +02:00
thd->wsrep_trx_meta.gtid.seqno);
thd->lex->current_select= 0;
if (!ev->when)
{
my_hrtime_t hrtime= my_hrtime();
ev->when= hrtime_to_my_time(hrtime);
ev->when_sec_part= hrtime_sec_part(hrtime);
}
2015-07-14 16:05:29 -04:00
thd->variables.option_bits=
(thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0);
2014-01-17 13:28:43 +02:00
ev->thd = thd;
exec_res = ev->apply_event(thd->wsrep_rgi);
DBUG_PRINT("info", ("exec_event result: %d", exec_res));
if (exec_res)
{
WSREP_WARN("RBR event %d %s apply warning: %d, %lld",
event, ev->get_type_str(), exec_res,
(long long) wsrep_thd_trx_seqno(thd));
rcode= exec_res;
/* stop processing for the first error */
delete ev;
goto error;
}
event++;
if (thd->wsrep_conflict_state!= NO_CONFLICT &&
thd->wsrep_conflict_state!= REPLAYING)
WSREP_WARN("conflict state after RBR event applying: %d, %lld",
thd->wsrep_query_state, (long long)wsrep_thd_trx_seqno(thd));
if (thd->wsrep_conflict_state == MUST_ABORT) {
WSREP_WARN("RBR event apply failed, rolling back: %lld",
(long long) wsrep_thd_trx_seqno(thd));
trans_rollback(thd);
thd->locked_tables_list.unlock_locked_tables(thd);
/* Release transactional metadata locks. */
thd->mdl_context.release_transactional_locks();
thd->wsrep_conflict_state= NO_CONFLICT;
DBUG_RETURN(WSREP_CB_FAILURE);
}
2015-07-14 16:05:29 -04:00
delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev);
2014-01-17 13:28:43 +02:00
}
error:
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_query_state= QUERY_IDLE;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
assert(thd->wsrep_exec_mode== REPL_RECV);
if (thd->killed == KILL_CONNECTION)
WSREP_INFO("applier aborted: %lld", (long long)wsrep_thd_trx_seqno(thd));
if (rcode) DBUG_RETURN(WSREP_CB_FAILURE);
DBUG_RETURN(WSREP_CB_SUCCESS);
}
wsrep_cb_status_t wsrep_apply_cb(void* const ctx,
const void* const buf,
size_t const buf_len,
uint32_t const flags,
const wsrep_trx_meta_t* meta)
{
THD* const thd((THD*)ctx);
2016-02-23 20:33:21 -05:00
// Allow tests to block the applier thread using the DBUG facilities.
DBUG_EXECUTE_IF("sync.wsrep_apply_cb",
{
const char act[]=
"now "
"wait_for signal.wsrep_apply_cb";
DBUG_ASSERT(!debug_sync_set_action(thd,
STRING_WITH_LEN(act)));
};);
2014-01-17 13:28:43 +02:00
thd->wsrep_trx_meta = *meta;
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"applying write set %lld: %p, %zu",
(long long)wsrep_thd_trx_seqno(thd), buf, buf_len);
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "applying write set");
#endif /* WSREP_PROC_INFO */
2014-06-19 18:48:20 -04:00
/* tune FK and UK checking policy */
if (wsrep_slave_UK_checks == FALSE)
thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
else
thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
if (wsrep_slave_FK_checks == FALSE)
thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
else
thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
MDEV-7563 Support CHECK constraint as in (or close to) SQL Standard MDEV-10134 Add full support for DEFAULT - Added support for using tables with MySQL 5.7 virtual fields, including MySQL 5.7 syntax - Better error messages also for old cases - CREATE ... SELECT now also updates timestamp columns - Blob can now have default values - Added new system variable "check_constraint_checks", to turn of CHECK constraint checking if needed. - Removed some engine independent tests in suite vcol to only test myisam - Moved some tests from 'include' to 't'. Should some day be done for all tests. - FRM version increased to 11 if one uses virtual fields or constraints - Changed to use a bitmap to check if a field has got a value, instead of setting HAS_EXPLICIT_VALUE bit in field flags - Expressions can now be up to 65K in total - Ensure we are not refering to uninitialized fields when handling virtual fields or defaults - Changed check_vcol_func_processor() to return a bitmap of used types - Had to change some functions that calculated cached value in fix_fields to do this in val() or getdate() instead. - store_now_in_TIME() now takes a THD argument - fill_record() now updates default values - Add a lookahead for NOT NULL, to be able to handle DEFAULT 1+1 NOT NULL - Automatically generate a name for constraints that doesn't have a name - Added support for ALTER TABLE DROP CONSTRAINT - Ensure that partition functions register virtual fields used. This fixes some bugs when using virtual fields in a partitioning function
2016-06-29 09:14:22 +02:00
/* With galera we assume that the master has done the constraint checks */
thd->variables.option_bits|= OPTION_NO_CHECK_CONSTRAINT_CHECKS;
2014-01-17 13:28:43 +02:00
if (flags & WSREP_FLAG_ISOLATION)
{
thd->wsrep_apply_toi= true;
/*
Don't run in transaction mode with TOI actions.
*/
thd->variables.option_bits&= ~OPTION_BEGIN;
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
}
wsrep_cb_status_t rcode(wsrep_apply_events(thd, buf, buf_len));
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"applied write set %lld", (long long)wsrep_thd_trx_seqno(thd));
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "applied write set");
#endif /* WSREP_PROC_INFO */
if (WSREP_CB_SUCCESS != rcode)
{
wsrep_dump_rbr_buf_with_header(thd, buf, buf_len);
2014-01-17 13:28:43 +02:00
}
if (thd->has_thd_temporary_tables())
2014-01-17 13:28:43 +02:00
{
WSREP_DEBUG("Applier %lld has temporary tables. Closing them now..",
thd->thread_id);
thd->close_temporary_tables();
2014-01-17 13:28:43 +02:00
}
return rcode;
}
static wsrep_cb_status_t wsrep_commit(THD* const thd)
2014-01-17 13:28:43 +02:00
{
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"committing %lld", (long long)wsrep_thd_trx_seqno(thd));
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "committing");
#endif /* WSREP_PROC_INFO */
wsrep_cb_status_t const rcode(trans_commit(thd) ?
WSREP_CB_FAILURE : WSREP_CB_SUCCESS);
if (WSREP_CB_SUCCESS == rcode)
{
thd->wsrep_rgi->cleanup_context(thd, false);
#ifdef GTID_SUPPORT
thd->variables.gtid_next.set_automatic();
#endif /* GTID_SUPPORT */
if (thd->wsrep_apply_toi)
{
wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid,
thd->wsrep_trx_meta.gtid.seqno);
}
2014-01-17 13:28:43 +02:00
}
2015-07-14 16:05:29 -04:00
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"committed %lld", (long long) wsrep_thd_trx_seqno(thd));
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "committed");
#endif /* WSREP_PROC_INFO */
2014-01-17 13:28:43 +02:00
return rcode;
}
static wsrep_cb_status_t wsrep_rollback(THD* const thd)
2014-01-17 13:28:43 +02:00
{
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"rolling back %lld", (long long)wsrep_thd_trx_seqno(thd));
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "rolling back");
#endif /* WSREP_PROC_INFO */
wsrep_cb_status_t const rcode(trans_rollback(thd) ?
WSREP_CB_FAILURE : WSREP_CB_SUCCESS);
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
"rolled back %lld", (long long)wsrep_thd_trx_seqno(thd));
thd_proc_info(thd, thd->wsrep_info);
#else
thd_proc_info(thd, "rolled back");
#endif /* WSREP_PROC_INFO */
return rcode;
}
wsrep_cb_status_t wsrep_commit_cb(void* const ctx,
uint32_t const flags,
const wsrep_trx_meta_t* meta,
wsrep_bool_t* const exit,
bool const commit)
{
THD* const thd((THD*)ctx);
assert(meta->gtid.seqno == wsrep_thd_trx_seqno(thd));
wsrep_cb_status_t rcode;
if (commit)
rcode = wsrep_commit(thd);
2014-01-17 13:28:43 +02:00
else
rcode = wsrep_rollback(thd);
2014-01-17 13:28:43 +02:00
wsrep_set_apply_format(thd, NULL);
thd->mdl_context.release_transactional_locks();
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation;
if (wsrep_slave_count_change < 0 && commit && WSREP_CB_SUCCESS == rcode)
{
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
if (wsrep_slave_count_change < 0)
{
wsrep_slave_count_change++;
*exit = true;
}
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
}
if (*exit == false && thd->wsrep_applier)
{
/* From trans_begin() */
thd->variables.option_bits|= OPTION_BEGIN;
thd->server_status|= SERVER_STATUS_IN_TRANS;
thd->wsrep_apply_toi= false;
}
return rcode;
}
2014-05-21 17:07:17 -04:00
2014-01-17 13:28:43 +02:00
wsrep_cb_status_t wsrep_unordered_cb(void* const ctx,
const void* const data,
size_t const size)
{
return WSREP_CB_SUCCESS;
}