mariadb/tpool/aio_win.cc
Marko Mäkelä a87bb96ecb MDEV-36234: Add innodb_linux_aio
This controls which linux implementation to use for
innodb_use_native_aio=ON.

innodb_linux_aio=auto is equivalent to innodb_linux_aio=io_uring when
it is available, and falling back to innodb_linux_aio=aio when not.

Debian packaging is no longer aio exclusive or uring, so
for those older Debian or Ubuntu releases, its a remove_uring directive.
For more recent releases, add mandatory liburing for consistent packaging.

WITH_LIBAIO is now an independent option from WITH_URING.

LINUX_NATIVE_AIO preprocessor constant is renamed to HAVE_LIBAIO,
analogous to existing HAVE_URING.

tpool::is_aio_supported(): A common feature check.

is_linux_native_aio_supported(): Remove. This had originally been added in
mysql/mysql-server@0da310b69d in 2012
to fix an issue where io_submit() on CentOS 5.5 would return EINVAL
for a /tmp/#sql*.ibd file associated with CREATE TEMPORARY TABLE.
But, starting with commit 2e814d4702 InnoDB
temporary tables will be written to innodb_temp_data_file_path.
The 2012 commit said that the error could occur on "old kernels".
Any GNU/Linux distribution that we currently support should be based
on a newer Linux kernel; for example, Red Hat Enterprise Linux 7
was released in 2014.

tpool::create_linux_aio(): Wraps the Linux implementations:
create_libaio() and create_liburing(), each defined in separate
compilation units (aio_linux.cc, aio_libaio.cc, aio_liburing.cc).

The CMake definitions are simplified using target_sources() and
target_compile_definitions(), all available since CMake 2.8.12.
With this change, there is no need to include ${CMAKE_SOURCE_DIR}/tpool
or add TPOOL_DEFINES flags anymore, target_link_libraries(lib tpool)
does all that.

This is joint work with Daniel Black and Vladislav Vaintroub.
2025-06-23 13:51:52 +03:00

142 lines
3.8 KiB
C++

/* Copyright(C) 2019 MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h"
#include <algorithm>
#include <assert.h>
#include <condition_variable>
#include <iostream>
#include <limits.h>
#include <mutex>
#include <queue>
#include <stack>
#include <thread>
#include <vector>
#include <tpool.h>
namespace tpool
{
/*
Windows AIO implementation, completion port based.
A single thread collects the completion notification with
GetQueuedCompletionStatus(), and forwards io completion callback
the worker threadpool
*/
class tpool_generic_win_aio : public aio
{
/* Thread that does collects completion status from the completion port. */
std::thread m_thread;
/* IOCP Completion port.*/
HANDLE m_completion_port;
/* The worker pool where completion routine is executed, as task. */
thread_pool* m_pool;
public:
tpool_generic_win_aio(thread_pool* pool, int max_io) : m_pool(pool)
{
m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
m_thread = std::thread(aio_completion_thread_proc, this);
}
/**
Task to be executed in the work pool.
*/
static void io_completion_task(void* data)
{
auto cb = (aiocb*)data;
cb->execute_callback();
}
void completion_thread_work()
{
for (;;)
{
DWORD n_bytes;
aiocb* aiocb;
ULONG_PTR key;
if (!GetQueuedCompletionStatus(m_completion_port, &n_bytes, &key,
(LPOVERLAPPED*)& aiocb, INFINITE))
break;
aiocb->m_err = 0;
aiocb->m_ret_len = n_bytes;
if (n_bytes != aiocb->m_len)
{
if (GetOverlappedResult(aiocb->m_fh, aiocb,
(LPDWORD)& aiocb->m_ret_len, FALSE))
{
aiocb->m_err = GetLastError();
}
}
aiocb->m_internal_task.m_func = aiocb->m_callback;
aiocb->m_internal_task.m_arg = aiocb;
aiocb->m_internal_task.m_group = aiocb->m_group;
m_pool->submit_task(&aiocb->m_internal_task);
}
}
static void aio_completion_thread_proc(tpool_generic_win_aio* aio)
{
aio->m_pool->m_worker_init_callback();
aio->completion_thread_work();
aio->m_pool->m_worker_destroy_callback();
}
~tpool_generic_win_aio()
{
if (m_completion_port)
CloseHandle(m_completion_port);
m_thread.join();
}
int submit_io(aiocb* cb) override
{
memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED));
cb->m_internal = this;
ULARGE_INTEGER uli;
uli.QuadPart = cb->m_offset;
cb->Offset = uli.LowPart;
cb->OffsetHigh = uli.HighPart;
BOOL ok;
if (cb->m_opcode == aio_opcode::AIO_PREAD)
ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
else
ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
if (ok || (GetLastError() == ERROR_IO_PENDING))
return 0;
return -1;
}
// Inherited via aio
int bind(native_file_handle& fd) override
{
return CreateIoCompletionPort(fd, m_completion_port, 0, 0) ? 0
: GetLastError();
}
int unbind(const native_file_handle& fd) override { return 0; }
const char *get_implementation() const override { return "completion ports"; }
};
aio* create_win_aio(thread_pool* pool, int max_io)
{
return new tpool_generic_win_aio(pool, max_io);
}
} // namespace tpool