mirror of
https://github.com/MariaDB/server.git
synced 2025-01-15 19:42:28 +01:00
MDEV-32216 Connection pool with asynchronous query execution.
Parallelism is achieved by using mysql_send_query on multiple connections without waiting for results, and using IO multiplexing (poll/IOCP) to wait for completions. Refresh libmariadb to pick up CONC-676 (fixes for IOCP use with named pipe)
This commit is contained in:
parent
9766a834f7
commit
ec5db6409d
3 changed files with 390 additions and 1 deletions
264
client/connection_pool.cc
Normal file
264
client/connection_pool.cc
Normal file
|
@ -0,0 +1,264 @@
|
|||
/*
|
||||
Copyright (c) 2023, MariaDB.
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335
|
||||
USA
|
||||
*/
|
||||
|
||||
/*
|
||||
Connection pool, with parallel query execution
|
||||
Does not use threads, but IO multiplexing via mysql_send_query and
|
||||
poll/iocp to wait for completions
|
||||
*/
|
||||
#include <my_global.h>
|
||||
#ifdef _WIN32
|
||||
#include <winsock2.h>
|
||||
#else
|
||||
#include <poll.h>
|
||||
#endif
|
||||
#include "connection_pool.h"
|
||||
#include <my_compiler.h>
|
||||
|
||||
namespace async_pool
|
||||
{
|
||||
static ATTRIBUTE_NORETURN void die(const char *format, ...)
|
||||
{
|
||||
va_list args;
|
||||
va_start(args, format);
|
||||
vfprintf(stderr, format, args);
|
||||
va_end(args);
|
||||
abort();
|
||||
}
|
||||
|
||||
pooled_connection *connection_pool::get_connection()
|
||||
{
|
||||
while (free_connections.empty())
|
||||
wait_for_completions();
|
||||
auto c= free_connections.front();
|
||||
free_connections.pop();
|
||||
return c;
|
||||
}
|
||||
|
||||
#ifdef _WIN32
|
||||
void connection_pool::add_to_pollset(pooled_connection *c)
|
||||
{
|
||||
DWORD err= ERROR_SUCCESS;
|
||||
static char ch;
|
||||
WSABUF buf;
|
||||
buf.len= 0;
|
||||
buf.buf= &ch;
|
||||
if (!c->is_pipe)
|
||||
{
|
||||
/* Do async io (sockets). */
|
||||
DWORD flags= 0;
|
||||
if (WSARecv((SOCKET) c->handle, &buf, 1, 0, &flags, c, NULL))
|
||||
err= WSAGetLastError();
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Do async read (named pipe) */
|
||||
if (!ReadFile(c->handle, buf.buf, buf.len, 0, c))
|
||||
err= GetLastError();
|
||||
}
|
||||
|
||||
if (err && err != ERROR_IO_PENDING)
|
||||
die("%s failed: %d\n", c->is_pipe ? "ReadFile" : "WSARecv", err);
|
||||
}
|
||||
|
||||
/*
|
||||
Wait for completions of queries.Uses IOCP on windows to wait for completions.
|
||||
(ReadFile/WSARecv with 0 bytes serves as readiness notification)
|
||||
*/
|
||||
void connection_pool::wait_for_completions()
|
||||
{
|
||||
ULONG n;
|
||||
OVERLAPPED_ENTRY events[32];
|
||||
if (!GetQueuedCompletionStatusEx(iocp, events, array_elements(events), &n, INFINITE,
|
||||
FALSE))
|
||||
{
|
||||
die("GetQueuedCompletionStatusEx failed: %d\n", GetLastError());
|
||||
}
|
||||
|
||||
for (ULONG i= 0; i < n; i++)
|
||||
{
|
||||
auto c= (pooled_connection *) events[i].lpOverlapped;
|
||||
if (!c)
|
||||
die("GetQueuedCompletionStatusEx unexpected return");
|
||||
DBUG_ASSERT(c->mysql);
|
||||
DBUG_ASSERT(!events[i].lpCompletionKey);
|
||||
DBUG_ASSERT(!events[i].dwNumberOfBytesTransferred);
|
||||
complete_query(c);
|
||||
}
|
||||
}
|
||||
#else /* !_WIN32 */
|
||||
void connection_pool::add_to_pollset(pooled_connection *c)
|
||||
{
|
||||
size_t idx= c - &all_connections[0];
|
||||
pollfd *pfd= &pollset[idx];
|
||||
pfd->fd= c->fd;
|
||||
pfd->events= POLLIN;
|
||||
pfd->revents= 0;
|
||||
}
|
||||
|
||||
/* something Linux-ish, can be returned for POLLIN event*/
|
||||
#ifndef POLLRDHUP
|
||||
#define POLLRDHUP 0
|
||||
#endif
|
||||
|
||||
void connection_pool::wait_for_completions()
|
||||
{
|
||||
int n;
|
||||
while ((n= poll(pollset.data(), pollset.size(), -1)) <= 0)
|
||||
{
|
||||
if (errno == EINTR)
|
||||
continue;
|
||||
die("poll failed: %d\n", errno);
|
||||
}
|
||||
|
||||
for (uint i= 0; n > 0 && i < pollset.size(); i++)
|
||||
{
|
||||
pollfd* pfd= &pollset[i];
|
||||
if (pfd->revents &
|
||||
(POLLIN | POLLPRI | POLLHUP | POLLRDHUP| POLLERR | POLLNVAL))
|
||||
{
|
||||
pfd->events= 0;
|
||||
pfd->revents= 0;
|
||||
pfd->fd= -1;
|
||||
complete_query(&all_connections[i]);
|
||||
n--;
|
||||
}
|
||||
}
|
||||
if (n)
|
||||
die("poll() failed to find free connection: %d\n");
|
||||
}
|
||||
#endif
|
||||
|
||||
void connection_pool::complete_query(pooled_connection *c)
|
||||
{
|
||||
int err= mysql_read_query_result(c->mysql);
|
||||
if (c->on_completion)
|
||||
c->on_completion(c->mysql, c->query.c_str(), !err, c->context);
|
||||
if (c->release_connection)
|
||||
{
|
||||
c->in_use= false;
|
||||
free_connections.push(c);
|
||||
}
|
||||
}
|
||||
|
||||
connection_pool::~connection_pool()
|
||||
{
|
||||
close();
|
||||
}
|
||||
|
||||
void connection_pool::init(MYSQL *con[], size_t n)
|
||||
{
|
||||
#ifdef _WIN32
|
||||
iocp= CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
|
||||
if (!iocp)
|
||||
die("CreateIoCompletionPort failed: %d\n", GetLastError());
|
||||
#else
|
||||
pollset.resize(n);
|
||||
for (auto &pfd : pollset)
|
||||
pfd.fd= -1;
|
||||
#endif
|
||||
|
||||
|
||||
for (size_t i= 0; i < n; i++)
|
||||
all_connections.emplace_back(con[i]);
|
||||
|
||||
for (auto &con : all_connections)
|
||||
{
|
||||
free_connections.push(&con);
|
||||
#ifdef _WIN32
|
||||
if (!CreateIoCompletionPort(con.handle, iocp, 0, 0))
|
||||
die("CreateIoCompletionPort failed: %d\n", GetLastError());
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
int connection_pool::execute_async(const char *query,
|
||||
query_completion_handler on_completion,
|
||||
void *context, bool release_connecton)
|
||||
{
|
||||
auto c= get_connection();
|
||||
c->context= context;
|
||||
c->on_completion= on_completion;
|
||||
c->release_connection= release_connecton;
|
||||
c->query= query;
|
||||
|
||||
int ret= mysql_send_query(c->mysql, query, (unsigned long) c->query.size());
|
||||
if (ret)
|
||||
{
|
||||
free_connections.push(c);
|
||||
return ret;
|
||||
}
|
||||
|
||||
c->in_use= true;
|
||||
add_to_pollset(c);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
Wait until all queries are completed and all
|
||||
connections are idle.
|
||||
*/
|
||||
void connection_pool::wait_all()
|
||||
{
|
||||
while (free_connections.size() != all_connections.size())
|
||||
wait_for_completions();
|
||||
}
|
||||
|
||||
void connection_pool::for_each_connection(void(*f)(MYSQL *mysql))
|
||||
{
|
||||
for (auto &c : all_connections)
|
||||
f(c.mysql);
|
||||
}
|
||||
|
||||
int connection_pool::close()
|
||||
{
|
||||
for (auto &c : all_connections)
|
||||
mysql_close(c.mysql);
|
||||
|
||||
all_connections.clear();
|
||||
while (!free_connections.empty())
|
||||
free_connections.pop();
|
||||
#ifdef _WIN32
|
||||
if (iocp)
|
||||
{
|
||||
CloseHandle(iocp);
|
||||
iocp= nullptr;
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
pooled_connection::pooled_connection(MYSQL *c)
|
||||
{
|
||||
mysql= c;
|
||||
#ifdef _WIN32
|
||||
OVERLAPPED *ov= static_cast<OVERLAPPED *>(this);
|
||||
memset(ov, 0, sizeof(OVERLAPPED));
|
||||
mysql_protocol_type protocol;
|
||||
if (c->host && !strcmp(c->host, "."))
|
||||
protocol= MYSQL_PROTOCOL_PIPE;
|
||||
else
|
||||
protocol= (mysql_protocol_type) c->options.protocol;
|
||||
is_pipe= protocol == MYSQL_PROTOCOL_PIPE;
|
||||
handle= (HANDLE) mysql_get_socket(c);
|
||||
#else
|
||||
fd= mysql_get_socket(c);
|
||||
#endif
|
||||
}
|
||||
|
||||
} // namespace async_pool
|
125
client/connection_pool.h
Normal file
125
client/connection_pool.h
Normal file
|
@ -0,0 +1,125 @@
|
|||
/*
|
||||
Copyright (c) 2023, MariaDB.
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335
|
||||
USA
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <mysql.h>
|
||||
#include <vector>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#ifdef _WIN32
|
||||
#include <windows.h>
|
||||
#else
|
||||
#include <poll.h>
|
||||
#endif
|
||||
|
||||
/*
|
||||
Implementation of asynchronous mariadb connection pool.
|
||||
|
||||
This pool consists of set of MYSQL* connections, created by C API
|
||||
function. The intention is that all connections have the same state
|
||||
same server, by the same user etc.
|
||||
|
||||
The "asynchronous" means the queries are executed on the server
|
||||
without waiting for the server reply. The queries are submitted
|
||||
with mysql_send_query(), and completions are picked by poll/IOCP.
|
||||
*/
|
||||
|
||||
namespace async_pool
|
||||
{
|
||||
typedef void (*query_completion_handler)(MYSQL *mysql, const char *query, bool success, void *context);
|
||||
|
||||
struct pooled_connection
|
||||
#ifdef _WIN32
|
||||
: OVERLAPPED
|
||||
#endif
|
||||
{
|
||||
MYSQL *mysql;
|
||||
query_completion_handler on_completion=NULL;
|
||||
void *context=NULL;
|
||||
std::string query;
|
||||
bool in_use=false;
|
||||
bool release_connection=false;
|
||||
#ifdef _WIN32
|
||||
bool is_pipe;
|
||||
HANDLE handle;
|
||||
#else
|
||||
int fd;
|
||||
#endif
|
||||
pooled_connection(MYSQL *mysql);
|
||||
};
|
||||
|
||||
|
||||
struct connection_pool
|
||||
{
|
||||
private:
|
||||
std::vector<pooled_connection> all_connections;
|
||||
std::queue<pooled_connection *> free_connections;
|
||||
pooled_connection *get_connection();
|
||||
void wait_for_completions();
|
||||
void complete_query(pooled_connection *c);
|
||||
void add_to_pollset(pooled_connection *c);
|
||||
|
||||
#ifdef _WIN32
|
||||
HANDLE iocp=nullptr;
|
||||
#else
|
||||
std::vector<pollfd> pollset;
|
||||
#endif
|
||||
public:
|
||||
~connection_pool();
|
||||
|
||||
/**
|
||||
Add connections to the connection pool
|
||||
|
||||
@param con - connections
|
||||
@param n_connections - number of connections
|
||||
*/
|
||||
void init(MYSQL *con[], size_t n_connections);
|
||||
|
||||
/**
|
||||
Send query to the connection pool
|
||||
Executes query on a connection in the pool, using mysql_send_query
|
||||
|
||||
@param query - query string
|
||||
@param on_completion - callback function to be called on completion
|
||||
@param context - user context that will be passed to the callback function
|
||||
@param release_connecton - if true, the connection should be released to the
|
||||
pool after the query is executed. If you execute another
|
||||
mysql_send_query() on the same connection, set this to false.
|
||||
|
||||
Note: the function will block if there are no free connections in the pool.
|
||||
|
||||
@return return code of mysql_send_query
|
||||
*/
|
||||
int execute_async(const char *query, query_completion_handler on_completion, void *context, bool release_connecton=true);
|
||||
|
||||
/** Waits for all outstanding queries to complete.*/
|
||||
void wait_all();
|
||||
|
||||
/** Execute callback for each connection in the pool. */
|
||||
void for_each_connection(void (*f)(MYSQL *mysql));
|
||||
|
||||
/**
|
||||
Closes all connections in pool and frees all resources.
|
||||
Does not wait for pending queries to complete
|
||||
(use wait_all() for that)
|
||||
*/
|
||||
int close();
|
||||
};
|
||||
|
||||
} // namespace async_pool
|
|
@ -1 +1 @@
|
|||
Subproject commit 458a4396b443dcefedcf464067560078aa09d8b4
|
||||
Subproject commit 75ab6fb1746824648ce09805acbe535f9501df37
|
Loading…
Reference in a new issue