MDEV-33515 log_sys.lsn_lock causes excessive context switching

The log_sys.lsn_lock is a very contended resource with a small
critical section in log_sys.append_prepare(). On many processor
microarchitectures, replacing the system call based log_sys.lsn_lock
with a pure spin lock would fare worse during high concurrency workloads,
wasting a significant amount of CPU cycles in the spin loop.

On other microarchitectures, we would see a significant amount of time
being spent in native_queued_spin_lock_slowpath() in the Linux kernel,
plus context switching between user and kernel address space. This was
pointed out by Steve Shaw from Intel Corporation.

Depending on the workload and the hardware implementation, it may be
useful to use a pure spin lock in log_sys.append_prepare().
We will introduce a parameter. The statement

	SET GLOBAL INNODB_LOG_SPIN_WAIT_DELAY=50;

would enable a spin lock that will execute that many MY_RELAX_CPU()
operations (such as the x86 PAUSE instruction) between successive
attempts of acquiring the spin lock. The use of a system call based
log_sys.lsn_lock (which is the default setting) can be enabled by

	SET GLOBAL INNODB_LOG_SPIN_WAIT_DELAY=0;

This patch will also introduce #ifdef LOG_LATCH_DEBUG
(part of cmake -DWITH_INNODB_EXTRA_DEBUG=ON) for more accurate
tracking of log_sys.latch ownership and reorganize the fields of
log_sys to improve the locality of reference and to reduce the
chances of false sharing.

When a spin lock is being used, it will be maintained in the
most significant bit of log_sys.buf_free. This is useful, because that is
one of the fields that is covered by the lock. For IA-32 or AMD64, we
implement the spin lock specially via log_t::lsn_lock_bts(), employing the
i386 LOCK BTS instruction. A straightforward std::atomic::fetch_or() would
translate into an inefficient loop around LOCK CMPXCHG.

mtr_t::spin_wait_delay: The value of innodb_log_spin_wait_delay.

mtr_t::finisher: Pointer to the currently used mtr_t::finish_write()
implementation. This allows to avoid introducing conditional branches.
We no longer invoke log_sys.is_pmem() at the mini-transaction level,
but we would do that in log_write_up_to().

mtr_t::finisher_update(): Update finisher when spin_wait_delay is
changed from or to 0 (the spin lock is changed to log_sys.lsn_lock or
vice versa).
This commit is contained in:
Marko Mäkelä 2024-03-22 12:29:01 +02:00
parent a2dd4c14a3
commit bf0b82d24b
12 changed files with 360 additions and 188 deletions

View file

