cleanup: extract transaction-related part of handlerton

into a separate transaction_participant structure

handlerton inherits it, so handlerton itself doesn't change.
but entities that only need to participate in a transaction,
like binlog or online alter log, use a transaction_participant
and no longer need to pretend to be a full-blown but invisible
storage engine which doesn't support create table.
This commit is contained in:
Sergei Golubchik 2024-09-11 19:32:38 +02:00
parent 126d6d787c
commit aed5928207
32 changed files with 670 additions and 724 deletions

View file

@ -645,7 +645,7 @@ struct st_mysql_storage_engine
int interface_version;
};
struct handlerton;
struct transaction_participant;
/*
@ -748,7 +748,7 @@ void mysql_query_cache_invalidate4(MYSQL_THD thd,
/**
Provide a handler data getter to simplify coding
*/
void *thd_get_ha_data(const MYSQL_THD thd, const struct handlerton *hton);
void *thd_get_ha_data(const MYSQL_THD thd, const struct transaction_participant *hton);
/**
@ -769,10 +769,10 @@ void *thd_get_ha_data(const MYSQL_THD thd, const struct handlerton *hton);
thd_set_ha_data() in this connection before, storage engine
plugin lock gets released.
If handlerton::close_connection() didn't reset ha_data, server does
it immediately after calling handlerton::close_connection().
If transaction_participant::close_connection() didn't reset ha_data, server
does it immediately after calling transaction_participant::close_connection()
*/
void thd_set_ha_data(MYSQL_THD thd, const struct handlerton *hton,
void thd_set_ha_data(MYSQL_THD thd, const struct transaction_participant *hton,
const void *ha_data);

View file

@ -643,7 +643,7 @@ struct st_mysql_storage_engine
{
int interface_version;
};
struct handlerton;
struct transaction_participant;
struct Mysql_replication {
int interface_version;
};
@ -671,8 +671,8 @@ void thd_get_xid(const THD* thd, MYSQL_XID *xid);
void mysql_query_cache_invalidate4(THD* thd,
const char *key, unsigned int key_length,
int using_trx);
void *thd_get_ha_data(const THD* thd, const struct handlerton *hton);
void thd_set_ha_data(THD* thd, const struct handlerton *hton,
void *thd_get_ha_data(const THD* thd, const struct transaction_participant *hton);
void thd_set_ha_data(THD* thd, const struct transaction_participant *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(THD* thd, int wakeup_error);
}

View file

@ -643,7 +643,7 @@ struct st_mysql_storage_engine
{
int interface_version;
};
struct handlerton;
struct transaction_participant;
struct Mysql_replication {
int interface_version;
};
@ -671,8 +671,8 @@ void thd_get_xid(const THD* thd, MYSQL_XID *xid);
void mysql_query_cache_invalidate4(THD* thd,
const char *key, unsigned int key_length,
int using_trx);
void *thd_get_ha_data(const THD* thd, const struct handlerton *hton);
void thd_set_ha_data(THD* thd, const struct handlerton *hton,
void *thd_get_ha_data(const THD* thd, const struct transaction_participant *hton);
void thd_set_ha_data(THD* thd, const struct transaction_participant *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(THD* thd, int wakeup_error);
}

View file

@ -643,7 +643,7 @@ struct st_mysql_storage_engine
{
int interface_version;
};
struct handlerton;
struct transaction_participant;
struct Mysql_replication {
int interface_version;
};
@ -671,8 +671,8 @@ void thd_get_xid(const THD* thd, MYSQL_XID *xid);
void mysql_query_cache_invalidate4(THD* thd,
const char *key, unsigned int key_length,
int using_trx);
void *thd_get_ha_data(const THD* thd, const struct handlerton *hton);
void thd_set_ha_data(THD* thd, const struct handlerton *hton,
void *thd_get_ha_data(const THD* thd, const struct transaction_participant *hton);
void thd_set_ha_data(THD* thd, const struct transaction_participant *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(THD* thd, int wakeup_error);
}

View file

@ -643,7 +643,7 @@ struct st_mysql_storage_engine
{
int interface_version;
};
struct handlerton;
struct transaction_participant;
struct Mysql_replication {
int interface_version;
};
@ -671,8 +671,8 @@ void thd_get_xid(const THD* thd, MYSQL_XID *xid);
void mysql_query_cache_invalidate4(THD* thd,
const char *key, unsigned int key_length,
int using_trx);
void *thd_get_ha_data(const THD* thd, const struct handlerton *hton);
void thd_set_ha_data(THD* thd, const struct handlerton *hton,
void *thd_get_ha_data(const THD* thd, const struct transaction_participant *hton);
void thd_set_ha_data(THD* thd, const struct transaction_participant *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(THD* thd, int wakeup_error);
}

View file

@ -595,7 +595,7 @@ struct st_mysql_storage_engine
{
int interface_version;
};
struct handlerton;
struct transaction_participant;
struct Mysql_replication {
int interface_version;
};
@ -623,8 +623,8 @@ void thd_get_xid(const THD* thd, MYSQL_XID *xid);
void mysql_query_cache_invalidate4(THD* thd,
const char *key, unsigned int key_length,
int using_trx);
void *thd_get_ha_data(const THD* thd, const struct handlerton *hton);
void thd_set_ha_data(THD* thd, const struct handlerton *hton,
void *thd_get_ha_data(const THD* thd, const struct transaction_participant *hton);
void thd_set_ha_data(THD* thd, const struct transaction_participant *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(THD* thd, int wakeup_error);
}

View file

@ -643,7 +643,7 @@ struct st_mysql_storage_engine
{
int interface_version;
};
struct handlerton;
struct transaction_participant;
struct Mysql_replication {
int interface_version;
};
@ -671,8 +671,8 @@ void thd_get_xid(const THD* thd, MYSQL_XID *xid);
void mysql_query_cache_invalidate4(THD* thd,
const char *key, unsigned int key_length,
int using_trx);
void *thd_get_ha_data(const THD* thd, const struct handlerton *hton);
void thd_set_ha_data(THD* thd, const struct handlerton *hton,
void *thd_get_ha_data(const THD* thd, const struct transaction_participant *hton);
void thd_set_ha_data(THD* thd, const struct transaction_participant *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(THD* thd, int wakeup_error);
}

View file

@ -643,7 +643,7 @@ struct st_mysql_storage_engine
{
int interface_version;
};
struct handlerton;
struct transaction_participant;
struct Mysql_replication {
int interface_version;
};
@ -671,8 +671,8 @@ void thd_get_xid(const THD* thd, MYSQL_XID *xid);
void mysql_query_cache_invalidate4(THD* thd,
const char *key, unsigned int key_length,
int using_trx);
void *thd_get_ha_data(const THD* thd, const struct handlerton *hton);
void thd_set_ha_data(THD* thd, const struct handlerton *hton,
void *thd_get_ha_data(const THD* thd, const struct transaction_participant *hton);
void thd_set_ha_data(THD* thd, const struct transaction_participant *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(THD* thd, int wakeup_error);
}

View file

@ -670,6 +670,48 @@ static bool update_optimizer_costs(handlerton *hton)
const char *hton_no_exts[]= { 0 };
static bool ddl_recovery_done= false;
int setup_transaction_participant(st_plugin_int *plugin)
{
auto tp= (transaction_participant *)(plugin->data);
ulong fslot;
for (fslot= 0; fslot < total_ha; fslot++)
if (!hton2plugin[fslot])
break;
if (fslot < total_ha)
tp->slot= fslot;
else
{
if (total_ha >= MAX_HA)
{
sql_print_error("Too many plugins loaded. Limit is %u. Failed on '%s'",
MAX_HA, plugin->name.str);
return 1;
}
tp->slot= total_ha++;
}
uint tmp= tp->savepoint_offset;
tp->savepoint_offset= savepoint_alloc_size;
savepoint_alloc_size+= tmp;
hton2plugin[tp->slot]=plugin;
if (tp->prepare)
{
total_ha_2pc++;
if (tc_log && tc_log != get_tc_log_implementation())
{
total_ha_2pc--;
tp->prepare= 0;
push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
ER_UNKNOWN_ERROR,
"Cannot enable tc-log at run-time. "
"XA features of %s are disabled",
plugin->name.str);
}
}
return 0;
}
int ha_initialize_handlerton(st_plugin_int *plugin)
{
handlerton *hton;
@ -712,9 +754,6 @@ int ha_initialize_handlerton(st_plugin_int *plugin)
hton->discover_table_existence= full_discover_for_existence;
}
uint tmp;
ulong fslot;
DBUG_EXECUTE_IF("unstable_db_type", {
static int i= (int) DB_TYPE_FIRST_DYNAMIC;
hton->db_type= (enum legacy_db_type)++i;
@ -742,54 +781,14 @@ int ha_initialize_handlerton(st_plugin_int *plugin)
hton->db_type= (enum legacy_db_type) idx;
}
/*
In case a plugin is uninstalled and re-installed later, it should
reuse an array slot. Otherwise the number of uninstall/install
cycles would be limited. So look for a free slot.
*/
DBUG_PRINT("plugin", ("total_ha: %lu", total_ha));
for (fslot= 0; fslot < total_ha; fslot++)
{
if (!hton2plugin[fslot])
break;
}
if (fslot < total_ha)
hton->slot= fslot;
else
{
if (total_ha >= MAX_HA)
{
sql_print_error("Too many plugins loaded. Limit is %lu. "
"Failed on '%s'", (ulong) MAX_HA, plugin->name.str);
ret= 1;
goto err_deinit;
}
hton->slot= total_ha++;
}
if ((ret= setup_transaction_participant(plugin)))
goto err_deinit;
installed_htons[hton->db_type]= hton;
tmp= hton->savepoint_offset;
hton->savepoint_offset= savepoint_alloc_size;
savepoint_alloc_size+= tmp;
hton2plugin[hton->slot]=plugin;
if (!(hton->flags & HTON_HIDDEN) && update_optimizer_costs(hton))
goto err_deinit;
if (hton->prepare)
{
total_ha_2pc++;
if (tc_log && tc_log != get_tc_log_implementation())
{
total_ha_2pc--;
hton->prepare= 0;
push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
ER_UNKNOWN_ERROR,
"Cannot enable tc-log at run-time. "
"XA features of %s are disabled",
plugin->name.str);
}
}
/*
This is entirely for legacy. We will create a new "disk based" hton and a
"memory" hton which will be configurable longterm. We should be able to
@ -872,6 +871,31 @@ int ha_end()
}
/*
unlike plugin_foreach() this is called for all plugins from
hton2plugin[], that is for anything that has a transaction_participant
object. Not only for storage engines.
*/
typedef bool (tp_foreach_func)(THD *thd, transaction_participant *tp, void *arg);
static bool tp_foreach(THD *thd, tp_foreach_func *func, void *arg)
{
int j=0, err= 0;
plugin_ref locks[MAX_HA];
for (uint i= 0; i < MAX_HA; i++)
{
if (st_plugin_int *pi= hton2plugin[i])
{
locks[j]= plugin_lock(NULL, plugin_int_to_ref(pi));
if ((err= func(thd, plugin_hton(locks[j++]), arg)))
break;
}
}
plugin_unlock_list(NULL, locks, j);
return err;
}
static my_bool dropdb_handlerton(THD *, plugin_ref plugin, void *path)
{
handlerton *hton= plugin_hton(plugin);
@ -893,11 +917,11 @@ struct st_commit_checkpoint_request {
void (*pre_hook)(void *);
};
static my_bool commit_checkpoint_request_handlerton(THD *, plugin_ref plugin,
void *data)
static bool commit_checkpoint_request_handlerton(THD *,
transaction_participant *hton,
void *data)
{
st_commit_checkpoint_request *st= (st_commit_checkpoint_request *)data;
handlerton *hton= plugin_hton(plugin);
if (hton->commit_checkpoint_request)
{
void *cookie= st->cookie;
@ -920,8 +944,7 @@ ha_commit_checkpoint_request(void *cookie, void (*pre_hook)(void *))
st_commit_checkpoint_request st;
st.cookie= cookie;
st.pre_hook= pre_hook;
plugin_foreach(NULL, commit_checkpoint_request_handlerton,
MYSQL_STORAGE_ENGINE_PLUGIN, &st);
tp_foreach(NULL, commit_checkpoint_request_handlerton, &st);
}
@ -941,7 +964,7 @@ void ha_close_connection(THD* thd)
thd->ha_data[i].lock= NULL;
handlerton *hton= plugin_hton(plugin);
if (hton->close_connection)
hton->close_connection(hton, thd);
hton->close_connection(thd);
thd_set_ha_data(thd, hton, 0);
plugin_unlock(NULL, plugin);
}
@ -1413,7 +1436,7 @@ void ha_pre_shutdown()
times per transaction.
*/
void trans_register_ha(THD *thd, bool all, handlerton *ht_arg, ulonglong trxid)
void trans_register_ha(THD *thd, bool all, transaction_participant *ht_arg, ulonglong trxid)
{
THD_TRANS *trans;
Ha_trx_info *ha_info;
@ -1455,7 +1478,7 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg, ulonglong trxid)
Do not register transactions in which binary log is the only participating
transactional storage engine.
*/
if (thd->m_transaction_psi == NULL && ht_arg->db_type != DB_TYPE_BINLOG)
if (thd->m_transaction_psi == NULL && ht_arg != &binlog_tp)
{
thd->m_transaction_psi= MYSQL_START_TRANSACTION(&thd->m_transaction_state,
thd->get_xid(), trxid, thd->tx_isolation, thd->tx_read_only,
@ -1467,7 +1490,7 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg, ulonglong trxid)
}
static int prepare_or_error(handlerton *ht, THD *thd, bool all)
static int prepare_or_error(transaction_participant *ht, THD *thd, bool all)
{
#ifdef WITH_WSREP
const bool run_wsrep_hooks= wsrep_run_commit_hook(thd, all);
@ -1478,7 +1501,7 @@ static int prepare_or_error(handlerton *ht, THD *thd, bool all)
}
#endif /* WITH_WSREP */
int err= ht->prepare(ht, thd, all);
int err= ht->prepare(thd, all);
status_var_increment(thd->status_var.ha_prepare_count);
if (err)
{
@ -1513,7 +1536,7 @@ int ha_prepare(THD *thd)
{
for (; ha_info; ha_info= ha_info->next())
{
handlerton *ht= ha_info->ht();
transaction_participant *ht= ha_info->ht();
if (ht->prepare)
{
if (unlikely(prepare_or_error(ht, thd, all)))
@ -1676,7 +1699,7 @@ static bool wsrep_have_no2pc_rw_ha(Ha_trx_info* ha_list)
{
for (Ha_trx_info *ha_info=ha_list; ha_info; ha_info= ha_info->next())
{
handlerton *ht= ha_info->ht();
transaction_participant *ht= ha_info->ht();
// Transaction is read-write and handler does not support 2pc
if (ha_info->is_trx_read_write() && ht->prepare==0)
return true;
@ -1937,7 +1960,7 @@ int ha_commit_trans(THD *thd, bool all)
for (Ha_trx_info *hi= ha_info; hi; hi= hi->next())
{
handlerton *ht= hi->ht();
transaction_participant *ht= hi->ht();
/*
Do not call two-phase commit if this particular
transaction is read-only. This allows for simpler
@ -2147,7 +2170,7 @@ static bool has_binlog_hton(Ha_trx_info *ha_info)
{
bool rc;
for (rc= false; ha_info && !rc; ha_info= ha_info->next())
rc= ha_info->ht() == binlog_hton;
rc= ha_info->ht() == &binlog_tp;
return rc;
}
@ -2186,15 +2209,15 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
for (; ha_info; ha_info= ha_info_next)
{
handlerton *ht= ha_info->ht();
if ((err= ht->commit(ht, thd, all)))
transaction_participant *ht= ha_info->ht();
if ((err= ht->commit(thd, all)))
{
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
error=1;
}
/* Should this be done only if is_real_trans is set ? */
status_var_increment(thd->status_var.ha_commit_count);
if (is_real_trans && ht != binlog_hton && ha_info->is_trx_read_write())
if (is_real_trans && ht != &binlog_tp && ha_info->is_trx_read_write())
++count;
ha_info_next= ha_info->next();
ha_info->reset(); /* keep it conveniently zero-filled */
@ -2312,8 +2335,8 @@ int ha_rollback_trans(THD *thd, bool all)
for (; ha_info; ha_info= ha_info_next)
{
int err;
handlerton *ht= ha_info->ht();
if ((err= ht->rollback(ht, thd, all)))
transaction_participant *ht= ha_info->ht();
if ((err= ht->rollback(thd, all)))
{
// cannot happen
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
@ -2399,23 +2422,21 @@ struct xahton_st {
int result;
};
static my_bool xacommit_handlerton(THD *, plugin_ref plugin, void *arg)
static bool xacommit_handlerton(THD *, transaction_participant *hton, void *arg)
{
handlerton *hton= plugin_hton(plugin);
if (hton->recover)
{
hton->commit_by_xid(hton, ((struct xahton_st *)arg)->xid);
hton->commit_by_xid(((struct xahton_st *)arg)->xid);
((struct xahton_st *)arg)->result= 0;
}
return FALSE;
}
static my_bool xarollback_handlerton(THD *, plugin_ref plugin, void *arg)
static bool xarollback_handlerton(THD *, transaction_participant *hton, void *arg)
{
handlerton *hton= plugin_hton(plugin);
if (hton->recover)
{
hton->rollback_by_xid(hton, ((struct xahton_st *)arg)->xid);
hton->rollback_by_xid(((struct xahton_st *)arg)->xid);
((struct xahton_st *)arg)->result= 0;
}
return FALSE;
@ -2433,12 +2454,11 @@ int ha_commit_or_rollback_by_xid(XID *xid, bool commit)
by it first.
*/
if (commit)
binlog_commit_by_xid(binlog_hton, xid);
binlog_commit_by_xid(xid);
else
binlog_rollback_by_xid(binlog_hton, xid);
binlog_rollback_by_xid(xid);
plugin_foreach(NULL, commit ? xacommit_handlerton : xarollback_handlerton,
MYSQL_STORAGE_ENGINE_PLUGIN, &xaop);
tp_foreach(NULL, commit ? xacommit_handlerton : xarollback_handlerton, &xaop);
return xaop.result;
}
@ -2646,7 +2666,7 @@ static bool xarecover_decide_to_commit(xid_recovery_member* member,
For a given hton decides what to do with a xid passed in the 2nd arg
and carries out the decision.
*/
static void xarecover_do_commit_or_rollback(handlerton *hton,
static void xarecover_do_commit_or_rollback(transaction_participant *hton,
xarecover_complete_arg *arg)
{
XA_data x;
@ -2661,12 +2681,12 @@ static void xarecover_do_commit_or_rollback(handlerton *hton,
x= *member->full_xid;
if (xarecover_decide_to_commit(member, ptr_commit_max))
rc= hton->commit_by_xid(hton, &x);
rc= hton->commit_by_xid(&x);
else if (hton->recover_rollback_by_xid &&
IF_WSREP(!(WSREP_ON || wsrep_recovery), true))
rc= hton->recover_rollback_by_xid(&x);
else
rc= hton->rollback_by_xid(hton, &x);
rc= hton->rollback_by_xid(&x);
/*
It's fine to have non-zero rc which would be from transaction
@ -2691,16 +2711,11 @@ static void xarecover_do_commit_or_rollback(handlerton *hton,
/*
Per hton recovery decider function.
*/
static my_bool xarecover_do_commit_or_rollback_handlerton(THD *,
plugin_ref plugin,
void *arg)
static bool xarecover_do_commit_or_rollback_handlerton(THD *,
transaction_participant *hton, void *arg)
{
handlerton *hton= plugin_hton(plugin);
if (hton->recover)
{
xarecover_do_commit_or_rollback(hton, (xarecover_complete_arg *) arg);
}
return FALSE;
}
@ -2711,16 +2726,14 @@ static my_bool xarecover_do_commit_or_rollback_handlerton(THD *,
Returns always FALSE.
*/
static my_bool xarecover_complete_and_count(void *member_arg,
void *param_arg)
static my_bool xarecover_complete_and_count(void *member_arg, void *param_arg)
{
xid_recovery_member *member= (xid_recovery_member*) member_arg;
xarecover_complete_arg *complete_params=
(xarecover_complete_arg*) param_arg;
complete_params->member= member;
(void) plugin_foreach(NULL, xarecover_do_commit_or_rollback_handlerton,
MYSQL_STORAGE_ENGINE_PLUGIN, complete_params);
tp_foreach(NULL, xarecover_do_commit_or_rollback_handlerton, complete_params);
if (member->in_engine_prepare)
{
@ -2761,15 +2774,14 @@ uint ha_recover_complete(HASH *commit_list, Binlog_offset *coord)
return complete.count;
}
static my_bool xarecover_handlerton(THD *, plugin_ref plugin, void *arg)
static bool xarecover_handlerton(THD *, transaction_participant *hton, void *arg)
{
handlerton *hton= plugin_hton(plugin);
struct xarecover_st *info= (struct xarecover_st *) arg;
int got;
if (hton->recover)
{
while ((got= hton->recover(hton, info->list, info->len)) > 0 )
while ((got= hton->recover(info->list, info->len)) > 0 )
{
sql_print_information("Found %d prepared transaction(s) in %s",
got, hton_name(hton)->str);
@ -2848,7 +2860,7 @@ static my_bool xarecover_handlerton(THD *, plugin_ref plugin, void *arg)
x <= wsrep_limit), false) ||
tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT)
{
int rc= hton->commit_by_xid(hton, info->list+i);
int rc= hton->commit_by_xid(info->list+i);
if (rc == 0)
{
DBUG_EXECUTE("info",{
@ -2859,7 +2871,7 @@ static my_bool xarecover_handlerton(THD *, plugin_ref plugin, void *arg)
}
else if (!info->mem_root)
{
int rc= hton->rollback_by_xid(hton, info->list+i);
int rc= hton->rollback_by_xid(info->list+i);
if (rc == 0)
{
DBUG_EXECUTE("info",{
@ -2912,8 +2924,7 @@ int ha_recover(HASH *commit_list, MEM_ROOT *arg_mem_root)
DBUG_RETURN(1);
}
plugin_foreach(NULL, xarecover_handlerton,
MYSQL_STORAGE_ENGINE_PLUGIN, &info);
tp_foreach(NULL, xarecover_handlerton, &info);
my_free(info.list);
if (info.found_foreign_xids)
@ -2973,11 +2984,11 @@ bool ha_rollback_to_savepoint_can_release_mdl(THD *thd)
*/
for (ha_info= trans->ha_list; ha_info; ha_info= ha_info->next())
{
handlerton *ht= ha_info->ht();
transaction_participant *ht= ha_info->ht();
DBUG_ASSERT(ht);
if (ht->savepoint_rollback_can_release_mdl == 0 ||
ht->savepoint_rollback_can_release_mdl(ht, thd) == false)
ht->savepoint_rollback_can_release_mdl(thd) == false)
DBUG_RETURN(false);
}
@ -3001,11 +3012,10 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
for (ha_info= sv->ha_list; ha_info; ha_info= ha_info->next())
{
int err;
handlerton *ht= ha_info->ht();
transaction_participant *ht= ha_info->ht();
DBUG_ASSERT(ht);
DBUG_ASSERT(ht->savepoint_set != 0);
if ((err= ht->savepoint_rollback(ht, thd,
(uchar *)(sv+1)+ht->savepoint_offset)))
if ((err= ht->savepoint_rollback(thd, (uchar *)(sv+1)+ht->savepoint_offset)))
{ // cannot happen
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
error=1;
@ -3021,7 +3031,7 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
ha_info= ha_info_next)
{
int err;
handlerton *ht= ha_info->ht();
transaction_participant *ht= ha_info->ht();
#ifdef WITH_WSREP
if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION)
{
@ -3030,7 +3040,7 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
}
#endif // WITH_WSREP
if ((err= ht->rollback(ht, thd, !thd->in_sub_stmt)))
if ((err= ht->rollback(thd, !thd->in_sub_stmt)))
{ // cannot happen
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
error=1;
@ -3081,7 +3091,7 @@ int ha_savepoint(THD *thd, SAVEPOINT *sv)
for (; ha_info; ha_info= ha_info->next())
{
int err;
handlerton *ht= ha_info->ht();
transaction_participant *ht= ha_info->ht();
DBUG_ASSERT(ht);
if (! ht->savepoint_set)
{
@ -3089,7 +3099,7 @@ int ha_savepoint(THD *thd, SAVEPOINT *sv)
error=1;
break;
}
if ((err= ht->savepoint_set(ht, thd, (uchar *)(sv+1)+ht->savepoint_offset)))
if ((err= ht->savepoint_set(thd, (uchar *)(sv+1)+ht->savepoint_offset)))
{ // cannot happen
my_error(ER_GET_ERRNO, MYF(0), err, hton_name(ht)->str);
error=1;
@ -3117,13 +3127,12 @@ int ha_release_savepoint(THD *thd, SAVEPOINT *sv)
for (; ha_info; ha_info= ha_info->next())
{
int err;
handlerton *ht= ha_info->ht();
transaction_participant *ht= ha_info->ht();
/* Savepoint life time is enclosed into transaction life time. */
DBUG_ASSERT(ht);
if (!ht->savepoint_release)
continue;
if ((err= ht->savepoint_release(ht, thd,
(uchar *)(sv+1) + ht->savepoint_offset)))
if ((err= ht->savepoint_release(thd, (uchar *)(sv+1) + ht->savepoint_offset)))
{ // cannot happen
my_error(ER_GET_ERRNO, MYF(0), err, hton_name(ht)->str);
error=1;
@ -3136,12 +3145,11 @@ int ha_release_savepoint(THD *thd, SAVEPOINT *sv)
DBUG_RETURN(error);
}
static my_bool snapshot_handlerton(THD *thd, plugin_ref plugin, void *arg)
static bool snapshot_handlerton(THD *thd, transaction_participant *hton, void *arg)
{
handlerton *hton= plugin_hton(plugin);
if (hton->start_consistent_snapshot)
{
if (hton->start_consistent_snapshot(hton, thd))
if (hton->start_consistent_snapshot(thd))
return TRUE;
*((bool *)arg)= false;
}
@ -3160,7 +3168,7 @@ int ha_start_consistent_snapshot(THD *thd)
have a consistent binlog position.
*/
mysql_mutex_lock(&LOCK_commit_ordered);
err= plugin_foreach(thd, snapshot_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &warn);
err= tp_foreach(thd, snapshot_handlerton, &warn);
mysql_mutex_unlock(&LOCK_commit_ordered);
if (err)

View file

@ -571,8 +571,6 @@ enum legacy_db_type
DB_TYPE_BLACKHOLE_DB=19,
DB_TYPE_PARTITION_DB=20,
DB_TYPE_BINLOG=21,
DB_TYPE_ONLINE_ALTER=22,
DB_TYPE_PBXT=23,
DB_TYPE_PERFORMANCE_SCHEMA=28,
DB_TYPE_S3=41,
DB_TYPE_ARIA=42,
@ -1254,19 +1252,8 @@ typedef class st_select_lex SELECT_LEX;
typedef class st_select_lex_unit SELECT_LEX_UNIT;
typedef struct st_order ORDER;
/*
handlerton is a singleton structure - one instance per storage engine -
to provide access to storage engine functionality that works on the
"global" level (unlike handler class that works on a per-table basis)
*/
struct handlerton
struct transaction_participant
{
/*
Historical number used for frm file to determine the correct
storage engine. This is going away and new engines will just use
"name" for this.
*/
enum legacy_db_type db_type;
/*
each storage engine has it's own memory area (actually a pointer)
in the thd, for storing per-connection information.
@ -1291,6 +1278,217 @@ struct handlerton
global handlerton flags HTON_...
*/
uint32 flags;
/*
close_connection is only called if
thd->ha_data[xxx_hton.slot] is non-zero, so even if you don't need
this storage area - set it to something, so that MySQL would know
this storage engine was accessed in this connection
*/
int (*close_connection)(THD *thd);
/*
sv points to an uninitialized storage area of requested size
(see savepoint_offset description)
*/
int (*savepoint_set)(THD *thd, void *sv);
/*
sv points to a storage area, that was earlier passed
to the savepoint_set call
*/
int (*savepoint_rollback)(THD *thd, void *sv);
/**
Check if storage engine allows to release metadata locks which were
acquired after the savepoint if rollback to savepoint is done.
@return true - If it is safe to release MDL locks.
false - If it is not.
*/
bool (*savepoint_rollback_can_release_mdl)(THD *thd);
int (*savepoint_release)(THD *thd, void *sv);
/*
'all' is true if it's a real commit, that makes persistent changes
'all' is false if it's not in fact a commit but an end of the
statement that is part of the transaction.
NOTE 'all' is also false in auto-commit mode where 'end of statement'
and 'real commit' mean the same event.
*/
int (*commit)(THD *thd, bool all);
int (*rollback)(THD *thd, bool all);
int (*prepare)(THD *thd, bool all);
int (*recover)(XID *xid_list, uint len);
int (*commit_by_xid)(XID *xid);
int (*rollback_by_xid)(XID *xid);
/*
recover_rollback_by_xid is optional. If set, it will be called instead of
rollback_by_xid when transactions should be rolled back at server startup.
This function should just change the transaction's state from prepared to
active before returing. The actual rollback should then happen
asynchroneously (eg. in a background thread). This way, rollbacks that
take a long time to complete will not block server startup, and the
database becomes available sooner to serve user queries.
*/
int (*recover_rollback_by_xid)(const XID *xid);
/*
It is called after binlog recovery has done commit/rollback of
all transactions. It is used together with recover_rollback_by_xid()
together to rollback prepared transactions asynchronously.
*/
void (*signal_tc_log_recovery_done)();
int (*start_consistent_snapshot)(THD *thd);
/*
The commit_ordered() method is called prior to the commit() method, after
the transaction manager has decided to commit (not rollback) the
transaction. Unlike commit(), commit_ordered() is called only when the
full transaction is committed, not for each commit of statement
transaction in a multi-statement transaction.
Not that like prepare(), commit_ordered() is only called when 2-phase
commit takes place. Ie. when no binary log and only a single engine
participates in a transaction, one commit() is called, no
commit_ordered(). So engines must be prepared for this.
The calls to commit_ordered() in multiple parallel transactions is
guaranteed to happen in the same order in every participating
handler. This can be used to ensure the same commit order among multiple
handlers (eg. in table handler and binlog). So if transaction T1 calls
into commit_ordered() of handler A before T2, then T1 will also call
commit_ordered() of handler B before T2.
Engines that implement this method should during this call make the
transaction visible to other transactions, thereby making the order of
transaction commits be defined by the order of commit_ordered() calls.
The intention is that commit_ordered() should do the minimal amount of
work that needs to happen in consistent commit order among handlers. To
preserve ordering, calls need to be serialised on a global mutex, so
doing any time-consuming or blocking operations in commit_ordered() will
limit scalability.
Handlers can rely on commit_ordered() calls to be serialised (no two
calls can run in parallel, so no extra locking on the handler part is
required to ensure this).
Note that commit_ordered() can be called from a different thread than the
one handling the transaction! So it can not do anything that depends on
thread local storage, in particular it can not call my_error() and
friends (instead it can store the error code and delay the call of
my_error() to the commit() method).
Similarly, since commit_ordered() returns void, any return error code
must be saved and returned from the commit() method instead.
The commit_ordered method is optional, and can be left unset if not
needed in a particular handler (then there will be no ordering guarantees
wrt. other engines and binary log).
*/
void (*commit_ordered)(THD *thd, bool all);
/*
The prepare_ordered method is optional. If set, it will be called after
successful prepare() in all handlers participating in 2-phase
commit. Like commit_ordered(), it is called only when the full
transaction is committed, not for each commit of statement transaction.
The calls to prepare_ordered() among multiple parallel transactions are
ordered consistently with calls to commit_ordered(). This means that
calls to prepare_ordered() effectively define the commit order, and that
each handler will see the same sequence of transactions calling into
prepare_ordered() and commit_ordered().
Thus, prepare_ordered() can be used to define commit order for handlers
that need to do this in the prepare step (like binlog). It can also be
used to release transaction's locks early in an order consistent with the
order transactions will be eventually committed.
Like commit_ordered(), prepare_ordered() calls are serialised to maintain
ordering, so the intention is that they should execute fast, with only
the minimal amount of work needed to define commit order. Handlers can
rely on this serialisation, and do not need to do any extra locking to
avoid two prepare_ordered() calls running in parallel.
Like commit_ordered(), prepare_ordered() is not guaranteed to be called
in the context of the thread handling the rest of the transaction. So it
cannot invoke code that relies on thread local storage, in particular it
cannot call my_error().
prepare_ordered() cannot cause a rollback by returning an error, all
possible errors must be handled in prepare() (the prepare_ordered()
method returns void). In case of some fatal error, a record of the error
must be made internally by the engine and returned from commit() later.
Note that for user-level XA SQL commands, no consistent ordering among
prepare_ordered() and commit_ordered() is guaranteed (as that would
require blocking all other commits for an indefinite time).
When 2-phase commit is not used (eg. only one engine (and no binlog) in
transaction), neither prepare() nor prepare_ordered() is called.
*/
void (*prepare_ordered)(THD *thd, bool all);
/*
The commit_checkpoint_request() handlerton method is used to checkpoint
the XA recovery process for storage engines that support two-phase
commit.
The method is optional - an engine that does not implemented is expected
to work the traditional way, where every commit() durably flushes the
transaction to disk in the engine before completion, so XA recovery will
no longer be needed for that transaction.
An engine that does implement commit_checkpoint_request() is also
expected to implement commit_ordered(), so that ordering of commits is
consistent between 2pc participants. Such engine is no longer required to
durably flush to disk transactions in commit(), provided that the
transaction has been successfully prepare()d and commit_ordered(); thus
potentionally saving one fsync() call. (Engine must still durably flush
to disk in commit() when no prepare()/commit_ordered() steps took place,
at least if durable commits are wanted; this happens eg. if binlog is
disabled).
The TC will periodically (eg. once per binlog rotation) call
commit_checkpoint_request(). When this happens, the engine must arrange
for all transaction that have completed commit_ordered() to be durably
flushed to disk (this does not include transactions that might be in the
middle of executing commit_ordered()). When such flush has completed, the
engine must call commit_checkpoint_notify_ha(), passing back the opaque
"cookie".
The flush and call of commit_checkpoint_notify_ha() need not happen
immediately - it can be scheduled and performed asynchronously (ie. as
part of next prepare(), or sync every second, or whatever), but should
not be postponed indefinitely. It is however also permissible to do it
immediately, before returning from commit_checkpoint_request().
When commit_checkpoint_notify_ha() is called, the TC will know that the
transactions are durably committed, and thus no longer require XA
recovery. It uses that to reduce the work needed for any subsequent XA
recovery process.
*/
void (*commit_checkpoint_request)(void *cookie);
/*********************************************************************
System Versioning
**********************************************************************/
/** Determine if system-versioned data was modified by the transaction.
@param[in,out] thd current session
@param[out] trx_id transaction start ID
@return transaction commit ID
@retval 0 if no system-versioned data was affected by the transaction
*/
ulonglong (*prepare_commit_versioned)(THD *thd, ulonglong *trx_id);
};
/*
handlerton is a singleton structure - one instance per storage engine -
to provide access to storage engine functionality that works on the
"global" level (unlike handler class that works on a per-table basis)
*/
struct handlerton : public transaction_participant
{
/*
Historical number used for frm file to determine the correct
storage engine. This is going away and new engines will just use
"name" for this.
*/
enum legacy_db_type db_type;
/*
Optional clauses in the CREATE/ALTER TABLE
*/
@ -1320,13 +1518,6 @@ struct handlerton
Generic handlerton methods
**********************************************************************/
handler *(*create)(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root);
/*
close_connection is only called if
thd->ha_data[xxx_hton.slot] is non-zero, so even if you don't need
this storage area - set it to something, so that MySQL would know
this storage engine was accessed in this connection
*/
int (*close_connection)(handlerton *hton, THD *thd);
/*
Tell handler that query has been killed.
*/
@ -1389,188 +1580,6 @@ struct handlerton
int (*create_partitioning_metadata)(const char *path, const char *old_path,
chf_create_flags action_flag);
/**********************************************************************
Transaction handling
**********************************************************************/
/*
sv points to an uninitialized storage area of requested size
(see savepoint_offset description)
*/
int (*savepoint_set)(handlerton *hton, THD *thd, void *sv);
/*
sv points to a storage area, that was earlier passed
to the savepoint_set call
*/
int (*savepoint_rollback)(handlerton *hton, THD *thd, void *sv);
/**
Check if storage engine allows to release metadata locks which were
acquired after the savepoint if rollback to savepoint is done.
@return true - If it is safe to release MDL locks.
false - If it is not.
*/
bool (*savepoint_rollback_can_release_mdl)(handlerton *hton, THD *thd);
int (*savepoint_release)(handlerton *hton, THD *thd, void *sv);
/*
'all' is true if it's a real commit, that makes persistent changes
'all' is false if it's not in fact a commit but an end of the
statement that is part of the transaction.
NOTE 'all' is also false in auto-commit mode where 'end of statement'
and 'real commit' mean the same event.
*/
int (*commit)(handlerton *hton, THD *thd, bool all);
/*
The commit_ordered() method is called prior to the commit() method, after
the transaction manager has decided to commit (not rollback) the
transaction. Unlike commit(), commit_ordered() is called only when the
full transaction is committed, not for each commit of statement
transaction in a multi-statement transaction.
Not that like prepare(), commit_ordered() is only called when 2-phase
commit takes place. Ie. when no binary log and only a single engine
participates in a transaction, one commit() is called, no
commit_ordered(). So engines must be prepared for this.
The calls to commit_ordered() in multiple parallel transactions is
guaranteed to happen in the same order in every participating
handler. This can be used to ensure the same commit order among multiple
handlers (eg. in table handler and binlog). So if transaction T1 calls
into commit_ordered() of handler A before T2, then T1 will also call
commit_ordered() of handler B before T2.
Engines that implement this method should during this call make the
transaction visible to other transactions, thereby making the order of
transaction commits be defined by the order of commit_ordered() calls.
The intention is that commit_ordered() should do the minimal amount of
work that needs to happen in consistent commit order among handlers. To
preserve ordering, calls need to be serialised on a global mutex, so
doing any time-consuming or blocking operations in commit_ordered() will
limit scalability.
Handlers can rely on commit_ordered() calls to be serialised (no two
calls can run in parallel, so no extra locking on the handler part is
required to ensure this).
Note that commit_ordered() can be called from a different thread than the
one handling the transaction! So it can not do anything that depends on
thread local storage, in particular it can not call my_error() and
friends (instead it can store the error code and delay the call of
my_error() to the commit() method).
Similarly, since commit_ordered() returns void, any return error code
must be saved and returned from the commit() method instead.
The commit_ordered method is optional, and can be left unset if not
needed in a particular handler (then there will be no ordering guarantees
wrt. other engines and binary log).
*/
void (*commit_ordered)(handlerton *hton, THD *thd, bool all);
int (*rollback)(handlerton *hton, THD *thd, bool all);
int (*prepare)(handlerton *hton, THD *thd, bool all);
/*
The prepare_ordered method is optional. If set, it will be called after
successful prepare() in all handlers participating in 2-phase
commit. Like commit_ordered(), it is called only when the full
transaction is committed, not for each commit of statement transaction.
The calls to prepare_ordered() among multiple parallel transactions are
ordered consistently with calls to commit_ordered(). This means that
calls to prepare_ordered() effectively define the commit order, and that
each handler will see the same sequence of transactions calling into
prepare_ordered() and commit_ordered().
Thus, prepare_ordered() can be used to define commit order for handlers
that need to do this in the prepare step (like binlog). It can also be
used to release transaction's locks early in an order consistent with the
order transactions will be eventually committed.
Like commit_ordered(), prepare_ordered() calls are serialised to maintain
ordering, so the intention is that they should execute fast, with only
the minimal amount of work needed to define commit order. Handlers can
rely on this serialisation, and do not need to do any extra locking to
avoid two prepare_ordered() calls running in parallel.
Like commit_ordered(), prepare_ordered() is not guaranteed to be called
in the context of the thread handling the rest of the transaction. So it
cannot invoke code that relies on thread local storage, in particular it
cannot call my_error().
prepare_ordered() cannot cause a rollback by returning an error, all
possible errors must be handled in prepare() (the prepare_ordered()
method returns void). In case of some fatal error, a record of the error
must be made internally by the engine and returned from commit() later.
Note that for user-level XA SQL commands, no consistent ordering among
prepare_ordered() and commit_ordered() is guaranteed (as that would
require blocking all other commits for an indefinite time).
When 2-phase commit is not used (eg. only one engine (and no binlog) in
transaction), neither prepare() nor prepare_ordered() is called.
*/
void (*prepare_ordered)(handlerton *hton, THD *thd, bool all);
int (*recover)(handlerton *hton, XID *xid_list, uint len);
int (*commit_by_xid)(handlerton *hton, XID *xid);
int (*rollback_by_xid)(handlerton *hton, XID *xid);
/*
recover_rollback_by_xid is optional. If set, it will be called instead of
rollback_by_xid when transactions should be rolled back at server startup.
This function should just change the transaction's state from prepared to
active before returing. The actual rollback should then happen
asynchroneously (eg. in a background thread). This way, rollbacks that
take a long time to complete will not block server startup, and the
database becomes available sooner to serve user queries.
*/
int (*recover_rollback_by_xid)(const XID *xid);
/*
It is called after binlog recovery has done commit/rollback of
all transactions. It is used together with recover_rollback_by_xid()
together to rollback prepared transactions asynchronously.
*/
void (*signal_tc_log_recovery_done)();
/*
The commit_checkpoint_request() handlerton method is used to checkpoint
the XA recovery process for storage engines that support two-phase
commit.
The method is optional - an engine that does not implemented is expected
to work the traditional way, where every commit() durably flushes the
transaction to disk in the engine before completion, so XA recovery will
no longer be needed for that transaction.
An engine that does implement commit_checkpoint_request() is also
expected to implement commit_ordered(), so that ordering of commits is
consistent between 2pc participants. Such engine is no longer required to
durably flush to disk transactions in commit(), provided that the
transaction has been successfully prepare()d and commit_ordered(); thus
potentionally saving one fsync() call. (Engine must still durably flush
to disk in commit() when no prepare()/commit_ordered() steps took place,
at least if durable commits are wanted; this happens eg. if binlog is
disabled).
The TC will periodically (eg. once per binlog rotation) call
commit_checkpoint_request(). When this happens, the engine must arrange
for all transaction that have completed commit_ordered() to be durably
flushed to disk (this does not include transactions that might be in the
middle of executing commit_ordered()). When such flush has completed, the
engine must call commit_checkpoint_notify_ha(), passing back the opaque
"cookie".
The flush and call of commit_checkpoint_notify_ha() need not happen
immediately - it can be scheduled and performed asynchronously (ie. as
part of next prepare(), or sync every second, or whatever), but should
not be postponed indefinitely. It is however also permissible to do it
immediately, before returning from commit_checkpoint_request().
When commit_checkpoint_notify_ha() is called, the TC will know that the
transactions are durably committed, and thus no longer require XA
recovery. It uses that to reduce the work needed for any subsequent XA
recovery process.
*/
void (*commit_checkpoint_request)(void *cookie);
int (*start_consistent_snapshot)(handlerton *hton, THD *thd);
/**********************************************************************
Functions to intercept queries
**********************************************************************/
@ -1724,17 +1733,6 @@ struct handlerton
int (*check_version)(handlerton *hton, const char *path,
const LEX_CUSTRING *version, ulonglong create_id);
/*********************************************************************
System Versioning
**********************************************************************/
/** Determine if system-versioned data was modified by the transaction.
@param[in,out] thd current session
@param[out] trx_id transaction start ID
@return transaction commit ID
@retval 0 if no system-versioned data was affected by the transaction
*/
ulonglong (*prepare_commit_versioned)(THD *thd, ulonglong *trx_id);
/*********************************************************************
backup
**********************************************************************/
@ -1753,7 +1751,7 @@ struct handlerton
extern const char *hton_no_exts[];
static inline LEX_CSTRING *hton_name(const handlerton *hton)
static inline LEX_CSTRING *hton_name(const transaction_participant *hton)
{
return &(hton2plugin[hton->slot]->name);
}
@ -1763,7 +1761,7 @@ static inline handlerton *plugin_hton(plugin_ref plugin)
return plugin_data(plugin, handlerton *);
}
static inline sys_var *find_hton_sysvar(handlerton *hton, st_mysql_sys_var *var)
static inline sys_var *find_hton_sysvar(transaction_participant *hton, st_mysql_sys_var *var)
{
return find_plugin_sysvar(hton2plugin[hton->slot], var);
}
@ -1984,7 +1982,7 @@ class Ha_trx_info
{
public:
/** Register this storage engine in the given transaction context. */
void register_ha(THD_TRANS *trans, handlerton *ht_arg)
void register_ha(THD_TRANS *trans, transaction_participant *ht_arg)
{
DBUG_ASSERT(m_flags == 0);
DBUG_ASSERT(m_ht == NULL);
@ -2035,7 +2033,7 @@ public:
DBUG_ASSERT(is_started());
return m_next;
}
handlerton *ht() const
transaction_participant *ht() const
{
DBUG_ASSERT(is_started());
return m_ht;
@ -2049,7 +2047,7 @@ private:
for the same storage engine, 'ht' is not-NULL only when the
corresponding storage is a part of a transaction.
*/
handlerton *m_ht;
transaction_participant *m_ht;
/**
Transaction flags related to this engine.
Not-null only if this instance is a part of transaction.
@ -5603,7 +5601,8 @@ static inline enum legacy_db_type ha_legacy_type(const handlerton *db_type)
return (db_type == NULL) ? DB_TYPE_UNKNOWN : db_type->db_type;
}
static inline const char *ha_resolve_storage_engine_name(const handlerton *db_type)
static inline const char *
ha_resolve_storage_engine_name(const transaction_participant *db_type)
{
return (db_type == NULL ? "UNKNOWN" :
db_type == view_pseudo_hton ? "VIEW" : hton_name(db_type)->str);
@ -5625,6 +5624,7 @@ int ha_init(void);
int ha_end(void);
int ha_initialize_handlerton(st_plugin_int *plugin);
int ha_finalize_handlerton(st_plugin_int *plugin);
int setup_transaction_participant(st_plugin_int *plugin);
TYPELIB *ha_known_exts(void);
int ha_panic(enum ha_panic_function flag);
@ -5724,7 +5724,7 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal);
#endif
/* these are called by storage engines */
void trans_register_ha(THD *thd, bool all, handlerton *ht,
void trans_register_ha(THD *thd, bool all, transaction_participant *ht,
ulonglong trxid);
/*

View file

@ -82,7 +82,6 @@
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
handlerton *binlog_hton;
LOGGER logger;
const char *log_bin_index= 0;
@ -93,14 +92,13 @@ MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period);
static bool test_if_number(const char *str,
ulong *res, bool allow_wildcards);
static int binlog_init(void *p);
static int binlog_close_connection(handlerton *hton, THD *thd);
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv);
static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv);
static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton,
THD *thd);
static int binlog_rollback(handlerton *hton, THD *thd, bool all);
static int binlog_prepare(handlerton *hton, THD *thd, bool all);
static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd);
static int binlog_close_connection(THD *thd);
static int binlog_savepoint_set(THD *thd, void *sv);
static int binlog_savepoint_rollback(THD *thd, void *sv);
static bool binlog_savepoint_rollback_can_release_mdl(THD *thd);
static int binlog_rollback(THD *thd, bool all);
static int binlog_prepare(THD *thd, bool all);
static int binlog_start_consistent_snapshot(THD *thd);
static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
Log_event *end_ev, bool all, bool using_stmt,
bool using_trx, bool is_ro_1pc);
@ -1760,40 +1758,34 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos)
DBUG_VOID_RETURN;
}
/*
this function is mostly a placeholder.
conceptually, binlog initialization (now mostly done in MYSQL_BIN_LOG::open)
should be moved here.
*/
transaction_participant binlog_tp;
int binlog_init(void *p)
{
binlog_hton= (handlerton *)p;
binlog_hton->db_type= DB_TYPE_BINLOG;
binlog_hton->savepoint_offset= sizeof(my_off_t);
binlog_hton->close_connection= binlog_close_connection;
binlog_hton->savepoint_set= binlog_savepoint_set;
binlog_hton->savepoint_rollback= binlog_savepoint_rollback;
binlog_hton->savepoint_rollback_can_release_mdl=
bzero(&binlog_tp, sizeof(binlog_tp));
binlog_tp.savepoint_offset= sizeof(my_off_t);
binlog_tp.close_connection= binlog_close_connection;
binlog_tp.savepoint_set= binlog_savepoint_set;
binlog_tp.savepoint_rollback= binlog_savepoint_rollback;
binlog_tp.savepoint_rollback_can_release_mdl=
binlog_savepoint_rollback_can_release_mdl;
binlog_hton->commit= [](handlerton *, THD *thd, bool all) { return 0; };
binlog_hton->rollback= binlog_rollback;
binlog_hton->drop_table= [](handlerton *, const char*) { return -1; };
binlog_tp.commit= [](THD *thd, bool all) { return 0; };
binlog_tp.rollback= binlog_rollback;
if (WSREP_ON || opt_bin_log)
{
binlog_hton->prepare= binlog_prepare;
binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot;
binlog_tp.prepare= binlog_prepare;
binlog_tp.start_consistent_snapshot= binlog_start_consistent_snapshot;
}
binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN | HTON_NO_ROLLBACK;
return 0;
binlog_tp.flags= HTON_NO_ROLLBACK;
auto plugin= (st_plugin_int*)p;
plugin->data= &binlog_tp;
return setup_transaction_participant(plugin);
}
#ifdef WITH_WSREP
#include "wsrep_binlog.h"
#endif /* WITH_WSREP */
static int binlog_close_connection(handlerton *hton, THD *thd)
static int binlog_close_connection(THD *thd)
{
DBUG_ENTER("binlog_close_connection");
binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr();
@ -2103,14 +2095,14 @@ inline bool is_preparing_xa(THD *thd)
}
static int binlog_prepare(handlerton *hton, THD *thd, bool all)
static int binlog_prepare(THD *thd, bool all)
{
/* Do nothing unless the transaction is a user XA. */
return is_preparing_xa(thd) ? binlog_commit(thd, all, FALSE) : 0;
}
int binlog_commit_by_xid(handlerton *hton, XID *xid)
int binlog_commit_by_xid(XID *xid)
{
int rc= 0;
THD *thd= current_thd;
@ -2131,20 +2123,20 @@ int binlog_commit_by_xid(handlerton *hton, XID *xid)
THD_TRANS trans;
trans.ha_list= NULL;
thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton);
thd->ha_data[binlog_hton->slot].ha_info[1].set_trx_read_write();
thd->ha_data[binlog_tp.slot].ha_info[1].register_ha(&trans, &binlog_tp);
thd->ha_data[binlog_tp.slot].ha_info[1].set_trx_read_write();
(void) thd->binlog_setup_trx_data();
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT);
rc= binlog_commit(thd, TRUE, FALSE);
thd->ha_data[binlog_hton->slot].ha_info[1].reset();
thd->ha_data[binlog_tp.slot].ha_info[1].reset();
return rc;
}
int binlog_rollback_by_xid(handlerton *hton, XID *xid)
int binlog_rollback_by_xid(XID *xid)
{
int rc= 0;
THD *thd= current_thd;
@ -2161,14 +2153,14 @@ int binlog_rollback_by_xid(handlerton *hton, XID *xid)
THD_TRANS trans;
trans.ha_list= NULL;
thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton);
thd->ha_data[hton->slot].ha_info[1].set_trx_read_write();
thd->ha_data[binlog_tp.slot].ha_info[1].register_ha(&trans, &binlog_tp);
thd->ha_data[binlog_tp.slot].ha_info[1].set_trx_read_write();
(void) thd->binlog_setup_trx_data();
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK ||
(thd->transaction->xid_state.get_state_code() == XA_ROLLBACK_ONLY));
rc= binlog_rollback(hton, thd, TRUE);
thd->ha_data[hton->slot].ha_info[1].reset();
rc= binlog_rollback(thd, TRUE);
thd->ha_data[binlog_tp.slot].ha_info[1].reset();
return rc;
}
@ -2315,8 +2307,8 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc)
}
if (cache_mngr->trx_cache.empty() &&
(thd->transaction->xid_state.get_state_code() != XA_PREPARED ||
!(thd->ha_data[binlog_hton->slot].ha_info[1].is_started() &&
thd->ha_data[binlog_hton->slot].ha_info[1].is_trx_read_write())))
!(thd->ha_data[binlog_tp.slot].ha_info[1].is_started() &&
thd->ha_data[binlog_tp.slot].ha_info[1].is_trx_read_write())))
{
/*
This is an empty transaction commit (both the regular and xa),
@ -2377,7 +2369,7 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc)
@see handlerton::rollback
*/
static int binlog_rollback(handlerton *hton, THD *thd, bool all)
static int binlog_rollback(THD *thd, bool all)
{
DBUG_ENTER("binlog_rollback");
@ -2414,8 +2406,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
if (!cache_mngr->trx_cache.has_incident() && cache_mngr->trx_cache.empty() &&
(thd->transaction->xid_state.get_state_code() != XA_PREPARED ||
!(thd->ha_data[binlog_hton->slot].ha_info[1].is_started() &&
thd->ha_data[binlog_hton->slot].ha_info[1].is_trx_read_write())))
!(thd->ha_data[binlog_tp.slot].ha_info[1].is_started() &&
thd->ha_data[binlog_tp.slot].ha_info[1].is_trx_read_write())))
{
/*
The same comments apply as in the binlog commit method's branch.
@ -2523,8 +2515,8 @@ void Event_log::set_write_error(THD *thd, bool is_transactional)
if (WSREP_EMULATE_BINLOG(thd))
{
if (is_transactional)
trans_register_ha(thd, TRUE, binlog_hton, 0);
trans_register_ha(thd, FALSE, binlog_hton, 0);
trans_register_ha(thd, TRUE, &binlog_tp, 0);
trans_register_ha(thd, FALSE, &binlog_tp, 0);
}
#endif /* WITH_WSREP */
DBUG_VOID_RETURN;
@ -2596,7 +2588,7 @@ Event_log::check_cache_error(THD *thd, binlog_cache_data *cache_data)
that case there is no need to have it in the binlog).
*/
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
static int binlog_savepoint_set(THD *thd, void *sv)
{
int error= 1;
DBUG_ENTER("binlog_savepoint_set");
@ -2629,7 +2621,7 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
DBUG_RETURN(error);
}
static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
static int binlog_savepoint_rollback(THD *thd, void *sv)
{
DBUG_ENTER("binlog_savepoint_rollback");
@ -2683,8 +2675,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
@return true - It is safe to release MDL locks.
false - If it is not.
*/
static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton,
THD *thd)
static bool binlog_savepoint_rollback_can_release_mdl(THD *thd)
{
DBUG_ENTER("binlog_savepoint_rollback_can_release_mdl");
/*
@ -6335,7 +6326,7 @@ bool stmt_has_updated_non_trans_table(const THD* thd)
/*
These functions are placed in this file since they need access to
binlog_hton, which has internal linkage.
binlog_tp, which has internal linkage.
*/
static binlog_cache_mngr *binlog_setup_cache_mngr(THD *thd)
@ -6379,12 +6370,12 @@ binlog_cache_mngr *THD::binlog_setup_trx_data()
{
DBUG_ENTER("THD::binlog_setup_trx_data");
binlog_cache_mngr *cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
(binlog_cache_mngr*) thd_get_ha_data(this, &binlog_tp);
if (!cache_mngr)
{
cache_mngr= binlog_setup_cache_mngr(this);
thd_set_ha_data(this, binlog_hton, cache_mngr);
thd_set_ha_data(this, &binlog_tp, cache_mngr);
}
@ -6487,7 +6478,7 @@ THD::binlog_start_trans_and_stmt()
ha_info->is_started().
*/
Ha_trx_info *ha_info;
ha_info= this->ha_data[binlog_hton->slot].ha_info + (mstmt_mode ? 1 : 0);
ha_info= this->ha_data[binlog_tp.slot].ha_info + (mstmt_mode ? 1 : 0);
if (!ha_info->is_started() && is_gtid_written_on_trans_start(this))
{
@ -6521,8 +6512,8 @@ THD::binlog_start_trans_and_stmt()
}
#endif
if (mstmt_mode)
trans_register_ha(this, TRUE, binlog_hton, 0);
trans_register_ha(this, FALSE, binlog_hton, 0);
trans_register_ha(this, TRUE, &binlog_tp, 0);
trans_register_ha(this, FALSE, &binlog_tp, 0);
/*
Mark statement transaction as read/write. We never start
a binary log transaction and keep it read-only,
@ -6532,7 +6523,7 @@ THD::binlog_start_trans_and_stmt()
since the statement-level flag will be propagated automatically
inside ha_commit_trans.
*/
ha_data[binlog_hton->slot].ha_info[0].set_trx_read_write();
ha_data[binlog_tp.slot].ha_info[0].set_trx_read_write();
}
DBUG_VOID_RETURN;
}
@ -6553,7 +6544,7 @@ void THD::binlog_set_stmt_begin() {
}
static int
binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
binlog_start_consistent_snapshot(THD *thd)
{
int err= 0;
DBUG_ENTER("binlog_start_consistent_snapshot");
@ -6565,7 +6556,7 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
strmake_buf(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file);
cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset;
trans_register_ha(thd, TRUE, binlog_hton, 0);
trans_register_ha(thd, TRUE, &binlog_tp, 0);
DBUG_RETURN(err);
}
@ -6750,7 +6741,7 @@ write_err:
binlog_cache_mngr *THD::binlog_get_cache_mngr() const
{
return (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
return (binlog_cache_mngr*) thd_get_ha_data(this, &binlog_tp);
}
@ -8459,7 +8450,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
{
for (; ha_info; ha_info= ha_info->next())
{
if (ha_info->is_started() && ha_info->ht() != binlog_hton &&
if (ha_info->is_started() && ha_info->ht() != &binlog_tp &&
!ha_info->ht()->commit_checkpoint_request)
{
entry.need_unlog= true;
@ -10175,10 +10166,10 @@ TC_LOG::run_prepare_ordered(THD *thd, bool all)
mysql_mutex_assert_owner(&LOCK_prepare_ordered);
for (; ha_info; ha_info= ha_info->next())
{
handlerton *ht= ha_info->ht();
transaction_participant *ht= ha_info->ht();
if (!ht->prepare_ordered)
continue;
ht->prepare_ordered(ht, thd, all);
ht->prepare_ordered(thd, all);
}
}
@ -10192,10 +10183,10 @@ TC_LOG::run_commit_ordered(THD *thd, bool all)
mysql_mutex_assert_owner(&LOCK_commit_ordered);
for (; ha_info; ha_info= ha_info->next())
{
handlerton *ht= ha_info->ht();
transaction_participant *ht= ha_info->ht();
if (!ht->commit_ordered)
continue;
ht->commit_ordered(ht, thd, all);
ht->commit_ordered(thd, all);
DBUG_EXECUTE_IF("enable_log_write_upto_crash",
{
DBUG_SET_INITIAL("+d,crash_after_log_write_upto");
@ -11353,8 +11344,8 @@ int TC_LOG_BINLOG::unlog_xa_prepare(THD *thd, bool all)
{
/* an empty XA-prepare event group is logged */
rc= write_empty_xa_prepare(thd, cache_mngr); // normally gains need_unlog
trans_register_ha(thd, true, binlog_hton, 0); // do it for future commmit
thd->ha_data[binlog_hton->slot].ha_info[1].set_trx_read_write();
trans_register_ha(thd, true, &binlog_tp, 0); // do it for future commmit
thd->ha_data[binlog_tp.slot].ha_info[1].set_trx_read_write();
}
if (rw_count == 0 || !cache_mngr->need_unlog)
return rc;
@ -12992,24 +12983,27 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
return errormsg;
}
struct st_mysql_storage_engine binlog_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
/*
Make it a "plugin" to be able to use a transaction_participant and to
add system and status variables.
*/
struct st_mysql_daemon binlog_plugin=
{ MYSQL_DAEMON_INTERFACE_VERSION };
maria_declare_plugin(binlog)
{
MYSQL_STORAGE_ENGINE_PLUGIN,
&binlog_storage_engine,
MYSQL_DAEMON_PLUGIN,
&binlog_plugin,
"binlog",
"MySQL AB",
"This is a pseudo storage engine to represent the binlog in a transaction",
"This is a plugin to represent the binlog in a transaction",
PLUGIN_LICENSE_GPL,
binlog_init, /* Plugin Init */
NULL, /* Plugin Deinit */
0x0100 /* 1.0 */,
binlog_status_vars_top, /* status variables */
binlog_sys_vars, /* system variables */
"1.0", /* string version */
0x0200 /* 1.0 */,
binlog_status_vars_top, /* status variables */
binlog_sys_vars, /* system variables */
"2.0", /* string version */
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
}
maria_declare_plugin_end;
@ -13019,7 +13013,7 @@ maria_declare_plugin_end;
IO_CACHE *wsrep_get_cache(THD * thd, bool is_transactional)
{
DBUG_ASSERT(binlog_hton->slot != HA_SLOT_UNDEF);
DBUG_ASSERT(binlog_tp.slot != HA_SLOT_UNDEF);
binlog_cache_mngr *cache_mngr = thd->binlog_get_cache_mngr();
if (cache_mngr)
return cache_mngr->get_binlog_cache_log(is_transactional);
@ -13032,7 +13026,7 @@ IO_CACHE *wsrep_get_cache(THD * thd, bool is_transactional)
bool wsrep_is_binlog_cache_empty(THD *thd)
{
binlog_cache_mngr *cache_mngr=
(binlog_cache_mngr *) thd_get_ha_data(thd, binlog_hton);
(binlog_cache_mngr *) thd_get_ha_data(thd, &binlog_tp);
if (cache_mngr)
return cache_mngr->trx_cache.empty() && cache_mngr->stmt_cache.empty();
return true;
@ -13104,13 +13098,13 @@ void wsrep_register_binlog_handler(THD *thd, bool trx)
Set callbacks in order to be able to call commmit or rollback.
*/
if (trx)
trans_register_ha(thd, TRUE, binlog_hton, 0);
trans_register_ha(thd, FALSE, binlog_hton, 0);
trans_register_ha(thd, TRUE, &binlog_tp, 0);
trans_register_ha(thd, FALSE, &binlog_tp, 0);
/*
Set the binary log as read/write otherwise callbacks are not called.
*/
thd->ha_data[binlog_hton->slot].ha_info[0].set_trx_read_write();
thd->ha_data[binlog_tp.slot].ha_info[0].set_trx_read_write();
}
DBUG_VOID_RETURN;
}

View file

@ -1409,7 +1409,7 @@ binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
extern handlerton *binlog_hton;
extern transaction_participant binlog_tp;
extern LOGGER logger;
extern const char *log_bin_index;
@ -1495,8 +1495,8 @@ const char *
get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list);
int binlog_commit(THD *thd, bool all, bool is_ro_1pc= false);
int binlog_commit_by_xid(handlerton *hton, XID *xid);
int binlog_rollback_by_xid(handlerton *hton, XID *xid);
int binlog_commit_by_xid(XID *xid);
int binlog_rollback_by_xid(XID *xid);
bool write_bin_log_start_alter(THD *thd, bool& partial_alter,
uint64 start_alter_id, bool log_if_exists);
#endif /* LOG_H */

View file

@ -21,11 +21,32 @@
#include "sql_class.h"
#include "log_cache.h"
static handlerton *online_alter_hton;
typedef void *sv_id_t;
static int online_alter_close_connection(THD *thd);
static int online_alter_savepoint_set(THD *thd, sv_id_t sv_id);
static int online_alter_savepoint_rollback(THD *thd, sv_id_t sv_id);
static int online_alter_commit(THD *thd, bool all);
static int online_alter_rollback(THD *thd, bool all);
static int online_alter_prepare(THD *thd, bool all);
static int online_alter_commit_by_xid(XID *x);
static int online_alter_rollback_by_xid(XID *x);
static transaction_participant online_alter_tp=
{
0, 0, HTON_NO_ROLLBACK,
online_alter_close_connection,
online_alter_savepoint_set, online_alter_savepoint_rollback,
[](THD *thd){ return true; }, /*savepoint_rollback_can_release_mdl*/
NULL, /*savepoint_release*/
online_alter_commit, online_alter_rollback, online_alter_prepare,
[](XID*, uint){ return 0; }, /*recover*/
online_alter_commit_by_xid,
online_alter_rollback_by_xid,
NULL, NULL,
NULL, NULL, NULL, NULL, NULL /* snapshot, *_ordered, checkpoint, versioned*/
};
struct Online_alter_cache_list: ilist<online_alter_cache_data>
{
sv_id_t savepoint_id= 0;
@ -108,7 +129,8 @@ online_alter_cache_data *setup_cache_data(MEM_ROOT *root, TABLE_SHARE *share)
}
static Online_alter_cache_list &get_cache_list(handlerton *ht, THD *thd)
static Online_alter_cache_list &get_cache_list(transaction_participant *ht,
THD *thd)
{
void *data= thd_get_ha_data(thd, ht);
DBUG_ASSERT(data);
@ -118,11 +140,11 @@ static Online_alter_cache_list &get_cache_list(handlerton *ht, THD *thd)
static Online_alter_cache_list &get_or_create_cache_list(THD *thd)
{
void *data= thd_get_ha_data(thd, online_alter_hton);
void *data= thd_get_ha_data(thd, &online_alter_tp);
if (!data)
{
data= new Online_alter_cache_list();
thd_set_ha_data(thd, online_alter_hton, data);
thd_set_ha_data(thd, &online_alter_tp, data);
}
return *(Online_alter_cache_list*)data;
}
@ -155,9 +177,9 @@ int online_alter_log_row(TABLE* table, const uchar *before_record,
{
table->online_alter_cache= get_cache_data(thd, table);
DBUG_ASSERT(table->online_alter_cache->cache_log.type == WRITE_CACHE);
trans_register_ha(thd, false, online_alter_hton, 0);
trans_register_ha(thd, false, &online_alter_tp, 0);
if (thd->in_multi_stmt_transaction_mode())
trans_register_ha(thd, true, online_alter_hton, 0);
trans_register_ha(thd, true, &online_alter_tp, 0);
}
// We need to log all columns for the case if alter table changes primary key
@ -275,10 +297,10 @@ void cleanup_tables(THD *thd)
}
static
int online_alter_savepoint_set(handlerton *hton, THD *thd, sv_id_t sv_id)
int online_alter_savepoint_set(THD *thd, sv_id_t sv_id)
{
DBUG_ENTER("binlog_online_alter_savepoint");
auto &cache_list= get_cache_list(hton, thd);
auto &cache_list= get_cache_list(&online_alter_tp, thd);
if (cache_list.empty())
DBUG_RETURN(0);
@ -296,11 +318,11 @@ int online_alter_savepoint_set(handlerton *hton, THD *thd, sv_id_t sv_id)
}
static
int online_alter_savepoint_rollback(handlerton *hton, THD *thd, sv_id_t sv_id)
int online_alter_savepoint_rollback(THD *thd, sv_id_t sv_id)
{
DBUG_ENTER("online_alter_savepoint_rollback");
auto &cache_list= get_cache_list(hton, thd);
auto &cache_list= get_cache_list(&online_alter_tp, thd);
for (auto &cache: cache_list)
{
if (cache.hton->savepoint_set == NULL)
@ -316,63 +338,7 @@ int online_alter_savepoint_rollback(handlerton *hton, THD *thd, sv_id_t sv_id)
DBUG_RETURN(0);
}
static int online_alter_commit(handlerton *hton, THD *thd, bool all)
{
int res;
bool is_ending_transaction= ending_trans(thd, all);
if (is_ending_transaction
&& thd->transaction->xid_state.get_state_code() == XA_PREPARED)
{
res= hton->commit_by_xid(hton, thd->transaction->xid_state.get_xid());
// cleanup was already done by prepare()
}
else
{
res= online_alter_end_trans(get_cache_list(hton, thd), thd,
is_ending_transaction, true);
cleanup_tables(thd);
}
return res;
};
static int online_alter_rollback(handlerton *hton, THD *thd, bool all)
{
int res;
bool is_ending_transaction= ending_trans(thd, all);
if (is_ending_transaction
&& thd->transaction->xid_state.get_state_code() == XA_PREPARED)
{
res= hton->rollback_by_xid(hton, thd->transaction->xid_state.get_xid());
// cleanup was already done by prepare()
}
else
{
res= online_alter_end_trans(get_cache_list(hton, thd), thd,
is_ending_transaction, false);
cleanup_tables(thd);
}
return res;
};
static int online_alter_prepare(handlerton *hton, THD *thd, bool all)
{
auto &cache_list= get_cache_list(hton, thd);
int res= 0;
if (ending_trans(thd, all))
{
thd->transaction->xid_state.set_online_alter_cache(&cache_list);
thd_set_ha_data(thd, hton, NULL);
}
else
{
res= online_alter_end_trans(cache_list, thd, false, true);
}
cleanup_tables(thd);
return res;
};
static int online_alter_commit_by_xid(handlerton *hton, XID *x)
static int online_alter_commit_by_xid(XID *x)
{
auto *xid= static_cast<XA_data*>(x);
if (likely(xid->online_alter_cache == NULL))
@ -384,7 +350,7 @@ static int online_alter_commit_by_xid(handlerton *hton, XID *x)
return res;
};
static int online_alter_rollback_by_xid(handlerton *hton, XID *x)
static int online_alter_rollback_by_xid(XID *x)
{
auto *xid= static_cast<XA_data*>(x);
if (likely(xid->online_alter_cache == NULL))
@ -396,61 +362,97 @@ static int online_alter_rollback_by_xid(handlerton *hton, XID *x)
return res;
};
static int online_alter_close_connection(handlerton *hton, THD *thd)
static int online_alter_commit(THD *thd, bool all)
{
auto *cache_list= (Online_alter_cache_list*)thd_get_ha_data(thd, hton);
int res;
bool is_ending_transaction= ending_trans(thd, all);
if (is_ending_transaction
&& thd->transaction->xid_state.get_state_code() == XA_PREPARED)
{
res= online_alter_commit_by_xid(thd->transaction->xid_state.get_xid());
// cleanup was already done by prepare()
}
else
{
res= online_alter_end_trans(get_cache_list(&online_alter_tp, thd), thd,
is_ending_transaction, true);
cleanup_tables(thd);
}
return res;
};
static int online_alter_rollback(THD *thd, bool all)
{
int res;
bool is_ending_transaction= ending_trans(thd, all);
if (is_ending_transaction
&& thd->transaction->xid_state.get_state_code() == XA_PREPARED)
{
res= online_alter_rollback_by_xid(thd->transaction->xid_state.get_xid());
// cleanup was already done by prepare()
}
else
{
res= online_alter_end_trans(get_cache_list(&online_alter_tp, thd), thd,
is_ending_transaction, false);
cleanup_tables(thd);
}
return res;
};
static int online_alter_prepare(THD *thd, bool all)
{
auto &cache_list= get_cache_list(&online_alter_tp, thd);
int res= 0;
if (ending_trans(thd, all))
{
thd->transaction->xid_state.set_online_alter_cache(&cache_list);
thd_set_ha_data(thd, &online_alter_tp, NULL);
}
else
{
res= online_alter_end_trans(cache_list, thd, false, true);
}
cleanup_tables(thd);
return res;
};
static int online_alter_close_connection(THD *thd)
{
auto *cache_list= (Online_alter_cache_list*)thd_get_ha_data(thd, &online_alter_tp);
DBUG_ASSERT(!cache_list || cache_list->empty());
delete cache_list;
thd_set_ha_data(thd, hton, NULL);
thd_set_ha_data(thd, &online_alter_tp, NULL);
return 0;
}
static int online_alter_log_init(void *p)
{
online_alter_hton= (handlerton *)p;
online_alter_hton->db_type= DB_TYPE_ONLINE_ALTER;
online_alter_hton->savepoint_offset= 0;
online_alter_hton->close_connection= online_alter_close_connection;
online_alter_hton->savepoint_set= online_alter_savepoint_set;
online_alter_hton->savepoint_rollback= online_alter_savepoint_rollback;
online_alter_hton->savepoint_rollback_can_release_mdl=
[](handlerton *hton, THD *thd){ return true; };
online_alter_hton->commit= online_alter_commit;
online_alter_hton->rollback= online_alter_rollback;
online_alter_hton->recover= [](handlerton*, XID*, uint){ return 0; };
online_alter_hton->prepare= online_alter_prepare;
online_alter_hton->commit_by_xid= online_alter_commit_by_xid;
online_alter_hton->rollback_by_xid= online_alter_rollback_by_xid;
online_alter_hton->drop_table= [](handlerton *, const char*) { return -1; };
online_alter_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN
| HTON_NO_ROLLBACK;
return 0;
auto plugin= (st_plugin_int*)p;
plugin->data= &online_alter_tp;
return setup_transaction_participant(plugin);
}
struct st_mysql_storage_engine online_alter_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
struct st_mysql_daemon online_alter_plugin=
{ MYSQL_DAEMON_INTERFACE_VERSION };
maria_declare_plugin(online_alter_log)
{
MYSQL_STORAGE_ENGINE_PLUGIN,
&online_alter_storage_engine,
MYSQL_DAEMON_PLUGIN,
&online_alter_plugin,
"online_alter_log",
"MariaDB PLC",
"A pseudo storage engine for the online alter log",
"This is a plugin to represent the online alter log in a transaction",
PLUGIN_LICENSE_GPL,
online_alter_log_init,
NULL,
0x0100, // 1.0
0x0200, // 2.0
NULL, // no status vars
NULL, // no sysvars
"1.0",
"2.0",
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;

View file

@ -530,7 +530,7 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
void *trx_hton= ha_info->ht();
auto table_entry= list;
if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton)
if (!ha_info->is_trx_read_write() || trx_hton == &binlog_tp)
continue;
while (table_entry)
{
@ -552,7 +552,7 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
ha_info= ha_info->next();
if (!ha_info)
break;
if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
if (ha_info->is_trx_read_write() && ha_info->ht() != &binlog_tp)
{
statistic_increment(rpl_transactions_multi_engine, LOCK_status);
break;

View file

@ -441,7 +441,7 @@ void thd_storage_lock_wait(THD *thd, long long value)
Provide a handler data getter to simplify coding
*/
extern "C"
void *thd_get_ha_data(const THD *thd, const struct handlerton *hton)
void *thd_get_ha_data(const THD *thd, const struct transaction_participant *hton)
{
DBUG_ASSERT(thd == current_thd || mysql_mutex_is_owner(&thd->LOCK_thd_data));
return thd->ha_data[hton->slot].ha_ptr;
@ -453,7 +453,7 @@ void *thd_get_ha_data(const THD *thd, const struct handlerton *hton)
@see thd_set_ha_data() definition in plugin.h
*/
extern "C"
void thd_set_ha_data(THD *thd, const struct handlerton *hton,
void thd_set_ha_data(THD *thd, const struct transaction_participant *hton,
const void *ha_data)
{
plugin_ref *lock= &thd->ha_data[hton->slot].lock;

View file

@ -122,6 +122,7 @@ enum enum_sql_command {
};
struct TABLE_LIST;
struct handlerton;
class Storage_engine_name
{
@ -136,8 +137,7 @@ public:
Storage_engine_name(const LEX_CSTRING &name)
:m_storage_engine_name(name)
{ }
bool resolve_storage_engine_with_error(THD *thd,
handlerton **ha,
bool resolve_storage_engine_with_error(THD *thd, handlerton **ha,
bool tmp_table);
bool is_set() { return m_storage_engine_name.str != NULL; }
};

View file

@ -442,21 +442,21 @@ public:
db_type= DB_TYPE_HLINDEX_HELPER;
flags = HTON_NOT_USER_SELECTABLE | HTON_HIDDEN;
savepoint_offset= 0;
savepoint_set= [](handlerton *, THD *, void *){ return 0; };
savepoint_rollback_can_release_mdl= [](handlerton *, THD *){ return true; };
savepoint_set= [](THD *, void *){ return 0; };
savepoint_rollback_can_release_mdl= [](THD *){ return true; };
savepoint_rollback= do_savepoint_rollback;
commit= do_commit;
rollback= do_rollback;
}
static int do_commit(handlerton *, THD *thd, bool);
static int do_rollback(handlerton *, THD *thd, bool);
static int do_savepoint_rollback(handlerton *, THD *thd, void *);
static int do_commit(THD *thd, bool);
static int do_rollback(THD *thd, bool);
static int do_savepoint_rollback(THD *thd, void *);
} hton;
};
MHNSW_Trx::MHNSW_hton MHNSW_Trx::hton;
int MHNSW_Trx::MHNSW_hton::do_savepoint_rollback(handlerton *, THD *thd, void *)
int MHNSW_Trx::MHNSW_hton::do_savepoint_rollback(THD *thd, void *)
{
for (auto trx= static_cast<MHNSW_Trx*>(thd_get_ha_data(thd, &hton));
trx; trx= trx->next)
@ -464,7 +464,7 @@ int MHNSW_Trx::MHNSW_hton::do_savepoint_rollback(handlerton *, THD *thd, void *)
return 0;
}
int MHNSW_Trx::MHNSW_hton::do_rollback(handlerton *, THD *thd, bool)
int MHNSW_Trx::MHNSW_hton::do_rollback(THD *thd, bool)
{
MHNSW_Trx *trx_next;
for (auto trx= static_cast<MHNSW_Trx*>(thd_get_ha_data(thd, &hton));
@ -477,7 +477,7 @@ int MHNSW_Trx::MHNSW_hton::do_rollback(handlerton *, THD *thd, bool)
return 0;
}
int MHNSW_Trx::MHNSW_hton::do_commit(handlerton *, THD *thd, bool)
int MHNSW_Trx::MHNSW_hton::do_commit(THD *thd, bool)
{
MHNSW_Trx *trx_next;
for (auto trx= static_cast<MHNSW_Trx*>(thd_get_ha_data(thd, &hton));

@ -1 +1 @@
Subproject commit 2acda95e0bcf3cd9ecb64c5933d6c0429df016b7
Subproject commit 0fd5a7919c0dc0cefb6cf1ec3caf36cc09a81d98

View file

@ -393,6 +393,8 @@
#define MIN_PORT 0
#endif
static handlerton *federated_hton;
/* Variables for federated share methods */
static HASH federated_open_tables; // To track open tables
mysql_mutex_t federated_mutex; // To init the hash
@ -411,8 +413,8 @@ static const uint sizeof_trailing_where= sizeof(" WHERE ") - 1;
static handler *federated_create_handler(handlerton *hton,
TABLE_SHARE *table,
MEM_ROOT *mem_root);
static int federated_commit(handlerton *hton, THD *thd, bool all);
static int federated_rollback(handlerton *hton, THD *thd, bool all);
static int federated_commit(THD *thd, bool all);
static int federated_rollback(THD *thd, bool all);
/* Federated storage engine handlerton */
@ -489,7 +491,7 @@ int federated_db_init(void *p)
init_federated_psi_keys();
#endif /* HAVE_PSI_INTERFACE */
handlerton *federated_hton= (handlerton *)p;
federated_hton= (handlerton *)p;
federated_hton->db_type= DB_TYPE_FEDERATED_DB;
federated_hton->commit= federated_commit;
federated_hton->rollback= federated_rollback;
@ -3307,10 +3309,10 @@ int ha_federated::external_lock(THD *thd, int lock_type)
}
static int federated_commit(handlerton *hton, THD *thd, bool all)
static int federated_commit(THD *thd, bool all)
{
int return_val= 0;
ha_federated *trx= (ha_federated *) thd_get_ha_data(thd, hton);
ha_federated *trx= (ha_federated *) thd_get_ha_data(thd, federated_hton);
DBUG_ENTER("federated_commit");
if (all)
@ -3325,7 +3327,7 @@ static int federated_commit(handlerton *hton, THD *thd, bool all)
if (error && !return_val)
return_val= error;
}
thd_set_ha_data(thd, hton, NULL);
thd_set_ha_data(thd, federated_hton, NULL);
}
DBUG_PRINT("info", ("error val: %d", return_val));
@ -3333,10 +3335,10 @@ static int federated_commit(handlerton *hton, THD *thd, bool all)
}
static int federated_rollback(handlerton *hton, THD *thd, bool all)
static int federated_rollback(THD *thd, bool all)
{
int return_val= 0;
ha_federated *trx= (ha_federated *)thd_get_ha_data(thd, hton);
ha_federated *trx= (ha_federated *)thd_get_ha_data(thd, federated_hton);
DBUG_ENTER("federated_rollback");
if (all)
@ -3351,7 +3353,7 @@ static int federated_rollback(handlerton *hton, THD *thd, bool all)
if (error && !return_val)
return_val= error;
}
thd_set_ha_data(thd, hton, NULL);
thd_set_ha_data(thd, federated_hton, NULL);
}
DBUG_PRINT("info", ("error val: %d", return_val));

View file

@ -1772,9 +1772,9 @@ federatedx_txn *ha_federatedx::get_txn(THD *thd, bool no_create)
}
int ha_federatedx::disconnect(handlerton *hton, MYSQL_THD thd)
int ha_federatedx::disconnect(MYSQL_THD thd)
{
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, federatedx_hton);
delete txn;
return 0;
}
@ -3541,16 +3541,16 @@ int ha_federatedx::external_lock(MYSQL_THD thd, int lock_type)
}
int ha_federatedx::savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv)
int ha_federatedx::savepoint_set(MYSQL_THD thd, void *sv)
{
int error= 0;
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, federatedx_hton);
DBUG_ENTER("ha_federatedx::savepoint_set");
if (txn && txn->has_connections())
{
if (txn->txn_begin())
trans_register_ha(thd, TRUE, hton, 0);
trans_register_ha(thd, TRUE, federatedx_hton, 0);
txn->sp_acquire((ulong *) sv);
@ -3561,10 +3561,10 @@ int ha_federatedx::savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv)
}
int ha_federatedx::savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv)
int ha_federatedx::savepoint_rollback(MYSQL_THD thd, void *sv)
{
int error= 0;
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, federatedx_hton);
DBUG_ENTER("ha_federatedx::savepoint_rollback");
if (txn)
@ -3574,10 +3574,10 @@ int ha_federatedx::savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv)
}
int ha_federatedx::savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv)
int ha_federatedx::savepoint_release(MYSQL_THD thd, void *sv)
{
int error= 0;
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, federatedx_hton);
DBUG_ENTER("ha_federatedx::savepoint_release");
if (txn)
@ -3587,10 +3587,10 @@ int ha_federatedx::savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv)
}
int ha_federatedx::commit(handlerton *hton, MYSQL_THD thd, bool all)
int ha_federatedx::commit(MYSQL_THD thd, bool all)
{
int return_val;
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, federatedx_hton);
DBUG_ENTER("ha_federatedx::commit");
if (all)
@ -3603,10 +3603,10 @@ int ha_federatedx::commit(handlerton *hton, MYSQL_THD thd, bool all)
}
int ha_federatedx::rollback(handlerton *hton, MYSQL_THD thd, bool all)
int ha_federatedx::rollback(MYSQL_THD thd, bool all)
{
int return_val;
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, federatedx_hton);
DBUG_ENTER("ha_federatedx::rollback");
if (all)

View file

@ -289,12 +289,12 @@ private:
int stash_remote_error();
static federatedx_txn *get_txn(THD *thd, bool no_create= FALSE);
static int disconnect(handlerton *hton, MYSQL_THD thd);
static int savepoint_set(handlerton *hton, MYSQL_THD thd, void *sv);
static int savepoint_rollback(handlerton *hton, MYSQL_THD thd, void *sv);
static int savepoint_release(handlerton *hton, MYSQL_THD thd, void *sv);
static int commit(handlerton *hton, MYSQL_THD thd, bool all);
static int rollback(handlerton *hton, MYSQL_THD thd, bool all);
static int disconnect(MYSQL_THD thd);
static int savepoint_set(MYSQL_THD thd, void *sv);
static int savepoint_rollback(MYSQL_THD thd, void *sv);
static int savepoint_release(MYSQL_THD thd, void *sv);
static int commit(MYSQL_THD thd, bool all);
static int rollback(MYSQL_THD thd, bool all);
static int discover_assisted(handlerton *, THD*, TABLE_SHARE *,
HA_CREATE_INFO *);

View file

@ -1062,14 +1062,13 @@ static
int
innobase_close_connection(
/*======================*/
handlerton* hton, /*!< in/out: InnoDB handlerton */
THD* thd); /*!< in: MySQL thread handle for
which to close the connection */
/** Cancel any pending lock request associated with the current THD.
@sa THD::awake() @sa ha_kill_query() */
static void innobase_kill_query(handlerton*, THD* thd, enum thd_kill_levels);
static void innobase_commit_ordered(handlerton *hton, THD* thd, bool all);
static void innobase_commit_ordered(THD* thd, bool all);
/*****************************************************************//**
Commits a transaction in an InnoDB database or marks an SQL statement
@ -1079,7 +1078,6 @@ static
int
innobase_commit(
/*============*/
handlerton* hton, /*!< in/out: InnoDB handlerton */
THD* thd, /*!< in: MySQL thread handle of the
user for whom the transaction should
be committed */
@ -1095,7 +1093,6 @@ static
int
innobase_rollback(
/*==============*/
handlerton* hton, /*!< in/out: InnoDB handlerton */
THD* thd, /*!< in: handle to the MySQL thread
of the user whose transaction should
be rolled back */
@ -1111,7 +1108,6 @@ static
int
innobase_rollback_to_savepoint(
/*===========================*/
handlerton* hton, /*!< in/out: InnoDB handlerton */
THD* thd, /*!< in: handle to the MySQL thread of
the user whose XA transaction should
be rolled back to savepoint */
@ -1125,7 +1121,6 @@ static
bool
innobase_rollback_to_savepoint_can_release_mdl(
/*===========================================*/
handlerton* hton, /*!< in/out: InnoDB handlerton */
THD* thd); /*!< in: handle to the MySQL thread of
the user whose XA transaction should
be rolled back to savepoint */
@ -1137,7 +1132,6 @@ static
int
innobase_savepoint(
/*===============*/
handlerton* hton, /*!< in/out: InnoDB handlerton */
THD* thd, /*!< in: handle to the MySQL thread of
the user's XA transaction for which
we need to take a savepoint */
@ -1151,7 +1145,6 @@ static
int
innobase_release_savepoint(
/*=======================*/
handlerton* hton, /*!< in/out: handlerton for InnoDB */
THD* thd, /*!< in: handle to the MySQL thread
of the user whose transaction's
savepoint should be released */
@ -1194,7 +1187,6 @@ static
int
innobase_xa_prepare(
/*================*/
handlerton* hton, /*!< in: InnoDB handlerton */
THD* thd, /*!< in: handle to the MySQL thread of
the user whose XA transaction should
be prepared */
@ -1208,7 +1200,6 @@ static
int
innobase_xa_recover(
/*================*/
handlerton* hton, /*!< in: InnoDB handlerton */
XID* xid_list, /*!< in/out: prepared transactions */
uint len); /*!< in: number of slots in xid_list */
/*******************************************************************//**
@ -1219,9 +1210,46 @@ static
int
innobase_commit_by_xid(
/*===================*/
handlerton* hton, /*!< in: InnoDB handlerton */
XID* xid); /*!< in: X/Open XA transaction
identification */
/*******************************************************************//**
This function is used to rollback one X/Open XA distributed transaction
which is in the prepared state
@return 0 or error number */
static
int
innobase_rollback_by_xid(
/*===================*/
XID* xid); /*!< in: X/Open XA transaction
identification */
#ifndef EMBEDDED_LIBRARY
/*******************************************************************//**
This function is used to rollback one X/Open XA distributed transaction
which is in the prepared state asynchronously.
It only set the transaction's status to ACTIVE and persist the status.
The transaction will be rolled back by background rollback thread.
@return 0 or error number
*/
static
int
innobase_recover_rollback_by_xid(
/*===================*/
const XID* xid); /*!< in: X/Open XA transaction
identification */
/*******************************************************************//**
This function is called after tc log is opened(typically binlog recovery)
has done. It starts rollback thread to rollback the transactions
have been changed from PREPARED to ACTIVE.
@return 0 or error number
*/
static
void
innobase_tc_log_recovery_done();
#endif
/** Ignore FOREIGN KEY constraints that would be violated by DROP DATABASE */
static ibool innodb_drop_database_ignore_fk(void*,void*) { return false; }
@ -1572,7 +1600,6 @@ static
int
innobase_start_trx_and_assign_read_view(
/*====================================*/
handlerton* hton, /* in: InnoDB handlerton */
THD* thd); /* in: MySQL thread handle of the
user for whom the transaction should
be committed */
@ -4307,12 +4334,10 @@ static
int
innobase_start_trx_and_assign_read_view(
/*====================================*/
handlerton* hton, /*!< in: InnoDB handlerton */
THD* thd) /*!< in: MySQL thread handle of the user for
whom the transaction should be committed */
{
DBUG_ENTER("innobase_start_trx_and_assign_read_view");
DBUG_ASSERT(hton == innodb_hton_ptr);
/* Create a new trx struct for thd, if it does not yet have one */
@ -4343,7 +4368,7 @@ innobase_start_trx_and_assign_read_view(
/* Set the MySQL flag to mark that there is an active transaction */
innobase_register_trx(hton, current_thd, trx);
innobase_register_trx(innodb_hton_ptr, current_thd, trx);
DBUG_RETURN(0);
}
@ -4409,7 +4434,6 @@ static
void
innobase_commit_ordered(
/*====================*/
handlerton *hton, /*!< in: Innodb handlerton */
THD* thd, /*!< in: MySQL thread handle of the user for whom
the transaction should be committed */
bool all) /*!< in: TRUE - commit transaction
@ -4417,7 +4441,6 @@ innobase_commit_ordered(
{
trx_t* trx;
DBUG_ENTER("innobase_commit_ordered");
DBUG_ASSERT(hton == innodb_hton_ptr);
trx = check_trx_exists(thd);
@ -4466,7 +4489,6 @@ static
int
innobase_commit(
/*============*/
handlerton* hton, /*!< in: InnoDB handlerton */
THD* thd, /*!< in: MySQL thread handle of the
user for whom the transaction should
be committed */
@ -4476,7 +4498,6 @@ innobase_commit(
{
DBUG_ENTER("innobase_commit");
DBUG_PRINT("enter", ("commit_trx: %d", commit_trx));
DBUG_ASSERT(hton == innodb_hton_ptr);
DBUG_PRINT("trans", ("ending transaction"));
trx_t* trx = check_trx_exists(thd);
@ -4554,7 +4575,6 @@ static
int
innobase_rollback(
/*==============*/
handlerton* hton, /*!< in: InnoDB handlerton */
THD* thd, /*!< in: handle to the MySQL thread
of the user whose transaction should
be rolled back */
@ -4563,7 +4583,6 @@ innobase_rollback(
statement only */
{
DBUG_ENTER("innobase_rollback");
DBUG_ASSERT(hton == innodb_hton_ptr);
DBUG_PRINT("trans", ("aborting transaction"));
trx_t* trx = check_trx_exists(thd);
@ -4766,7 +4785,6 @@ static
int
innobase_rollback_to_savepoint(
/*===========================*/
handlerton* hton, /*!< in: InnoDB handlerton */
THD* thd, /*!< in: handle to the MySQL thread
of the user whose transaction should
be rolled back to savepoint */
@ -4774,7 +4792,6 @@ innobase_rollback_to_savepoint(
{
DBUG_ENTER("innobase_rollback_to_savepoint");
DBUG_ASSERT(hton == innodb_hton_ptr);
trx_t* trx = check_trx_exists(thd);
@ -4806,13 +4823,11 @@ static
bool
innobase_rollback_to_savepoint_can_release_mdl(
/*===========================================*/
handlerton* hton, /*!< in: InnoDB handlerton */
THD* thd) /*!< in: handle to the MySQL thread
of the user whose transaction should
be rolled back to savepoint */
{
DBUG_ENTER("innobase_rollback_to_savepoint_can_release_mdl");
DBUG_ASSERT(hton == innodb_hton_ptr);
trx_t* trx = check_trx_exists(thd);
@ -4834,7 +4849,6 @@ static
int
innobase_release_savepoint(
/*=======================*/
handlerton* hton, /*!< in: handlerton for InnoDB */
THD* thd, /*!< in: handle to the MySQL thread
of the user whose transaction's
savepoint should be released */
@ -4845,7 +4859,6 @@ innobase_release_savepoint(
char name[64];
DBUG_ENTER("innobase_release_savepoint");
DBUG_ASSERT(hton == innodb_hton_ptr);
trx = check_trx_exists(thd);
@ -4869,12 +4882,10 @@ static
int
innobase_savepoint(
/*===============*/
handlerton* hton, /*!< in: handle to the InnoDB handlerton */
THD* thd, /*!< in: handle to the MySQL thread */
void* savepoint)/*!< in: savepoint data */
{
DBUG_ENTER("innobase_savepoint");
DBUG_ASSERT(hton == innodb_hton_ptr);
/* In the autocommit mode there is no sense to set a savepoint
(unless we are in sub-statement), so SQL layer ensures that
@ -4908,9 +4919,8 @@ innobase_savepoint(
@return 0 always
*/
static int innobase_close_connection(handlerton *hton, THD *thd)
static int innobase_close_connection(THD *thd)
{
DBUG_ASSERT(hton == innodb_hton_ptr);
if (auto trx= thd_to_trx(thd))
{
thd_set_ha_data(thd, innodb_hton_ptr, NULL);
@ -16139,7 +16149,7 @@ ha_innobase::external_lock(
if (trx_is_started(trx)) {
innobase_commit(ht, thd, TRUE);
innobase_commit(thd, TRUE);
}
} else if (trx->isolation_level <= TRX_ISO_READ_COMMITTED) {
@ -16970,7 +16980,6 @@ static
int
innobase_xa_prepare(
/*================*/
handlerton* hton, /*!< in: InnoDB handlerton */
THD* thd, /*!< in: handle to the MySQL thread of
the user whose XA transaction should
be prepared */
@ -16980,8 +16989,6 @@ innobase_xa_prepare(
{
trx_t* trx = check_trx_exists(thd);
DBUG_ASSERT(hton == innodb_hton_ptr);
thd_get_xid(thd, &reinterpret_cast<MYSQL_XID&>(trx->xid));
if (!trx_is_registered_for_2pc(trx) && trx_is_started(trx)) {
@ -17044,12 +17051,9 @@ static
int
innobase_xa_recover(
/*================*/
handlerton* hton, /*!< in: InnoDB handlerton */
XID* xid_list,/*!< in/out: prepared transactions */
uint len) /*!< in: number of slots in xid_list */
{
DBUG_ASSERT(hton == innodb_hton_ptr);
if (len == 0 || xid_list == NULL) {
return(0);
@ -17066,11 +17070,8 @@ static
int
innobase_commit_by_xid(
/*===================*/
handlerton* hton,
XID* xid) /*!< in: X/Open XA transaction identification */
{
DBUG_ASSERT(hton == innodb_hton_ptr);
DBUG_EXECUTE_IF("innobase_xa_fail",
return XAER_RMFAIL;);
@ -17099,10 +17100,8 @@ which is in the prepared state
@param[in] xid X/Open XA transaction identification
@return 0 or error number */
int innobase_rollback_by_xid(handlerton* hton, XID* xid)
static int innobase_rollback_by_xid(XID* xid)
{
DBUG_ASSERT(hton == innodb_hton_ptr);
DBUG_EXECUTE_IF("innobase_xa_fail",
return XAER_RMFAIL;);
@ -17141,7 +17140,7 @@ int innobase_rollback_by_xid(handlerton* hton, XID* xid)
@return 0 or error number
*/
int innobase_recover_rollback_by_xid(const XID *xid)
static int innobase_recover_rollback_by_xid(const XID *xid)
{
DBUG_EXECUTE_IF("innobase_xa_fail", return XAER_RMFAIL;);
@ -17187,7 +17186,7 @@ int innobase_recover_rollback_by_xid(const XID *xid)
return 0;
}
void innobase_tc_log_recovery_done()
static void innobase_tc_log_recovery_done()
{
if (high_level_read_only)
return;

View file

@ -932,33 +932,3 @@ ib_push_frm_error(
@return true if index column length exceeds limit */
MY_ATTRIBUTE((warn_unused_result))
bool too_big_key_part_length(size_t max_field_len, const KEY& key);
/** This function is used to rollback one X/Open XA distributed transaction
which is in the prepared state
@param[in] hton InnoDB handlerton
@param[in] xid X/Open XA transaction identification
@return 0 or error number */
int innobase_rollback_by_xid(handlerton* hton, XID* xid);
/**
This function is used to rollback one X/Open XA distributed transaction
which is in the prepared state asynchronously.
It only set the transaction's status to ACTIVE and persist the status.
The transaction will be rolled back by background rollback thread.
@param xid X/Open XA transaction identification
@return 0 or error number
*/
int innobase_recover_rollback_by_xid(const XID *xid);
/**
This function is called after tc log is opened(typically binlog recovery)
has done. It starts rollback thread to rollback the transactions
have been changed from PREPARED to ACTIVE.
@return 0 or error number
*/
void innobase_tc_log_recovery_done();

View file

@ -3587,8 +3587,7 @@ static int maria_hton_panic(handlerton *hton, ha_panic_function flag)
}
static int maria_commit(handlerton *hton __attribute__ ((unused)),
THD *thd, bool all)
static int maria_commit(THD *thd, bool all)
{
TRN *trn= THD_TRN;
int res= 0;
@ -3618,7 +3617,7 @@ static int maria_commit(handlerton *hton __attribute__ ((unused)),
}
#ifdef MARIA_CANNOT_ROLLBACK
static int maria_rollback(handlerton *hton, THD *thd, bool all)
static int maria_rollback(THD *thd, bool all)
{
TRN *trn= THD_TRN;
DBUG_ENTER("maria_rollback");
@ -3630,15 +3629,14 @@ static int maria_rollback(handlerton *hton, THD *thd, bool all)
ER_THD(thd, ER_DATA_WAS_COMMITED_UNDER_ROLLBACK),
"Aria");
if (all)
DBUG_RETURN(maria_commit(hton, thd, all));
DBUG_RETURN(maria_commit(thd, all));
/* Statement rollbacks are ignored. Commit will happen in external_lock */
DBUG_RETURN(0);
}
#else
static int maria_rollback(handlerton *hton __attribute__ ((unused)),
THD *thd, bool all)
static int maria_rollback(THD *thd, bool all)
{
TRN *trn= THD_TRN;
DBUG_ENTER("maria_rollback");

View file

@ -1340,7 +1340,7 @@ static void mrn_drop_database(handlerton *hton, char *path)
DBUG_VOID_RETURN;
}
static int mrn_close_connection(handlerton *hton, THD *thd)
static int mrn_close_connection(THD *thd)
{
MRN_DBUG_ENTER_FUNCTION();
void *p = thd_get_ha_data(thd, mrn_hton_ptr);

View file

@ -155,7 +155,7 @@ int oqgraph_discover_table_structure(handlerton *hton, THD* thd,
share->init_from_sql_statement_string(thd, true, sql.ptr(), sql.length());
}
int oqgraph_close_connection(handlerton *hton, THD *thd);
int oqgraph_close_connection(THD *thd);
static int oqgraph_init(void *p)
{
@ -380,7 +380,7 @@ int ha_oqgraph::oqgraph_check_table_structure (TABLE *table_arg)
** OQGRAPH tables
*****************************************************************************/
int oqgraph_close_connection(handlerton *hton, THD *thd)
int oqgraph_close_connection(THD *thd)
{
DBUG_PRINT( "oq-debug", ("thd: 0x%lx; oqgraph_close_connection.", (long) thd));
// close_thread_tables(thd); // maybe this?

View file

@ -3798,7 +3798,7 @@ static Rdb_transaction *get_or_create_tx(THD *const thd) {
return tx;
}
static int rocksdb_close_connection(handlerton *const hton, THD *const thd) {
static int rocksdb_close_connection(THD *const thd) {
Rdb_transaction *tx = get_tx_from_thd(thd);
if (tx != nullptr) {
bool is_critical_error;
@ -3876,7 +3876,7 @@ static bool rocksdb_flush_wal(handlerton* hton __attribute__((__unused__)))
For a slave, prepare() updates the slave_gtid_info table which tracks the
replication progress.
*/
static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
static int rocksdb_prepare(THD* thd, bool prepare_tx)
{
bool async=false; // This is "ASYNC_COMMIT" feature which is only present in webscalesql
@ -3960,10 +3960,9 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
do nothing for prepare/commit by xid
this is needed to avoid crashes in XA scenarios
*/
static int rocksdb_commit_by_xid(handlerton *const hton, XID *const xid) {
static int rocksdb_commit_by_xid(XID *const xid) {
DBUG_ENTER_FUNC();
DBUG_ASSERT(hton != nullptr);
DBUG_ASSERT(xid != nullptr);
DBUG_ASSERT(commit_latency_stats != nullptr);
@ -3993,11 +3992,9 @@ static int rocksdb_commit_by_xid(handlerton *const hton, XID *const xid) {
DBUG_RETURN(HA_EXIT_SUCCESS);
}
static int rocksdb_rollback_by_xid(
handlerton *const hton MY_ATTRIBUTE((__unused__)), XID *const xid) {
static int rocksdb_rollback_by_xid(XID *const xid) {
DBUG_ENTER_FUNC();
DBUG_ASSERT(hton != nullptr);
DBUG_ASSERT(xid != nullptr);
DBUG_ASSERT(rdb != nullptr);
@ -4049,7 +4046,7 @@ static void rdb_xid_from_string(const std::string &src, XID *const dst) {
Reading last committed binary log info from RocksDB system row.
The info is needed for crash safe slave/master to work.
*/
static int rocksdb_recover(handlerton* hton, XID* xid_list, uint len)
static int rocksdb_recover(XID* xid_list, uint len)
#ifdef MARIAROCKS_NOT_YET
char* const binlog_file,
my_off_t *const binlog_pos,
@ -4128,7 +4125,7 @@ static void rocksdb_checkpoint_request(void *cookie)
@param all: TRUE - commit the transaction
FALSE - SQL statement ended
*/
static void rocksdb_commit_ordered(handlerton *hton, THD* thd, bool all)
static void rocksdb_commit_ordered(THD* thd, bool all)
{
// Same assert as InnoDB has
DBUG_ASSERT(all || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT |
@ -4154,11 +4151,10 @@ static void rocksdb_commit_ordered(handlerton *hton, THD* thd, bool all)
}
static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
static int rocksdb_commit(THD* thd, bool commit_tx)
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(hton != nullptr);
DBUG_ASSERT(thd != nullptr);
DBUG_ASSERT(commit_latency_stats != nullptr);
@ -4236,8 +4232,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
}
static int rocksdb_rollback(handlerton *const hton, THD *const thd,
bool rollback_tx) {
static int rocksdb_rollback(THD *const thd, bool rollback_tx) {
Rdb_transaction *tx = get_tx_from_thd(thd);
Rdb_perf_context_guard guard(tx, rocksdb_perf_context_level(thd));
@ -4875,7 +4870,6 @@ static bool rocksdb_explicit_snapshot(
InnoDB and RocksDB transactions.
*/
static int rocksdb_start_tx_and_assign_read_view(
handlerton *const hton, /*!< in: RocksDB handlerton */
THD *const thd /*!< in: MySQL thread handle of the
user for whom the transaction should
be committed */
@ -4918,7 +4912,7 @@ static int rocksdb_start_tx_and_assign_read_view(
DBUG_ASSERT(!tx->has_snapshot());
tx->set_tx_read_only(true);
rocksdb_register_tx(hton, thd, tx);
rocksdb_register_tx(rocksdb_hton, thd, tx);
tx->acquire_snapshot(true);
#ifdef MARIADB_NOT_YET
@ -5026,19 +5020,16 @@ static int rocksdb_start_tx_with_shared_read_view(
* Current SAVEPOINT does not correctly handle ROLLBACK and does not return
* errors. This needs to be addressed in future versions (Issue#96).
*/
static int rocksdb_savepoint(handlerton *const hton, THD *const thd,
void *const savepoint) {
static int rocksdb_savepoint(THD *const thd, void *const savepoint) {
return HA_EXIT_SUCCESS;
}
static int rocksdb_rollback_to_savepoint(handlerton *const hton, THD *const thd,
void *const savepoint) {
static int rocksdb_rollback_to_savepoint(THD *const thd, void *const savepoint) {
Rdb_transaction *tx = get_tx_from_thd(thd);
return tx->rollback_to_savepoint(savepoint);
}
static bool rocksdb_rollback_to_savepoint_can_release_mdl(
handlerton *const /* hton */, THD *const /* thd */) {
static bool rocksdb_rollback_to_savepoint_can_release_mdl(THD *const) {
return true;
}

View file

@ -364,10 +364,6 @@ static int discover_table_existence(handlerton *hton, const char *db,
return !parse_table_name(table_name, strlen(table_name), &from, &to, &step);
}
static int dummy_commit_rollback(handlerton *, THD *, bool) { return 0; }
static int dummy_savepoint(handlerton *, THD *, void *) { return 0; }
/*****************************************************************************
Example of a simple group by handler for queries like:
SELECT SUM(seq) from sequence_table;
@ -535,9 +531,10 @@ static int init(void *p)
hton->drop_table= drop_table;
hton->discover_table= discover_table;
hton->discover_table_existence= discover_table_existence;
hton->commit= hton->rollback= dummy_commit_rollback;
hton->commit= hton->rollback= [](THD *, bool) { return 0; };
hton->savepoint_set= hton->savepoint_rollback= hton->savepoint_release=
dummy_savepoint;
[](THD *, void *) { return 0; };
hton->create_group_by= create_group_by_handler;
hton->update_optimizer_costs= sequence_update_optimizer_costs;
return 0;

View file

@ -649,7 +649,7 @@ template int CSphSEQuery::ParseArray<longlong> ( longlong **, const char * );
static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root );
static int sphinx_init_func ( void * p );
static int sphinx_close_connection ( handlerton * hton, THD * thd );
static int sphinx_close_connection ( THD * thd );
static int sphinx_panic ( handlerton * hton, enum ha_panic_function flag );
static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print, enum ha_stat_type stat_type );
@ -763,11 +763,11 @@ static bool sphinx_init_func_for_handlerton ()
#if MYSQL_VERSION_ID>50100
static int sphinx_close_connection ( handlerton * hton, THD * thd )
static int sphinx_close_connection ( THD * thd )
{
// deallocate common handler data
SPH_ENTER_FUNC();
CSphTLS * pTls = (CSphTLS *) thd_get_ha_data ( thd, hton );
CSphTLS * pTls = (CSphTLS *) thd_get_ha_data ( thd, sphinx_hton_ptr );
SafeDelete ( pTls );
SPH_RET(0);
}

View file

@ -6170,7 +6170,6 @@ handler* spider_create_handler(
}
int spider_close_connection(
handlerton* hton,
THD* thd
) {
int roop_count = 0;
@ -6200,7 +6199,7 @@ int spider_close_connection(
SPIDER_CONN_RESTORE_DASTATUS;
}
spider_rollback(spider_hton_ptr, thd, TRUE);
spider_rollback(thd, TRUE);
Dummy_error_handler deh; // suppress network errors at this stage
thd->push_internal_handler(&deh);

View file

@ -2965,7 +2965,6 @@ error_open_table:
}
int spider_start_consistent_snapshot(
handlerton *hton,
THD* thd
) {
int error_num;
@ -3053,7 +3052,6 @@ error:
}
int spider_commit(
handlerton *hton,
THD *thd,
bool all
) {
@ -3091,7 +3089,7 @@ int spider_commit(
{
*/
/* rollback for semi_trx */
spider_rollback(hton, thd, all);
spider_rollback(thd, all);
/*
}
*/
@ -3148,7 +3146,6 @@ int spider_commit(
}
int spider_rollback(
handlerton *hton,
THD *thd,
bool all
) {
@ -3225,7 +3222,6 @@ int spider_rollback(
}
int spider_xa_prepare(
handlerton *hton,
THD* thd,
bool all
) {
@ -3260,7 +3256,6 @@ error:
}
int spider_xa_recover(
handlerton *hton,
XID* xid_list,
uint len
) {
@ -3276,7 +3271,6 @@ int spider_xa_recover(
}
int spider_xa_commit_by_xid(
handlerton *hton,
XID* xid
) {
SPIDER_TRX *trx;
@ -3298,7 +3292,6 @@ error_get_trx:
}
int spider_xa_rollback_by_xid(
handlerton *hton,
XID* xid
) {
SPIDER_TRX *trx;

View file

@ -180,41 +180,34 @@ int spider_internal_xa_rollback_by_xid(
);
int spider_start_consistent_snapshot(
handlerton *hton,
THD* thd
);
int spider_commit(
handlerton *hton,
THD *thd,
bool all
);
int spider_rollback(
handlerton *hton,
THD *thd,
bool all
);
int spider_xa_prepare(
handlerton *hton,
THD* thd,
bool all
);
int spider_xa_recover(
handlerton *hton,
XID* xid_list,
uint len
);
int spider_xa_commit_by_xid(
handlerton *hton,
XID* xid
);
int spider_xa_rollback_by_xid(
handlerton *hton,
XID* xid
);