mariadb/tpool/tpool_generic.cc
2025-01-09 09:41:38 +02:00

980 lines
24 KiB
C++

/* Copyright (C) 2019, 2022, MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h"
#include <limits.h>
#include <algorithm>
#include <assert.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <limits.h>
#include <mutex>
#include <queue>
#include <stack>
#include <thread>
#include <vector>
#include "tpool.h"
#include <assert.h>
#include <my_global.h>
#include <my_dbug.h>
#include <thr_timer.h>
#include <stdlib.h>
#include "aligned.h"
namespace tpool
{
#ifdef __linux__
#if defined(HAVE_URING) || defined(LINUX_NATIVE_AIO)
extern aio* create_linux_aio(thread_pool* tp, int max_io);
#else
aio *create_linux_aio(thread_pool *, int) { return nullptr; };
#endif
#endif
#ifdef _WIN32
extern aio* create_win_aio(thread_pool* tp, int max_io);
#endif
static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
static const int OVERSUBSCRIBE_FACTOR = 2;
/**
Process the cb synchronously
*/
void aio::synchronous(aiocb *cb)
{
#ifdef _WIN32
size_t ret_len;
#else
ssize_t ret_len;
#endif
int err= 0;
switch (cb->m_opcode)
{
case aio_opcode::AIO_PREAD:
ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
break;
case aio_opcode::AIO_PWRITE:
ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
break;
default:
abort();
}
#ifdef _WIN32
if (static_cast<int>(ret_len) < 0)
err= GetLastError();
#else
if (ret_len < 0)
{
err= errno;
ret_len= 0;
}
#endif
cb->m_ret_len = ret_len;
cb->m_err = err;
if (ret_len)
finish_synchronous(cb);
}
/**
Implementation of generic threadpool.
This threadpool consists of the following components
- The task queue. This queue is populated by submit()
- Worker that execute the work items.
- Timer thread that takes care of pool health
The task queue is populated by submit() method.
on submit(), a worker thread can be woken, or created
to execute tasks.
The timer thread watches if work items are being dequeued, and if not,
this can indicate potential deadlock.
Thus the timer thread can also wake or create a thread, to ensure some progress.
Optimizations:
- worker threads that are idle for long time will shutdown.
- worker threads are woken in LIFO order, which minimizes context switching
and also ensures that idle timeout works well. LIFO wakeup order ensures
that active threads stay active, and idle ones stay idle.
*/
/**
Worker wakeup flags.
*/
enum worker_wake_reason
{
WAKE_REASON_NONE,
WAKE_REASON_TASK,
WAKE_REASON_SHUTDOWN
};
/* A per-worker thread structure.*/
struct worker_data
{
/** Condition variable to wakeup this worker.*/
std::condition_variable m_cv;
/** Reason why worker was woken. */
worker_wake_reason m_wake_reason;
/**
If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute.
*/
task* m_task;
/** Struct is member of intrusive doubly linked list */
worker_data* m_prev;
worker_data* m_next;
/* Current state of the worker.*/
enum state
{
NONE = 0,
EXECUTING_TASK = 1,
LONG_TASK = 2,
WAITING = 4
};
int m_state;
/* Padding to avoid false sharing */
char m_pad[CPU_LEVEL1_DCACHE_LINESIZE];
bool is_executing_task()
{
return m_state & EXECUTING_TASK;
}
bool is_long_task()
{
return m_state & LONG_TASK;
}
bool is_waiting()
{
return m_state & WAITING;
}
std::chrono::system_clock::time_point m_task_start_time;
worker_data() :
m_cv(),
m_wake_reason(WAKE_REASON_NONE),
m_task(),
m_prev(),
m_next(),
m_state(NONE),
m_task_start_time()
{}
};
static thread_local worker_data* tls_worker_data;
class thread_pool_generic : public thread_pool
{
/** Cache for per-worker structures */
cache<worker_data> m_thread_data_cache;
/** The task queue */
circular_queue<task*> m_task_queue;
/** List of standby (idle) workers */
doubly_linked_list<worker_data> m_standby_threads;
/** List of threads that are executing tasks */
doubly_linked_list<worker_data> m_active_threads;
/* Mutex that protects the whole struct, most importantly
the standby threads list, and task queue */
std::mutex m_mtx;
/** Timeout after which idle worker shuts down */
std::chrono::milliseconds m_thread_timeout;
/** How often should timer wakeup.*/
std::chrono::milliseconds m_timer_interval;
/** Another condition variable, used in pool shutdown */
std::condition_variable m_cv_no_threads;
/** Condition variable for the timer thread. Signaled on shutdown. */
std::condition_variable m_cv_timer;
/** Overall number of enqueues*/
unsigned long long m_tasks_enqueued;
unsigned long long m_group_enqueued;
/** Overall number of dequeued tasks. */
unsigned long long m_tasks_dequeued;
/** Statistic related, number of worker thread wakeups */
int m_wakeups;
/**
Statistic related, number of spurious thread wakeups
(i.e thread woke up, and the task queue is empty)
*/
int m_spurious_wakeups;
/** The desired concurrency. This number of workers should be
actively executing. */
unsigned int m_concurrency;
/** True, if threadpool is being shutdown, false otherwise */
bool m_in_shutdown= false;
/** Maintenance timer state : true = active(ON),false = inactive(OFF)*/
enum class timer_state_t
{
OFF, ON
};
timer_state_t m_timer_state= timer_state_t::OFF;
void switch_timer(timer_state_t state);
/* Updates idle_since, and maybe switches the timer off */
void check_idle(std::chrono::system_clock::time_point now);
/** time point when timer last ran, used as a coarse clock. */
std::chrono::system_clock::time_point m_timestamp;
/** Number of long running tasks. The long running tasks are excluded when
adjusting concurrency */
unsigned int m_long_tasks_count;
unsigned int m_waiting_task_count;
/** Last time thread was created*/
std::chrono::system_clock::time_point m_last_thread_creation;
/** Minimumum number of threads in this pool.*/
unsigned int m_min_threads;
/** Maximimum number of threads in this pool. */
unsigned int m_max_threads;
/* maintenance related statistics (see maintenance()) */
size_t m_last_thread_count;
unsigned long long m_last_activity;
std::atomic_flag m_thread_creation_pending= ATOMIC_FLAG_INIT;
void worker_main(worker_data *thread_data);
void worker_end(worker_data* thread_data);
/* Checks threadpool responsiveness, adjusts thread_counts */
void maintenance();
static void maintenance_func(void* arg)
{
((thread_pool_generic *)arg)->maintenance();
}
bool add_thread();
bool wake(worker_wake_reason reason, task *t = nullptr);
void maybe_wake_or_create_thread();
bool too_many_active_threads();
bool get_task(worker_data *thread_var, task **t);
bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
worker_data *thread_var);
void cancel_pending(task* t);
size_t thread_count()
{
return m_active_threads.size() + m_standby_threads.size();
}
public:
thread_pool_generic(int min_threads, int max_threads);
~thread_pool_generic() override;
void wait_begin() override;
void wait_end() override;
void submit_task(task *task) override;
aio *create_native_aio(int max_io) override
{
#ifdef _WIN32
return create_win_aio(this, max_io);
#elif defined(__linux__)
return create_linux_aio(this,max_io);
#else
return nullptr;
#endif
}
class timer_generic : public thr_timer_t, public timer
{
thread_pool_generic* m_pool;
waitable_task m_task;
callback_func m_callback;
void* m_data;
int m_period;
std::mutex m_mtx;
bool m_on;
std::atomic<int> m_running;
void run()
{
/*
In rare cases, multiple callbacks can be scheduled,
at the same time,. e.g with set_time(0,0) in a loop.
We do not allow parallel execution, since it is against the expectations.
*/
if (m_running.fetch_add(1, std::memory_order_acquire) > 0)
return;
do
{
m_callback(m_data);
}
while (m_running.fetch_sub(1, std::memory_order_release) != 1);
if (m_pool && m_period)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (m_on)
{
DBUG_PUSH_EMPTY;
thr_timer_end(this);
thr_timer_settime(this, 1000ULL * m_period);
DBUG_POP_EMPTY;
}
}
}
static void execute(void* arg)
{
auto timer = (timer_generic*)arg;
timer->run();
}
static void submit_task(void* arg)
{
timer_generic* timer = (timer_generic*)arg;
timer->m_pool->submit_task(&timer->m_task);
}
public:
timer_generic(callback_func func, void* data, thread_pool_generic * pool):
m_pool(pool),
m_task(timer_generic::execute,this),
m_callback(func),m_data(data),m_period(0),m_mtx(),
m_on(true),m_running()
{
if (pool)
{
/* EXecute callback in threadpool*/
thr_timer_init(this, submit_task, this);
}
else
{
/* run in "timer" thread */
thr_timer_init(this, m_task.get_func(), m_task.get_arg());
}
}
void set_time(int initial_delay_ms, int period_ms) override
{
std::unique_lock<std::mutex> lk(m_mtx);
if (!m_on)
return;
thr_timer_end(this);
if (!m_pool)
thr_timer_set_period(this, 1000ULL * period_ms);
else
m_period = period_ms;
thr_timer_settime(this, 1000ULL * initial_delay_ms);
}
/*
Change only period of a periodic timer
(after the next execution). Workarounds
mysys timer deadlocks
*/
void set_period(int period_ms)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (!m_on)
return;
if (!m_pool)
thr_timer_set_period(this, 1000ULL * period_ms);
else
m_period = period_ms;
}
void disarm() override
{
std::unique_lock<std::mutex> lk(m_mtx);
m_on = false;
thr_timer_end(this);
lk.unlock();
if (m_task.m_group)
{
m_task.m_group->cancel_pending(&m_task);
}
if (m_pool)
{
m_pool->cancel_pending(&m_task);
}
m_task.wait();
}
~timer_generic() override
{
disarm();
}
};
timer_generic* m_maintenance_timer=nullptr;
timer* create_timer(callback_func func, void *data) override
{
return new timer_generic(func, data, this);
}
void set_concurrency(unsigned int concurrency=0) override;
};
void thread_pool_generic::cancel_pending(task* t)
{
std::unique_lock <std::mutex> lk(m_mtx);
for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++)
{
if (*it == t)
{
t->release();
*it = nullptr;
}
}
}
/**
Register worker in standby list, and wait to be woken.
@retval true if thread was woken
@retval false idle wait timeout exceeded (the current thread must shutdown)
*/
bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk,
worker_data *thread_data)
{
assert(m_task_queue.empty());
assert(!m_in_shutdown);
thread_data->m_wake_reason= WAKE_REASON_NONE;
m_active_threads.erase(thread_data);
m_standby_threads.push_back(thread_data);
for (;;)
{
thread_data->m_cv.wait_for(lk, m_thread_timeout);
if (thread_data->m_wake_reason != WAKE_REASON_NONE)
{
/* Woke up not due to timeout.*/
return true;
}
if (thread_count() <= m_min_threads)
{
/* Do not shutdown thread, maintain required minimum of worker
threads.*/
continue;
}
/*
Woke up due to timeout, remove this thread's from the standby list. In
all other cases where it is signaled it is removed by the signaling
thread.
*/
m_standby_threads.erase(thread_data);
m_active_threads.push_back(thread_data);
return false;
}
}
/**
Workers "get next task" routine.
A task can be handed over to the current thread directly during submit().
if thread_var->m_wake_reason == WAKE_REASON_TASK.
Or a task can be taken from the task queue.
In case task queue is empty, the worker thread will park (wait for wakeup).
*/
bool thread_pool_generic::get_task(worker_data *thread_var, task **t)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (thread_var->is_long_task())
{
DBUG_ASSERT(m_long_tasks_count);
m_long_tasks_count--;
}
DBUG_ASSERT(!thread_var->is_waiting());
thread_var->m_state = worker_data::NONE;
while (m_task_queue.empty())
{
if (m_in_shutdown)
return false;
if (!wait_for_tasks(lk, thread_var))
return false;
if (m_task_queue.empty())
{
m_spurious_wakeups++;
continue;
}
}
/* Dequeue from the task queue.*/
*t= m_task_queue.front();
m_task_queue.pop();
m_tasks_dequeued++;
thread_var->m_state |= worker_data::EXECUTING_TASK;
thread_var->m_task_start_time = m_timestamp;
return true;
}
/** Worker thread shutdown routine. */
void thread_pool_generic::worker_end(worker_data* thread_data)
{
std::lock_guard<std::mutex> lk(m_mtx);
DBUG_ASSERT(!thread_data->is_long_task());
m_active_threads.erase(thread_data);
m_thread_data_cache.put(thread_data);
if (!thread_count() && m_in_shutdown)
{
/* Signal the destructor that no more threads are left. */
m_cv_no_threads.notify_all();
}
}
extern "C" void set_tls_pool(tpool::thread_pool* pool);
/* The worker get/execute task loop.*/
void thread_pool_generic::worker_main(worker_data *thread_var)
{
task* task;
set_tls_pool(this);
m_worker_init_callback();
tls_worker_data = thread_var;
m_thread_creation_pending.clear();
while (get_task(thread_var, &task) && task)
{
task->execute();
}
m_worker_destroy_callback();
worker_end(thread_var);
}
/*
Check if threadpool had been idle for a while
Switch off maintenance timer if it is in idle state
for too long.
Helper function, to be used inside maintenance callback,
before m_last_activity is updated
*/
static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
constexpr auto max_idle_time= std::chrono::minutes(1);
/* Time since maintenance timer had nothing to do */
static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
{
DBUG_ASSERT(m_task_queue.empty());
/*
We think that there is no activity, if there were at most 2 tasks
since last time, and there is a spare thread.
The 2 tasks (and not 0) is to account for some periodic timers.
*/
bool idle= m_standby_threads.m_count > 0;
if (!idle)
{
idle_since= invalid_timestamp;
return;
}
if (idle_since == invalid_timestamp)
{
idle_since= now;
return;
}
/* Switch timer off after 1 minute of idle time */
if (now - idle_since > max_idle_time && m_active_threads.empty())
{
idle_since= invalid_timestamp;
switch_timer(timer_state_t::OFF);
}
}
/*
Periodic job to fix thread count and concurrency,
in case of long tasks, etc
*/
void thread_pool_generic::maintenance()
{
/*
If pool is busy (i.e the its mutex is currently locked), we can
skip the maintenance task, some times, to lower mutex contention
*/
static int skip_counter;
const int MAX_SKIPS = 10;
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
if (skip_counter == MAX_SKIPS)
{
lk.lock();
}
else if (!lk.try_lock())
{
skip_counter++;
return;
}
skip_counter = 0;
m_timestamp = std::chrono::system_clock::now();
if (m_task_queue.empty())
{
check_idle(m_timestamp);
m_last_activity = m_tasks_dequeued + m_wakeups;
return;
}
m_long_tasks_count = 0;
for (auto thread_data = m_active_threads.front();
thread_data;
thread_data = thread_data->m_next)
{
if (thread_data->is_executing_task() &&
!thread_data->is_waiting() &&
(thread_data->is_long_task()
|| (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION)))
{
thread_data->m_state |= worker_data::LONG_TASK;
m_long_tasks_count++;
}
}
maybe_wake_or_create_thread();
size_t thread_cnt = (int)thread_count();
if (m_last_activity == m_tasks_dequeued + m_wakeups &&
m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
{
// no progress made since last iteration. create new
// thread
add_thread();
}
m_last_activity = m_tasks_dequeued + m_wakeups;
m_last_thread_count= thread_cnt;
}
/*
Heuristic used for thread creation throttling.
Returns interval in milliseconds between thread creation
(depending on number of threads already in the pool, and
desired concurrency level)
*/
static int throttling_interval_ms(size_t n_threads,size_t concurrency)
{
if (n_threads < concurrency*4)
return 0;
if (n_threads < concurrency*8)
return 50;
if (n_threads < concurrency*16)
return 100;
return 200;
}
/* Create a new worker.*/
bool thread_pool_generic::add_thread()
{
size_t n_threads = thread_count();
if (n_threads >= m_max_threads)
return false;
/*
Deadlock danger exists, so monitor pool health
with maintenance timer.
*/
switch_timer(timer_state_t::ON);
if (n_threads >= m_min_threads)
{
auto now = std::chrono::system_clock::now();
if (now - m_last_thread_creation <
std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency)))
{
/*
Throttle thread creation and wakeup deadlock detection timer,
if is it off.
*/
return false;
}
}
/* Check and set "thread creation pending" flag before creating the thread. We
reset the flag in thread_pool_generic::worker_main in new thread created. The
flag must be reset back in case we fail to create the thread. If this flag is
not reset all future attempt to create thread for this pool would not work as
we would return from here.
Do not use this flag for pool of fixed size.
(since they lack maintenence that would rectify the pool size, if it is too small)
*/
if (m_min_threads != m_max_threads)
{
if (m_thread_creation_pending.test_and_set())
return false;
}
worker_data *thread_data = m_thread_data_cache.get();
m_active_threads.push_back(thread_data);
try
{
std::thread thread(&thread_pool_generic::worker_main, this, thread_data);
m_last_thread_creation = std::chrono::system_clock::now();
thread.detach();
}
catch (std::system_error& e)
{
m_active_threads.erase(thread_data);
m_thread_data_cache.put(thread_data);
static bool warning_written;
if (!warning_written)
{
fprintf(stderr, "Warning : threadpool thread could not be created :%s,"
"current number of threads in pool %zu\n", e.what(), thread_count());
warning_written = true;
}
m_thread_creation_pending.clear();
return false;
}
return true;
}
/** Wake a standby thread, and hand the given task over to this thread. */
bool thread_pool_generic::wake(worker_wake_reason reason, task *)
{
assert(reason != WAKE_REASON_NONE);
if (m_standby_threads.empty())
return false;
auto var= m_standby_threads.back();
m_standby_threads.pop_back();
m_active_threads.push_back(var);
assert(var->m_wake_reason == WAKE_REASON_NONE);
var->m_wake_reason= reason;
var->m_cv.notify_one();
m_wakeups++;
return true;
}
thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
m_thread_data_cache(max_threads),
m_task_queue(10000),
m_standby_threads(),
m_active_threads(),
m_mtx(),
m_thread_timeout(std::chrono::milliseconds(60000)),
m_timer_interval(std::chrono::milliseconds(400)),
m_cv_no_threads(),
m_cv_timer(),
m_tasks_enqueued(),
m_tasks_dequeued(),
m_wakeups(),
m_spurious_wakeups(),
m_timer_state(timer_state_t::ON),
m_timestamp(),
m_long_tasks_count(),
m_waiting_task_count(),
m_last_thread_creation(),
m_min_threads(min_threads),
m_max_threads(max_threads),
m_last_thread_count(),
m_last_activity()
{
set_concurrency();
// start the timer
if (m_min_threads != m_max_threads)
{
m_maintenance_timer= new timer_generic(thread_pool_generic::maintenance_func, this, nullptr);
m_maintenance_timer->set_time(0, (int)m_timer_interval.count());
}
}
void thread_pool_generic::maybe_wake_or_create_thread()
{
if (m_task_queue.empty())
return;
DBUG_ASSERT(m_active_threads.size() >= static_cast<size_t>(m_long_tasks_count + m_waiting_task_count));
if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency)
return;
if (!m_standby_threads.empty())
{
wake(WAKE_REASON_TASK);
}
else
{
add_thread();
}
}
bool thread_pool_generic::too_many_active_threads()
{
return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count >
m_concurrency* OVERSUBSCRIBE_FACTOR;
}
void thread_pool_generic::set_concurrency(unsigned int concurrency)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (concurrency == 0)
concurrency= 2 * std::thread::hardware_concurrency();
m_concurrency = concurrency;
if (m_concurrency > m_max_threads)
m_concurrency = m_max_threads;
if (m_concurrency < m_min_threads)
m_concurrency = m_min_threads;
if (m_concurrency < 1)
m_concurrency = 1;
}
/** Submit a new task*/
void thread_pool_generic::submit_task(task* task)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (m_in_shutdown)
return;
task->add_ref();
m_tasks_enqueued++;
m_task_queue.push(task);
maybe_wake_or_create_thread();
}
/* Notify thread pool that current thread is going to wait */
void thread_pool_generic::wait_begin()
{
if (!tls_worker_data || tls_worker_data->is_long_task())
return;
std::unique_lock<std::mutex> lk(m_mtx);
if(tls_worker_data->is_long_task())
{
/*
Current task flag could have become "long-running"
while waiting for the lock, thus recheck.
*/
return;
}
DBUG_ASSERT(!tls_worker_data->is_waiting());
tls_worker_data->m_state |= worker_data::WAITING;
m_waiting_task_count++;
/* Maintain concurrency */
maybe_wake_or_create_thread();
}
void thread_pool_generic::wait_end()
{
if (tls_worker_data && tls_worker_data->is_waiting())
{
std::unique_lock<std::mutex> lk(m_mtx);
tls_worker_data->m_state &= ~worker_data::WAITING;
m_waiting_task_count--;
}
}
void thread_pool_generic::switch_timer(timer_state_t state)
{
if (m_timer_state == state)
return;
/*
We can't use timer::set_time, because mysys timers are deadlock
prone.
Instead, to switch off we increase the timer period
and decrease period to switch on.
This might introduce delays in thread creation when needed,
as period will only be changed when timer fires next time.
For this reason, we can't use very long periods for the "off" state.
*/
m_timer_state= state;
long long period= (state == timer_state_t::OFF) ?
m_timer_interval.count()*10: m_timer_interval.count();
if (m_maintenance_timer)
m_maintenance_timer->set_period((int)period);
}
/**
Wake up all workers, and wait until they are gone
Stop the timer.
*/
thread_pool_generic::~thread_pool_generic()
{
/*
Stop AIO early.
This is needed to prevent AIO completion thread
from calling submit_task() on an object that is being destroyed.
*/
m_aio.reset();
/* Also stop the maintanence task early. */
if (m_maintenance_timer)
m_maintenance_timer->disarm();
std::unique_lock<std::mutex> lk(m_mtx);
m_in_shutdown= true;
/* Wake up idle threads. */
while (wake(WAKE_REASON_SHUTDOWN))
{
}
while (thread_count())
{
m_cv_no_threads.wait(lk);
}
lk.unlock();
delete m_maintenance_timer;
}
thread_pool *create_thread_pool_generic(int min_threads, int max_threads)
{
return new thread_pool_generic(min_threads, max_threads);
}
} // namespace tpool