mariadb/tpool/tpool.h

323 lines
8.2 KiB
C
Raw Normal View History

/* Copyright (C) 2019, 2021, MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#pragma once
#include <memory> /* unique_ptr */
#include <condition_variable>
#include <mutex>
#include <atomic>
#include <tpool_structs.h>
#ifdef LINUX_NATIVE_AIO
#include <libaio.h>
#endif
#ifdef HAVE_URING
#include <sys/uio.h>
#endif
#ifdef _WIN32
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <windows.h>
#include <cassert>
/**
Windows-specific native file handle struct.
Apart from the actual handle, contains PTP_IO
used by the Windows threadpool.
*/
struct native_file_handle
{
HANDLE m_handle;
PTP_IO m_ptp_io;
native_file_handle(){};
native_file_handle(HANDLE h) : m_handle(h), m_ptp_io() {}
operator HANDLE() const { return m_handle; }
};
#else
#include <unistd.h>
typedef int native_file_handle;
#endif
namespace tpool
{
/**
Task callback function
*/
typedef void (*callback_func)(void *);
typedef void (*callback_func_np)(void);
class task;
struct group_stats
{
/** Current number of running tasks*/
size_t tasks_running;
/** Current number of tasks in the queue*/
size_t queue_size;
/** Total number of tasks executed */
unsigned long long total_tasks_executed;
/** Total number of tasks enqueued */
unsigned long long total_tasks_enqueued;
};
2019-11-15 19:55:13 +02:00
/** A class that can be used e.g. for
restricting concurrency for specific class of tasks. */
class task_group
{
private:
circular_queue<task*> m_queue;
std::mutex m_mtx;
std::condition_variable m_cv;
unsigned long long m_total_tasks;
unsigned long long m_total_enqueues;
unsigned int m_tasks_running;
unsigned int m_max_concurrent_tasks;
const bool m_enable_task_release;
public:
task_group(unsigned int max_concurrency= 100000, bool m_enable_task_release= true);
void set_max_tasks(unsigned int max_concurrent_tasks);
void execute(task* t);
void cancel_pending(task *t);
2024-01-23 16:06:37 +01:00
void get_stats(group_stats *stats);
~task_group();
};
class task
{
public:
callback_func m_func;
void *m_arg;
task_group* m_group;
virtual void add_ref() {};
virtual void release() {};
task() {};
task(callback_func func, void* arg, task_group* group = nullptr);
void* get_arg() { return m_arg; }
callback_func get_func() { return m_func; }
virtual void execute();
virtual ~task() {}
};
class waitable_task :public task
{
std::mutex m_mtx;
std::condition_variable m_cv;
int m_ref_count;
int m_waiter_count;
callback_func m_original_func;
void wait(std::unique_lock<std::mutex>&lk);
public:
waitable_task(callback_func func, void* arg, task_group* group = nullptr);
void add_ref() override;
void release() override;
TPOOL_SUPPRESS_TSAN bool is_running() { return get_ref_count() > 0; }
TPOOL_SUPPRESS_TSAN int get_ref_count() {return m_ref_count;}
void wait();
void disable();
void enable();
virtual ~waitable_task() {};
};
enum class aio_opcode
{
AIO_PREAD,
AIO_PWRITE
};
constexpr size_t MAX_AIO_USERDATA_LEN= 4 * sizeof(void*);
/** IO control block, includes parameters for the IO, and the callback*/
struct aiocb
#ifdef _WIN32
:OVERLAPPED
#elif defined LINUX_NATIVE_AIO
:iocb
#elif defined HAVE_URING
:iovec
#endif
{
native_file_handle m_fh;
aio_opcode m_opcode;
unsigned long long m_offset;
void *m_buffer;
unsigned int m_len;
callback_func m_callback;
task_group* m_group;
/* Returned length and error code*/
size_t m_ret_len;
int m_err;
void *m_internal;
task m_internal_task;
alignas(8) char m_userdata[MAX_AIO_USERDATA_LEN];
aiocb() : m_internal_task(nullptr, nullptr)
{}
void execute_callback()
{
task t(m_callback, this,m_group);
t.execute();
}
};
/**
AIO interface
*/
class aio
{
public:
/**
Submit asynchronous IO.
On completion, cb->m_callback is executed.
*/
virtual int submit_io(aiocb *cb)= 0;
/** "Bind" file to AIO handler (used on Windows only) */
virtual int bind(native_file_handle &fd)= 0;
/** "Unind" file to AIO handler (used on Windows only) */
virtual int unbind(const native_file_handle &fd)= 0;
virtual ~aio(){};
protected:
static void synchronous(aiocb *cb);
/** finish a partial read/write callback synchronously */
static inline void finish_synchronous(aiocb *cb)
{
if (!cb->m_err && cb->m_ret_len != cb->m_len)
{
/* partial read/write */
cb->m_buffer= (char *) cb->m_buffer + cb->m_ret_len;
cb->m_len-= (unsigned int) cb->m_ret_len;
cb->m_offset+= cb->m_ret_len;
synchronous(cb);
}
}
};
class timer
{
public:
virtual void set_time(int initial_delay_ms, int period_ms) = 0;
virtual void disarm() = 0;
virtual ~timer(){}
};
class thread_pool;
extern aio *create_simulated_aio(thread_pool *tp);
class thread_pool
{
protected:
/* AIO handler */
std::unique_ptr<aio> m_aio;
virtual aio *create_native_aio(int max_io)= 0;
public:
/**
Functions to be called at worker thread start/end
can be used for example to set some TLS variables
*/
void (*m_worker_init_callback)(void)= [] {};
void (*m_worker_destroy_callback)(void)= [] {};
thread_pool()
: m_aio()
{
}
virtual void submit_task(task *t)= 0;
virtual timer* create_timer(callback_func func, void *data=nullptr) = 0;
void set_thread_callbacks(void (*init)(), void (*destroy)())
{
assert(init);
assert(destroy);
m_worker_init_callback= init;
m_worker_destroy_callback= destroy;
}
int configure_aio(bool use_native_aio, int max_io)
{
if (use_native_aio)
m_aio.reset(create_native_aio(max_io));
else
m_aio.reset(create_simulated_aio(this));
return !m_aio ? -1 : 0;
}
int reconfigure_aio(bool use_native_aio, int max_io)
{
assert(m_aio);
if (use_native_aio)
{
auto new_aio = create_native_aio(max_io);
if (!new_aio)
return -1;
m_aio.reset(new_aio);
}
return 0;
}
void disable_aio()
{
m_aio.reset();
}
/**
Tweaks how fast worker threads are created, or how often they are signaled.
@param threads - desired number of concurrently active threads
Special value 0 means default. Not the same as max number of threads
in the pool - oversubscription is allowed and stalls are still detected
@note
It is designed to use with "batch" operations, where huge number
of tasks is submitted in rapid succession. In this case, it is
better to temporarily restrict concurrency, which will make thread
creation throttling more aggressive.
Once the batch is over, restore default concurrency
by calling set_concurrency(0).
*/
virtual void set_concurrency(unsigned int threads=0){}
int bind(native_file_handle &fd) { return m_aio->bind(fd); }
MDEV-15053 Reduce buf_pool_t::mutex contention User-visible changes: The INFORMATION_SCHEMA views INNODB_BUFFER_PAGE and INNODB_BUFFER_PAGE_LRU will report a dummy value FLUSH_TYPE=0 and will no longer report the PAGE_STATE value READY_FOR_USE. We will remove some fields from buf_page_t and move much code to member functions of buf_pool_t and buf_page_t, so that the access rules of data members can be enforced consistently. Evicting or adding pages in buf_pool.LRU will remain covered by buf_pool.mutex. Evicting or adding pages in buf_pool.page_hash will remain covered by both buf_pool.mutex and the buf_pool.page_hash X-latch. After this fix, buf_pool.page_hash lookups can entirely avoid acquiring buf_pool.mutex, only relying on buf_pool.hash_lock_get() S-latch. Similarly, buf_flush_check_neighbors() can will rely solely on buf_pool.mutex, no buf_pool.page_hash latch at all. The buf_pool.mutex is rather contended in I/O heavy benchmarks, especially when the workload does not fit in the buffer pool. The first attempt to alleviate the contention was the buf_pool_t::mutex split in commit 4ed7082eefe56b3e97e0edefb3df76dd7ef5e858 which introduced buf_block_t::mutex, which we are now removing. Later, multiple instances of buf_pool_t were introduced in commit c18084f71b02ea707c6461353e6cfc15d7553bc6 and recently removed by us in commit 1a6f708ec594ac0ae2dd30db926ab07b100fa24b (MDEV-15058). UNIV_BUF_DEBUG: Remove. This option to enable some buffer pool related debugging in otherwise non-debug builds has not been used for years. Instead, we have been using UNIV_DEBUG, which is enabled in CMAKE_BUILD_TYPE=Debug. buf_block_t::mutex, buf_pool_t::zip_mutex: Remove. We can mainly rely on std::atomic and the buf_pool.page_hash latches, and in some cases depend on buf_pool.mutex or buf_pool.flush_list_mutex just like before. We must always release buf_block_t::lock before invoking unfix() or io_unfix(), to prevent a glitch where a block that was added to the buf_pool.free list would apper X-latched. See commit c5883debd6ef440a037011c11873b396923e93c5 how this glitch was finally caught in a debug environment. We move some buf_pool_t::page_hash specific code from the ha and hash modules to buf_pool, for improved readability. buf_pool_t::close(): Assert that all blocks are clean, except on aborted startup or crash-like shutdown. buf_pool_t::validate(): No longer attempt to validate n_flush[] against the number of BUF_IO_WRITE fixed blocks, because buf_page_t::flush_type no longer exists. buf_pool_t::watch_set(): Replaces buf_pool_watch_set(). Reduce mutex contention by separating the buf_pool.watch[] allocation and the insert into buf_pool.page_hash. buf_pool_t::page_hash_lock<bool exclusive>(): Acquire a buf_pool.page_hash latch. Replaces and extends buf_page_hash_lock_s_confirm() and buf_page_hash_lock_x_confirm(). buf_pool_t::READ_AHEAD_PAGES: Renamed from BUF_READ_AHEAD_PAGES. buf_pool_t::curr_size, old_size, read_ahead_area, n_pend_reads: Use Atomic_counter. buf_pool_t::running_out(): Replaces buf_LRU_buf_pool_running_out(). buf_pool_t::LRU_remove(): Remove a block from the LRU list and return its predecessor. Incorporates buf_LRU_adjust_hp(), which was removed. buf_page_get_gen(): Remove a redundant call of fsp_is_system_temporary(), for mode == BUF_GET_IF_IN_POOL_OR_WATCH, which is only used by BTR_DELETE_OP (purge), which is never invoked on temporary tables. buf_free_from_unzip_LRU_list_batch(): Avoid redundant assignments. buf_LRU_free_from_unzip_LRU_list(): Simplify the loop condition. buf_LRU_free_page(): Clarify the function comment. buf_flush_check_neighbor(), buf_flush_check_neighbors(): Rewrite the construction of the page hash range. We will hold the buf_pool.mutex for up to buf_pool.read_ahead_area (at most 64) consecutive lookups of buf_pool.page_hash. buf_flush_page_and_try_neighbors(): Remove. Merge to its only callers, and remove redundant operations in buf_flush_LRU_list_batch(). buf_read_ahead_random(), buf_read_ahead_linear(): Rewrite. Do not acquire buf_pool.mutex, and iterate directly with page_id_t. ut_2_power_up(): Remove. my_round_up_to_next_power() is inlined and avoids any loops. fil_page_get_prev(), fil_page_get_next(), fil_addr_is_null(): Remove. buf_flush_page(): Add a fil_space_t* parameter. Minimize the buf_pool.mutex hold time. buf_pool.n_flush[] is no longer updated atomically with the io_fix, and we will protect most buf_block_t fields with buf_block_t::lock. The function buf_flush_write_block_low() is removed and merged here. buf_page_init_for_read(): Use static linkage. Initialize the newly allocated block and acquire the exclusive buf_block_t::lock while not holding any mutex. IORequest::IORequest(): Remove the body. We only need to invoke set_punch_hole() in buf_flush_page() and nowhere else. buf_page_t::flush_type: Remove. Replaced by IORequest::flush_type. This field is only used during a fil_io() call. That function already takes IORequest as a parameter, so we had better introduce for the rarely changing field. buf_block_t::init(): Replaces buf_page_init(). buf_page_t::init(): Replaces buf_page_init_low(). buf_block_t::initialise(): Initialise many fields, but keep the buf_page_t::state(). Both buf_pool_t::validate() and buf_page_optimistic_get() requires that buf_page_t::in_file() be protected atomically with buf_page_t::in_page_hash and buf_page_t::in_LRU_list. buf_page_optimistic_get(): Now that buf_block_t::mutex no longer exists, we must check buf_page_t::io_fix() after acquiring the buf_pool.page_hash lock, to detect whether buf_page_init_for_read() has been initiated. We will also check the io_fix() before acquiring hash_lock in order to avoid unnecessary computation. The field buf_block_t::modify_clock (protected by buf_block_t::lock) allows buf_page_optimistic_get() to validate the block. buf_page_t::real_size: Remove. It was only used while flushing pages of page_compressed tables. buf_page_encrypt(): Add an output parameter that allows us ot eliminate buf_page_t::real_size. Replace a condition with debug assertion. buf_page_should_punch_hole(): Remove. buf_dblwr_t::add_to_batch(): Replaces buf_dblwr_add_to_batch(). Add the parameter size (to replace buf_page_t::real_size). buf_dblwr_t::write_single_page(): Replaces buf_dblwr_write_single_page(). Add the parameter size (to replace buf_page_t::real_size). fil_system_t::detach(): Replaces fil_space_detach(). Ensure that fil_validate() will not be violated even if fil_system.mutex is released and reacquired. fil_node_t::complete_io(): Renamed from fil_node_complete_io(). fil_node_t::close_to_free(): Replaces fil_node_close_to_free(). Avoid invoking fil_node_t::close() because fil_system.n_open has already been decremented in fil_space_t::detach(). BUF_BLOCK_READY_FOR_USE: Remove. Directly use BUF_BLOCK_MEMORY. BUF_BLOCK_ZIP_DIRTY: Remove. Directly use BUF_BLOCK_ZIP_PAGE, and distinguish dirty pages by buf_page_t::oldest_modification(). BUF_BLOCK_POOL_WATCH: Remove. Use BUF_BLOCK_NOT_USED instead. This state was only being used for buf_page_t that are in buf_pool.watch. buf_pool_t::watch[]: Remove pointer indirection. buf_page_t::in_flush_list: Remove. It was set if and only if buf_page_t::oldest_modification() is nonzero. buf_page_decrypt_after_read(), buf_corrupt_page_release(), buf_page_check_corrupt(): Change the const fil_space_t* parameter to const fil_node_t& so that we can report the correct file name. buf_page_monitor(): Declare as an ATTRIBUTE_COLD global function. buf_page_io_complete(): Split to buf_page_read_complete() and buf_page_write_complete(). buf_dblwr_t::in_use: Remove. buf_dblwr_t::buf_block_array: Add IORequest::flush_t. buf_dblwr_sync_datafiles(): Remove. It was a useless wrapper of os_aio_wait_until_no_pending_writes(). buf_flush_write_complete(): Declare static, not global. Add the parameter IORequest::flush_t. buf_flush_freed_page(): Simplify the code. recv_sys_t::flush_lru: Renamed from flush_type and changed to bool. fil_read(), fil_write(): Replaced with direct use of fil_io(). fil_buffering_disabled(): Remove. Check srv_file_flush_method directly. fil_mutex_enter_and_prepare_for_io(): Return the resolved fil_space_t* to avoid a duplicated lookup in the caller. fil_report_invalid_page_access(): Clean up the parameters. fil_io(): Return fil_io_t, which comprises fil_node_t and error code. Always invoke fil_space_t::acquire_for_io() and let either the sync=true caller or fil_aio_callback() invoke fil_space_t::release_for_io(). fil_aio_callback(): Rewrite to replace buf_page_io_complete(). fil_check_pending_operations(): Remove a parameter, and remove some redundant lookups. fil_node_close_to_free(): Wait for n_pending==0. Because we no longer do an extra lookup of the tablespace between fil_io() and the completion of the operation, we must give fil_node_t::complete_io() a chance to decrement the counter. fil_close_tablespace(): Remove unused parameter trx, and document that this is only invoked during the error handling of IMPORT TABLESPACE. row_import_discard_changes(): Merged with the only caller, row_import_cleanup(). Do not lock up the data dictionary while invoking fil_close_tablespace(). logs_empty_and_mark_files_at_shutdown(): Do not invoke fil_close_all_files(), to avoid a !needs_flush assertion failure on fil_node_t::close(). innodb_shutdown(): Invoke os_aio_free() before fil_close_all_files(). fil_close_all_files(): Invoke fil_flush_file_spaces() to ensure proper durability. thread_pool::unbind(): Fix a crash that would occur on Windows after srv_thread_pool->disable_aio() and os_file_close(). This fix was submitted by Vladislav Vaintroub. Thanks to Matthias Leich and Axel Schwenke for extensive testing, Vladislav Vaintroub for helpful comments, and Eugene Kosov for a review.
2020-06-05 12:35:46 +03:00
void unbind(const native_file_handle &fd) { if (m_aio) m_aio->unbind(fd); }
int submit_io(aiocb *cb) { return m_aio->submit_io(cb); }
virtual void wait_begin() {};
virtual void wait_end() {};
virtual ~thread_pool() {}
};
const int DEFAULT_MIN_POOL_THREADS= 1;
const int DEFAULT_MAX_POOL_THREADS= 500;
extern thread_pool *
create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS,
int max_threads= DEFAULT_MAX_POOL_THREADS);
extern "C" void tpool_wait_begin();
extern "C" void tpool_wait_end();
#ifdef _WIN32
extern thread_pool *
create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS,
int max_threads= DEFAULT_MAX_POOL_THREADS);
/*
Helper functions, to execute pread/pwrite even if file is
opened with FILE_FLAG_OVERLAPPED, and bound to completion
port.
*/
SSIZE_T pwrite(const native_file_handle &h, const void *buf, size_t count,
unsigned long long offset);
SSIZE_T pread(const native_file_handle &h, void *buf, size_t count,
unsigned long long offset);
HANDLE win_get_syncio_event();
#endif
} // namespace tpool