/* Copyright (C) 2019, 2020, MariaDB Corporation. This program is free software; you can redistribute itand /or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ #pragma once #include /* unique_ptr */ #include #include #include #include #ifdef LINUX_NATIVE_AIO #include #endif #ifdef _WIN32 #ifndef NOMINMAX #define NOMINMAX #endif #include #include /** Windows-specific native file handle struct. Apart from the actual handle, contains PTP_IO used by the Windows threadpool. */ struct native_file_handle { HANDLE m_handle; PTP_IO m_ptp_io; native_file_handle(){}; native_file_handle(HANDLE h) : m_handle(h), m_ptp_io() {} operator HANDLE() const { return m_handle; } }; #else #include typedef int native_file_handle; #endif namespace tpool { /** Task callback function */ typedef void (*callback_func)(void *); typedef void (*callback_func_np)(void); class task; /** A class that can be used e.g. for restricting concurrency for specific class of tasks. */ class task_group { private: circular_queue m_queue; std::mutex m_mtx; std::condition_variable m_cv; unsigned int m_tasks_running; unsigned int m_max_concurrent_tasks; public: task_group(unsigned int max_concurrency= 100000); void set_max_tasks(unsigned int max_concurrent_tasks); void execute(task* t); void cancel_pending(task *t); ~task_group(); }; class task { public: callback_func m_func; void *m_arg; task_group* m_group; virtual void add_ref() {}; virtual void release() {}; task() {}; task(callback_func func, void* arg, task_group* group = nullptr); void* get_arg() { return m_arg; } callback_func get_func() { return m_func; } virtual void execute(); virtual ~task() {} }; class waitable_task :public task { std::mutex m_mtx; std::condition_variable m_cv; int m_ref_count; int m_waiter_count; callback_func m_original_func; void wait(std::unique_lock&lk); public: waitable_task(callback_func func, void* arg, task_group* group = nullptr); void add_ref() override; void release() override; TPOOL_SUPPRESS_TSAN bool is_running() { return get_ref_count() > 0; } TPOOL_SUPPRESS_TSAN int get_ref_count() {return m_ref_count;} void wait(); void disable(); void enable(); virtual ~waitable_task() {}; }; enum class aio_opcode { AIO_PREAD, AIO_PWRITE }; constexpr size_t MAX_AIO_USERDATA_LEN= 3 * sizeof(void*); /** IO control block, includes parameters for the IO, and the callback*/ struct aiocb #ifdef _WIN32 :OVERLAPPED #elif defined LINUX_NATIVE_AIO :iocb #endif { native_file_handle m_fh; aio_opcode m_opcode; unsigned long long m_offset; void *m_buffer; unsigned int m_len; callback_func m_callback; task_group* m_group; /* Returned length and error code*/ size_t m_ret_len; int m_err; void *m_internal; task m_internal_task; alignas(8) char m_userdata[MAX_AIO_USERDATA_LEN]; aiocb() : m_internal_task(nullptr, nullptr) {} void execute_callback() { task t(m_callback, this,m_group); t.execute(); } }; /** AIO interface */ class aio { public: /** Submit asynchronous IO. On completion, cb->m_callback is executed. */ virtual int submit_io(aiocb *cb)= 0; /** "Bind" file to AIO handler (used on Windows only) */ virtual int bind(native_file_handle &fd)= 0; /** "Unind" file to AIO handler (used on Windows only) */ virtual int unbind(const native_file_handle &fd)= 0; virtual ~aio(){}; protected: static void synchronous(aiocb *cb); /** finish a partial read/write callback synchronously */ static void finish_synchronous(aiocb *cb); }; class timer { public: virtual void set_time(int initial_delay_ms, int period_ms) = 0; virtual void disarm() = 0; virtual ~timer(){} }; class thread_pool; extern aio *create_simulated_aio(thread_pool *tp); #ifndef DBUG_OFF /* This function is useful for debugging to make sure all mutexes are released inside a task callback */ void set_after_task_callback(callback_func_np cb); void execute_after_task_callback(); #define dbug_execute_after_task_callback() execute_after_task_callback() #else #define dbug_execute_after_task_callback() do{}while(0) #endif class thread_pool { protected: /* AIO handler */ std::unique_ptr m_aio; virtual aio *create_native_aio(int max_io)= 0; public: /** Functions to be called at worker thread start/end can be used for example to set some TLS variables */ void (*m_worker_init_callback)(void)= [] {}; void (*m_worker_destroy_callback)(void)= [] {}; thread_pool() : m_aio() { } virtual void submit_task(task *t)= 0; virtual timer* create_timer(callback_func func, void *data=nullptr) = 0; void set_thread_callbacks(void (*init)(), void (*destroy)()) { assert(init); assert(destroy); m_worker_init_callback= init; m_worker_destroy_callback= destroy; } int configure_aio(bool use_native_aio, int max_io) { if (use_native_aio) m_aio.reset(create_native_aio(max_io)); else m_aio.reset(create_simulated_aio(this)); return !m_aio ? -1 : 0; } void disable_aio() { m_aio.reset(); } int bind(native_file_handle &fd) { return m_aio->bind(fd); } void unbind(const native_file_handle &fd) { if (m_aio) m_aio->unbind(fd); } int submit_io(aiocb *cb) { return m_aio->submit_io(cb); } virtual void wait_begin() {}; virtual void wait_end() {}; virtual ~thread_pool() {} }; const int DEFAULT_MIN_POOL_THREADS= 1; const int DEFAULT_MAX_POOL_THREADS= 500; extern thread_pool * create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS, int max_threads= DEFAULT_MAX_POOL_THREADS); extern "C" void tpool_wait_begin(); extern "C" void tpool_wait_end(); #ifdef _WIN32 extern thread_pool * create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS, int max_threads= DEFAULT_MAX_POOL_THREADS); /* Helper functions, to execute pread/pwrite even if file is opened with FILE_FLAG_OVERLAPPED, and bound to completion port. */ SSIZE_T pwrite(const native_file_handle &h, void *buf, size_t count, unsigned long long offset); SSIZE_T pread(const native_file_handle &h, void *buf, size_t count, unsigned long long offset); HANDLE win_get_syncio_event(); #endif } // namespace tpool