mariadb/tpool/tpool_generic.cc
2019-11-15 19:55:13 +02:00

770 lines
19 KiB
C++

/* Copyright(C) 2019 MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h"
#include <limits.h>
#include <algorithm>
#include <assert.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <limits.h>
#include <mutex>
#include <queue>
#include <stack>
#include <thread>
#include <vector>
#include "tpool.h"
#include <assert.h>
#include <my_global.h>
#include <my_dbug.h>
#include <thr_timer.h>
#include <stdlib.h>
namespace tpool
{
#ifdef __linux__
extern aio* create_linux_aio(thread_pool* tp, int max_io);
#endif
#ifdef _WIN32
extern aio* create_win_aio(thread_pool* tp, int max_io);
#endif
static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
static const int OVERSUBSCRIBE_FACTOR = 2;
/**
Implementation of generic threadpool.
This threadpool consists of the following components
- The task queue. This queue is populated by submit()
- Worker that execute the work items.
- Timer thread that takes care of pool health
The task queue is populated by submit() method.
on submit(), a worker thread can be woken, or created
to execute tasks.
The timer thread watches if work items are being dequeued, and if not,
this can indicate potential deadlock.
Thus the timer thread can also wake or create a thread, to ensure some progress.
Optimizations:
- worker threads that are idle for long time will shutdown.
- worker threads are woken in LIFO order, which minimizes context switching
and also ensures that idle timeout works well. LIFO wakeup order ensures
that active threads stay active, and idle ones stay idle.
- to minimize spurious wakeups, some items are not put into the queue. Instead
submit() will pass the data directly to the thread it woke up.
*/
/**
Worker wakeup flags.
*/
enum worker_wake_reason
{
WAKE_REASON_NONE,
WAKE_REASON_TASK,
WAKE_REASON_SHUTDOWN
};
/* A per-worker thread structure.*/
struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data
{
/** Condition variable to wakeup this worker.*/
std::condition_variable m_cv;
/** Reason why worker was woken. */
worker_wake_reason m_wake_reason;
/**
If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute.
*/
task* m_task;
/** Struct is member of intrusive doubly linked list */
worker_data* m_prev;
worker_data* m_next;
/* Current state of the worker.*/
enum state
{
NONE = 0,
EXECUTING_TASK = 1,
LONG_TASK = 2
};
int m_state;
bool is_executing_task()
{
return m_state & EXECUTING_TASK;
}
bool is_long_task()
{
return m_state & LONG_TASK;
}
std::chrono::system_clock::time_point m_task_start_time;
worker_data() :
m_cv(),
m_wake_reason(WAKE_REASON_NONE),
m_task(),
m_prev(),
m_next(),
m_state(NONE),
m_task_start_time()
{}
/*Define custom new/delete because of overaligned structure. */
void* operator new(size_t size)
{
#ifdef _WIN32
return _aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
#else
void* ptr;
int ret = posix_memalign(&ptr, CPU_LEVEL1_DCACHE_LINESIZE, size);
return ret ? 0 : ptr;
#endif
}
void operator delete(void* p)
{
#ifdef _WIN32
_aligned_free(p);
#else
free(p);
#endif
}
};
class thread_pool_generic : public thread_pool
{
/** Cache for per-worker structures */
cache<worker_data> m_thread_data_cache;
/** The task queue */
circular_queue<task*> m_task_queue;
/** List of standby (idle) workers */
doubly_linked_list<worker_data> m_standby_threads;
/** List of threads that are executing tasks */
doubly_linked_list<worker_data> m_active_threads;
/* Mutex that protects the whole struct, most importantly
the standby threads list, and task queue */
std::mutex m_mtx;
/** Timeout after which idle worker shuts down */
std::chrono::milliseconds m_thread_timeout;
/** How often should timer wakeup.*/
std::chrono::milliseconds m_timer_interval;
/** Another condition variable, used in pool shutdown */
std::condition_variable m_cv_no_threads;
/** Condition variable for the timer thread. Signaled on shutdown. */
std::condition_variable m_cv_timer;
/** Overall number of enqueues*/
unsigned long long m_tasks_enqueued;
/** Overall number of dequeued tasks. */
unsigned long long m_tasks_dequeued;
/** Statistic related, number of worker thread wakeups */
int m_wakeups;
/**
Statistic related, number of spurious thread wakeups
(i.e thread woke up, and the task queue is empty)
*/
int m_spurious_wakeups;
/** The desired concurrency. This number of workers should be
actively executing. */
unsigned int m_concurrency;
/** True, if threadpool is being shutdown, false otherwise */
bool m_in_shutdown;
/** time point when timer last ran, used as a coarse clock. */
std::chrono::system_clock::time_point m_timestamp;
/** Number of long running tasks. The long running tasks are excluded when
adjusting concurrency */
int m_long_tasks_count;
/** Last time thread was created*/
std::chrono::system_clock::time_point m_last_thread_creation;
/** Minimumum number of threads in this pool.*/
unsigned int m_min_threads;
/** Maximimum number of threads in this pool. */
unsigned int m_max_threads;
/* Maintainence related statistics (see maintainence()) */
size_t m_last_thread_count;
unsigned long long m_last_activity;
std::unique_ptr<timer> m_maintaince_timer_task;
void worker_main(worker_data *thread_data);
void worker_end(worker_data* thread_data);
/* Checks threadpool responsiveness, adjusts thread_counts */
void maintainence();
static void maintainence_func(void* arg)
{
((thread_pool_generic *)arg)->maintainence();
}
bool add_thread();
bool wake(worker_wake_reason reason, task *t = nullptr);
void wake_or_create_thread();
bool get_task(worker_data *thread_var, task **t);
bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
worker_data *thread_var);
void cancel_pending(task* t);
size_t thread_count()
{
return m_active_threads.size() + m_standby_threads.size();
}
public:
thread_pool_generic(int min_threads, int max_threads);
~thread_pool_generic();
void submit_task(task *task) override;
virtual aio *create_native_aio(int max_io) override
{
#ifdef _WIN32
return create_win_aio(this, max_io);
#elif defined(__linux__)
return create_linux_aio(this,max_io);
#else
return nullptr;
#endif
}
class timer_generic : public thr_timer_t, public timer
{
thread_pool_generic* m_pool;
waitable_task m_task;
callback_func m_callback;
void* m_data;
int m_period;
std::mutex m_mtx;
bool m_on;
std::atomic<bool> m_running;
void run()
{
/*
In rare cases, multiple callbacks can be scheduled,
e.g with set_time(0,0) in a loop.
We do not allow parallel execution, as user is not prepared.
*/
bool expected = false;
if (!m_running.compare_exchange_strong(expected, true))
return;
m_callback(m_data);
m_running = false;
if (m_pool && m_period)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (m_on)
{
thr_timer_end(this);
thr_timer_settime(this, 1000ULL * m_period);
}
}
}
static void execute(void* arg)
{
auto timer = (timer_generic*)arg;
timer->run();
}
static void submit_task(void* arg)
{
timer_generic* timer = (timer_generic*)arg;
timer->m_pool->submit_task(&timer->m_task);
}
public:
timer_generic(callback_func func, void* data, thread_pool_generic * pool):
m_pool(pool),
m_task(timer_generic::execute,this),
m_callback(func),m_data(data),m_period(0),m_mtx(),
m_on(true),m_running()
{
if (pool)
{
/* EXecute callback in threadpool*/
thr_timer_init(this, submit_task, this);
}
else
{
/* run in "timer" thread */
thr_timer_init(this, m_task.get_func(), m_task.get_arg());
}
}
void set_time(int initial_delay_ms, int period_ms) override
{
std::unique_lock<std::mutex> lk(m_mtx);
if (!m_on)
return;
thr_timer_end(this);
if (!m_pool)
thr_timer_set_period(this, 1000ULL * period_ms);
else
m_period = period_ms;
thr_timer_settime(this, 1000ULL * initial_delay_ms);
}
void disarm() override
{
std::unique_lock<std::mutex> lk(m_mtx);
m_on = false;
thr_timer_end(this);
lk.unlock();
if (m_task.m_group)
{
m_task.m_group->cancel_pending(&m_task);
}
if (m_pool)
{
m_pool->cancel_pending(&m_task);
}
m_task.wait();
}
virtual ~timer_generic()
{
disarm();
}
};
virtual timer* create_timer(callback_func func, void *data) override
{
return new timer_generic(func, data, this);
}
};
void thread_pool_generic::cancel_pending(task* t)
{
std::unique_lock <std::mutex> lk(m_mtx);
for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++)
{
if (*it == t)
{
t->release();
*it = nullptr;
}
}
}
/**
Register worker in standby list, and wait to be woken.
@retval true if thread was woken
@retval false idle wait timeout exceeded (the current thread must shutdown)
*/
bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk,
worker_data *thread_data)
{
assert(m_task_queue.empty());
assert(!m_in_shutdown);
thread_data->m_wake_reason= WAKE_REASON_NONE;
m_active_threads.erase(thread_data);
m_standby_threads.push_back(thread_data);
for (;;)
{
thread_data->m_cv.wait_for(lk, m_thread_timeout);
if (thread_data->m_wake_reason != WAKE_REASON_NONE)
{
/* Woke up not due to timeout.*/
return true;
}
if (thread_count() <= m_min_threads)
{
/* Do not shutdown thread, maintain required minimum of worker
threads.*/
continue;
}
/*
Woke up due to timeout, remove this thread's from the standby list. In
all other cases where it is signaled it is removed by the signaling
thread.
*/
m_standby_threads.erase(thread_data);
m_active_threads.push_back(thread_data);
return false;
}
}
/**
Workers "get next task" routine.
A task can be handed over to the current thread directly during submit().
if thread_var->m_wake_reason == WAKE_REASON_TASK.
Or a task can be taken from the task queue.
In case task queue is empty, the worker thread will park (wait for wakeup).
*/
bool thread_pool_generic::get_task(worker_data *thread_var, task **t)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (thread_var->is_long_task() && m_long_tasks_count)
m_long_tasks_count--;
thread_var->m_state = worker_data::NONE;
if (m_task_queue.empty())
{
if (m_in_shutdown)
return false;
if (!wait_for_tasks(lk, thread_var))
return false;
/* Task was handed over directly by signaling thread.*/
if (thread_var->m_wake_reason == WAKE_REASON_TASK)
{
*t= thread_var->m_task;
goto end;
}
if (m_task_queue.empty())
return false;
}
/* Dequeue from the task queue.*/
*t= m_task_queue.front();
m_task_queue.pop();
m_tasks_dequeued++;
end:
thread_var->m_state |= worker_data::EXECUTING_TASK;
thread_var->m_task_start_time = m_timestamp;
return true;
}
/** Worker thread shutdown routine. */
void thread_pool_generic::worker_end(worker_data* thread_data)
{
std::lock_guard<std::mutex> lk(m_mtx);
m_active_threads.erase(thread_data);
m_thread_data_cache.put(thread_data);
if (!thread_count() && m_in_shutdown)
{
/* Signal the destructor that no more threads are left. */
m_cv_no_threads.notify_all();
}
}
/* The worker get/execute task loop.*/
void thread_pool_generic::worker_main(worker_data *thread_var)
{
task* task;
if(m_worker_init_callback)
m_worker_init_callback();
while (get_task(thread_var, &task) && task)
{
task->execute();
}
if (m_worker_destroy_callback)
m_worker_destroy_callback();
worker_end(thread_var);
}
/*
Periodic job to fix thread count and concurrency,
in case of long tasks, etc
*/
void thread_pool_generic::maintainence()
{
/*
If pool is busy (i.e the its mutex is currently locked), we can
skip the maintainence task, some times, to lower mutex contention
*/
static int skip_counter;
const int MAX_SKIPS = 10;
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
if (skip_counter == MAX_SKIPS)
{
lk.lock();
}
else if (!lk.try_lock())
{
skip_counter++;
return;
}
skip_counter = 0;
m_timestamp = std::chrono::system_clock::now();
m_long_tasks_count = 0;
if (m_task_queue.empty())
{
m_last_activity = m_tasks_dequeued + m_wakeups;
return;
}
for (auto thread_data = m_active_threads.front();
thread_data;
thread_data = thread_data->m_next)
{
if (thread_data->is_executing_task() &&
(thread_data->is_long_task()
|| (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION)))
{
thread_data->m_state |= worker_data::LONG_TASK;
m_long_tasks_count++;
}
}
size_t thread_cnt = (int)thread_count();
if (m_active_threads.size() - m_long_tasks_count < m_concurrency*OVERSUBSCRIBE_FACTOR)
{
wake_or_create_thread();
return;
}
if (m_last_activity == m_tasks_dequeued + m_wakeups &&
m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
{
// no progress made since last iteration. create new
// thread
add_thread();
}
m_last_activity = m_tasks_dequeued + m_wakeups;
m_last_thread_count= thread_cnt;
}
/*
Heuristic used for thread creation throttling.
Returns interval in milliseconds between thread creation
(depending on number of threads already in the pool, and
desired concurrency level)
*/
static int throttling_interval_ms(size_t n_threads,size_t concurrency)
{
if (n_threads < concurrency*4)
return 0;
if (n_threads < concurrency*8)
return 50;
if (n_threads < concurrency*16)
return 100;
return 200;
}
/* Create a new worker.*/
bool thread_pool_generic::add_thread()
{
size_t n_threads = thread_count();
if (n_threads >= m_max_threads)
return false;
if (n_threads >= m_min_threads)
{
auto now = std::chrono::system_clock::now();
if (now - m_last_thread_creation <
std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency)))
{
/* Throttle thread creation.*/
return false;
}
}
worker_data *thread_data = m_thread_data_cache.get();
m_active_threads.push_back(thread_data);
try
{
std::thread thread(&thread_pool_generic::worker_main, this, thread_data);
m_last_thread_creation = std::chrono::system_clock::now();
thread.detach();
}
catch (std::system_error& e)
{
m_active_threads.erase(thread_data);
m_thread_data_cache.put(thread_data);
static bool warning_written;
if (!warning_written)
{
fprintf(stderr, "Warning : threadpool thread could not be created :%s,"
"current number of threads in pool %zu\n", e.what(), thread_count());
warning_written = true;
}
return false;
}
return true;
}
/** Wake a standby thread, and hand the given task over to this thread. */
bool thread_pool_generic::wake(worker_wake_reason reason, task *t)
{
assert(reason != WAKE_REASON_NONE);
if (m_standby_threads.empty())
return false;
auto var= m_standby_threads.back();
m_standby_threads.pop_back();
m_active_threads.push_back(var);
assert(var->m_wake_reason == WAKE_REASON_NONE);
var->m_wake_reason= reason;
var->m_cv.notify_one();
if (t)
{
var->m_task= t;
}
m_wakeups++;
return true;
}
thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
m_thread_data_cache(max_threads),
m_task_queue(10000),
m_standby_threads(),
m_active_threads(),
m_mtx(),
m_thread_timeout(std::chrono::milliseconds(60000)),
m_timer_interval(std::chrono::milliseconds(400)),
m_cv_no_threads(),
m_cv_timer(),
m_tasks_enqueued(),
m_tasks_dequeued(),
m_wakeups(),
m_spurious_wakeups(),
m_concurrency(std::thread::hardware_concurrency()),
m_in_shutdown(),
m_timestamp(),
m_long_tasks_count(),
m_last_thread_creation(),
m_min_threads(min_threads),
m_max_threads(max_threads),
m_last_thread_count(),
m_last_activity(),
m_maintaince_timer_task()
{
if (m_max_threads < m_concurrency)
m_concurrency = m_max_threads;
if (m_min_threads > m_concurrency)
m_concurrency = min_threads;
if (!m_concurrency)
m_concurrency = 1;
if (min_threads < max_threads)
{
m_maintaince_timer_task.reset(new timer_generic(thread_pool_generic::maintainence_func, this, nullptr));
m_maintaince_timer_task->set_time((int)m_timer_interval.count(), (int)m_timer_interval.count());
}
}
void thread_pool_generic::wake_or_create_thread()
{
assert(!m_task_queue.empty());
if (!m_standby_threads.empty())
{
auto t= m_task_queue.front();
m_task_queue.pop();
wake(WAKE_REASON_TASK, t);
}
else
{
add_thread();
}
}
/** Submit a new task*/
void thread_pool_generic::submit_task(task* task)
{
std::unique_lock<std::mutex> lk(m_mtx);
if (m_in_shutdown)
return;
task->add_ref();
m_tasks_enqueued++;
m_task_queue.push(task);
if (m_active_threads.size() - m_long_tasks_count < m_concurrency *OVERSUBSCRIBE_FACTOR)
wake_or_create_thread();
}
/**
Wake up all workers, and wait until they are gone
Stop the timer.
*/
thread_pool_generic::~thread_pool_generic()
{
/*
Stop AIO early.
This is needed to prevent AIO completion thread
from calling submit_task() on an object that is being destroyed.
*/
m_aio.reset();
/* Also stop the maintanence task early. */
m_maintaince_timer_task.reset();
std::unique_lock<std::mutex> lk(m_mtx);
m_in_shutdown= true;
/* Wake up idle threads. */
while (wake(WAKE_REASON_SHUTDOWN))
{
}
while (thread_count())
{
m_cv_no_threads.wait(lk);
}
lk.unlock();
}
thread_pool *create_thread_pool_generic(int min_threads, int max_threads)
{
return new thread_pool_generic(min_threads, max_threads);
}
} // namespace tpool