mirror of
https://github.com/MariaDB/server.git
synced 2025-01-15 19:42:28 +01:00
194 lines
5.6 KiB
C++
194 lines
5.6 KiB
C++
/* Copyright (C) 2019, 2020, MariaDB Corporation.
|
|
|
|
This program is free software; you can redistribute itand /or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
|
|
|
|
#include "tpool_structs.h"
|
|
#include "tpool.h"
|
|
|
|
# include <thread>
|
|
# include <atomic>
|
|
# include <cstdio>
|
|
# include <libaio.h>
|
|
# include <sys/syscall.h>
|
|
# include <pthread.h>
|
|
# include <my_sys.h>
|
|
/**
|
|
Invoke the io_getevents() system call, without timeout parameter.
|
|
|
|
@param ctx context from io_setup()
|
|
@param min_nr minimum number of completion events to wait for
|
|
@param nr maximum number of completion events to collect
|
|
@param ev the collected events
|
|
|
|
In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc
|
|
the io_getevents() implementation in libaio was "optimized" so that it
|
|
would elide the system call when there are no outstanding requests
|
|
and a timeout was specified.
|
|
|
|
The libaio code for dereferencing ctx would occasionally trigger
|
|
SIGSEGV if io_destroy() was concurrently invoked from another thread.
|
|
Hence, we have to use the raw system call.
|
|
|
|
WHY are we doing this at all?
|
|
Because we want io_destroy() from another thread to interrupt io_getevents().
|
|
|
|
And, WHY do we want io_destroy() from another thread to interrupt
|
|
io_getevents()?
|
|
|
|
Because there is no documented, libaio-friendly and race-condition-free way to
|
|
interrupt io_getevents(). io_destroy() coupled with raw syscall seemed to work
|
|
for us so far.
|
|
|
|
Historical note : in the past, we used io_getevents with timeouts. We'd wake
|
|
up periodically, check for shutdown flag, return from the main routine.
|
|
This was admittedly safer, yet it did cost periodic wakeups, which we are not
|
|
willing to do anymore.
|
|
|
|
@note we also rely on the undocumented property, that io_destroy(ctx)
|
|
will make this version of io_getevents return EINVAL.
|
|
*/
|
|
static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
|
|
{
|
|
int saved_errno= errno;
|
|
int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
|
|
min_nr, nr, ev, 0);
|
|
if (ret < 0)
|
|
{
|
|
ret= -errno;
|
|
errno= saved_errno;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
|
|
/*
|
|
Linux AIO implementation, based on native AIO.
|
|
Needs libaio.h and -laio at the compile time.
|
|
|
|
io_submit() is used to submit async IO.
|
|
|
|
A single thread will collect the completion notification
|
|
with io_getevents() and forward io completion callback to
|
|
the worker threadpool.
|
|
*/
|
|
namespace tpool
|
|
{
|
|
|
|
class aio_linux final : public aio
|
|
{
|
|
thread_pool *m_pool;
|
|
io_context_t m_io_ctx;
|
|
std::thread m_getevent_thread;
|
|
static std::atomic<bool> shutdown_in_progress;
|
|
|
|
static void getevent_thread_routine(aio_linux *aio)
|
|
{
|
|
my_thread_set_name("my_getevents");
|
|
/*
|
|
We collect events in small batches to hopefully reduce the
|
|
number of system calls.
|
|
*/
|
|
constexpr unsigned MAX_EVENTS= 256;
|
|
|
|
aio->m_pool->m_worker_init_callback();
|
|
io_event events[MAX_EVENTS];
|
|
for (;;)
|
|
{
|
|
switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) {
|
|
case -EINTR:
|
|
continue;
|
|
case -EINVAL:
|
|
if (shutdown_in_progress)
|
|
goto end;
|
|
/* fall through */
|
|
default:
|
|
if (ret < 0)
|
|
{
|
|
fprintf(stderr, "io_getevents returned %d\n", ret);
|
|
abort();
|
|
goto end;
|
|
}
|
|
for (int i= 0; i < ret; i++)
|
|
{
|
|
const io_event &event= events[i];
|
|
aiocb *iocb= static_cast<aiocb*>(event.obj);
|
|
if (static_cast<int>(event.res) < 0)
|
|
{
|
|
iocb->m_err= -event.res;
|
|
iocb->m_ret_len= 0;
|
|
}
|
|
else
|
|
{
|
|
iocb->m_ret_len= event.res;
|
|
iocb->m_err= 0;
|
|
finish_synchronous(iocb);
|
|
}
|
|
iocb->m_internal_task.m_func= iocb->m_callback;
|
|
iocb->m_internal_task.m_arg= iocb;
|
|
iocb->m_internal_task.m_group= iocb->m_group;
|
|
aio->m_pool->submit_task(&iocb->m_internal_task);
|
|
}
|
|
}
|
|
}
|
|
end:
|
|
aio->m_pool->m_worker_destroy_callback();
|
|
}
|
|
|
|
public:
|
|
aio_linux(io_context_t ctx, thread_pool *pool)
|
|
: m_pool(pool), m_io_ctx(ctx),
|
|
m_getevent_thread(getevent_thread_routine, this)
|
|
{
|
|
}
|
|
|
|
~aio_linux()
|
|
{
|
|
shutdown_in_progress= true;
|
|
io_destroy(m_io_ctx);
|
|
m_getevent_thread.join();
|
|
shutdown_in_progress= false;
|
|
}
|
|
|
|
int submit_io(aiocb *cb) override
|
|
{
|
|
io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len,
|
|
cb->m_offset);
|
|
if (cb->m_opcode != aio_opcode::AIO_PREAD)
|
|
cb->aio_lio_opcode= IO_CMD_PWRITE;
|
|
iocb *icb= static_cast<iocb*>(cb);
|
|
int ret= io_submit(m_io_ctx, 1, &icb);
|
|
if (ret == 1)
|
|
return 0;
|
|
errno= -ret;
|
|
return -1;
|
|
}
|
|
|
|
int bind(native_file_handle&) override { return 0; }
|
|
int unbind(const native_file_handle&) override { return 0; }
|
|
};
|
|
|
|
std::atomic<bool> aio_linux::shutdown_in_progress;
|
|
|
|
aio *create_linux_aio(thread_pool *pool, int max_io)
|
|
{
|
|
io_context_t ctx;
|
|
memset(&ctx, 0, sizeof ctx);
|
|
if (int ret= io_setup(max_io, &ctx))
|
|
{
|
|
fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
|
|
return nullptr;
|
|
}
|
|
return new aio_linux(ctx, pool);
|
|
}
|
|
}
|