MDEV-25062: Reduce trx_rseg_t::mutex contention

redo_rseg_mutex, noredo_rseg_mutex: Remove the PERFORMANCE_SCHEMA keys.
The rollback segment mutex will be uninstrumented.

trx_sys_t: Remove pointer indirection for rseg_array, temp_rseg.
Align each element to the cache line.

trx_sys_t::rseg_id(): Replaces trx_rseg_t::id.

trx_rseg_t::ref: Replaces needs_purge, trx_ref_count, skip_allocation
in a single std::atomic<uint32_t>.

trx_rseg_t::latch: Replaces trx_rseg_t::mutex.

trx_rseg_t::history_size: Replaces trx_sys_t::rseg_history_len

trx_sys_t::history_size_approx(): Replaces trx_sys.rseg_history_len
in those places where the exact count does not matter. We must not
acquire any trx_rseg_t::latch while holding index page latches, because
normally the trx_rseg_t::latch is acquired before any page latches.

trx_sys_t::history_exists(): Replaces trx_sys.rseg_history_len!=0
with an approximation.

We remove some unnecessary trx_rseg_t::latch acquisition around
trx_undo_set_state_at_prepare() and trx_undo_set_state_at_finish().
Those operations will only access fields that remain constant
after trx_rseg_t::init().
This commit is contained in:
Marko Mäkelä 2021-06-23 13:42:11 +03:00
parent b3e8788009
commit 6e12ebd4a7
18 changed files with 477 additions and 494 deletions

View file

