mirror of
https://github.com/MariaDB/server.git
synced 2025-08-05 01:51:36 +02:00
201 lines
5.8 KiB
C++
201 lines
5.8 KiB
C++
/* Copyright (C) 2019, 2020, MariaDB Corporation.
|
|
|
|
This program is free software; you can redistribute itand /or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
|
|
|
|
#include "tpool.h"
|
|
#include <thread>
|
|
#include <sys/syscall.h>
|
|
#include <libaio.h>
|
|
#include "my_valgrind.h"
|
|
|
|
/**
|
|
Invoke the io_getevents() system call, without timeout parameter.
|
|
|
|
@param ctx context from io_setup()
|
|
@param min_nr minimum number of completion events to wait for
|
|
@param nr maximum number of completion events to collect
|
|
@param ev the collected events
|
|
|
|
In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc
|
|
the io_getevents() implementation in libaio was "optimized" so that it
|
|
would elide the system call when there are no outstanding requests
|
|
and a timeout was specified.
|
|
|
|
The libaio code for dereferencing ctx would occasionally trigger
|
|
SIGSEGV if io_destroy() was concurrently invoked from another thread.
|
|
Hence, we have to use the raw system call.
|
|
|
|
WHY are we doing this at all?
|
|
Because we want io_destroy() from another thread to interrupt io_getevents().
|
|
|
|
And, WHY do we want io_destroy() from another thread to interrupt
|
|
io_getevents()?
|
|
|
|
Because there is no documented, libaio-friendly and
|
|
race-condition-free way to interrupt io_getevents(). io_destroy()
|
|
coupled with raw syscall seemed to work for us so far.
|
|
|
|
Historical note: in the past, we used io_getevents with
|
|
timeouts. We'd wake up periodically, check for shutdown flag, return
|
|
from the main routine. This was admittedly safer, yet it did cost
|
|
periodic wakeups, which we are not willing to do anymore.
|
|
|
|
@note we also rely on the undocumented property, that io_destroy(ctx)
|
|
will make this version of io_getevents return EINVAL.
|
|
*/
|
|
static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
|
|
noexcept
|
|
{
|
|
int saved_errno= errno;
|
|
int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
|
|
min_nr, nr, ev, 0);
|
|
if (ret < 0)
|
|
{
|
|
ret= -errno;
|
|
errno= saved_errno;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
|
|
/*
|
|
Linux AIO implementation, based on native AIO.
|
|
Needs libaio.h and -laio at the compile time.
|
|
|
|
io_submit() is used to submit async IO.
|
|
|
|
A single thread will collect the completion notification
|
|
with io_getevents() and forward io completion callback to
|
|
the worker threadpool.
|
|
*/
|
|
namespace
|
|
{
|
|
using namespace tpool;
|
|
|
|
class aio_libaio final : public aio
|
|
{
|
|
thread_pool *m_pool;
|
|
io_context_t m_io_ctx;
|
|
std::thread m_getevent_thread;
|
|
static std::atomic<bool> shutdown_in_progress;
|
|
|
|
static void getevent_thread_routine(aio_libaio *aio)
|
|
{
|
|
/*
|
|
We collect events in small batches to hopefully reduce the
|
|
number of system calls.
|
|
*/
|
|
constexpr unsigned MAX_EVENTS= 256;
|
|
|
|
aio->m_pool->m_worker_init_callback();
|
|
io_event events[MAX_EVENTS];
|
|
for (;;)
|
|
{
|
|
switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) {
|
|
case -EINTR:
|
|
continue;
|
|
case -EINVAL:
|
|
if (shutdown_in_progress)
|
|
goto end;
|
|
/* fall through */
|
|
default:
|
|
if (ret < 0)
|
|
{
|
|
fprintf(stderr, "io_getevents returned %d\n", ret);
|
|
abort();
|
|
goto end;
|
|
}
|
|
#if __has_feature(memory_sanitizer)
|
|
MEM_MAKE_DEFINED(events, ret * sizeof *events);
|
|
#endif
|
|
for (int i= 0; i < ret; i++)
|
|
{
|
|
const io_event &event= events[i];
|
|
aiocb *iocb= reinterpret_cast<aiocb*>(event.obj);
|
|
if (static_cast<int>(event.res) < 0)
|
|
{
|
|
iocb->m_err= -event.res;
|
|
iocb->m_ret_len= 0;
|
|
}
|
|
else
|
|
{
|
|
#if __has_feature(memory_sanitizer)
|
|
if (iocb->m_opcode == aio_opcode::AIO_PREAD)
|
|
MEM_MAKE_DEFINED(iocb->m_buffer, event.res);
|
|
#endif
|
|
iocb->m_ret_len= event.res;
|
|
iocb->m_err= 0;
|
|
finish_synchronous(iocb);
|
|
}
|
|
iocb->m_internal_task.m_func= iocb->m_callback;
|
|
iocb->m_internal_task.m_arg= iocb;
|
|
iocb->m_internal_task.m_group= iocb->m_group;
|
|
aio->m_pool->submit_task(&iocb->m_internal_task);
|
|
}
|
|
}
|
|
}
|
|
end:
|
|
aio->m_pool->m_worker_destroy_callback();
|
|
}
|
|
|
|
public:
|
|
aio_libaio(io_context_t ctx, thread_pool *pool)
|
|
: m_pool(pool), m_io_ctx(ctx),
|
|
m_getevent_thread(getevent_thread_routine, this)
|
|
{
|
|
}
|
|
|
|
~aio_libaio()
|
|
{
|
|
shutdown_in_progress= true;
|
|
io_destroy(m_io_ctx);
|
|
m_getevent_thread.join();
|
|
shutdown_in_progress= false;
|
|
}
|
|
|
|
int submit_io(aiocb *cb) override
|
|
{
|
|
io_prep_pread(&cb->m_iocb, cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
|
|
if (cb->m_opcode != aio_opcode::AIO_PREAD)
|
|
cb->m_iocb.aio_lio_opcode= IO_CMD_PWRITE;
|
|
iocb *icb= &cb->m_iocb;
|
|
int ret= io_submit(m_io_ctx, 1, &icb);
|
|
if (ret == 1)
|
|
return 0;
|
|
errno= -ret;
|
|
return -1;
|
|
}
|
|
|
|
int bind(native_file_handle&) override { return 0; }
|
|
int unbind(const native_file_handle&) override { return 0; }
|
|
const char *get_implementation() const override { return "Linux native AIO"; };
|
|
};
|
|
|
|
std::atomic<bool> aio_libaio::shutdown_in_progress;
|
|
}
|
|
|
|
namespace tpool
|
|
{
|
|
aio *create_libaio(thread_pool *pool, int max_io)
|
|
{
|
|
io_context_t ctx;
|
|
memset(&ctx, 0, sizeof ctx);
|
|
if (int ret= io_setup(max_io, &ctx))
|
|
{
|
|
fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
|
|
return nullptr;
|
|
}
|
|
return new aio_libaio(ctx, pool);
|
|
}
|
|
}
|