mirror of
https://github.com/MariaDB/server.git
synced 2026-05-15 19:37:16 +02:00
MWL#116: Efficient group commit: PBXT part
Implement the commit_ordered() API in PBXT, getting consistent commit ordering with other engines and binlog. Make pbxt_support_xa default in MariaDB debug build (as the bug that causes assert in MySQL is fixed in MariaDB).
This commit is contained in:
parent
8bc445360e
commit
fdb2ad5fb8
5 changed files with 253 additions and 47 deletions
|
|
@ -108,6 +108,9 @@ static int pbxt_end(void *p);
|
|||
static int pbxt_panic(handlerton *hton, enum ha_panic_function flag);
|
||||
static void pbxt_drop_database(handlerton *hton, char *path);
|
||||
static int pbxt_close_connection(handlerton *hton, THD* thd);
|
||||
#ifdef MARIADB_BASE_VERSION
|
||||
static void pbxt_commit_ordered(handlerton *hton, THD *thd, bool all);
|
||||
#endif
|
||||
static int pbxt_commit(handlerton *hton, THD *thd, bool all);
|
||||
static int pbxt_rollback(handlerton *hton, THD *thd, bool all);
|
||||
static int pbxt_prepare(handlerton *hton, THD *thd, bool all);
|
||||
|
|
@ -1147,6 +1150,9 @@ static int pbxt_init(void *p)
|
|||
pbxt_hton->state = SHOW_OPTION_YES;
|
||||
pbxt_hton->db_type = DB_TYPE_PBXT; // Wow! I have my own!
|
||||
pbxt_hton->close_connection = pbxt_close_connection; /* close_connection, cleanup thread related data. */
|
||||
#ifdef MARIADB_BASE_VERSION
|
||||
pbxt_hton->commit_ordered = pbxt_commit_ordered;
|
||||
#endif
|
||||
pbxt_hton->commit = pbxt_commit; /* commit */
|
||||
pbxt_hton->rollback = pbxt_rollback; /* rollback */
|
||||
if (pbxt_support_xa) {
|
||||
|
|
@ -1484,6 +1490,29 @@ static int pbxt_start_consistent_snapshot(handlerton *hton, THD *thd)
|
|||
return err;
|
||||
}
|
||||
|
||||
#ifdef MARIADB_BASE_VERSION
|
||||
/*
|
||||
* Quickly commit the transaction to memory and make it visible to others.
|
||||
* The remaining part of commit will happen later, in pbxt_commit().
|
||||
*/
|
||||
static void pbxt_commit_ordered(handlerton *hton, THD *thd, bool all)
|
||||
{
|
||||
XTThreadPtr self;
|
||||
|
||||
if ((self = (XTThreadPtr) *thd_ha_data(thd, hton))) {
|
||||
XT_PRINT2(self, "%s pbxt_commit_ordered all=%d\n", all ? "END CONN XACT" : "END STAT", all);
|
||||
|
||||
if (self->st_xact_data) {
|
||||
if (all || self->st_auto_commit) {
|
||||
self->st_commit_ordered = TRUE;
|
||||
self->st_writer = self->st_xact_writer;
|
||||
self->st_delayed_error= !xt_xn_commit_fast(self, self->st_writer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Commit the PBXT transaction of the given thread.
|
||||
* thd is the MySQL thread structure.
|
||||
|
|
@ -1512,7 +1541,13 @@ static int pbxt_commit(handlerton *hton, THD *thd, bool all)
|
|||
if (all || self->st_auto_commit) {
|
||||
XT_PRINT0(self, "xt_xn_commit in pbxt_commit\n");
|
||||
|
||||
if (!xt_xn_commit(self))
|
||||
if (self->st_commit_ordered) {
|
||||
self->st_commit_ordered = FALSE;
|
||||
err = !xt_xn_commit_slow(self, self->st_writer) || self->st_delayed_error;
|
||||
} else {
|
||||
err = !xt_xn_commit(self);
|
||||
}
|
||||
if (err)
|
||||
err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
|
||||
}
|
||||
}
|
||||
|
|
@ -6064,7 +6099,7 @@ static MYSQL_SYSVAR_INT(max_threads, pbxt_max_threads,
|
|||
NULL, NULL, 0, 0, 20000, 1);
|
||||
#endif
|
||||
|
||||
#ifndef DEBUG
|
||||
#if !defined(DEBUG) || defined(MARIADB_BASE_VERSION)
|
||||
static MYSQL_SYSVAR_BOOL(support_xa, pbxt_support_xa,
|
||||
PLUGIN_VAR_OPCMDARG,
|
||||
"Enable PBXT support for the XA two-phase commit, default is enabled",
|
||||
|
|
|
|||
|
|
@ -299,6 +299,9 @@ typedef struct XTThread {
|
|||
xtBool st_stat_ended; /* TRUE if the statement was ended. */
|
||||
xtBool st_stat_trans; /* TRUE if a statement transaction is running (started on UPDATE). */
|
||||
xtBool st_stat_modify; /* TRUE if the statement is an INSERT/UPDATE/DELETE */
|
||||
xtBool st_commit_ordered; /* TRUE if we have run commit_ordered() */
|
||||
xtBool st_delayed_error; /* TRUE if we got an error in commit_ordered() */
|
||||
xtBool st_writer; /* Copy of thread->st_xact_writer (which is clobbered by xlog_append()) */
|
||||
#ifdef XT_IMPLEMENT_NO_ACTION
|
||||
XTBasicListRec st_restrict_list; /* These records have been deleted and should have no reference. */
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -1287,27 +1287,61 @@ xtPublic xtBool xt_xn_begin(XTThreadPtr self)
|
|||
return OK;
|
||||
}
|
||||
|
||||
static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
|
||||
static void xn_end_release_locks(XTThreadPtr thread)
|
||||
{
|
||||
XTXactDataPtr xact = thread->st_xact_data;
|
||||
XTDatabaseHPtr db = thread->st_database;
|
||||
ASSERT_NS(xact);
|
||||
|
||||
/* {REMOVE-LOCKS} Drop locks if you have any: */
|
||||
thread->st_lock_list.xt_remove_all_locks(db, thread);
|
||||
|
||||
/* Do this afterwards to make sure the sweeper
|
||||
* does not cleanup transactions start cleaning up
|
||||
* before any transactions that were waiting for
|
||||
* this transaction have completed!
|
||||
*/
|
||||
xact->xd_end_xn_id = db->db_xn_curr_id;
|
||||
|
||||
/* Now you can sweep! */
|
||||
xact->xd_flags |= XT_XN_XAC_SWEEP;
|
||||
}
|
||||
|
||||
/* The commit is split into two phases: one "fast" for MariaDB commit_ordered(),
|
||||
* and one "slow" for commit(). When not using internal 2pc, there is only one
|
||||
* call combining both phases.
|
||||
*/
|
||||
|
||||
enum {
|
||||
XN_END_PHASE_FAST = 1,
|
||||
XN_END_PHASE_SLOW = 2,
|
||||
XN_END_PHASE_BOTH = 3
|
||||
};
|
||||
|
||||
static xtBool xn_end_xact(XTThreadPtr thread, u_int status, xtBool writer, int phase)
|
||||
{
|
||||
XTXactDataPtr xact;
|
||||
xtBool ok = TRUE;
|
||||
xtBool err;
|
||||
|
||||
ASSERT_NS(thread->st_xact_data);
|
||||
if ((xact = thread->st_xact_data)) {
|
||||
XTDatabaseHPtr db = thread->st_database;
|
||||
xtXactID xn_id = xact->xd_start_xn_id;
|
||||
xtBool writer;
|
||||
|
||||
if ((writer = thread->st_xact_writer)) {
|
||||
if (writer) {
|
||||
/* The transaction wrote something: */
|
||||
XTXactEndEntryDRec entry;
|
||||
xtWord4 sum;
|
||||
|
||||
sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
|
||||
entry.xe_status_1 = status;
|
||||
entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
|
||||
XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
|
||||
XT_SET_DISK_4(entry.xe_not_used_4, 0);
|
||||
if (phase & XN_END_PHASE_FAST)
|
||||
{
|
||||
sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
|
||||
entry.xe_status_1 = status;
|
||||
entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
|
||||
XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
|
||||
XT_SET_DISK_4(entry.xe_not_used_4, 0);
|
||||
}
|
||||
|
||||
#ifdef XT_IMPLEMENT_NO_ACTION
|
||||
/* This will check any resticts that have been delayed to the end of the statement. */
|
||||
|
|
@ -1319,20 +1353,35 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
|
|||
}
|
||||
#endif
|
||||
|
||||
/* Flush the data log: */
|
||||
if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
|
||||
/* Flush the data log (in the "fast" case we already did it in prepare: */
|
||||
if ((phase & XN_END_PHASE_SLOW) && !thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
|
||||
ok = FALSE;
|
||||
status = XT_LOG_ENT_ABORT;
|
||||
}
|
||||
|
||||
/* Write and flush the transaction log: */
|
||||
if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit)) {
|
||||
if (phase == XN_END_PHASE_FAST) {
|
||||
/* Fast phase, delay any write or flush to later. */
|
||||
err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, XT_XLOG_NO_WRITE_NO_FLUSH);
|
||||
} else if (phase == XN_END_PHASE_SLOW) {
|
||||
/* We already appended the commit record in the fast phase.
|
||||
* Now just call with empty record to ensure we write/flush
|
||||
* the log as needed for this commit.
|
||||
*/
|
||||
err = !xt_xlog_log_data(thread, 0, NULL, xt_db_flush_log_at_trx_commit);
|
||||
} else /* phase == XN_END_PHASE_BOTH */ {
|
||||
/* Both phases at once, append commit record and write/flush normally. */
|
||||
ASSERT_NS(phase == XN_END_PHASE_BOTH);
|
||||
err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit);
|
||||
}
|
||||
|
||||
if (err) {
|
||||
ok = FALSE;
|
||||
status = XT_LOG_ENT_ABORT;
|
||||
/* Make sure this is done, if we failed to log
|
||||
* the transction end!
|
||||
*/
|
||||
if (thread->st_xact_writer) {
|
||||
if (writer) {
|
||||
/* Adjust this in case of error, but don't forget
|
||||
* to lock!
|
||||
*/
|
||||
|
|
@ -1347,46 +1396,46 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
|
|||
}
|
||||
}
|
||||
|
||||
/* Setting this flag completes the transaction,
|
||||
* Do this before we release the locks, because
|
||||
* the unlocked transactions expect the
|
||||
* transaction they are waiting for to be
|
||||
* gone!
|
||||
*/
|
||||
xact->xd_end_time = ++db->db_xn_end_time;
|
||||
if (status == XT_LOG_ENT_COMMIT) {
|
||||
thread->st_statistics.st_commits++;
|
||||
xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
|
||||
}
|
||||
else {
|
||||
thread->st_statistics.st_rollbacks++;
|
||||
xact->xd_flags |= XT_XN_XAC_ENDED;
|
||||
if (phase & XN_END_PHASE_FAST) {
|
||||
/* Setting this flag completes the transaction,
|
||||
* Do this before we release the locks, because
|
||||
* the unlocked transactions expect the
|
||||
* transaction they are waiting for to be
|
||||
* gone!
|
||||
*/
|
||||
xact->xd_end_time = ++db->db_xn_end_time;
|
||||
if (status == XT_LOG_ENT_COMMIT) {
|
||||
thread->st_statistics.st_commits++;
|
||||
xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
|
||||
}
|
||||
else {
|
||||
thread->st_statistics.st_rollbacks++;
|
||||
xact->xd_flags |= XT_XN_XAC_ENDED;
|
||||
}
|
||||
}
|
||||
|
||||
/* {REMOVE-LOCKS} Drop locks is you have any: */
|
||||
thread->st_lock_list.xt_remove_all_locks(db, thread);
|
||||
|
||||
/* Do this afterwards to make sure the sweeper
|
||||
* does not cleanup transactions start cleaning up
|
||||
* before any transactions that were waiting for
|
||||
* this transaction have completed!
|
||||
/* Be as fast as possible in the "fast" path, as we want to be as
|
||||
* fast as possible here (we will release slow locks immediately
|
||||
* after in the "slow" part).
|
||||
* ToDo: If we ran the fast part, the slow part could release locks
|
||||
* _before_ fsync(), rather than after.
|
||||
*/
|
||||
xact->xd_end_xn_id = db->db_xn_curr_id;
|
||||
if (!(phase & XN_END_PHASE_SLOW))
|
||||
return ok;
|
||||
|
||||
/* Now you can sweep! */
|
||||
xact->xd_flags |= XT_XN_XAC_SWEEP;
|
||||
xn_end_release_locks(thread);
|
||||
}
|
||||
else {
|
||||
/* Read-only transaction can be removed, immediately */
|
||||
xact->xd_end_time = ++db->db_xn_end_time;
|
||||
xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
|
||||
if (phase & XN_END_PHASE_FAST) {
|
||||
xact->xd_end_time = ++db->db_xn_end_time;
|
||||
xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
|
||||
|
||||
/* Drop locks is you have any: */
|
||||
thread->st_lock_list.xt_remove_all_locks(db, thread);
|
||||
if (!(phase & XN_END_PHASE_SLOW))
|
||||
return ok;
|
||||
}
|
||||
|
||||
xact->xd_end_xn_id = db->db_xn_curr_id;
|
||||
|
||||
xact->xd_flags |= XT_XN_XAC_SWEEP;
|
||||
xn_end_release_locks(thread);
|
||||
|
||||
if (xt_xn_delete_xact(db, xn_id, thread)) {
|
||||
if (db->db_xn_min_ram_id == xn_id)
|
||||
|
|
@ -1478,12 +1527,22 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
|
|||
|
||||
xtPublic xtBool xt_xn_commit(XTThreadPtr thread)
|
||||
{
|
||||
return xn_end_xact(thread, XT_LOG_ENT_COMMIT);
|
||||
return xn_end_xact(thread, XT_LOG_ENT_COMMIT, thread->st_xact_writer, XN_END_PHASE_BOTH);
|
||||
}
|
||||
|
||||
xtPublic xtBool xt_xn_commit_fast(XTThreadPtr thread, xtBool writer)
|
||||
{
|
||||
return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, XN_END_PHASE_FAST);
|
||||
}
|
||||
|
||||
xtPublic xtBool xt_xn_commit_slow(XTThreadPtr thread, xtBool writer)
|
||||
{
|
||||
return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, XN_END_PHASE_SLOW);
|
||||
}
|
||||
|
||||
xtPublic xtBool xt_xn_rollback(XTThreadPtr thread)
|
||||
{
|
||||
return xn_end_xact(thread, XT_LOG_ENT_ABORT);
|
||||
return xn_end_xact(thread, XT_LOG_ENT_ABORT, thread->st_xact_writer, XN_END_PHASE_BOTH);
|
||||
}
|
||||
|
||||
xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)
|
||||
|
|
|
|||
|
|
@ -193,6 +193,8 @@ void xt_wakeup_sweeper(struct XTDatabase *db);
|
|||
|
||||
xtBool xt_xn_begin(struct XTThread *self);
|
||||
xtBool xt_xn_commit(struct XTThread *self);
|
||||
xtBool xt_xn_commit_fast(struct XTThread *self, xtBool writer);
|
||||
xtBool xt_xn_commit_slow(struct XTThread *self, xtBool writer);
|
||||
xtBool xt_xn_rollback(struct XTThread *self);
|
||||
xtBool xt_xn_log_tab_id(struct XTThread *self, xtTableID tab_id);
|
||||
int xt_xn_status(struct XTOpenTable *ot, xtXactID xn_id, xtRecordID rec_id);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue