mirror of
https://github.com/MariaDB/server.git
synced 2025-01-27 09:14:17 +01:00
c8e3bcf79b
Fix concurrency error - avoid accessing deleted memory, when io_slots is resized. the deleted memory in this case was vftable pointer in aiocb::m_internal_task The fix avoids calling dummy release function, via a flag in task_group.
115 lines
2.8 KiB
C++
115 lines
2.8 KiB
C++
/* Copyright(C) 2019 MariaDB Corporation.
|
|
|
|
This program is free software; you can redistribute itand /or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
|
|
|
|
#include <tpool.h>
|
|
#include <queue>
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
#include <tpool_structs.h>
|
|
#include <thread>
|
|
#include <assert.h>
|
|
#ifndef _WIN32
|
|
#include <unistd.h> // usleep
|
|
#endif
|
|
namespace tpool
|
|
{
|
|
|
|
/**
|
|
Task_group constructor
|
|
|
|
@param max_threads - maximum number of threads allowed to execute
|
|
tasks from the group at the same time.
|
|
|
|
@param enable_task_release - if true (default), task::release() will be
|
|
called after task execution.'false' should only be used in rare cases
|
|
when accessing memory, pointed by task structures, would be unsafe after.
|
|
the callback. Also 'false' is only possible ,if task::release() is a trivial function
|
|
*/
|
|
task_group::task_group(unsigned int max_concurrency,
|
|
bool enable_task_release)
|
|
:
|
|
m_queue(8),
|
|
m_mtx(),
|
|
m_tasks_running(),
|
|
m_max_concurrent_tasks(max_concurrency),
|
|
m_enable_task_release(enable_task_release)
|
|
{};
|
|
|
|
void task_group::set_max_tasks(unsigned int max_concurrency)
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
m_max_concurrent_tasks = max_concurrency;
|
|
}
|
|
void task_group::execute(task* t)
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
if (m_tasks_running == m_max_concurrent_tasks)
|
|
{
|
|
/* Queue for later execution by another thread.*/
|
|
m_queue.push(t);
|
|
return;
|
|
}
|
|
m_tasks_running++;
|
|
for (;;)
|
|
{
|
|
lk.unlock();
|
|
if (t)
|
|
{
|
|
t->m_func(t->m_arg);
|
|
if (m_enable_task_release)
|
|
t->release();
|
|
}
|
|
lk.lock();
|
|
|
|
if (m_queue.empty())
|
|
break;
|
|
t = m_queue.front();
|
|
m_queue.pop();
|
|
}
|
|
m_tasks_running--;
|
|
}
|
|
|
|
void task_group::cancel_pending(task* t)
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
if (!t)
|
|
m_queue.clear();
|
|
for (auto it = m_queue.begin(); it != m_queue.end(); it++)
|
|
{
|
|
if (*it == t)
|
|
{
|
|
(*it)->release();
|
|
(*it) = nullptr;
|
|
}
|
|
}
|
|
}
|
|
|
|
task_group::~task_group()
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
assert(m_queue.empty());
|
|
|
|
while (m_tasks_running)
|
|
{
|
|
lk.unlock();
|
|
#ifndef _WIN32
|
|
usleep(1000);
|
|
#else
|
|
Sleep(1);
|
|
#endif
|
|
lk.lock();
|
|
}
|
|
}
|
|
}
|