@ -1462,8 +1462,9 @@ btr_cur_search_to_nth_level_func(
Free blocks and read IO bandwidth should be prior
for them, when the history list is glowing huge. */
if (lock_intention == BTR_INTENTION_DELETE
&& trx_sys.rseg_history_len > BTR_CUR_FINE_HISTORY_LENGTH
&& buf_pool.n_pend_reads) {
&& buf_pool.n_pend_reads
&& trx_sys.history_size_approx()
> BTR_CUR_FINE_HISTORY_LENGTH) {
x_latch_index:
mtr_x_lock_index(index, mtr);
} else if (index->is_spatial()
@ -2575,8 +2576,9 @@ btr_cur_open_at_index_side(
Free blocks and read IO bandwidth should be prior
for them, when the history list is glowing huge. */
if (lock_intention == BTR_INTENTION_DELETE
&& trx_sys.rseg_history_len > BTR_CUR_FINE_HISTORY_LENGTH
&& buf_pool.n_pend_reads) {
&& buf_pool.n_pend_reads
&& trx_sys.history_size_approx()
> BTR_CUR_FINE_HISTORY_LENGTH) {
mtr_x_lock_index(index, mtr);
} else {
mtr_sx_lock_index(index, mtr);
@ -2898,8 +2900,9 @@ btr_cur_open_at_rnd_pos(
Free blocks and read IO bandwidth should be prior
for them, when the history list is glowing huge. */
if (lock_intention == BTR_INTENTION_DELETE
&& trx_sys.rseg_history_len > BTR_CUR_FINE_HISTORY_LENGTH
&& buf_pool.n_pend_reads) {
&& buf_pool.n_pend_reads
&& trx_sys.history_size_approx()
> BTR_CUR_FINE_HISTORY_LENGTH) {
mtr_x_lock_index(index, mtr);
} else {
mtr_sx_lock_index(index, mtr);

View file

@ -216,15 +216,15 @@ static my_bool innodb_read_only_compressed;
/** A dummy variable */
static uint innodb_max_purge_lag_wait;
/** Wait for trx_sys_t::rseg_history_len to be below a limit. */
/** Wait for trx_sys.history_size() to be below a limit. */
static void innodb_max_purge_lag_wait_update(THD *thd, st_mysql_sys_var *,
void *, const void *limit)
{
const uint l= *static_cast<const uint*>(limit);
if (trx_sys.rseg_history_len <= l)
if (!trx_sys.history_exceeds(l))
return;
mysql_mutex_unlock(&LOCK_global_system_variables);
while (trx_sys.rseg_history_len > l)
while (trx_sys.history_exceeds(l))
{
if (thd_kill_level(thd))
break;
@ -520,8 +520,6 @@ mysql_pfs_key_t log_flush_order_mutex_key;
mysql_pfs_key_t recalc_pool_mutex_key;
mysql_pfs_key_t purge_sys_pq_mutex_key;
mysql_pfs_key_t recv_sys_mutex_key;
mysql_pfs_key_t redo_rseg_mutex_key;
mysql_pfs_key_t noredo_rseg_mutex_key;
mysql_pfs_key_t page_zip_stat_per_index_mutex_key;
mysql_pfs_key_t rtr_active_mutex_key;
mysql_pfs_key_t rtr_match_mutex_key;
@ -564,8 +562,6 @@ static PSI_mutex_info all_innodb_mutexes[] = {
PSI_KEY(page_zip_stat_per_index_mutex),
PSI_KEY(purge_sys_pq_mutex),
PSI_KEY(recv_sys_mutex),
PSI_KEY(redo_rseg_mutex),
PSI_KEY(noredo_rseg_mutex),
PSI_KEY(srv_innodb_monitor_mutex),
PSI_KEY(srv_misc_tmpfile_mutex),
PSI_KEY(srv_monitor_file_mutex),

View file

@ -27,7 +27,7 @@ Created 3/26/1996 Heikki Tuuri
#ifndef trx0purge_h
#define trx0purge_h
#include "trx0rseg.h"
#include "trx0sys.h"
#include "que0types.h"
#include "srw_lock.h"

View file

@ -24,16 +24,12 @@ Rollback segment
Created 3/26/1996 Heikki Tuuri
*******************************************************/
#ifndef trx0rseg_h
#define trx0rseg_h
#include "trx0sys.h"
#pragma once
#include "trx0types.h"
#include "fut0lst.h"
#ifdef UNIV_PFS_MUTEX
extern mysql_pfs_key_t redo_rseg_mutex_key;
extern mysql_pfs_key_t noredo_rseg_mutex_key;
#endif /* UNIV_PFS_MUTEX */
#ifdef WITH_WSREP
# include "trx0xa.h"
#endif /* WITH_WSREP */
/** Gets a rollback segment header.
@param[in] space space where placed
@ -73,21 +69,8 @@ trx_rseg_header_create(
/** Initialize or recover the rollback segments at startup. */
dberr_t trx_rseg_array_init();
/** Free a rollback segment in memory. */
void
trx_rseg_mem_free(trx_rseg_t* rseg);
/** Create a persistent rollback segment.
@param[in] space_id system or undo tablespace id
@return pointer to new rollback segment
@retval NULL on failure */
trx_rseg_t*
trx_rseg_create(ulint space_id)
MY_ATTRIBUTE((warn_unused_result));
/** Create the temporary rollback segments. */
void
trx_temp_rseg_create();
void trx_temp_rseg_create();
/* Number of undo log slots in a rollback segment file copy */
#define TRX_RSEG_N_SLOTS (srv_page_size / 16)
@ -96,34 +79,93 @@ trx_temp_rseg_create();
#define TRX_RSEG_MAX_N_TRXS (TRX_RSEG_N_SLOTS / 2)
/** The rollback segment memory object */
struct trx_rseg_t {
/*--------------------------------------------------------*/
/** rollback segment id == the index of its slot in the trx
system file copy */
ulint id;
struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) trx_rseg_t
{
/** tablespace containing the rollback segment; constant after init() */
fil_space_t *space;
/** latch protecting everything except page_no, space */
srw_lock_low latch;
/** rollback segment header page number; constant after init() */
uint32_t page_no;
/** length of the TRX_RSEG_HISTORY list (number of transactions) */
uint32_t history_size;
/** mutex protecting the fields in this struct except id,space,page_no
which are constant */
mysql_mutex_t mutex;
private:
/** Reference counter to track rseg allocated transactions,
with SKIP and NEEDS_PURGE flags. */
std::atomic<uint32_t> ref;
/** space where the rollback segment header is placed */
fil_space_t* space;
/** Whether undo tablespace truncation is pending */
static constexpr uint32_t SKIP= 1;
/** Whether the log segment needs purge */
static constexpr uint32_t NEEDS_PURGE= 2;
/** Transaction reference count multiplier */
static constexpr uint32_t REF= 4;
/** page number of the rollback segment header */
uint32_t page_no;
uint32_t ref_load() const { return ref.load(std::memory_order_relaxed); }
public:
/** current size in pages */
uint32_t curr_size;
/** Initialize the fields that are not zero-initialized. */
void init(fil_space_t *space, uint32_t page);
/** Reinitialize the fields on undo tablespace truncation. */
void reinit(uint32_t page);
/** Clean up. */
void destroy();
/*--------------------------------------------------------*/
/* Fields for undo logs */
/** List of undo logs */
UT_LIST_BASE_NODE_T(trx_undo_t) undo_list;
/** Note that undo tablespace truncation was started. */
void set_skip_allocation()
{ ut_ad(is_persistent()); ref.fetch_or(SKIP, std::memory_order_relaxed); }
/** Note that undo tablespace truncation was completed. */
void clear_skip_allocation()
{
ut_ad(is_persistent());
ut_d(auto r=) ref.fetch_and(~SKIP, std::memory_order_relaxed);
ut_ad(r == SKIP);
}
/** Note that the rollback segment requires purge. */
void set_needs_purge()
{ ref.fetch_or(NEEDS_PURGE, std::memory_order_relaxed); }
/** Note that the rollback segment will not require purge. */
void clear_needs_purge()
{ ref.fetch_and(~NEEDS_PURGE, std::memory_order_relaxed); }
/** @return whether the segment is marked for undo truncation */
bool skip_allocation() const { return ref_load() & SKIP; }
/** @return whether the segment needs purge */
bool needs_purge() const { return ref_load() & NEEDS_PURGE; }
/** Increment the reference count */
void acquire()
{ ut_d(auto r=) ref.fetch_add(REF); ut_ad(!(r & SKIP)); }
/** Increment the reference count if possible
@retval true if the reference count was incremented
@retval false if skip_allocation() holds */
bool acquire_if_available()
{
uint32_t r= 0;
while (!ref.compare_exchange_weak(r, r + REF,
std::memory_order_relaxed,
std::memory_order_relaxed))
if (r & SKIP)
return false;
return true;
}
/** List of undo log segments cached for fast reuse */
UT_LIST_BASE_NODE_T(trx_undo_t) undo_cached;
/** Decrement the reference count */
void release()
{
ut_d(const auto r=)
ref.fetch_sub(REF, std::memory_order_relaxed);
ut_ad(r >= REF);
}
/** @return whether references exist */
bool is_referenced() const { return ref_load() >= REF; }
/*--------------------------------------------------------*/
/** current size in pages */
uint32_t curr_size;
/** List of undo logs (transactions) */
UT_LIST_BASE_NODE_T(trx_undo_t) undo_list;
/** List of undo log segments cached for fast reuse */
UT_LIST_BASE_NODE_T(trx_undo_t) undo_cached;
/** Last not yet purged undo log header; FIL_NULL if all purged */
uint32_t last_page_no;
@ -131,16 +173,6 @@ struct trx_rseg_t {
/** trx_t::no | last_offset << 48 */
uint64_t last_commit_and_offset;
/** Whether the log segment needs purge */
bool needs_purge;
/** Reference counter to track rseg allocated transactions. */
ulint trx_ref_count;
/** If true, then skip allocating this rseg as it reside in
UNDO-tablespace marked for truncate. */
bool skip_allocation;
/** @return the commit ID of the last committed transaction */
trx_id_t last_trx_no() const
{ return last_commit_and_offset & ((1ULL << 48) - 1); }
@ -153,24 +185,21 @@ struct trx_rseg_t {
last_commit_and_offset= static_cast<uint64_t>(last_offset) << 48 | trx_no;
}
/** @return whether the rollback segment is persistent */
bool is_persistent() const
{
ut_ad(space == fil_system.temp_space
|| space == fil_system.sys_space
|| (srv_undo_space_id_start > 0
&& space->id >= srv_undo_space_id_start
&& space->id <= srv_undo_space_id_start
+ TRX_SYS_MAX_UNDO_SPACES));
ut_ad(space == fil_system.temp_space
|| space == fil_system.sys_space
|| (srv_undo_space_id_start > 0
&& space->id >= srv_undo_space_id_start
&& space->id <= srv_undo_space_id_start
+ srv_undo_tablespaces_open)
|| !srv_was_started);
return(space->id != SRV_TMP_SPACE_ID);
}
/** @return whether the rollback segment is persistent */
bool is_persistent() const
{
ut_ad(space == fil_system.temp_space || space == fil_system.sys_space ||
(srv_undo_space_id_start > 0 &&
space->id >= srv_undo_space_id_start &&
space->id <= srv_undo_space_id_start + TRX_SYS_MAX_UNDO_SPACES));
ut_ad(space == fil_system.temp_space || space == fil_system.sys_space ||
!srv_was_started ||
(srv_undo_space_id_start > 0 &&
space->id >= srv_undo_space_id_start
&& space->id <= srv_undo_space_id_start +
srv_undo_tablespaces_open));
return space->id != SRV_TMP_SPACE_ID;
}
};
/* Undo log segment slot in a rollback segment header */
@ -278,5 +307,3 @@ void trx_rseg_update_binlog_offset(buf_block_t *rseg_header, const trx_t *trx,
mtr_t *mtr);
#include "trx0rseg.ic"
#endif

View file

@ -27,7 +27,7 @@ Created 3/26/1996 Heikki Tuuri
#pragma once
#include "buf0buf.h"
#include "fil0fil.h"
#include "trx0types.h"
#include "trx0rseg.h"
#include "mem0mem.h"
#include "mtr0mtr.h"
#include "ut0byte.h"
@ -35,9 +35,6 @@ Created 3/26/1996 Heikki Tuuri
#include "read0types.h"
#include "page0types.h"
#include "trx0trx.h"
#ifdef WITH_WSREP
#include "trx0xa.h"
#endif /* WITH_WSREP */
#include "ilist.h"
#include "my_cpu.h"
@ -157,13 +154,6 @@ from older MySQL or MariaDB versions. */
/*!< the start of the array of
rollback segment specification
slots */
/*------------------------------------------------------------- @} */
/** The number of rollback segments; rollback segment id must fit in
the 7 bits reserved for it in DB_ROLL_PTR. */
#define TRX_SYS_N_RSEGS 128
/** Maximum number of undo tablespaces (not counting the system tablespace) */
#define TRX_SYS_MAX_UNDO_SPACES (TRX_SYS_N_RSEGS - 1)
/* Rollback segment specification slot offsets */
@ -871,26 +861,14 @@ class trx_sys_t
bool m_initialised;
public:
/**
TRX_RSEG_HISTORY list length (number of committed transactions to purge)
*/
MY_ALIGNED(CACHE_LINE_SIZE) Atomic_counter<uint32_t> rseg_history_len;
/** List of all transactions. */
thread_safe_trx_ilist_t trx_list;
MY_ALIGNED(CACHE_LINE_SIZE)
/** Temporary rollback segments */
trx_rseg_t* temp_rsegs[TRX_SYS_N_RSEGS];
/** Temporary rollback segments */
trx_rseg_t temp_rsegs[TRX_SYS_N_RSEGS];
MY_ALIGNED(CACHE_LINE_SIZE)
trx_rseg_t* rseg_array[TRX_SYS_N_RSEGS];
/*!< Pointer array to rollback
segments; NULL if slot not in use;
created and destroyed in
single-threaded mode; not protected
by any mutex, because it is read-only
during multi-threaded operation */
/** Persistent rollback segments; space==nullptr if slot not in use */
trx_rseg_t rseg_array[TRX_SYS_N_RSEGS];
/**
Lock-free hash of in memory read-write transactions.
@ -922,6 +900,32 @@ public:
trx_sys_t(): m_initialised(false) {}
/**
@return TRX_RSEG_HISTORY length (number of committed transactions to purge)
*/
uint32_t history_size();
/**
Check whether history_size() exceeds a specified number.
@param threshold number of committed transactions
@return whether TRX_RSEG_HISTORY length exceeds the threshold
*/
bool history_exceeds(uint32_t threshold);
/**
@return approximate history_size(), without latch protection
*/
TPOOL_SUPPRESS_TSAN uint32_t history_size_approx() const;
/**
@return whether history_size() is nonzero (with some race condition)
*/
TPOOL_SUPPRESS_TSAN bool history_exists();
/**
Returns the minimum trx id in rw trx list.
@ -1043,7 +1047,7 @@ public:
}
bool is_initialised() { return m_initialised; }
bool is_initialised() const { return m_initialised; }
/** Initialise the transaction subsystem. */
@ -1056,6 +1060,22 @@ public:
ulint any_active_transactions();
/**
Determine the rollback segment identifier.
@param rseg rollback segment
@param persistent whether the rollback segment is persistent
@return the rollback segment identifier
*/
unsigned rseg_id(const trx_rseg_t *rseg, bool persistent) const
{
const trx_rseg_t *array= persistent ? rseg_array : temp_rsegs;
ut_ad(rseg >= array);
ut_ad(rseg < &array[TRX_SYS_N_RSEGS]);
return static_cast<unsigned>(rseg - array);
}
/**
Registers read-write transaction.

View file

@ -108,3 +108,9 @@ typedef byte trx_undo_rec_t;
/* @} */
typedef std::vector<trx_id_t, ut_allocator<trx_id_t> > trx_ids_t;
/** The number of rollback segments; rollback segment id must fit in
the 7 bits reserved for it in DB_ROLL_PTR. */
static constexpr unsigned TRX_SYS_N_RSEGS= 128;
/** Maximum number of undo tablespaces (not counting the system tablespace) */
static constexpr unsigned TRX_SYS_MAX_UNDO_SPACES= TRX_SYS_N_RSEGS - 1;

View file

@ -4146,7 +4146,7 @@ lock_print_info_summary(
? (purge_sys.running() ? "running"
: purge_sys.paused() ? "stopped" : "running but idle")
: "disabled",
uint32_t{trx_sys.rseg_history_len});
trx_sys.history_size());
#ifdef PRINT_NUM_OF_LOCK_STRUCTS
fprintf(file,

View file

@ -893,7 +893,6 @@ skip_secondaries:
= upd_get_nth_field(node->update, i);
if (dfield_is_ext(&ufield->new_val)) {
trx_rseg_t* rseg;
buf_block_t* block;
byte* data_field;
bool is_insert;
@ -918,11 +917,8 @@ skip_secondaries:
&is_insert, &rseg_id,
&page_no, &offset);
rseg = trx_sys.rseg_array[rseg_id];
ut_a(rseg != NULL);
ut_ad(rseg->id == rseg_id);
ut_ad(rseg->is_persistent());
const trx_rseg_t &rseg = trx_sys.rseg_array[rseg_id];
ut_ad(rseg.is_persistent());
mtr.start();
@ -945,7 +941,7 @@ skip_secondaries:
btr_root_get(index, &mtr);
block = buf_page_get(
page_id_t(rseg->space->id, page_no),
page_id_t(rseg.space->id, page_no),
0, RW_X_LATCH, &mtr);
data_field = buf_block_get_frame(block)

View file

@ -287,6 +287,7 @@ static bool row_undo_rec_get(undo_node_t* node)
trx_undo_t* update = trx->rsegs.m_redo.undo;
trx_undo_t* temp = trx->rsegs.m_noredo.undo;
const undo_no_t limit = trx->roll_limit;
bool is_temp = false;
ut_ad(!update || !temp || update->empty() || temp->empty()
|| update->top_undo_no != temp->top_undo_no);
@ -300,10 +301,9 @@ static bool row_undo_rec_get(undo_node_t* node)
}
if (temp && !temp->empty() && temp->top_undo_no >= limit) {
if (!undo) {
undo = temp;
} else if (undo->top_undo_no < temp->top_undo_no) {
if (!undo || undo->top_undo_no < temp->top_undo_no) {
undo = temp;
is_temp = true;
}
}
@ -321,7 +321,8 @@ static bool row_undo_rec_get(undo_node_t* node)
ut_ad(limit <= undo->top_undo_no);
node->roll_ptr = trx_undo_build_roll_ptr(
false, undo->rseg->id, undo->top_page_no, undo->top_offset);
false, trx_sys.rseg_id(undo->rseg, !is_temp),
undo->top_page_no, undo->top_offset);
mtr_t mtr;
mtr.start();

View file

@ -1387,27 +1387,12 @@ srv_mon_set_module_control(
/****************************************************************//**
Get transaction system's rollback segment size in pages
@return size in pages */
static
ulint
srv_mon_get_rseg_size(void)
/*=======================*/
TPOOL_SUPPRESS_TSAN static ulint srv_mon_get_rseg_size()
{
ulint i;
ulint value = 0;
/* rseg_array is a static array, so we can go through it without
mutex protection. In addition, we provide an estimate of the
total rollback segment size and to avoid mutex contention we
don't acquire the rseg->mutex" */
for (i = 0; i < TRX_SYS_N_RSEGS; ++i) {
const trx_rseg_t* rseg = trx_sys.rseg_array[i];
if (rseg != NULL) {
value += rseg->curr_size;
}
}
return(value);
ulint size= 0;
for (const auto &rseg : trx_sys.rseg_array)
size+= rseg.curr_size;
return size;
}
/****************************************************************//**
@ -1719,7 +1704,7 @@ srv_mon_process_existing_counter(
break;
case MONITOR_RSEG_HISTORY_LEN:
value = trx_sys.rseg_history_len;
value = trx_sys.history_size();
break;
case MONITOR_RSEG_CUR_SIZE:

View file

@ -1074,7 +1074,7 @@ srv_export_innodb_status(void)
- UT_LIST_GET_LEN(buf_pool.free);
export_vars.innodb_max_trx_id = trx_sys.get_max_trx_id();
export_vars.innodb_history_list_length = trx_sys.rseg_history_len;
export_vars.innodb_history_list_length = trx_sys.history_size();
export_vars.innodb_log_waits = srv_stats.log_waits;
@ -1353,7 +1353,7 @@ srv_wake_purge_thread_if_not_active()
ut_ad(!srv_read_only_mode);
if (purge_sys.enabled() && !purge_sys.paused()
&& trx_sys.rseg_history_len) {
&& trx_sys.history_exists()) {
if(++purge_state.m_running == 1) {
srv_thread_pool->submit_task(&purge_coordinator_task);
}
@ -1723,7 +1723,8 @@ static bool srv_purge_should_exit()
return true;
/* Slow shutdown was requested. */
if (const uint32_t history_size= trx_sys.rseg_history_len)
const uint32_t history_size= trx_sys.history_size();
if (history_size)
{
static time_t progress_time;
time_t now= time(NULL);
@ -1817,10 +1818,9 @@ static uint32_t srv_do_purge(ulint* n_total_purged)
std::lock_guard<std::mutex> lk(purge_thread_count_mtx);
n_threads = n_use_threads = srv_n_purge_threads;
srv_purge_thread_count_changed = 0;
} else if (trx_sys.rseg_history_len > rseg_history_len
|| (srv_max_purge_lag > 0
&& rseg_history_len > srv_max_purge_lag)) {
} else if (trx_sys.history_size_approx() > rseg_history_len
|| (srv_max_purge_lag > 0
&& rseg_history_len > srv_max_purge_lag)) {
/* History length is now longer than what it was
when we took the last snapshot. Use more threads. */
@ -1844,7 +1844,7 @@ static uint32_t srv_do_purge(ulint* n_total_purged)
ut_a(n_use_threads <= n_threads);
/* Take a snapshot of the history list before purge. */
if (!(rseg_history_len = trx_sys.rseg_history_len)) {
if (!(rseg_history_len = trx_sys.history_size())) {
break;
}
@ -1894,24 +1894,21 @@ void release_thd(THD *thd, void *ctx)
}
/*
/**
Called by timer when purge coordinator decides
to delay processing of purge records.
*/
static void purge_coordinator_timer_callback(void *)
{
if (!purge_sys.enabled() || purge_sys.paused() ||
purge_state.m_running || !trx_sys.rseg_history_len)
if (!purge_sys.enabled() || purge_sys.paused() || purge_state.m_running)
return;
if (purge_state.m_history_length < 5000 &&
purge_state.m_history_length == trx_sys.rseg_history_len)
/* No new records were added since wait started.
Simply wait for new records. The magic number 5000 is an
approximation for the case where we have cached UNDO
log records which prevent truncate of the UNDO segments.*/
return;
srv_wake_purge_thread_if_not_active();
/* The magic number 5000 is an approximation for the case where we have
cached undo log records which prevent truncate of the rollback segments. */
if (const auto history_size= trx_sys.history_size())
if (purge_state.m_history_length >= 5000 ||
purge_state.m_history_length != history_size)
srv_wake_purge_thread_if_not_active();
}
static void purge_worker_callback(void*)
@ -1949,7 +1946,7 @@ static void purge_coordinator_callback_low()
someone added work and woke us up. */
if (n_total_purged == 0)
{
if (trx_sys.rseg_history_len == 0)
if (trx_sys.history_size() == 0)
return;
if (!woken_during_purge)
{

View file

@ -110,7 +110,7 @@ inline bool TrxUndoRsegsIterator::set_next()
purge_sys.rseg = *m_iter++;
mysql_mutex_unlock(&purge_sys.pq_mutex);
mysql_mutex_lock(&purge_sys.rseg->mutex);
purge_sys.rseg->latch.rd_lock();
ut_a(purge_sys.rseg->last_page_no != FIL_NULL);
ut_ad(purge_sys.rseg->last_trx_no() == m_rsegs.trx_no);
@ -126,8 +126,7 @@ inline bool TrxUndoRsegsIterator::set_next()
purge_sys.hdr_offset = purge_sys.rseg->last_offset();
purge_sys.hdr_page_no = purge_sys.rseg->last_page_no;
mysql_mutex_unlock(&purge_sys.rseg->mutex);
purge_sys.rseg->latch.rd_unlock();
return(true);
}
@ -312,10 +311,10 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
rseg->last_page_no = undo->hdr_page_no;
rseg->set_last_commit(undo->hdr_offset,
trx->rw_trx_hash_element->no);
rseg->needs_purge = true;
rseg->set_needs_purge();
}
trx_sys.rseg_history_len++;
rseg->history_size++;
if (undo->state == TRX_UNDO_CACHED) {
UT_LIST_ADD_FIRST(rseg->undo_cached, undo);
@ -338,24 +337,25 @@ static void trx_purge_remove_log_hdr(buf_block_t *rseg, buf_block_t* log,
{
flst_remove(rseg, TRX_RSEG + TRX_RSEG_HISTORY,
log, static_cast<uint16_t>(offset + TRX_UNDO_HISTORY_NODE), mtr);
trx_sys.rseg_history_len--;
}
/** Free an undo log segment, and remove the header from the history list.
@param[in,out] rseg rollback segment
@param[in] hdr_addr file address of log_hdr */
static
void
trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
static void trx_purge_free_segment(trx_rseg_t *rseg, fil_addr_t hdr_addr)
{
mtr_t mtr;
mtr.start();
mysql_mutex_lock(&rseg->mutex);
const page_id_t hdr_page_id(rseg->space->id, hdr_addr.page);
/* We only need the latch to maintain rseg->curr_size. To follow the
latching order, we must acquire it before acquiring any related
page latch. */
rseg->latch.wr_lock();
buf_block_t* rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);
buf_block_t* block = trx_undo_page_get(
page_id_t(rseg->space->id, hdr_addr.page), &mtr);
buf_block_t* block = trx_undo_page_get(hdr_page_id, &mtr);
/* Mark the last undo log totally purged, so that if the
system crashes, the tail of the undo log will not get accessed
@ -368,17 +368,14 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
while (!fseg_free_step_not_header(
TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER
+ block->frame, &mtr)) {
mysql_mutex_unlock(&rseg->mutex);
rseg->latch.wr_unlock();
mtr.commit();
mtr.start();
mysql_mutex_lock(&rseg->mutex);
rseg->latch.wr_lock();
rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);
block = trx_undo_page_get(
page_id_t(rseg->space->id, hdr_addr.page), &mtr);
block = trx_undo_page_get(hdr_page_id, &mtr);
}
/* The page list may now be inconsistent, but the length field
@ -412,11 +409,12 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
ut_ad(rseg->curr_size >= seg_size);
rseg->history_size--;
rseg->curr_size -= seg_size;
mysql_mutex_unlock(&rseg->mutex);
rseg->latch.wr_unlock();
mtr_commit(&mtr);
mtr.commit();
}
/** Remove unnecessary history data from a rollback segment.
@ -435,7 +433,7 @@ trx_purge_truncate_rseg_history(
mtr.start();
ut_ad(rseg.is_persistent());
mysql_mutex_lock(&rseg.mutex);
rseg.latch.wr_lock();
buf_block_t* rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr);
@ -447,7 +445,7 @@ trx_purge_truncate_rseg_history(
loop:
if (hdr_addr.page == FIL_NULL) {
func_exit:
mysql_mutex_unlock(&rseg.mutex);
rseg.latch.wr_unlock();
mtr.commit();
return;
}
@ -480,7 +478,7 @@ func_exit:
/* We can free the whole log segment */
mysql_mutex_unlock(&rseg.mutex);
rseg.latch.wr_unlock();
mtr.commit();
/* calls the trx_purge_remove_log_hdr()
@ -490,13 +488,13 @@ func_exit:
/* Remove the log hdr from the rseg history. */
trx_purge_remove_log_hdr(rseg_hdr, block, hdr_addr.boffset,
&mtr);
mysql_mutex_unlock(&rseg.mutex);
rseg.history_size--;
rseg.latch.wr_unlock();
mtr.commit();
}
mtr.start();
mysql_mutex_lock(&rseg.mutex);
rseg.latch.wr_lock();
rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr);
@ -559,10 +557,9 @@ static void trx_purge_truncate_history()
head.undo_no = 0;
}
for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
if (trx_rseg_t* rseg = trx_sys.rseg_array[i]) {
ut_ad(rseg->id == i);
trx_purge_truncate_rseg_history(*rseg, head);
for (auto& rseg : trx_sys.rseg_array) {
if (rseg.space) {
trx_purge_truncate_rseg_history(rseg, head);
}
}
@ -608,40 +605,40 @@ static void trx_purge_truncate_history()
DBUG_LOG("undo", "marking for truncate: " << file->name);
for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
if (trx_rseg_t* rseg = trx_sys.rseg_array[i]) {
ut_ad(rseg->is_persistent());
if (rseg->space == &space) {
/* Once set, this rseg will
not be allocated to subsequent
transactions, but we will wait
for existing active
transactions to finish. */
rseg->skip_allocation = true;
}
for (auto& rseg : trx_sys.rseg_array) {
if (rseg.space == &space) {
/* Once set, this rseg will
not be allocated to subsequent
transactions, but we will wait
for existing active
transactions to finish. */
rseg.set_skip_allocation();
}
}
for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
trx_rseg_t* rseg = trx_sys.rseg_array[i];
if (!rseg || rseg->space != &space) {
for (auto& rseg : trx_sys.rseg_array) {
if (rseg.space != &space) {
continue;
}
mysql_mutex_lock(&rseg->mutex);
ut_ad(rseg->skip_allocation);
if (rseg->trx_ref_count) {
ut_ad(rseg.skip_allocation());
if (rseg.is_referenced()) {
return;
}
rseg.latch.rd_lock();
ut_ad(rseg.skip_allocation());
if (rseg.is_referenced()) {
not_free:
mysql_mutex_unlock(&rseg->mutex);
rseg.latch.rd_unlock();
return;
}
if (rseg->curr_size != 1) {
if (rseg.curr_size != 1) {
/* Check if all segments are
cached and safe to remove. */
ulint cached = 0;
for (trx_undo_t* undo = UT_LIST_GET_FIRST(
rseg->undo_cached);
rseg.undo_cached);
undo;
undo = UT_LIST_GET_NEXT(undo_list,
undo)) {
@ -652,14 +649,14 @@ not_free:
}
}
ut_ad(rseg->curr_size > cached);
ut_ad(rseg.curr_size > cached);
if (rseg->curr_size > cached + 1) {
if (rseg.curr_size > cached + 1) {
goto not_free;
}
}
mysql_mutex_unlock(&rseg->mutex);
rseg.latch.rd_unlock();
}
ib::info() << "Truncating " << file->name;
@ -725,58 +722,22 @@ not_free:
buf_block_t* sys_header = trx_sysf_get(&mtr);
for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
trx_rseg_t* rseg = trx_sys.rseg_array[i];
if (!rseg || rseg->space != &space) {
for (auto& rseg : trx_sys.rseg_array) {
if (rseg.space != &space) {
continue;
}
ut_ad(rseg->is_persistent());
ut_d(const ulint old_page = rseg->page_no);
buf_block_t* rblock = trx_rseg_header_create(
purge_sys.truncate.current,
rseg->id, sys_header, &mtr);
i, sys_header, &mtr);
ut_ad(rblock);
rseg->page_no = rblock
? rblock->page.id().page_no() : FIL_NULL;
ut_ad(old_page == rseg->page_no);
/* Before re-initialization ensure that we
free the existing structure. There can't be
any active transactions. */
ut_a(UT_LIST_GET_LEN(rseg->undo_list) == 0);
trx_undo_t* next_undo;
for (trx_undo_t* undo = UT_LIST_GET_FIRST(
rseg->undo_cached);
undo; undo = next_undo) {
next_undo = UT_LIST_GET_NEXT(undo_list, undo);
UT_LIST_REMOVE(rseg->undo_cached, undo);
MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_CACHED);
ut_free(undo);
}
UT_LIST_INIT(rseg->undo_list,
&trx_undo_t::undo_list);
UT_LIST_INIT(rseg->undo_cached,
&trx_undo_t::undo_list);
/* These were written by trx_rseg_header_create(). */
ut_ad(!mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT
+ rblock->frame));
ut_ad(!mach_read_from_4(TRX_RSEG + TRX_RSEG_HISTORY_SIZE
+ rblock->frame));
/* Initialize the undo log lists according to
the rseg header */
rseg->curr_size = 1;
rseg->trx_ref_count = 0;
rseg->last_page_no = FIL_NULL;
rseg->last_commit_and_offset = 0;
rseg->needs_purge = false;
rseg.reinit(rblock
? rblock->page.id().page_no() : FIL_NULL);
}
mtr.commit();
@ -820,12 +781,9 @@ not_free:
log_write_up_to(LSN_MAX, true);
DBUG_SUICIDE(););
for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
if (trx_rseg_t* rseg = trx_sys.rseg_array[i]) {
ut_ad(rseg->is_persistent());
if (rseg->space == &space) {
rseg->skip_allocation = false;
}
for (auto& rseg : trx_sys.rseg_array) {
if (rseg.space == &space) {
rseg.clear_skip_allocation();
}
}
@ -846,7 +804,9 @@ static void trx_purge_rseg_get_next_history_log(
trx_id_t trx_no;
mtr_t mtr;
mysql_mutex_lock(&purge_sys.rseg->mutex);
mtr.start();
purge_sys.rseg->latch.wr_lock();
ut_a(purge_sys.rseg->last_page_no != FIL_NULL);
@ -854,8 +814,6 @@ static void trx_purge_rseg_get_next_history_log(
purge_sys.tail.undo_no = 0;
purge_sys.next_stored = false;
mtr.start();
const buf_block_t* undo_page = trx_undo_page_get_s_latched(
page_id_t(purge_sys.rseg->space->id,
purge_sys.rseg->last_page_no), &mtr);
@ -879,7 +837,7 @@ static void trx_purge_rseg_get_next_history_log(
purge_sys.rseg->last_page_no = FIL_NULL;
}
mysql_mutex_unlock(&purge_sys.rseg->mutex);
purge_sys.rseg->latch.wr_unlock();
mtr.commit();
if (empty) {
@ -899,11 +857,15 @@ static void trx_purge_rseg_get_next_history_log(
mtr_commit(&mtr);
mysql_mutex_lock(&purge_sys.rseg->mutex);
purge_sys.rseg->latch.wr_lock();
purge_sys.rseg->last_page_no = prev_log_addr.page;
purge_sys.rseg->set_last_commit(prev_log_addr.boffset, trx_no);
purge_sys.rseg->needs_purge = log_hdr[TRX_UNDO_NEEDS_PURGE + 1] != 0;
if (log_hdr[TRX_UNDO_NEEDS_PURGE + 1]) {
purge_sys.rseg->set_needs_purge();
} else {
purge_sys.rseg->clear_needs_purge();
}
/* Purge can also produce events, however these are already ordered
in the rollback segment and any user generated event will be greater
@ -916,7 +878,7 @@ static void trx_purge_rseg_get_next_history_log(
mysql_mutex_unlock(&purge_sys.pq_mutex);
mysql_mutex_unlock(&purge_sys.rseg->mutex);
purge_sys.rseg->latch.wr_unlock();
}
/** Position the purge sys "iterator" on the undo record to use for purging. */
@ -929,7 +891,7 @@ static void trx_purge_read_undo_rec()
purge_sys.hdr_offset = purge_sys.rseg->last_offset();
page_no = purge_sys.hdr_page_no = purge_sys.rseg->last_page_no;
if (purge_sys.rseg->needs_purge) {
if (purge_sys.rseg->needs_purge()) {
mtr_t mtr;
mtr.start();
buf_block_t* undo_page;
@ -1095,7 +1057,7 @@ trx_purge_fetch_next_rec(
/* row_purge_record_func() will later set
ROLL_PTR_INSERT_FLAG for TRX_UNDO_INSERT_REC */
false,
purge_sys.rseg->id,
trx_sys.rseg_id(purge_sys.rseg, true),
purge_sys.page_no, purge_sys.offset);
/* The following call will advance the stored values of the
@ -1229,7 +1191,7 @@ trx_purge_dml_delay(void)
/* If purge lag is set then calculate the new DML delay. */
if (srv_max_purge_lag > 0) {
double ratio = static_cast<double>(trx_sys.rseg_history_len) /
double ratio = static_cast<double>(trx_sys.history_size()) /
static_cast<double>(srv_max_purge_lag);
if (ratio > 1.0) {

View file

@ -2112,9 +2112,9 @@ err_exit:
mtr.set_log_mode(MTR_LOG_NO_REDO);
}
mysql_mutex_lock(&rseg->mutex);
rseg->latch.wr_lock();
trx_undo_free_last_page(undo, &mtr);
mysql_mutex_unlock(&rseg->mutex);
rseg->latch.wr_unlock();
if (m.second) {
/* We are not going to modify
@ -2166,8 +2166,8 @@ err_exit:
if (!bulk) {
*roll_ptr = trx_undo_build_roll_ptr(
!rec, rseg->id, undo->top_page_no,
offset);
!rec, trx_sys.rseg_id(rseg, !is_temp),
undo->top_page_no, offset);
}
return(DB_SUCCESS);
@ -2220,7 +2220,6 @@ trx_undo_get_undo_rec_low(
ulint rseg_id;
uint32_t page_no;
uint16_t offset;
trx_rseg_t* rseg;
bool is_insert;
mtr_t mtr;
@ -2228,7 +2227,7 @@ trx_undo_get_undo_rec_low(
&offset);
ut_ad(page_no > FSP_FIRST_INODE_PAGE_NO);
ut_ad(offset >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
rseg = trx_sys.rseg_array[rseg_id];
trx_rseg_t* rseg = &trx_sys.rseg_array[rseg_id];
ut_ad(rseg->is_persistent());
mtr.start();

View file

@ -243,12 +243,10 @@ dberr_t trx_rollback_for_mysql(trx_t* trx)
== trx->rsegs.m_redo.rseg);
mtr_t mtr;
mtr.start();
mysql_mutex_lock(&trx->rsegs.m_redo.rseg->mutex);
if (trx_undo_t* undo = trx->rsegs.m_redo.undo) {
trx_undo_set_state_at_prepare(trx, undo, true,
&mtr);
}
mysql_mutex_unlock(&trx->rsegs.m_redo.rseg->mutex);
/* Write the redo log for the XA ROLLBACK
state change to the global buffer. It is
not necessary to flush the redo log. If

View file

@ -134,7 +134,7 @@ trx_rseg_update_wsrep_checkpoint(const XID* xid, mtr_t* mtr)
result. */
const bool must_clear_rsegs = memcmp(wsrep_uuid, xid_uuid,
sizeof wsrep_uuid);
const trx_rseg_t* rseg = trx_sys.rseg_array[0];
const trx_rseg_t* rseg = &trx_sys.rseg_array[0];
buf_block_t* rseg_header = trx_rsegf_get(rseg->space, rseg->page_no,
mtr);
@ -151,11 +151,11 @@ trx_rseg_update_wsrep_checkpoint(const XID* xid, mtr_t* mtr)
changed, and we must reset the XID in all rollback
segment headers. */
for (ulint rseg_id = 1; rseg_id < TRX_SYS_N_RSEGS; ++rseg_id) {
if (const trx_rseg_t* rseg =
trx_sys.rseg_array[rseg_id]) {
const trx_rseg_t &rseg = trx_sys.rseg_array[rseg_id];
if (rseg.space) {
trx_rseg_clear_wsrep_checkpoint(
trx_rsegf_get(rseg->space,
rseg->page_no, mtr),
trx_rsegf_get(rseg.space, rseg.page_no,
mtr),
mtr);
}
}
@ -354,59 +354,59 @@ trx_rseg_header_create(
return block;
}
/** Free a rollback segment in memory. */
void
trx_rseg_mem_free(trx_rseg_t* rseg)
void trx_rseg_t::destroy()
{
trx_undo_t* undo;
trx_undo_t* next_undo;
latch.destroy();
mysql_mutex_destroy(&rseg->mutex);
/* There can't be any active transactions. */
ut_a(!UT_LIST_GET_LEN(undo_list));
/* There can't be any active transactions. */
ut_a(UT_LIST_GET_LEN(rseg->undo_list) == 0);
for (undo = UT_LIST_GET_FIRST(rseg->undo_cached);
undo != NULL;
undo = next_undo) {
next_undo = UT_LIST_GET_NEXT(undo_list, undo);
UT_LIST_REMOVE(rseg->undo_cached, undo);
MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_CACHED);
ut_free(undo);
}
ut_free(rseg);
for (trx_undo_t *next, *undo= UT_LIST_GET_FIRST(undo_cached); undo;
undo= next)
{
next= UT_LIST_GET_NEXT(undo_list, undo);
UT_LIST_REMOVE(undo_cached, undo);
ut_free(undo);
}
}
/** Create a rollback segment object.
@param[in] id rollback segment id
@param[in] space space where the segment is placed
@param[in] page_no page number of the segment header */
static
trx_rseg_t*
trx_rseg_mem_create(ulint id, fil_space_t* space, uint32_t page_no)
void trx_rseg_t::init(fil_space_t *space, uint32_t page)
{
trx_rseg_t* rseg = static_cast<trx_rseg_t*>(
ut_zalloc_nokey(sizeof *rseg));
latch.init();
ut_ad(!this->space);
this->space= space;
page_no= page;
last_page_no= FIL_NULL;
curr_size= 1;
rseg->id = id;
rseg->space = space;
rseg->page_no = page_no;
rseg->last_page_no = FIL_NULL;
rseg->curr_size = 1;
UT_LIST_INIT(undo_list, &trx_undo_t::undo_list);
UT_LIST_INIT(undo_cached, &trx_undo_t::undo_list);
}
mysql_mutex_init(rseg->is_persistent()
? redo_rseg_mutex_key
: noredo_rseg_mutex_key,
&rseg->mutex, nullptr);
UT_LIST_INIT(rseg->undo_list, &trx_undo_t::undo_list);
UT_LIST_INIT(rseg->undo_cached, &trx_undo_t::undo_list);
void trx_rseg_t::reinit(uint32_t page)
{
ut_ad(is_persistent());
ut_ad(page_no == page);
ut_a(!UT_LIST_GET_LEN(undo_list));
ut_ad(!history_size || UT_LIST_GET_FIRST(undo_cached));
return(rseg);
history_size= 0;
page_no= page;
for (trx_undo_t *next, *undo= UT_LIST_GET_FIRST(undo_cached); undo;
undo= next)
{
next= UT_LIST_GET_NEXT(undo_list, undo);
UT_LIST_REMOVE(undo_cached, undo);
MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_CACHED);
ut_free(undo);
}
ut_ad(!is_referenced());
clear_needs_purge();
last_commit_and_offset= 0;
last_page_no= FIL_NULL;
curr_size= 1;
}
/** Read the undo log lists.
@ -501,7 +501,7 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, trx_id_t &max_trx_id,
if (auto len = flst_get_len(TRX_RSEG + TRX_RSEG_HISTORY
+ rseg_hdr->frame)) {
trx_sys.rseg_history_len += len;
rseg->history_size += len;
fil_addr_t node_addr = flst_get_last(TRX_RSEG
+ TRX_RSEG_HISTORY
@ -530,7 +530,9 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, trx_id_t &max_trx_id,
+ node_addr.boffset
+ TRX_UNDO_NEEDS_PURGE);
ut_ad(purge <= 1);
rseg->needs_purge = purge != 0;
if (purge != 0) {
rseg->set_needs_purge();
}
if (rseg->last_page_no != FIL_NULL) {
@ -599,17 +601,14 @@ dberr_t trx_rseg_array_init()
const uint32_t page_no = trx_sysf_rseg_get_page_no(
sys, rseg_id);
if (page_no != FIL_NULL) {
trx_rseg_t* rseg = trx_rseg_mem_create(
rseg_id,
fil_space_get(trx_sysf_rseg_get_space(
sys, rseg_id)),
page_no);
ut_ad(rseg->is_persistent());
ut_ad(rseg->id == rseg_id);
ut_ad(!trx_sys.rseg_array[rseg_id]);
trx_sys.rseg_array[rseg_id] = rseg;
trx_rseg_t& rseg = trx_sys.rseg_array[rseg_id];
rseg.init(fil_space_get(
trx_sysf_rseg_get_space(
sys, rseg_id)),
page_no);
ut_ad(rseg.is_persistent());
if ((err = trx_rseg_mem_restore(
rseg, max_trx_id, &mtr))
&rseg, max_trx_id, &mtr))
!= DB_SUCCESS) {
mtr.commit();
break;
@ -634,15 +633,10 @@ dberr_t trx_rseg_array_init()
}
if (err != DB_SUCCESS) {
for (ulint rseg_id = 0; rseg_id < TRX_SYS_N_RSEGS; rseg_id++) {
if (trx_rseg_t*& rseg = trx_sys.rseg_array[rseg_id]) {
while (trx_undo_t* u= UT_LIST_GET_FIRST(
rseg->undo_list)) {
UT_LIST_REMOVE(rseg->undo_list, u);
ut_free(u);
}
trx_rseg_mem_free(rseg);
rseg = NULL;
for (auto& rseg : trx_sys.rseg_array) {
while (auto u = UT_LIST_GET_FIRST(rseg.undo_list)) {
UT_LIST_REMOVE(rseg.undo_list, u);
ut_free(u);
}
}
return err;
@ -673,62 +667,20 @@ dberr_t trx_rseg_array_init()
return DB_SUCCESS;
}
/** Create a persistent rollback segment.
@param[in] space_id system or undo tablespace id
@return pointer to new rollback segment
@retval NULL on failure */
trx_rseg_t*
trx_rseg_create(ulint space_id)
{
trx_rseg_t* rseg = NULL;
mtr_t mtr;
mtr.start();
fil_space_t* space = mtr.x_lock_space(space_id);
ut_ad(space->purpose == FIL_TYPE_TABLESPACE);
if (buf_block_t* sys_header = trx_sysf_get(&mtr)) {
ulint rseg_id = trx_sys_rseg_find_free(sys_header);
if (buf_block_t* rblock = rseg_id == ULINT_UNDEFINED
? NULL
: trx_rseg_header_create(space, rseg_id, sys_header,
&mtr)) {
ut_ad(trx_sysf_rseg_get_space(sys_header, rseg_id)
== space_id);
rseg = trx_rseg_mem_create(rseg_id, space,
rblock->page.id().
page_no());
ut_ad(rseg->id == rseg_id);
ut_ad(rseg->is_persistent());
ut_ad(!trx_sys.rseg_array[rseg->id]);
trx_sys.rseg_array[rseg->id] = rseg;
}
}
mtr.commit();
return(rseg);
}
/** Create the temporary rollback segments. */
void
trx_temp_rseg_create()
void trx_temp_rseg_create()
{
mtr_t mtr;
for (ulong i = 0; i < TRX_SYS_N_RSEGS; i++) {
for (ulong i = 0; i < array_elements(trx_sys.temp_rsegs); i++) {
mtr.start();
mtr.set_log_mode(MTR_LOG_NO_REDO);
mtr.x_lock_space(fil_system.temp_space);
buf_block_t* rblock = trx_rseg_header_create(
fil_system.temp_space, i, NULL, &mtr);
trx_rseg_t* rseg = trx_rseg_mem_create(
i, fil_system.temp_space, rblock->page.id().page_no());
ut_ad(!rseg->is_persistent());
ut_ad(!trx_sys.temp_rsegs[i]);
trx_sys.temp_rsegs[i] = rseg;
trx_sys.temp_rsegs[i].init(fil_system.temp_space,
rblock->page.id().page_no());
mtr.commit();
}
}

View file

@ -197,17 +197,67 @@ trx_sysf_create(
ut_a(rblock->page.id() == page_id_t(0, FSP_FIRST_RSEG_PAGE_NO));
}
/** Create the instance */
void
trx_sys_t::create()
void trx_sys_t::create()
{
ut_ad(this == &trx_sys);
ut_ad(!is_initialised());
m_initialised = true;
trx_list.create();
rseg_history_len= 0;
ut_ad(this == &trx_sys);
ut_ad(!is_initialised());
m_initialised= true;
trx_list.create();
rw_trx_hash.init();
}
rw_trx_hash.init();
uint32_t trx_sys_t::history_size()
{
ut_ad(is_initialised());
uint32_t size= 0;
for (auto &rseg : rseg_array)
{
rseg.latch.rd_lock();
size+= rseg.history_size;
}
for (auto &rseg : rseg_array)
rseg.latch.rd_unlock();
return size;
}
bool trx_sys_t::history_exceeds(uint32_t threshold)
{
ut_ad(is_initialised());
uint32_t size= 0;
bool exceeds= false;
size_t i;
for (i= 0; i < array_elements(rseg_array); i++)
{
rseg_array[i].latch.rd_lock();
size+= rseg_array[i].history_size;
if (size > threshold)
{
exceeds= true;
i++;
break;
}
}
while (i)
rseg_array[--i].latch.rd_unlock();
return exceeds;
}
TPOOL_SUPPRESS_TSAN bool trx_sys_t::history_exists()
{
ut_ad(is_initialised());
for (auto &rseg : rseg_array)
if (rseg.history_size)
return true;
return false;
}
TPOOL_SUPPRESS_TSAN uint32_t trx_sys_t::history_size_approx() const
{
ut_ad(is_initialised());
uint32_t size= 0;
for (auto &rseg : rseg_array)
size+= rseg.history_size;
return size;
}
/*****************************************************************//**
@ -225,10 +275,42 @@ trx_sys_create_sys_pages(void)
mtr_commit(&mtr);
}
/** Create a persistent rollback segment.
@param space_id system or undo tablespace id
@return pointer to new rollback segment
@retval nullptr on failure */
static trx_rseg_t *trx_rseg_create(ulint space_id)
{
trx_rseg_t *rseg= nullptr;
mtr_t mtr;
mtr.start();
if (fil_space_t *space= mtr.x_lock_space(space_id))
{
ut_ad(space->purpose == FIL_TYPE_TABLESPACE);
if (buf_block_t *sys_header= trx_sysf_get(&mtr))
{
ulint rseg_id= trx_sys_rseg_find_free(sys_header);
if (buf_block_t *rblock= rseg_id == ULINT_UNDEFINED
? nullptr : trx_rseg_header_create(space, rseg_id, sys_header,
&mtr))
{
ut_ad(trx_sysf_rseg_get_space(sys_header, rseg_id) == space_id);
rseg= &trx_sys.rseg_array[rseg_id];
rseg->init(space, rblock->page.id().page_no());
ut_ad(rseg->is_persistent());
}
}
}
mtr.commit();
return rseg;
}
/** Create the rollback segments.
@return whether the creation succeeded */
bool
trx_sys_create_rsegs()
bool trx_sys_create_rsegs()
{
/* srv_available_undo_logs reflects the number of persistent
rollback segments that have been initialized in the
@ -308,14 +390,11 @@ trx_sys_t::close()
/* There can't be any active transactions. */
for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
if (trx_rseg_t* rseg = rseg_array[i]) {
trx_rseg_mem_free(rseg);
}
if (trx_rseg_t* rseg = temp_rsegs[i]) {
trx_rseg_mem_free(rseg);
}
for (ulint i = 0; i < array_elements(temp_rsegs); ++i) {
temp_rsegs[i].destroy();
}
for (ulint i = 0; i < array_elements(rseg_array); ++i) {
rseg_array[i].destroy();
}
ut_a(trx_list.empty());

View file

@ -674,7 +674,7 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg,
or will not qualify for purge limit criteria. So it is safe to increment
this trx_ref_count w/o mutex protection.
*/
++trx->rsegs.m_redo.rseg->trx_ref_count;
trx->rsegs.m_redo.rseg->acquire();
*trx->xid= undo->xid;
trx->id= undo->trx_id;
trx->is_recovered= true;
@ -719,31 +719,30 @@ dberr_t trx_lists_init_at_db_start()
const ulonglong start_time_micro= microsecond_interval_timer();
uint64_t rows_to_undo = 0;
for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
for (auto& rseg : trx_sys.rseg_array) {
trx_undo_t* undo;
trx_rseg_t* rseg = trx_sys.rseg_array[i];
/* Some rollback segment may be unavailable,
especially if the server was previously run with a
non-default value of innodb_undo_logs. */
if (rseg == NULL) {
if (!rseg.space) {
continue;
}
/* Ressurrect other transactions. */
for (undo = UT_LIST_GET_FIRST(rseg->undo_list);
for (undo = UT_LIST_GET_FIRST(rseg.undo_list);
undo != NULL;
undo = UT_LIST_GET_NEXT(undo_list, undo)) {
trx_t *trx = trx_sys.find(0, undo->trx_id, false);
if (!trx) {
trx_resurrect(undo, rseg, start_time,
trx_resurrect(undo, &rseg, start_time,
start_time_micro, &rows_to_undo);
} else {
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) ||
trx_state_eq(trx, TRX_STATE_PREPARED));
ut_ad(trx->start_time == start_time);
ut_ad(trx->is_recovered);
ut_ad(trx->rsegs.m_redo.rseg == rseg);
ut_ad(trx->rsegs.m_redo.rseg->trx_ref_count);
ut_ad(trx->rsegs.m_redo.rseg == &rseg);
ut_ad(rseg.is_referenced());
trx->rsegs.m_redo.undo = undo;
if (undo->top_undo_no >= trx->undo_no) {
@ -787,7 +786,7 @@ static trx_rseg_t* trx_assign_rseg_low()
ut_ad(srv_available_undo_logs == TRX_SYS_N_RSEGS);
/* The first slot is always assigned to the system tablespace. */
ut_ad(trx_sys.rseg_array[0]->space == fil_system.sys_space);
ut_ad(trx_sys.rseg_array[0].space == fil_system.sys_space);
/* Choose a rollback segment evenly distributed between 0 and
innodb_undo_logs-1 in a round-robin fashion, skipping those
@ -806,7 +805,7 @@ static trx_rseg_t* trx_assign_rseg_low()
do {
for (;;) {
rseg = trx_sys.rseg_array[slot];
rseg = &trx_sys.rseg_array[slot];
#ifdef UNIV_DEBUG
/* Ensure that we are not revisiting the same
@ -820,20 +819,20 @@ static trx_rseg_t* trx_assign_rseg_low()
ut_d(if (!trx_rseg_n_slots_debug))
slot = (slot + 1) % TRX_SYS_N_RSEGS;
if (rseg == NULL) {
if (!rseg->space) {
continue;
}
ut_ad(rseg->is_persistent());
if (rseg->space != fil_system.sys_space) {
if (rseg->skip_allocation
if (rseg->skip_allocation()
|| !srv_undo_tablespaces) {
continue;
}
} else if (trx_rseg_t* next
= trx_sys.rseg_array[slot]) {
if (next->space != fil_system.sys_space
} else if (const fil_space_t *space =
trx_sys.rseg_array[slot].space) {
if (space != fil_system.sys_space
&& srv_undo_tablespaces > 0) {
/** If dedicated
innodb_undo_tablespaces have
@ -849,15 +848,10 @@ static trx_rseg_t* trx_assign_rseg_low()
/* By now we have only selected the rseg but not marked it
allocated. By marking it allocated we are ensuring that it will
never be selected for UNDO truncate purge. */
mysql_mutex_lock(&rseg->mutex);
if (!rseg->skip_allocation) {
rseg->trx_ref_count++;
allocated = true;
}
mysql_mutex_unlock(&rseg->mutex);
allocated = rseg->acquire_if_available();
} while (!allocated);
ut_ad(rseg->trx_ref_count > 0);
ut_ad(rseg->is_referenced());
ut_ad(rseg->is_persistent());
return(rseg);
}
@ -873,7 +867,7 @@ trx_rseg_t *trx_t::assign_temp_rseg()
/* Choose a temporary rollback segment between 0 and 127
in a round-robin fashion. */
static Atomic_counter<unsigned> rseg_slot;
trx_rseg_t* rseg = trx_sys.temp_rsegs[
trx_rseg_t* rseg = &trx_sys.temp_rsegs[
rseg_slot++ & (TRX_SYS_N_RSEGS - 1)];
ut_ad(!rseg->is_persistent());
rsegs.m_noredo.rseg = rseg;
@ -882,7 +876,6 @@ trx_rseg_t *trx_t::assign_temp_rseg()
trx_sys.register_rw(this);
}
ut_ad(!rseg->is_persistent());
return(rseg);
}
@ -984,7 +977,6 @@ trx_serialise(trx_t* trx)
{
trx_rseg_t *rseg = trx->rsegs.m_redo.rseg;
ut_ad(rseg);
mysql_mutex_assert_owner(&rseg->mutex);
if (rseg->last_page_no == FIL_NULL) {
mysql_mutex_lock(&purge_sys.pq_mutex);
@ -1031,10 +1023,7 @@ trx_write_serialisation_history(
mtr_t temp_mtr;
temp_mtr.start();
temp_mtr.set_log_mode(MTR_LOG_NO_REDO);
mysql_mutex_lock(&trx->rsegs.m_noredo.rseg->mutex);
trx_undo_set_state_at_finish(undo, &temp_mtr);
mysql_mutex_unlock(&trx->rsegs.m_noredo.rseg->mutex);
temp_mtr.commit();
}
@ -1052,7 +1041,7 @@ trx_write_serialisation_history(
ut_ad(!trx->read_only);
ut_ad(!undo || undo->rseg == rseg);
mysql_mutex_lock(&rseg->mutex);
rseg->latch.wr_lock();
/* Assign the transaction serialisation number and add any
undo log to the purge queue. */
@ -1062,7 +1051,7 @@ trx_write_serialisation_history(
trx_purge_add_undo_to_history(trx, undo, mtr);
}
mysql_mutex_unlock(&rseg->mutex);
rseg->latch.wr_unlock();
MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);
}
@ -1320,12 +1309,7 @@ inline void trx_t::commit_in_memory(const mtr_t *mtr)
ut_ad(UT_LIST_GET_LEN(lock.evicted_tables) == 0);
if (trx_rseg_t *rseg= rsegs.m_redo.rseg)
{
mysql_mutex_lock(&rseg->mutex);
ut_ad(rseg->trx_ref_count > 0);
--rseg->trx_ref_count;
mysql_mutex_unlock(&rseg->mutex);
}
rseg->release();
if (mtr)
{
@ -1821,11 +1805,7 @@ static lsn_t trx_prepare_low(trx_t *trx)
mtr.start();
mtr.set_log_mode(MTR_LOG_NO_REDO);
mysql_mutex_lock(&undo->rseg->mutex);
trx_undo_set_state_at_prepare(trx, undo, false, &mtr);
mysql_mutex_unlock(&undo->rseg->mutex);
mtr.commit();
}
@ -1836,8 +1816,7 @@ static lsn_t trx_prepare_low(trx_t *trx)
return(0);
}
trx_rseg_t* rseg = trx->rsegs.m_redo.rseg;
ut_ad(undo->rseg == rseg);
ut_ad(undo->rseg == trx->rsegs.m_redo.rseg);
mtr.start();
@ -1845,10 +1824,7 @@ static lsn_t trx_prepare_low(trx_t *trx)
TRX_UNDO_PREPARED: these modifications to the file data
structure define the transaction as prepared in the file-based
world, at the serialization point of lsn. */
mysql_mutex_lock(&rseg->mutex);
trx_undo_set_state_at_prepare(trx, undo, false, &mtr);
mysql_mutex_unlock(&rseg->mutex);
/* Make the XA PREPARE durable. */
mtr.commit();

View file

@ -554,7 +554,7 @@ buf_block_t* trx_undo_add_page(trx_undo_t* undo, mtr_t* mtr)
a pessimistic insert in a B-tree, and we must reserve the
counterpart of the tree latch, which is the rseg mutex. */
mysql_mutex_lock(&rseg->mutex);
rseg->latch.wr_lock();
buf_block_t* header_block = trx_undo_page_get(
page_id_t(undo->rseg->space->id, undo->hdr_page_no), mtr);
@ -586,7 +586,7 @@ buf_block_t* trx_undo_add_page(trx_undo_t* undo, mtr_t* mtr)
rseg->curr_size++;
func_exit:
mysql_mutex_unlock(&rseg->mutex);
rseg->latch.wr_unlock();
return(new_block);
}
@ -610,7 +610,6 @@ trx_undo_free_page(
const ulint space = rseg->space->id;
ut_a(hdr_page_no != page_no);
mysql_mutex_assert_owner(&rseg->mutex);
buf_block_t* undo_block = trx_undo_page_get(page_id_t(space, page_no),
mtr);
@ -675,7 +674,7 @@ void trx_undo_truncate_end(trx_undo_t& undo, undo_no_t limit, bool is_temp)
}
trx_undo_rec_t* trunc_here = NULL;
mysql_mutex_lock(&undo.rseg->mutex);
undo.rseg->latch.wr_lock();
buf_block_t* undo_block = trx_undo_page_get(
page_id_t(undo.rseg->space->id, undo.last_page_no),
&mtr);
@ -695,13 +694,13 @@ void trx_undo_truncate_end(trx_undo_t& undo, undo_no_t limit, bool is_temp)
if (undo.last_page_no != undo.hdr_page_no) {
trx_undo_free_last_page(&undo, &mtr);
mysql_mutex_unlock(&undo.rseg->mutex);
undo.rseg->latch.wr_unlock();
mtr.commit();
continue;
}
func_exit:
mysql_mutex_unlock(&undo.rseg->mutex);
undo.rseg->latch.wr_unlock();
if (trunc_here) {
mtr.write<2>(*undo_block,
@ -734,8 +733,6 @@ trx_undo_truncate_start(
trx_undo_rec_t* last_rec;
mtr_t mtr;
mysql_mutex_assert_owner(&rseg->mutex);
if (!limit) {
return;
}
@ -913,10 +910,8 @@ corrupted:
max_trx_id = trx_id;
}
mysql_mutex_lock(&rseg->mutex);
trx_undo_t* undo = trx_undo_mem_create(
rseg, id, trx_id, &xid, page_no, offset);
mysql_mutex_unlock(&rseg->mutex);
if (!undo) {
return undo;
}
@ -974,8 +969,6 @@ trx_undo_mem_create(
{
trx_undo_t* undo;
mysql_mutex_assert_owner(&rseg->mutex);
ut_a(id < TRX_RSEG_N_SLOTS);
undo = static_cast<trx_undo_t*>(ut_malloc_nokey(sizeof(*undo)));
@ -1019,8 +1012,6 @@ trx_undo_mem_init_for_reuse(
const XID* xid, /*!< in: X/Open XA transaction identification*/
uint16_t offset) /*!< in: undo log header byte offset on page */
{
mysql_mutex_assert_owner(&undo->rseg->mutex);
ut_a(undo->id < TRX_RSEG_N_SLOTS);
undo->state = TRX_UNDO_ACTIVE;
@ -1048,9 +1039,6 @@ trx_undo_create(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo,
dberr_t* err, mtr_t* mtr)
{
ulint id;
mysql_mutex_assert_owner(&rseg->mutex);
buf_block_t* block = trx_undo_seg_create(
rseg->space,
trx_rsegf_get(rseg->space, rseg->page_no, mtr), &id, err, mtr);
@ -1099,8 +1087,6 @@ buf_block_t*
trx_undo_reuse_cached(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** pundo,
mtr_t* mtr)
{
mysql_mutex_assert_owner(&rseg->mutex);
trx_undo_t* undo = UT_LIST_GET_FIRST(rseg->undo_cached);
if (!undo) {
return NULL;
@ -1163,7 +1149,7 @@ trx_undo_assign(trx_t* trx, dberr_t* err, mtr_t* mtr)
trx_rseg_t* rseg = trx->rsegs.m_redo.rseg;
mysql_mutex_lock(&rseg->mutex);
rseg->latch.wr_lock();
buf_block_t* block = trx_undo_reuse_cached(
trx, rseg, &trx->rsegs.m_redo.undo, mtr);
@ -1181,7 +1167,7 @@ trx_undo_assign(trx_t* trx, dberr_t* err, mtr_t* mtr)
UT_LIST_ADD_FIRST(rseg->undo_list, trx->rsegs.m_redo.undo);
func_exit:
mysql_mutex_unlock(&rseg->mutex);
rseg->latch.wr_unlock();
return block;
}
@ -1219,7 +1205,7 @@ trx_undo_assign_low(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo,
*err = DB_TOO_MANY_CONCURRENT_TRXS; return NULL;
);
mysql_mutex_lock(&rseg->mutex);
rseg->latch.wr_lock();
buf_block_t* block = trx_undo_reuse_cached(trx, rseg, undo, mtr);
@ -1236,7 +1222,7 @@ trx_undo_assign_low(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo,
UT_LIST_ADD_FIRST(rseg->undo_list, *undo);
func_exit:
mysql_mutex_unlock(&rseg->mutex);
rseg->latch.wr_unlock();
return block;
}
@ -1312,13 +1298,14 @@ void trx_undo_commit_cleanup(trx_undo_t *undo)
trx_rseg_t* rseg = undo->rseg;
ut_ad(rseg->space == fil_system.temp_space);
mysql_mutex_lock(&rseg->mutex);
rseg->latch.wr_lock();
UT_LIST_REMOVE(rseg->undo_list, undo);
if (undo->state == TRX_UNDO_CACHED) {
UT_LIST_ADD_FIRST(rseg->undo_cached, undo);
MONITOR_INC(MONITOR_NUM_UNDO_SLOT_CACHED);
undo = nullptr;
} else {
ut_ad(undo->state == TRX_UNDO_TO_PURGE);
@ -1327,11 +1314,10 @@ void trx_undo_commit_cleanup(trx_undo_t *undo)
ut_ad(rseg->curr_size > undo->size);
rseg->curr_size -= undo->size;
ut_free(undo);
}
mysql_mutex_unlock(&rseg->mutex);
rseg->latch.wr_unlock();
ut_free(undo);
}
/** At shutdown, frees the undo logs of a transaction. */