@ -5320,9 +5320,10 @@ fail:
}
/* get current checkpoint_lsn */
{
log_sys.latch.wr_lock(SRW_LOCK_CALL);
mysql_mutex_lock(&recv_sys.mutex);
dberr_t err = recv_sys.find_checkpoint();
log_sys.latch.wr_unlock();
if (err != DB_SUCCESS) {
msg("Error: cannot read redo log header");

View file

@ -1027,6 +1027,18 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST NULL
READ_ONLY YES
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME INNODB_LOG_SPIN_WAIT_DELAY
SESSION_VALUE NULL
DEFAULT_VALUE 0
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE INT UNSIGNED
VARIABLE_COMMENT Delay between log buffer spin lock polls (0 to use a blocking latch)
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 6000
NUMERIC_BLOCK_SIZE 0
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT OPTIONAL
VARIABLE_NAME INNODB_LRU_FLUSH_SIZE
SESSION_VALUE NULL
DEFAULT_VALUE 32

View file

@ -71,7 +71,7 @@ ADD_FEATURE_INFO(INNODB_ROOT_GUESS WITH_INNODB_ROOT_GUESS
OPTION(WITH_INNODB_EXTRA_DEBUG "Enable extra InnoDB debug checks" OFF)
IF(WITH_INNODB_EXTRA_DEBUG)
ADD_DEFINITIONS(-DUNIV_ZIP_DEBUG)
ADD_DEFINITIONS(-DUNIV_ZIP_DEBUG -DLOG_LATCH_DEBUG)
ENDIF()
ADD_FEATURE_INFO(INNODB_EXTRA_DEBUG WITH_INNODB_EXTRA_DEBUG "Extra InnoDB debug checks")

View file

@ -1915,7 +1915,7 @@ inline void log_t::write_checkpoint(lsn_t end_lsn) noexcept
{
my_munmap(buf, file_size);
buf= resize_buf;
buf_free= START_OFFSET + (get_lsn() - resizing);
set_buf_free(START_OFFSET + (get_lsn() - resizing));
}
else
#endif
@ -1957,9 +1957,7 @@ inline void log_t::write_checkpoint(lsn_t end_lsn) noexcept
static bool log_checkpoint_low(lsn_t oldest_lsn, lsn_t end_lsn)
{
ut_ad(!srv_read_only_mode);
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
ut_ad(log_sys.latch_have_wr());
ut_ad(oldest_lsn <= end_lsn);
ut_ad(end_lsn == log_sys.get_lsn());

View file

@ -927,9 +927,7 @@ bool fil_space_free(uint32_t id, bool x_latched)
log_sys.latch.wr_unlock();
} else {
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
ut_ad(log_sys.latch_have_wr());
if (space->max_lsn) {
ut_d(space->max_lsn = 0);
fil_system.named_spaces.remove(*space);
@ -3036,9 +3034,7 @@ void
fil_names_dirty(
fil_space_t* space)
{
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
ut_ad(log_sys.latch_have_wr());
ut_ad(recv_recovery_is_on());
ut_ad(log_sys.get_lsn() != 0);
ut_ad(space->max_lsn == 0);
@ -3052,9 +3048,7 @@ fil_names_dirty(
tablespace was modified for the first time since fil_names_clear(). */
ATTRIBUTE_NOINLINE ATTRIBUTE_COLD void mtr_t::name_write()
{
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
ut_ad(log_sys.latch_have_wr());
ut_d(fil_space_validate_for_mtr_commit(m_user_space));
ut_ad(!m_user_space->max_lsn);
m_user_space->max_lsn= log_sys.get_lsn();
@ -3078,9 +3072,7 @@ ATTRIBUTE_COLD lsn_t fil_names_clear(lsn_t lsn)
{
mtr_t mtr;
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
ut_ad(log_sys.latch_have_wr());
ut_ad(lsn);
ut_ad(log_sys.is_latest());

View file

@ -18478,6 +18478,24 @@ static void innodb_log_file_size_update(THD *thd, st_mysql_sys_var*,
mysql_mutex_lock(&LOCK_global_system_variables);
}
static void innodb_log_spin_wait_delay_update(THD *thd, st_mysql_sys_var*,
void *var, const void *save)
{
ut_ad(var == &mtr_t::spin_wait_delay);
unsigned delay= *static_cast<const unsigned*>(save);
if (!delay != !mtr_t::spin_wait_delay)
{
log_sys.latch.wr_lock(SRW_LOCK_CALL);
mtr_t::spin_wait_delay= delay;
mtr_t::finisher_update();
log_sys.latch.wr_unlock();
}
else
mtr_t::spin_wait_delay= delay;
}
/** Update innodb_status_output or innodb_status_output_locks,
which control InnoDB "status monitor" output to the error log.
@param[out] var current value
@ -19312,6 +19330,12 @@ static MYSQL_SYSVAR_ULONGLONG(log_file_size, srv_log_file_size,
nullptr, innodb_log_file_size_update,
96 << 20, 4 << 20, std::numeric_limits<ulonglong>::max(), 4096);
static MYSQL_SYSVAR_UINT(log_spin_wait_delay, mtr_t::spin_wait_delay,
PLUGIN_VAR_OPCMDARG,
"Delay between log buffer spin lock polls (0 to use a blocking latch)",
nullptr, innodb_log_spin_wait_delay_update,
0, 0, 6000, 0);
static MYSQL_SYSVAR_UINT(old_blocks_pct, innobase_old_blocks_pct,
PLUGIN_VAR_RQCMDARG,
"Percentage of the buffer pool to reserve for 'old' blocks.",
@ -19771,6 +19795,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(log_file_buffering),
#endif
MYSQL_SYSVAR(log_file_size),
MYSQL_SYSVAR(log_spin_wait_delay),
MYSQL_SYSVAR(log_group_home_dir),
MYSQL_SYSVAR(max_dirty_pages_pct),
MYSQL_SYSVAR(max_dirty_pages_pct_lwm),

View file

@ -57,11 +57,7 @@ public:
/**
Gets the number of used bytes in a block.
@return number of bytes used */
ulint used() const
MY_ATTRIBUTE((warn_unused_result))
{
return(static_cast<ulint>(m_used & ~DYN_BLOCK_FULL_FLAG));
}
uint32_t used() const { return m_used; }
/**
Gets pointer to the start of data.

View file

@ -165,60 +165,92 @@ struct log_t
static constexpr lsn_t FIRST_LSN= START_OFFSET;
private:
/** The log sequence number of the last change of durable InnoDB files */
/** the lock bit in buf_free */
static constexpr size_t buf_free_LOCK= ~(~size_t{0} >> 1);
alignas(CPU_LEVEL1_DCACHE_LINESIZE)
/** first free offset within buf used;
the most significant bit is set by lock_lsn() to protect this field
as well as write_to_buf, waits */
std::atomic<size_t> buf_free;
public:
/** number of write requests (to buf); protected by lock_lsn() or lsn_lock */
size_t write_to_buf;
/** log record buffer, written to by mtr_t::commit() */
byte *buf;
private:
/** The log sequence number of the last change of durable InnoDB files;
protected by lock_lsn() or lsn_lock or latch.wr_lock() */
std::atomic<lsn_t> lsn;
/** the first guaranteed-durable log sequence number */
std::atomic<lsn_t> flushed_to_disk_lsn;
/** log sequence number when log resizing was initiated, or 0 */
std::atomic<lsn_t> resize_lsn;
/** set when there may be need to initiate a log checkpoint.
This must hold if lsn - last_checkpoint_lsn > max_checkpoint_age. */
std::atomic<bool> need_checkpoint;
public:
/** number of append_prepare_wait(); protected by lock_lsn() or lsn_lock */
size_t waits;
/** innodb_log_buffer_size (size of buf,flush_buf if !is_pmem(), in bytes) */
size_t buf_size;
/** log file size in bytes, including the header */
lsn_t file_size;
#if defined(__aarch64__)
/* On ARM, we do more spinning */
#ifdef LOG_LATCH_DEBUG
typedef srw_lock_debug log_rwlock;
typedef srw_mutex log_lsn_lock;
bool latch_have_wr() const { return latch.have_wr(); }
bool latch_have_rd() const { return latch.have_rd(); }
bool latch_have_any() const { return latch.have_any(); }
#else
# ifndef UNIV_DEBUG
# elif defined SUX_LOCK_GENERIC
bool latch_have_wr() const { return true; }
bool latch_have_rd() const { return true; }
bool latch_have_any() const { return true; }
# else
bool latch_have_wr() const { return latch.is_write_locked(); }
bool latch_have_rd() const { return latch.is_locked(); }
bool latch_have_any() const { return latch.is_locked(); }
# endif
# ifdef __aarch64__
/* On ARM, we spin more */
typedef srw_spin_lock log_rwlock;
typedef pthread_mutex_wrapper<true> log_lsn_lock;
#else
# else
typedef srw_lock log_rwlock;
typedef srw_mutex log_lsn_lock;
# endif
#endif
public:
/** rw-lock protecting writes to buf; normal mtr_t::commit()
outside any log checkpoint is covered by a shared latch */
/** exclusive latch for checkpoint, shared for mtr_t::commit() to buf */
alignas(CPU_LEVEL1_DCACHE_LINESIZE) log_rwlock latch;
private:
/** mutex protecting buf_free et al, together with latch */
log_lsn_lock lsn_lock;
public:
/** first free offset within buf use; protected by lsn_lock */
Atomic_relaxed<size_t> buf_free;
/** number of write requests (to buf); protected by lsn_lock */
size_t write_to_buf;
/** number of append_prepare_wait(); protected by lsn_lock */
size_t waits;
private:
/** Last written LSN */
lsn_t write_lsn;
public:
/** log record buffer, written to by mtr_t::commit() */
byte *buf;
/** buffer for writing data to ib_logfile0, or nullptr if is_pmem()
In write_buf(), buf and flush_buf are swapped */
byte *flush_buf;
/** number of std::swap(buf, flush_buf) and writes from buf to log;
protected by latch.wr_lock() */
ulint write_to_log;
/** Last written LSN */
lsn_t write_lsn;
/** recommended maximum buf_free size, after which the buffer is flushed */
size_t max_buf_free;
/** buffer for writing data to ib_logfile0, or nullptr if is_pmem()
In write_buf(), buf and flush_buf are swapped */
byte *flush_buf;
/** set when there may be need to initiate a log checkpoint.
This must hold if lsn - last_checkpoint_lsn > max_checkpoint_age. */
std::atomic<bool> need_checkpoint;
/** whether a checkpoint is pending; protected by latch.wr_lock() */
Atomic_relaxed<bool> checkpoint_pending;
/** Log sequence number when a log file overwrite (broken crash recovery)
was noticed. Protected by latch.wr_lock(). */
lsn_t overwrite_warned;
/** innodb_log_buffer_size (size of buf,flush_buf if !is_pmem(), in bytes) */
size_t buf_size;
/** latest completed checkpoint (protected by latch.wr_lock()) */
Atomic_relaxed<lsn_t> last_checkpoint_lsn;
/** next checkpoint LSN (protected by latch.wr_lock()) */
lsn_t next_checkpoint_lsn;
/** next checkpoint number (protected by latch.wr_lock()) */
ulint next_checkpoint_no;
/** Log file */
log_file_t log;
private:
/** Log file being constructed during resizing; protected by latch */
log_file_t resize_log;
@ -229,18 +261,14 @@ private:
/** Buffer for writing to resize_log; @see flush_buf */
byte *resize_flush_buf;
void init_lsn_lock() {lsn_lock.init(); }
void lock_lsn() { lsn_lock.wr_lock(); }
void unlock_lsn() {lsn_lock.wr_unlock(); }
void destroy_lsn_lock() { lsn_lock.destroy(); }
/** Special implementation of lock_lsn() for IA-32 and AMD64 */
void lsn_lock_bts() noexcept;
/** Acquire a lock for updating buf_free and related fields.
@return the value of buf_free */
size_t lock_lsn() noexcept;
public:
/** recommended maximum size of buf, after which the buffer is flushed */
size_t max_buf_free;
/** log file size in bytes, including the header */
lsn_t file_size;
private:
/** log sequence number when log resizing was initiated, or 0 */
std::atomic<lsn_t> resize_lsn;
/** the log sequence number at the start of the log file */
lsn_t first_lsn;
#if defined __linux__ || defined _WIN32
@ -250,8 +278,6 @@ private:
public:
/** format of the redo log: e.g., FORMAT_10_8 */
uint32_t format;
/** Log file */
log_file_t log;
#if defined __linux__ || defined _WIN32
/** whether file system caching is enabled for the log */
my_bool log_buffered;
@ -279,21 +305,28 @@ public:
/*!< this is the maximum allowed value
for lsn - last_checkpoint_lsn when a
new query step is started */
/** latest completed checkpoint (protected by latch.wr_lock()) */
Atomic_relaxed<lsn_t> last_checkpoint_lsn;
/** next checkpoint LSN (protected by log_sys.latch) */
lsn_t next_checkpoint_lsn;
/** next checkpoint number (protected by latch.wr_lock()) */
ulint next_checkpoint_no;
/** whether a checkpoint is pending */
Atomic_relaxed<bool> checkpoint_pending;
/** buffer for checkpoint header */
byte *checkpoint_buf;
/* @} */
private:
/** A lock when the spin-only lock_lsn() is not being used */
log_lsn_lock lsn_lock;
public:
bool is_initialised() const noexcept { return max_buf_free != 0; }
/** whether there is capacity in the log buffer */
bool buf_free_ok() const noexcept
{
return (buf_free.load(std::memory_order_relaxed) & ~buf_free_LOCK) <
max_buf_free;
}
void set_buf_free(size_t f) noexcept
{ ut_ad(f < buf_free_LOCK); buf_free.store(f, std::memory_order_relaxed); }
#ifdef HAVE_PMEM
bool is_pmem() const noexcept { return !flush_buf; }
#else
@ -302,7 +335,7 @@ public:
bool is_opened() const noexcept { return log.is_opened(); }
/** @return target write LSN to react on buf_free >= max_buf_free */
/** @return target write LSN to react on !buf_free_ok() */
inline lsn_t get_write_target() const;
/** @return LSN at which log resizing was started and is still in progress
@ -402,9 +435,7 @@ public:
void set_recovered_lsn(lsn_t lsn) noexcept
{
#ifndef SUX_LOCK_GENERIC
ut_ad(latch.is_write_locked());
#endif /* SUX_LOCK_GENERIC */
ut_ad(latch_have_wr());
write_lsn= lsn;
this->lsn.store(lsn, std::memory_order_relaxed);
flushed_to_disk_lsn.store(lsn, std::memory_order_relaxed);
@ -444,17 +475,23 @@ public:
private:
/** Wait in append_prepare() for buffer to become available
@param lsn log sequence number to write up to
@param ex whether log_sys.latch is exclusively locked */
ATTRIBUTE_COLD void append_prepare_wait(lsn_t lsn, bool ex) noexcept;
@tparam spin whether to use the spin-only lock_lsn()
@param b the value of buf_free
@param ex whether log_sys.latch is exclusively locked
@param lsn log sequence number to write up to
@return the new value of buf_free */
template<bool spin>
ATTRIBUTE_COLD size_t append_prepare_wait(size_t b, bool ex, lsn_t lsn)
noexcept;
public:
/** Reserve space in the log buffer for appending data.
@tparam spin whether to use the spin-only lock_lsn()
@tparam pmem log_sys.is_pmem()
@param size total length of the data to append(), in bytes
@param ex whether log_sys.latch is exclusively locked
@return the start LSN and the buffer position for append() */
template<bool pmem>
inline std::pair<lsn_t,byte*> append_prepare(size_t size, bool ex) noexcept;
template<bool spin,bool pmem>
std::pair<lsn_t,byte*> append_prepare(size_t size, bool ex) noexcept;
/** Append a string of bytes to the redo log.
@param d destination
@ -462,9 +499,7 @@ public:
@param size length of str, in bytes */
void append(byte *&d, const void *s, size_t size) noexcept
{
#ifndef SUX_LOCK_GENERIC
ut_ad(latch.is_locked());
#endif
ut_ad(latch_have_any());
ut_ad(d + size <= buf + (is_pmem() ? file_size : buf_size));
memcpy(d, s, size);
d+= size;

View file

@ -700,9 +700,27 @@ private:
std::pair<lsn_t,page_flush_ahead> do_write();
/** Append the redo log records to the redo log buffer.
@tparam spin whether to use the spin-only log_sys.lock_lsn()
@tparam pmem log_sys.is_pmem()
@param mtr mini-transaction
@param len number of bytes to write
@return {start_lsn,flush_ahead} */
std::pair<lsn_t,page_flush_ahead> finish_write(size_t len);
template<bool spin,bool pmem> static
std::pair<lsn_t,page_flush_ahead> finish_writer(mtr_t *mtr, size_t len);
/** The applicable variant of finish_writer() */
static std::pair<lsn_t,page_flush_ahead> (*finisher)(mtr_t *, size_t);
std::pair<lsn_t,page_flush_ahead> finish_write(size_t len)
{ return finisher(this, len); }
public:
/** Poll interval in log_sys.lock_lsn(); 0 to use log_sys.lsn_lock.
Protected by LOCK_global_system_variables; changes to and from 0
are additionally protected by exclusive log_sys.latch. */
static unsigned spin_wait_delay;
/** Update finisher when spin_wait_delay is changing to or from 0. */
static void finisher_update();
private:
/** Release all latches. */
void release();

View file

@ -69,9 +69,7 @@ log_t log_sys;
void log_t::set_capacity()
{
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
ut_ad(log_sys.latch_have_wr());
/* Margin for the free space in the smallest log, before a new query
step which modifies the database, is started */
@ -134,7 +132,6 @@ bool log_t::create()
#endif
latch.SRW_LOCK_INIT(log_latch_key);
init_lsn_lock();
last_checkpoint_lsn= FIRST_LSN;
log_capacity= 0;
@ -143,7 +140,7 @@ bool log_t::create()
next_checkpoint_lsn= 0;
checkpoint_pending= false;
buf_free= 0;
set_buf_free(0);
ut_ad(is_initialised());
#ifndef HAVE_PMEM
@ -244,6 +241,7 @@ void log_t::attach_low(log_file_t file, os_offset_t size)
# endif
log_maybe_unbuffered= true;
log_buffered= false;
mtr_t::finisher_update();
return true;
}
}
@ -278,6 +276,7 @@ void log_t::attach_low(log_file_t file, os_offset_t size)
block_size);
#endif
mtr_t::finisher_update();
#ifdef HAVE_PMEM
checkpoint_buf= static_cast<byte*>(aligned_malloc(block_size, block_size));
memset_aligned<64>(checkpoint_buf, 0, block_size);
@ -313,9 +312,7 @@ void log_t::header_write(byte *buf, lsn_t lsn, bool encrypted)
void log_t::create(lsn_t lsn) noexcept
{
#ifndef SUX_LOCK_GENERIC
ut_ad(latch.is_write_locked());
#endif
ut_ad(latch_have_wr());
ut_ad(!recv_no_log_write);
ut_ad(is_latest());
ut_ad(this == &log_sys);
@ -332,12 +329,12 @@ void log_t::create(lsn_t lsn) noexcept
{
mprotect(buf, size_t(file_size), PROT_READ | PROT_WRITE);
memset_aligned<4096>(buf, 0, 4096);
buf_free= START_OFFSET;
set_buf_free(START_OFFSET);
}
else
#endif
{
buf_free= 0;
set_buf_free(0);
memset_aligned<4096>(flush_buf, 0, buf_size);
memset_aligned<4096>(buf, 0, buf_size);
}
@ -813,9 +810,7 @@ ATTRIBUTE_COLD void log_t::resize_write_buf(size_t length) noexcept
@return the current log sequence number */
template<bool release_latch> inline lsn_t log_t::write_buf() noexcept
{
#ifndef SUX_LOCK_GENERIC
ut_ad(latch.is_write_locked());
#endif
ut_ad(latch_have_wr());
ut_ad(!is_pmem());
ut_ad(!srv_read_only_mode);
@ -931,7 +926,7 @@ wait and check if an already running write is covering the request.
void log_write_up_to(lsn_t lsn, bool durable,
const completion_callback *callback)
{
ut_ad(!srv_read_only_mode || (log_sys.buf_free < log_sys.max_buf_free));
ut_ad(!srv_read_only_mode || log_sys.buf_free_ok());
ut_ad(lsn != LSN_MAX);
ut_ad(lsn != 0);
@ -1292,6 +1287,7 @@ log_print(
void log_t::close()
{
ut_ad(this == &log_sys);
ut_ad(!(buf_free & buf_free_LOCK));
if (!is_initialised()) return;
close_file();
@ -1309,7 +1305,6 @@ void log_t::close()
#endif
latch.destroy();
destroy_lsn_lock();
recv_sys.close();

View file

@ -2518,11 +2518,9 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse(source &l, bool if_exists)
noexcept
{
restart:
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked() ||
ut_ad(log_sys.latch_have_wr() ||
srv_operation == SRV_OPERATION_BACKUP ||
srv_operation == SRV_OPERATION_BACKUP_NO_DEFER);
#endif
mysql_mutex_assert_owner(&mutex);
ut_ad(log_sys.next_checkpoint_lsn);
ut_ad(log_sys.is_latest());
@ -4050,9 +4048,7 @@ static bool recv_scan_log(bool last_phase)
lsn_t rewound_lsn= 0;
for (ut_d(lsn_t source_offset= 0);;)
{
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
ut_ad(log_sys.latch_have_wr());
#ifdef UNIV_DEBUG
const bool wrap{source_offset + recv_sys.len == log_sys.file_size};
#endif
@ -4447,9 +4443,7 @@ recv_init_crash_recovery_spaces(bool rescan, bool& missing_tablespace)
static dberr_t recv_rename_files()
{
mysql_mutex_assert_owner(&recv_sys.mutex);
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
ut_ad(log_sys.latch_have_wr());
dberr_t err= DB_SUCCESS;
@ -4732,7 +4726,7 @@ err_exit:
PROT_READ | PROT_WRITE);
#endif
}
log_sys.buf_free = recv_sys.offset;
log_sys.set_buf_free(recv_sys.offset);
if (recv_needed_recovery
&& srv_operation <= SRV_OPERATION_EXPORT_RESTORED) {
/* Write a FILE_CHECKPOINT marker as the first thing,

View file

@ -37,6 +37,24 @@ Created 11/26/1995 Heikki Tuuri
#include "srv0start.h"
#include "log.h"
#include "mariadb_stats.h"
#include "my_cpu.h"
std::pair<lsn_t,mtr_t::page_flush_ahead> (*mtr_t::finisher)(mtr_t *, size_t);
unsigned mtr_t::spin_wait_delay;
void mtr_t::finisher_update()
{
ut_ad(log_sys.latch_have_wr());
finisher=
#ifdef HAVE_PMEM
log_sys.is_pmem()
? (spin_wait_delay
? mtr_t::finish_writer<true,true> : mtr_t::finish_writer<false,true>)
:
#endif
(spin_wait_delay
? mtr_t::finish_writer<true,false> : mtr_t::finish_writer<false,false>);
}
void mtr_memo_slot_t::release() const
{
@ -82,9 +100,7 @@ void mtr_memo_slot_t::release() const
inline buf_page_t *buf_pool_t::prepare_insert_into_flush_list(lsn_t lsn)
noexcept
{
#ifndef SUX_LOCK_GENERIC
ut_ad(recv_recovery_is_on() || log_sys.latch.is_locked());
#endif
ut_ad(recv_recovery_is_on() || log_sys.latch_have_any());
ut_ad(lsn >= log_sys.last_checkpoint_lsn);
mysql_mutex_assert_owner(&flush_list_mutex);
static_assert(log_t::FIRST_LSN >= 2, "compatibility");
@ -317,10 +333,8 @@ void mtr_t::release()
inline lsn_t log_t::get_write_target() const
{
#ifndef SUX_LOCK_GENERIC
ut_ad(latch.is_locked());
#endif
if (UNIV_LIKELY(buf_free < max_buf_free))
ut_ad(latch_have_any());
if (UNIV_LIKELY(buf_free_ok()))
return 0;
ut_ad(!is_pmem());
/* The LSN corresponding to the end of buf is
@ -559,9 +573,7 @@ void mtr_t::commit_shrink(fil_space_t &space, uint32_t size)
/* Durably write the reduced FSP_SIZE before truncating the data file. */
log_write_and_flush();
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
ut_ad(log_sys.latch_have_wr());
os_file_truncate(space.chain.start->name, space.chain.start->handle,
os_offset_t{size} << srv_page_size_shift, true);
@ -718,9 +730,7 @@ This is to be used at log_checkpoint().
@return current LSN */
ATTRIBUTE_COLD lsn_t mtr_t::commit_files(lsn_t checkpoint_lsn)
{
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
ut_ad(log_sys.latch_have_wr());
ut_ad(is_active());
ut_ad(!is_inside_ibuf());
ut_ad(m_log_mode == MTR_LOG_ALL);
@ -875,13 +885,111 @@ ATTRIBUTE_COLD static void log_overwrite_warning(lsn_t lsn)
? ". Shutdown is in progress" : "");
}
/** Wait in append_prepare() for buffer to become available
@param lsn log sequence number to write up to
@param ex whether log_sys.latch is exclusively locked */
ATTRIBUTE_COLD void log_t::append_prepare_wait(lsn_t lsn, bool ex) noexcept
static ATTRIBUTE_NOINLINE void lsn_delay(size_t delay, size_t mult) noexcept
{
delay*= mult * 2; // GCC 13.2.0 -O2 targeting AMD64 wants to unroll twice
HMT_low();
do
MY_RELAX_CPU();
while (--delay)
HMT_medium();
}
#if defined __clang_major__ && __clang_major__ < 10
/* Only clang-10 introduced support for asm goto */
#elif defined __APPLE__
/* At least some versions of Apple Xcode do not support asm goto */
#elif defined __GNUC__ && (defined __i386__ || defined __x86_64__)
# if SIZEOF_SIZE_T == 8
# define LOCK_TSET \
__asm__ goto("lock btsq $63, %0\n\t" "jnc %l1" \
: : "m"(buf_free) : "cc", "memory" : got)
# else
# define LOCK_TSET \
__asm__ goto("lock btsl $31, %0\n\t" "jnc %l1" \
: : "m"(buf_free) : "cc", "memory" : got)
# endif
#elif defined _MSC_VER && (defined _M_IX86 || defined _M_X64)
# if SIZEOF_SIZE_T == 8
# define LOCK_TSET \
if (!_interlockedbittestandset64 \
(reinterpret_cast<volatile LONG64*>(&buf_free), 63)) return
# else
# define LOCK_TSET \
if (!_interlockedbittestandset \
(reinterpret_cast<volatile long*>(&buf_free), 31)) return
# endif
#endif
#ifdef LOCK_TSET
ATTRIBUTE_NOINLINE
void log_t::lsn_lock_bts() noexcept
{
LOCK_TSET;
{
const size_t m= mtr_t::spin_wait_delay;
constexpr size_t DELAY= 10, MAX_ITERATIONS= 10;
for (size_t delay_count= DELAY, delay_iterations= 1;;
lsn_delay(delay_iterations, m))
{
if (!(buf_free.load(std::memory_order_relaxed) & buf_free_LOCK))
LOCK_TSET;
if (!delay_count);
else if (delay_iterations < MAX_ITERATIONS)
delay_count= DELAY, delay_iterations++;
else
delay_count--;
}
}
# ifdef __GNUC__
got:
return;
# endif
}
inline
#else
ATTRIBUTE_NOINLINE
#endif
size_t log_t::lock_lsn() noexcept
{
#ifdef LOCK_TSET
lsn_lock_bts();
return ~buf_free_LOCK & buf_free.load(std::memory_order_relaxed);
# undef LOCK_TSET
#else
size_t b= buf_free.fetch_or(buf_free_LOCK, std::memory_order_acquire);
if (b & buf_free_LOCK)
{
const size_t m= my_cpu_relax_multiplier * srv_spin_wait_delay / 32;
constexpr size_t DELAY= 10, MAX_ITERATIONS= 10;
for (size_t delay_count= DELAY, delay_iterations= 1;
((b= buf_free.load(std::memory_order_relaxed)) & buf_free_LOCK) ||
(buf_free_LOCK & (b= buf_free.fetch_or(buf_free_LOCK,
std::memory_order_acquire)));
lsn_delay(delay_iterations, m))
if (!delay_count);
else if (delay_iterations < MAX_ITERATIONS)
delay_count= DELAY, delay_iterations++;
else
delay_count--;
}
return b;
#endif
}
template<bool spin>
ATTRIBUTE_COLD size_t log_t::append_prepare_wait(size_t b, bool ex, lsn_t lsn)
noexcept
{
waits++;
unlock_lsn();
ut_ad(buf_free.load(std::memory_order_relaxed) ==
(spin ? (b | buf_free_LOCK) : b));
if (spin)
buf_free.store(b, std::memory_order_release);
else
lsn_lock.wr_unlock();
if (ex)
latch.wr_unlock();
@ -895,51 +1003,57 @@ ATTRIBUTE_COLD void log_t::append_prepare_wait(lsn_t lsn, bool ex) noexcept
else
latch.rd_lock(SRW_LOCK_CALL);
lock_lsn();
if (spin)
return lock_lsn();
lsn_lock.wr_lock();
return buf_free.load(std::memory_order_relaxed);
}
/** Reserve space in the log buffer for appending data.
@tparam spin whether to use the spin-only lock_lsn()
@tparam pmem log_sys.is_pmem()
@param size total length of the data to append(), in bytes
@param ex whether log_sys.latch is exclusively locked
@return the start LSN and the buffer position for append() */
template<bool pmem>
template<bool spin,bool pmem>
inline
std::pair<lsn_t,byte*> log_t::append_prepare(size_t size, bool ex) noexcept
{
#ifndef SUX_LOCK_GENERIC
ut_ad(latch.is_locked());
# ifndef _WIN32 // there is no accurate is_write_locked() on SRWLOCK
ut_ad(ex == latch.is_write_locked());
# endif
#endif
ut_ad(ex ? latch_have_wr() : latch_have_rd());
ut_ad(pmem == is_pmem());
lock_lsn();
if (!spin)
lsn_lock.wr_lock();
size_t b{spin ? lock_lsn() : buf_free.load(std::memory_order_relaxed)};
write_to_buf++;
const lsn_t l{lsn.load(std::memory_order_relaxed)}, end_lsn{l + size};
size_t b{buf_free};
if (UNIV_UNLIKELY(pmem
? (end_lsn -
get_flushed_lsn(std::memory_order_relaxed)) > capacity()
: b + size >= buf_size))
{
append_prepare_wait(l, ex);
b= buf_free;
}
b= append_prepare_wait<spin>(b, ex, l);
lsn.store(end_lsn, std::memory_order_relaxed);
size_t new_buf_free= b + size;
if (pmem && new_buf_free >= file_size)
new_buf_free-= size_t(capacity());
buf_free= new_buf_free;
unlock_lsn();
lsn.store(end_lsn, std::memory_order_relaxed);
if (UNIV_UNLIKELY(end_lsn >= last_checkpoint_lsn + log_capacity))
set_check_for_checkpoint();
set_check_for_checkpoint(true);
return {l, &buf[b]};
byte *our_buf= buf;
if (spin)
buf_free.store(new_buf_free, std::memory_order_release);
else
{
buf_free.store(new_buf_free, std::memory_order_relaxed);
lsn_lock.wr_unlock();
}
return {l, our_buf + b};
}
/** Finish appending data to the log.
@ -947,9 +1061,7 @@ std::pair<lsn_t,byte*> log_t::append_prepare(size_t size, bool ex) noexcept
@return whether buf_flush_ahead() will have to be invoked */
static mtr_t::page_flush_ahead log_close(lsn_t lsn) noexcept
{
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_locked());
#endif
ut_ad(log_sys.latch_have_any());
const lsn_t checkpoint_age= lsn - log_sys.last_checkpoint_lsn;
@ -1014,9 +1126,7 @@ std::pair<lsn_t,mtr_t::page_flush_ahead> mtr_t::do_write()
ut_ad(!recv_no_log_write);
ut_ad(is_logged());
ut_ad(m_log.size());
#ifndef SUX_LOCK_GENERIC
ut_ad(!m_latch_ex || log_sys.latch.is_write_locked());
#endif
ut_ad(!m_latch_ex || log_sys.latch_have_wr());
#ifndef DBUG_OFF
do
@ -1074,9 +1184,7 @@ func_exit:
inline void log_t::resize_write(lsn_t lsn, const byte *end, size_t len,
size_t seq) noexcept
{
#ifndef SUX_LOCK_GENERIC
ut_ad(latch.is_locked());
#endif
ut_ad(latch_have_any());
if (UNIV_LIKELY_NULL(resize_buf))
{
@ -1181,50 +1289,46 @@ inline void log_t::resize_write(lsn_t lsn, const byte *end, size_t len,
}
}
template<bool spin,bool pmem>
std::pair<lsn_t,mtr_t::page_flush_ahead>
mtr_t::finish_write(size_t len)
mtr_t::finish_writer(mtr_t *mtr, size_t len)
{
ut_ad(!recv_no_log_write);
ut_ad(is_logged());
#ifndef SUX_LOCK_GENERIC
# ifndef _WIN32 // there is no accurate is_write_locked() on SRWLOCK
ut_ad(m_latch_ex == log_sys.latch.is_write_locked());
# endif
#endif
ut_ad(mtr->is_logged());
ut_ad(mtr->m_latch_ex ? log_sys.latch_have_wr() : log_sys.latch_have_rd());
const size_t size{m_commit_lsn ? 5U + 8U : 5U};
std::pair<lsn_t, byte*> start;
const size_t size{mtr->m_commit_lsn ? 5U + 8U : 5U};
std::pair<lsn_t, byte*> start=
log_sys.append_prepare<spin,pmem>(len, mtr->m_latch_ex);
if (!log_sys.is_pmem())
if (!pmem)
{
start= log_sys.append_prepare<false>(len, m_latch_ex);
m_log.for_each_block([&start](const mtr_buf_t::block_t *b)
mtr->m_log.for_each_block([&start](const mtr_buf_t::block_t *b)
{ log_sys.append(start.second, b->begin(), b->used()); return true; });
#ifdef HAVE_PMEM
write_trailer:
#endif
*start.second++= log_sys.get_sequence_bit(start.first + len - size);
if (m_commit_lsn)
if (mtr->m_commit_lsn)
{
mach_write_to_8(start.second, m_commit_lsn);
m_crc= my_crc32c(m_crc, start.second, 8);
mach_write_to_8(start.second, mtr->m_commit_lsn);
mtr->m_crc= my_crc32c(mtr->m_crc, start.second, 8);
start.second+= 8;
}
mach_write_to_4(start.second, m_crc);
mach_write_to_4(start.second, mtr->m_crc);
start.second+= 4;
}
#ifdef HAVE_PMEM
else
{
start= log_sys.append_prepare<true>(len, m_latch_ex);
if (UNIV_LIKELY(start.second + len <= &log_sys.buf[log_sys.file_size]))
{
m_log.for_each_block([&start](const mtr_buf_t::block_t *b)
mtr->m_log.for_each_block([&start](const mtr_buf_t::block_t *b)
{ log_sys.append(start.second, b->begin(), b->used()); return true; });
goto write_trailer;
}
m_log.for_each_block([&start](const mtr_buf_t::block_t *b)
mtr->m_log.for_each_block([&start](const mtr_buf_t::block_t *b)
{
size_t size{b->used()};
const size_t size_left(&log_sys.buf[log_sys.file_size] - start.second);
@ -1247,14 +1351,14 @@ mtr_t::finish_write(size_t len)
byte tail[5 + 8];
tail[0]= log_sys.get_sequence_bit(start.first + len - size);
if (m_commit_lsn)
if (mtr->m_commit_lsn)
{
mach_write_to_8(tail + 1, m_commit_lsn);
m_crc= my_crc32c(m_crc, tail + 1, 8);
mach_write_to_4(tail + 9, m_crc);
mach_write_to_8(tail + 1, mtr->m_commit_lsn);
mtr->m_crc= my_crc32c(mtr->m_crc, tail + 1, 8);
mach_write_to_4(tail + 9, mtr->m_crc);
}
else
mach_write_to_4(tail + 1, m_crc);
mach_write_to_4(tail + 1, mtr->m_crc);
::memcpy(start.second, tail, size_left);
::memcpy(log_sys.buf + log_sys.START_OFFSET, tail + size_left,
@ -1263,12 +1367,14 @@ mtr_t::finish_write(size_t len)
((size >= size_left) ? log_sys.START_OFFSET : log_sys.file_size) +
(size - size_left);
}
#else
static_assert(!pmem, "");
#endif
log_sys.resize_write(start.first, start.second, len, size);
m_commit_lsn= start.first + len;
return {start.first, log_close(m_commit_lsn)};
mtr->m_commit_lsn= start.first + len;
return {start.first, log_close(mtr->m_commit_lsn)};
}
bool mtr_t::have_x_latch(const buf_block_t &block) const