mirror of
https://github.com/MariaDB/server.git
synced 2025-01-15 19:42:28 +01:00
980 lines
24 KiB
C++
980 lines
24 KiB
C++
/* Copyright (C) 2019, 2022, MariaDB Corporation.
|
|
|
|
This program is free software; you can redistribute itand /or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
|
|
|
|
#include "tpool_structs.h"
|
|
#include <limits.h>
|
|
#include <algorithm>
|
|
#include <assert.h>
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <condition_variable>
|
|
#include <iostream>
|
|
#include <limits.h>
|
|
#include <mutex>
|
|
#include <queue>
|
|
#include <stack>
|
|
#include <thread>
|
|
#include <vector>
|
|
#include "tpool.h"
|
|
#include <assert.h>
|
|
#include <my_global.h>
|
|
#include <my_dbug.h>
|
|
#include <thr_timer.h>
|
|
#include <stdlib.h>
|
|
#include "aligned.h"
|
|
|
|
namespace tpool
|
|
{
|
|
|
|
#ifdef __linux__
|
|
#if defined(HAVE_URING) || defined(LINUX_NATIVE_AIO)
|
|
extern aio* create_linux_aio(thread_pool* tp, int max_io);
|
|
#else
|
|
aio *create_linux_aio(thread_pool *, int) { return nullptr; };
|
|
#endif
|
|
#endif
|
|
#ifdef _WIN32
|
|
extern aio* create_win_aio(thread_pool* tp, int max_io);
|
|
#endif
|
|
|
|
static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
|
|
static const int OVERSUBSCRIBE_FACTOR = 2;
|
|
|
|
/**
|
|
Process the cb synchronously
|
|
*/
|
|
void aio::synchronous(aiocb *cb)
|
|
{
|
|
#ifdef _WIN32
|
|
size_t ret_len;
|
|
#else
|
|
ssize_t ret_len;
|
|
#endif
|
|
int err= 0;
|
|
switch (cb->m_opcode)
|
|
{
|
|
case aio_opcode::AIO_PREAD:
|
|
ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
|
|
break;
|
|
case aio_opcode::AIO_PWRITE:
|
|
ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
|
|
break;
|
|
default:
|
|
abort();
|
|
}
|
|
#ifdef _WIN32
|
|
if (static_cast<int>(ret_len) < 0)
|
|
err= GetLastError();
|
|
#else
|
|
if (ret_len < 0)
|
|
{
|
|
err= errno;
|
|
ret_len= 0;
|
|
}
|
|
#endif
|
|
cb->m_ret_len = ret_len;
|
|
cb->m_err = err;
|
|
if (ret_len)
|
|
finish_synchronous(cb);
|
|
}
|
|
|
|
|
|
/**
|
|
Implementation of generic threadpool.
|
|
This threadpool consists of the following components
|
|
|
|
- The task queue. This queue is populated by submit()
|
|
- Worker that execute the work items.
|
|
- Timer thread that takes care of pool health
|
|
|
|
The task queue is populated by submit() method.
|
|
on submit(), a worker thread can be woken, or created
|
|
to execute tasks.
|
|
|
|
The timer thread watches if work items are being dequeued, and if not,
|
|
this can indicate potential deadlock.
|
|
Thus the timer thread can also wake or create a thread, to ensure some progress.
|
|
|
|
Optimizations:
|
|
|
|
- worker threads that are idle for long time will shutdown.
|
|
- worker threads are woken in LIFO order, which minimizes context switching
|
|
and also ensures that idle timeout works well. LIFO wakeup order ensures
|
|
that active threads stay active, and idle ones stay idle.
|
|
|
|
*/
|
|
|
|
/**
|
|
Worker wakeup flags.
|
|
*/
|
|
enum worker_wake_reason
|
|
{
|
|
WAKE_REASON_NONE,
|
|
WAKE_REASON_TASK,
|
|
WAKE_REASON_SHUTDOWN
|
|
};
|
|
|
|
|
|
|
|
/* A per-worker thread structure.*/
|
|
struct worker_data
|
|
{
|
|
/** Condition variable to wakeup this worker.*/
|
|
std::condition_variable m_cv;
|
|
|
|
/** Reason why worker was woken. */
|
|
worker_wake_reason m_wake_reason;
|
|
|
|
/**
|
|
If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute.
|
|
*/
|
|
task* m_task;
|
|
|
|
/** Struct is member of intrusive doubly linked list */
|
|
worker_data* m_prev;
|
|
worker_data* m_next;
|
|
|
|
/* Current state of the worker.*/
|
|
enum state
|
|
{
|
|
NONE = 0,
|
|
EXECUTING_TASK = 1,
|
|
LONG_TASK = 2,
|
|
WAITING = 4
|
|
};
|
|
|
|
int m_state;
|
|
/* Padding to avoid false sharing */
|
|
char m_pad[CPU_LEVEL1_DCACHE_LINESIZE];
|
|
|
|
bool is_executing_task()
|
|
{
|
|
return m_state & EXECUTING_TASK;
|
|
}
|
|
bool is_long_task()
|
|
{
|
|
return m_state & LONG_TASK;
|
|
}
|
|
bool is_waiting()
|
|
{
|
|
return m_state & WAITING;
|
|
}
|
|
std::chrono::system_clock::time_point m_task_start_time;
|
|
worker_data() :
|
|
m_cv(),
|
|
m_wake_reason(WAKE_REASON_NONE),
|
|
m_task(),
|
|
m_prev(),
|
|
m_next(),
|
|
m_state(NONE),
|
|
m_task_start_time()
|
|
{}
|
|
};
|
|
|
|
|
|
static thread_local worker_data* tls_worker_data;
|
|
|
|
class thread_pool_generic : public thread_pool
|
|
{
|
|
/** Cache for per-worker structures */
|
|
cache<worker_data> m_thread_data_cache;
|
|
|
|
/** The task queue */
|
|
circular_queue<task*> m_task_queue;
|
|
|
|
/** List of standby (idle) workers */
|
|
doubly_linked_list<worker_data> m_standby_threads;
|
|
|
|
/** List of threads that are executing tasks */
|
|
doubly_linked_list<worker_data> m_active_threads;
|
|
|
|
/* Mutex that protects the whole struct, most importantly
|
|
the standby threads list, and task queue */
|
|
std::mutex m_mtx;
|
|
|
|
/** Timeout after which idle worker shuts down */
|
|
std::chrono::milliseconds m_thread_timeout;
|
|
|
|
/** How often should timer wakeup.*/
|
|
std::chrono::milliseconds m_timer_interval;
|
|
|
|
/** Another condition variable, used in pool shutdown */
|
|
std::condition_variable m_cv_no_threads;
|
|
|
|
/** Condition variable for the timer thread. Signaled on shutdown. */
|
|
std::condition_variable m_cv_timer;
|
|
|
|
/** Overall number of enqueues*/
|
|
unsigned long long m_tasks_enqueued;
|
|
unsigned long long m_group_enqueued;
|
|
/** Overall number of dequeued tasks. */
|
|
unsigned long long m_tasks_dequeued;
|
|
|
|
/** Statistic related, number of worker thread wakeups */
|
|
int m_wakeups;
|
|
|
|
/**
|
|
Statistic related, number of spurious thread wakeups
|
|
(i.e thread woke up, and the task queue is empty)
|
|
*/
|
|
int m_spurious_wakeups;
|
|
|
|
/** The desired concurrency. This number of workers should be
|
|
actively executing. */
|
|
unsigned int m_concurrency;
|
|
|
|
/** True, if threadpool is being shutdown, false otherwise */
|
|
bool m_in_shutdown= false;
|
|
|
|
/** Maintenance timer state : true = active(ON),false = inactive(OFF)*/
|
|
enum class timer_state_t
|
|
{
|
|
OFF, ON
|
|
};
|
|
timer_state_t m_timer_state= timer_state_t::OFF;
|
|
void switch_timer(timer_state_t state);
|
|
|
|
/* Updates idle_since, and maybe switches the timer off */
|
|
void check_idle(std::chrono::system_clock::time_point now);
|
|
|
|
/** time point when timer last ran, used as a coarse clock. */
|
|
std::chrono::system_clock::time_point m_timestamp;
|
|
|
|
/** Number of long running tasks. The long running tasks are excluded when
|
|
adjusting concurrency */
|
|
unsigned int m_long_tasks_count;
|
|
|
|
unsigned int m_waiting_task_count;
|
|
|
|
/** Last time thread was created*/
|
|
std::chrono::system_clock::time_point m_last_thread_creation;
|
|
|
|
/** Minimumum number of threads in this pool.*/
|
|
unsigned int m_min_threads;
|
|
|
|
/** Maximimum number of threads in this pool. */
|
|
unsigned int m_max_threads;
|
|
|
|
/* maintenance related statistics (see maintenance()) */
|
|
size_t m_last_thread_count;
|
|
unsigned long long m_last_activity;
|
|
std::atomic_flag m_thread_creation_pending= ATOMIC_FLAG_INIT;
|
|
|
|
void worker_main(worker_data *thread_data);
|
|
void worker_end(worker_data* thread_data);
|
|
|
|
/* Checks threadpool responsiveness, adjusts thread_counts */
|
|
void maintenance();
|
|
static void maintenance_func(void* arg)
|
|
{
|
|
((thread_pool_generic *)arg)->maintenance();
|
|
}
|
|
bool add_thread();
|
|
bool wake(worker_wake_reason reason, task *t = nullptr);
|
|
void maybe_wake_or_create_thread();
|
|
bool too_many_active_threads();
|
|
bool get_task(worker_data *thread_var, task **t);
|
|
bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
|
|
worker_data *thread_var);
|
|
void cancel_pending(task* t);
|
|
|
|
size_t thread_count()
|
|
{
|
|
return m_active_threads.size() + m_standby_threads.size();
|
|
}
|
|
public:
|
|
thread_pool_generic(int min_threads, int max_threads);
|
|
~thread_pool_generic() override;
|
|
void wait_begin() override;
|
|
void wait_end() override;
|
|
void submit_task(task *task) override;
|
|
aio *create_native_aio(int max_io) override
|
|
{
|
|
#ifdef _WIN32
|
|
return create_win_aio(this, max_io);
|
|
#elif defined(__linux__)
|
|
return create_linux_aio(this,max_io);
|
|
#else
|
|
return nullptr;
|
|
#endif
|
|
}
|
|
|
|
class timer_generic : public thr_timer_t, public timer
|
|
{
|
|
thread_pool_generic* m_pool;
|
|
waitable_task m_task;
|
|
callback_func m_callback;
|
|
void* m_data;
|
|
int m_period;
|
|
std::mutex m_mtx;
|
|
bool m_on;
|
|
std::atomic<int> m_running;
|
|
|
|
void run()
|
|
{
|
|
/*
|
|
In rare cases, multiple callbacks can be scheduled,
|
|
at the same time,. e.g with set_time(0,0) in a loop.
|
|
We do not allow parallel execution, since it is against the expectations.
|
|
*/
|
|
if (m_running.fetch_add(1, std::memory_order_acquire) > 0)
|
|
return;
|
|
do
|
|
{
|
|
m_callback(m_data);
|
|
}
|
|
while (m_running.fetch_sub(1, std::memory_order_release) != 1);
|
|
|
|
if (m_pool && m_period)
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
if (m_on)
|
|
{
|
|
DBUG_PUSH_EMPTY;
|
|
thr_timer_end(this);
|
|
thr_timer_settime(this, 1000ULL * m_period);
|
|
DBUG_POP_EMPTY;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void execute(void* arg)
|
|
{
|
|
auto timer = (timer_generic*)arg;
|
|
timer->run();
|
|
}
|
|
|
|
static void submit_task(void* arg)
|
|
{
|
|
timer_generic* timer = (timer_generic*)arg;
|
|
timer->m_pool->submit_task(&timer->m_task);
|
|
}
|
|
|
|
public:
|
|
timer_generic(callback_func func, void* data, thread_pool_generic * pool):
|
|
m_pool(pool),
|
|
m_task(timer_generic::execute,this),
|
|
m_callback(func),m_data(data),m_period(0),m_mtx(),
|
|
m_on(true),m_running()
|
|
{
|
|
if (pool)
|
|
{
|
|
/* EXecute callback in threadpool*/
|
|
thr_timer_init(this, submit_task, this);
|
|
}
|
|
else
|
|
{
|
|
/* run in "timer" thread */
|
|
thr_timer_init(this, m_task.get_func(), m_task.get_arg());
|
|
}
|
|
}
|
|
|
|
void set_time(int initial_delay_ms, int period_ms) override
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
if (!m_on)
|
|
return;
|
|
thr_timer_end(this);
|
|
if (!m_pool)
|
|
thr_timer_set_period(this, 1000ULL * period_ms);
|
|
else
|
|
m_period = period_ms;
|
|
thr_timer_settime(this, 1000ULL * initial_delay_ms);
|
|
}
|
|
|
|
/*
|
|
Change only period of a periodic timer
|
|
(after the next execution). Workarounds
|
|
mysys timer deadlocks
|
|
*/
|
|
void set_period(int period_ms)
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
if (!m_on)
|
|
return;
|
|
if (!m_pool)
|
|
thr_timer_set_period(this, 1000ULL * period_ms);
|
|
else
|
|
m_period = period_ms;
|
|
}
|
|
|
|
void disarm() override
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
m_on = false;
|
|
thr_timer_end(this);
|
|
lk.unlock();
|
|
|
|
if (m_task.m_group)
|
|
{
|
|
m_task.m_group->cancel_pending(&m_task);
|
|
}
|
|
if (m_pool)
|
|
{
|
|
m_pool->cancel_pending(&m_task);
|
|
}
|
|
m_task.wait();
|
|
}
|
|
|
|
~timer_generic() override
|
|
{
|
|
disarm();
|
|
}
|
|
};
|
|
timer_generic* m_maintenance_timer=nullptr;
|
|
timer* create_timer(callback_func func, void *data) override
|
|
{
|
|
return new timer_generic(func, data, this);
|
|
}
|
|
void set_concurrency(unsigned int concurrency=0) override;
|
|
};
|
|
|
|
void thread_pool_generic::cancel_pending(task* t)
|
|
{
|
|
std::unique_lock <std::mutex> lk(m_mtx);
|
|
for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++)
|
|
{
|
|
if (*it == t)
|
|
{
|
|
t->release();
|
|
*it = nullptr;
|
|
}
|
|
}
|
|
}
|
|
/**
|
|
Register worker in standby list, and wait to be woken.
|
|
|
|
@retval true if thread was woken
|
|
@retval false idle wait timeout exceeded (the current thread must shutdown)
|
|
*/
|
|
bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk,
|
|
worker_data *thread_data)
|
|
{
|
|
assert(m_task_queue.empty());
|
|
assert(!m_in_shutdown);
|
|
|
|
thread_data->m_wake_reason= WAKE_REASON_NONE;
|
|
m_active_threads.erase(thread_data);
|
|
m_standby_threads.push_back(thread_data);
|
|
|
|
for (;;)
|
|
{
|
|
thread_data->m_cv.wait_for(lk, m_thread_timeout);
|
|
|
|
if (thread_data->m_wake_reason != WAKE_REASON_NONE)
|
|
{
|
|
/* Woke up not due to timeout.*/
|
|
return true;
|
|
}
|
|
|
|
if (thread_count() <= m_min_threads)
|
|
{
|
|
/* Do not shutdown thread, maintain required minimum of worker
|
|
threads.*/
|
|
continue;
|
|
}
|
|
|
|
/*
|
|
Woke up due to timeout, remove this thread's from the standby list. In
|
|
all other cases where it is signaled it is removed by the signaling
|
|
thread.
|
|
*/
|
|
m_standby_threads.erase(thread_data);
|
|
m_active_threads.push_back(thread_data);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
Workers "get next task" routine.
|
|
|
|
A task can be handed over to the current thread directly during submit().
|
|
if thread_var->m_wake_reason == WAKE_REASON_TASK.
|
|
|
|
Or a task can be taken from the task queue.
|
|
In case task queue is empty, the worker thread will park (wait for wakeup).
|
|
*/
|
|
bool thread_pool_generic::get_task(worker_data *thread_var, task **t)
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
|
|
if (thread_var->is_long_task())
|
|
{
|
|
DBUG_ASSERT(m_long_tasks_count);
|
|
m_long_tasks_count--;
|
|
}
|
|
DBUG_ASSERT(!thread_var->is_waiting());
|
|
thread_var->m_state = worker_data::NONE;
|
|
|
|
while (m_task_queue.empty())
|
|
{
|
|
if (m_in_shutdown)
|
|
return false;
|
|
|
|
if (!wait_for_tasks(lk, thread_var))
|
|
return false;
|
|
if (m_task_queue.empty())
|
|
{
|
|
m_spurious_wakeups++;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
/* Dequeue from the task queue.*/
|
|
*t= m_task_queue.front();
|
|
m_task_queue.pop();
|
|
m_tasks_dequeued++;
|
|
thread_var->m_state |= worker_data::EXECUTING_TASK;
|
|
thread_var->m_task_start_time = m_timestamp;
|
|
return true;
|
|
}
|
|
|
|
/** Worker thread shutdown routine. */
|
|
void thread_pool_generic::worker_end(worker_data* thread_data)
|
|
{
|
|
std::lock_guard<std::mutex> lk(m_mtx);
|
|
DBUG_ASSERT(!thread_data->is_long_task());
|
|
m_active_threads.erase(thread_data);
|
|
m_thread_data_cache.put(thread_data);
|
|
|
|
if (!thread_count() && m_in_shutdown)
|
|
{
|
|
/* Signal the destructor that no more threads are left. */
|
|
m_cv_no_threads.notify_all();
|
|
}
|
|
}
|
|
|
|
extern "C" void set_tls_pool(tpool::thread_pool* pool);
|
|
|
|
/* The worker get/execute task loop.*/
|
|
void thread_pool_generic::worker_main(worker_data *thread_var)
|
|
{
|
|
task* task;
|
|
set_tls_pool(this);
|
|
m_worker_init_callback();
|
|
|
|
tls_worker_data = thread_var;
|
|
m_thread_creation_pending.clear();
|
|
|
|
while (get_task(thread_var, &task) && task)
|
|
{
|
|
task->execute();
|
|
}
|
|
|
|
m_worker_destroy_callback();
|
|
|
|
worker_end(thread_var);
|
|
}
|
|
|
|
|
|
/*
|
|
Check if threadpool had been idle for a while
|
|
Switch off maintenance timer if it is in idle state
|
|
for too long.
|
|
|
|
Helper function, to be used inside maintenance callback,
|
|
before m_last_activity is updated
|
|
*/
|
|
|
|
static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
|
|
constexpr auto max_idle_time= std::chrono::minutes(1);
|
|
|
|
/* Time since maintenance timer had nothing to do */
|
|
static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
|
|
void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
|
|
{
|
|
DBUG_ASSERT(m_task_queue.empty());
|
|
|
|
/*
|
|
We think that there is no activity, if there were at most 2 tasks
|
|
since last time, and there is a spare thread.
|
|
The 2 tasks (and not 0) is to account for some periodic timers.
|
|
*/
|
|
bool idle= m_standby_threads.m_count > 0;
|
|
|
|
if (!idle)
|
|
{
|
|
idle_since= invalid_timestamp;
|
|
return;
|
|
}
|
|
|
|
if (idle_since == invalid_timestamp)
|
|
{
|
|
idle_since= now;
|
|
return;
|
|
}
|
|
|
|
/* Switch timer off after 1 minute of idle time */
|
|
if (now - idle_since > max_idle_time && m_active_threads.empty())
|
|
{
|
|
idle_since= invalid_timestamp;
|
|
switch_timer(timer_state_t::OFF);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
Periodic job to fix thread count and concurrency,
|
|
in case of long tasks, etc
|
|
*/
|
|
void thread_pool_generic::maintenance()
|
|
{
|
|
/*
|
|
If pool is busy (i.e the its mutex is currently locked), we can
|
|
skip the maintenance task, some times, to lower mutex contention
|
|
*/
|
|
static int skip_counter;
|
|
const int MAX_SKIPS = 10;
|
|
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
|
|
if (skip_counter == MAX_SKIPS)
|
|
{
|
|
lk.lock();
|
|
}
|
|
else if (!lk.try_lock())
|
|
{
|
|
skip_counter++;
|
|
return;
|
|
}
|
|
|
|
skip_counter = 0;
|
|
|
|
m_timestamp = std::chrono::system_clock::now();
|
|
|
|
if (m_task_queue.empty())
|
|
{
|
|
check_idle(m_timestamp);
|
|
m_last_activity = m_tasks_dequeued + m_wakeups;
|
|
return;
|
|
}
|
|
|
|
m_long_tasks_count = 0;
|
|
for (auto thread_data = m_active_threads.front();
|
|
thread_data;
|
|
thread_data = thread_data->m_next)
|
|
{
|
|
if (thread_data->is_executing_task() &&
|
|
!thread_data->is_waiting() &&
|
|
(thread_data->is_long_task()
|
|
|| (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION)))
|
|
{
|
|
thread_data->m_state |= worker_data::LONG_TASK;
|
|
m_long_tasks_count++;
|
|
}
|
|
}
|
|
|
|
maybe_wake_or_create_thread();
|
|
|
|
size_t thread_cnt = (int)thread_count();
|
|
if (m_last_activity == m_tasks_dequeued + m_wakeups &&
|
|
m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
|
|
{
|
|
// no progress made since last iteration. create new
|
|
// thread
|
|
add_thread();
|
|
}
|
|
m_last_activity = m_tasks_dequeued + m_wakeups;
|
|
m_last_thread_count= thread_cnt;
|
|
}
|
|
|
|
/*
|
|
Heuristic used for thread creation throttling.
|
|
Returns interval in milliseconds between thread creation
|
|
(depending on number of threads already in the pool, and
|
|
desired concurrency level)
|
|
*/
|
|
static int throttling_interval_ms(size_t n_threads,size_t concurrency)
|
|
{
|
|
if (n_threads < concurrency*4)
|
|
return 0;
|
|
|
|
if (n_threads < concurrency*8)
|
|
return 50;
|
|
|
|
if (n_threads < concurrency*16)
|
|
return 100;
|
|
|
|
return 200;
|
|
}
|
|
|
|
/* Create a new worker.*/
|
|
bool thread_pool_generic::add_thread()
|
|
{
|
|
size_t n_threads = thread_count();
|
|
|
|
if (n_threads >= m_max_threads)
|
|
return false;
|
|
|
|
/*
|
|
Deadlock danger exists, so monitor pool health
|
|
with maintenance timer.
|
|
*/
|
|
switch_timer(timer_state_t::ON);
|
|
|
|
if (n_threads >= m_min_threads)
|
|
{
|
|
auto now = std::chrono::system_clock::now();
|
|
if (now - m_last_thread_creation <
|
|
std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency)))
|
|
{
|
|
/*
|
|
Throttle thread creation and wakeup deadlock detection timer,
|
|
if is it off.
|
|
*/
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/* Check and set "thread creation pending" flag before creating the thread. We
|
|
reset the flag in thread_pool_generic::worker_main in new thread created. The
|
|
flag must be reset back in case we fail to create the thread. If this flag is
|
|
not reset all future attempt to create thread for this pool would not work as
|
|
we would return from here.
|
|
|
|
Do not use this flag for pool of fixed size.
|
|
(since they lack maintenence that would rectify the pool size, if it is too small)
|
|
*/
|
|
if (m_min_threads != m_max_threads)
|
|
{
|
|
if (m_thread_creation_pending.test_and_set())
|
|
return false;
|
|
}
|
|
|
|
worker_data *thread_data = m_thread_data_cache.get();
|
|
m_active_threads.push_back(thread_data);
|
|
try
|
|
{
|
|
std::thread thread(&thread_pool_generic::worker_main, this, thread_data);
|
|
m_last_thread_creation = std::chrono::system_clock::now();
|
|
thread.detach();
|
|
}
|
|
catch (std::system_error& e)
|
|
{
|
|
m_active_threads.erase(thread_data);
|
|
m_thread_data_cache.put(thread_data);
|
|
static bool warning_written;
|
|
if (!warning_written)
|
|
{
|
|
fprintf(stderr, "Warning : threadpool thread could not be created :%s,"
|
|
"current number of threads in pool %zu\n", e.what(), thread_count());
|
|
warning_written = true;
|
|
}
|
|
m_thread_creation_pending.clear();
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/** Wake a standby thread, and hand the given task over to this thread. */
|
|
bool thread_pool_generic::wake(worker_wake_reason reason, task *)
|
|
{
|
|
assert(reason != WAKE_REASON_NONE);
|
|
|
|
if (m_standby_threads.empty())
|
|
return false;
|
|
auto var= m_standby_threads.back();
|
|
m_standby_threads.pop_back();
|
|
m_active_threads.push_back(var);
|
|
assert(var->m_wake_reason == WAKE_REASON_NONE);
|
|
var->m_wake_reason= reason;
|
|
var->m_cv.notify_one();
|
|
m_wakeups++;
|
|
return true;
|
|
}
|
|
|
|
|
|
thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
|
|
m_thread_data_cache(max_threads),
|
|
m_task_queue(10000),
|
|
m_standby_threads(),
|
|
m_active_threads(),
|
|
m_mtx(),
|
|
m_thread_timeout(std::chrono::milliseconds(60000)),
|
|
m_timer_interval(std::chrono::milliseconds(400)),
|
|
m_cv_no_threads(),
|
|
m_cv_timer(),
|
|
m_tasks_enqueued(),
|
|
m_tasks_dequeued(),
|
|
m_wakeups(),
|
|
m_spurious_wakeups(),
|
|
m_timer_state(timer_state_t::ON),
|
|
m_timestamp(),
|
|
m_long_tasks_count(),
|
|
m_waiting_task_count(),
|
|
m_last_thread_creation(),
|
|
m_min_threads(min_threads),
|
|
m_max_threads(max_threads),
|
|
m_last_thread_count(),
|
|
m_last_activity()
|
|
{
|
|
set_concurrency();
|
|
|
|
// start the timer
|
|
if (m_min_threads != m_max_threads)
|
|
{
|
|
m_maintenance_timer= new timer_generic(thread_pool_generic::maintenance_func, this, nullptr);
|
|
m_maintenance_timer->set_time(0, (int)m_timer_interval.count());
|
|
}
|
|
}
|
|
|
|
|
|
void thread_pool_generic::maybe_wake_or_create_thread()
|
|
{
|
|
if (m_task_queue.empty())
|
|
return;
|
|
DBUG_ASSERT(m_active_threads.size() >= static_cast<size_t>(m_long_tasks_count + m_waiting_task_count));
|
|
if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency)
|
|
return;
|
|
if (!m_standby_threads.empty())
|
|
{
|
|
wake(WAKE_REASON_TASK);
|
|
}
|
|
else
|
|
{
|
|
add_thread();
|
|
}
|
|
}
|
|
|
|
bool thread_pool_generic::too_many_active_threads()
|
|
{
|
|
return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count >
|
|
m_concurrency* OVERSUBSCRIBE_FACTOR;
|
|
}
|
|
|
|
void thread_pool_generic::set_concurrency(unsigned int concurrency)
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
if (concurrency == 0)
|
|
concurrency= 2 * std::thread::hardware_concurrency();
|
|
m_concurrency = concurrency;
|
|
if (m_concurrency > m_max_threads)
|
|
m_concurrency = m_max_threads;
|
|
if (m_concurrency < m_min_threads)
|
|
m_concurrency = m_min_threads;
|
|
if (m_concurrency < 1)
|
|
m_concurrency = 1;
|
|
}
|
|
|
|
/** Submit a new task*/
|
|
void thread_pool_generic::submit_task(task* task)
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
if (m_in_shutdown)
|
|
return;
|
|
task->add_ref();
|
|
m_tasks_enqueued++;
|
|
m_task_queue.push(task);
|
|
maybe_wake_or_create_thread();
|
|
}
|
|
|
|
|
|
/* Notify thread pool that current thread is going to wait */
|
|
void thread_pool_generic::wait_begin()
|
|
{
|
|
if (!tls_worker_data || tls_worker_data->is_long_task())
|
|
return;
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
if(tls_worker_data->is_long_task())
|
|
{
|
|
/*
|
|
Current task flag could have become "long-running"
|
|
while waiting for the lock, thus recheck.
|
|
*/
|
|
return;
|
|
}
|
|
DBUG_ASSERT(!tls_worker_data->is_waiting());
|
|
tls_worker_data->m_state |= worker_data::WAITING;
|
|
m_waiting_task_count++;
|
|
|
|
/* Maintain concurrency */
|
|
maybe_wake_or_create_thread();
|
|
}
|
|
|
|
|
|
void thread_pool_generic::wait_end()
|
|
{
|
|
if (tls_worker_data && tls_worker_data->is_waiting())
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
tls_worker_data->m_state &= ~worker_data::WAITING;
|
|
m_waiting_task_count--;
|
|
}
|
|
}
|
|
|
|
|
|
void thread_pool_generic::switch_timer(timer_state_t state)
|
|
{
|
|
if (m_timer_state == state)
|
|
return;
|
|
/*
|
|
We can't use timer::set_time, because mysys timers are deadlock
|
|
prone.
|
|
|
|
Instead, to switch off we increase the timer period
|
|
and decrease period to switch on.
|
|
|
|
This might introduce delays in thread creation when needed,
|
|
as period will only be changed when timer fires next time.
|
|
For this reason, we can't use very long periods for the "off" state.
|
|
*/
|
|
m_timer_state= state;
|
|
long long period= (state == timer_state_t::OFF) ?
|
|
m_timer_interval.count()*10: m_timer_interval.count();
|
|
|
|
if (m_maintenance_timer)
|
|
m_maintenance_timer->set_period((int)period);
|
|
}
|
|
|
|
|
|
/**
|
|
Wake up all workers, and wait until they are gone
|
|
Stop the timer.
|
|
*/
|
|
thread_pool_generic::~thread_pool_generic()
|
|
{
|
|
/*
|
|
Stop AIO early.
|
|
This is needed to prevent AIO completion thread
|
|
from calling submit_task() on an object that is being destroyed.
|
|
*/
|
|
m_aio.reset();
|
|
|
|
/* Also stop the maintanence task early. */
|
|
if (m_maintenance_timer)
|
|
m_maintenance_timer->disarm();
|
|
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
m_in_shutdown= true;
|
|
|
|
/* Wake up idle threads. */
|
|
while (wake(WAKE_REASON_SHUTDOWN))
|
|
{
|
|
}
|
|
|
|
while (thread_count())
|
|
{
|
|
m_cv_no_threads.wait(lk);
|
|
}
|
|
|
|
lk.unlock();
|
|
delete m_maintenance_timer;
|
|
}
|
|
|
|
thread_pool *create_thread_pool_generic(int min_threads, int max_threads)
|
|
{
|
|
return new thread_pool_generic(min_threads, max_threads);
|
|
}
|
|
|
|
} // namespace tpool
|