mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-30 18:36:12 +01:00 
			
		
		
		
	 ec5db6409d
			
		
	
	
	ec5db6409d
	
	
	
		
			
			Parallelism is achieved by using mysql_send_query on multiple connections without waiting for results, and using IO multiplexing (poll/IOCP) to wait for completions. Refresh libmariadb to pick up CONC-676 (fixes for IOCP use with named pipe)
		
			
				
	
	
		
			264 lines
		
	
	
	
		
			6.1 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			264 lines
		
	
	
	
		
			6.1 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|    Copyright (c) 2023, MariaDB.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or modify
 | |
|    it under the terms of the GNU General Public License as published by
 | |
|    the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335
 | |
|    USA
 | |
| */
 | |
| 
 | |
| /*
 | |
|   Connection pool, with parallel query execution
 | |
|   Does not use threads, but IO multiplexing via mysql_send_query and
 | |
|   poll/iocp to wait for completions
 | |
| */
 | |
| #include <my_global.h>
 | |
| #ifdef _WIN32
 | |
| #include <winsock2.h>
 | |
| #else
 | |
| #include <poll.h>
 | |
| #endif
 | |
| #include "connection_pool.h"
 | |
| #include <my_compiler.h>
 | |
| 
 | |
| namespace async_pool
 | |
| {
 | |
| static ATTRIBUTE_NORETURN void die(const char *format, ...)
 | |
| {
 | |
|   va_list args;
 | |
|   va_start(args, format);
 | |
|   vfprintf(stderr, format, args);
 | |
|   va_end(args);
 | |
|   abort();
 | |
| }
 | |
| 
 | |
| pooled_connection *connection_pool::get_connection()
 | |
| {
 | |
|   while (free_connections.empty())
 | |
|     wait_for_completions();
 | |
|   auto c= free_connections.front();
 | |
|   free_connections.pop();
 | |
|   return c;
 | |
| }
 | |
| 
 | |
| #ifdef _WIN32
 | |
| void connection_pool::add_to_pollset(pooled_connection *c)
 | |
| {
 | |
|   DWORD err= ERROR_SUCCESS;
 | |
|   static char ch;
 | |
|   WSABUF buf;
 | |
|   buf.len= 0;
 | |
|   buf.buf= &ch;
 | |
|   if (!c->is_pipe)
 | |
|   {
 | |
|     /* Do async io (sockets). */
 | |
|     DWORD flags= 0;
 | |
|     if (WSARecv((SOCKET) c->handle, &buf, 1, 0, &flags, c, NULL))
 | |
|       err= WSAGetLastError();
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     /* Do async read (named pipe) */
 | |
|     if (!ReadFile(c->handle, buf.buf, buf.len, 0, c))
 | |
|       err= GetLastError();
 | |
|   }
 | |
| 
 | |
|   if (err && err != ERROR_IO_PENDING)
 | |
|     die("%s failed: %d\n", c->is_pipe ? "ReadFile" : "WSARecv", err);
 | |
| }
 | |
| 
 | |
| /*
 | |
|   Wait for completions of queries.Uses IOCP on windows to wait for completions.
 | |
|   (ReadFile/WSARecv with 0 bytes serves as readiness notification)
 | |
| */
 | |
| void connection_pool::wait_for_completions()
 | |
| {
 | |
|   ULONG n;
 | |
|   OVERLAPPED_ENTRY events[32];
 | |
|   if (!GetQueuedCompletionStatusEx(iocp, events, array_elements(events), &n, INFINITE,
 | |
|                                    FALSE))
 | |
|   {
 | |
|     die("GetQueuedCompletionStatusEx failed: %d\n", GetLastError());
 | |
|   }
 | |
| 
 | |
|   for (ULONG i= 0; i < n; i++)
 | |
|   {
 | |
|     auto c= (pooled_connection *) events[i].lpOverlapped;
 | |
|     if (!c)
 | |
|       die("GetQueuedCompletionStatusEx unexpected return");
 | |
|     DBUG_ASSERT(c->mysql);
 | |
|     DBUG_ASSERT(!events[i].lpCompletionKey);
 | |
|     DBUG_ASSERT(!events[i].dwNumberOfBytesTransferred);
 | |
|     complete_query(c);
 | |
|   }
 | |
| }
 | |
| #else /* !_WIN32 */
 | |
| void connection_pool::add_to_pollset(pooled_connection *c)
 | |
| {
 | |
|   size_t idx= c - &all_connections[0];
 | |
|   pollfd *pfd= &pollset[idx];
 | |
|   pfd->fd= c->fd;
 | |
|   pfd->events= POLLIN;
 | |
|   pfd->revents= 0;
 | |
| }
 | |
| 
 | |
| /* something Linux-ish, can be returned for POLLIN event*/
 | |
| #ifndef POLLRDHUP
 | |
| #define POLLRDHUP 0
 | |
| #endif
 | |
| 
 | |
| void connection_pool::wait_for_completions()
 | |
| {
 | |
|   int n;
 | |
|   while ((n= poll(pollset.data(), pollset.size(), -1)) <= 0)
 | |
|   {
 | |
|     if (errno == EINTR)
 | |
|       continue;
 | |
|     die("poll failed: %d\n", errno);
 | |
|   }
 | |
| 
 | |
|   for (uint i= 0; n > 0 && i < pollset.size(); i++)
 | |
|   {
 | |
|     pollfd* pfd= &pollset[i];
 | |
|     if (pfd->revents &
 | |
|         (POLLIN | POLLPRI | POLLHUP | POLLRDHUP| POLLERR | POLLNVAL))
 | |
|     {
 | |
|       pfd->events= 0;
 | |
|       pfd->revents= 0;
 | |
|       pfd->fd= -1;
 | |
|       complete_query(&all_connections[i]);
 | |
|       n--;
 | |
|     }
 | |
|   }
 | |
|   if (n)
 | |
|     die("poll() failed to find free connection: %d\n");
 | |
| }
 | |
| #endif
 | |
| 
 | |
| void connection_pool::complete_query(pooled_connection *c)
 | |
| {
 | |
|   int err= mysql_read_query_result(c->mysql);
 | |
|   if (c->on_completion)
 | |
|     c->on_completion(c->mysql, c->query.c_str(), !err, c->context);
 | |
|   if (c->release_connection)
 | |
|   {
 | |
|     c->in_use= false;
 | |
|     free_connections.push(c);
 | |
|   }
 | |
| }
 | |
| 
 | |
| connection_pool::~connection_pool()
 | |
| {
 | |
|   close();
 | |
| }
 | |
| 
 | |
| void connection_pool::init(MYSQL *con[], size_t n)
 | |
| {
 | |
| #ifdef _WIN32
 | |
|   iocp= CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
 | |
|   if (!iocp)
 | |
|     die("CreateIoCompletionPort failed: %d\n", GetLastError());
 | |
| #else
 | |
|   pollset.resize(n);
 | |
|   for (auto &pfd : pollset)
 | |
|     pfd.fd= -1;
 | |
| #endif
 | |
| 
 | |
| 
 | |
|   for (size_t i= 0; i < n; i++)
 | |
|     all_connections.emplace_back(con[i]);
 | |
| 
 | |
|   for (auto &con : all_connections)
 | |
|   {
 | |
|     free_connections.push(&con);
 | |
| #ifdef _WIN32
 | |
|     if (!CreateIoCompletionPort(con.handle, iocp, 0, 0))
 | |
|       die("CreateIoCompletionPort failed: %d\n", GetLastError());
 | |
| #endif
 | |
|   }
 | |
| }
 | |
| 
 | |
| int connection_pool::execute_async(const char *query,
 | |
|                                    query_completion_handler on_completion,
 | |
|                                    void *context, bool release_connecton)
 | |
| {
 | |
|   auto c= get_connection();
 | |
|   c->context= context;
 | |
|   c->on_completion= on_completion;
 | |
|   c->release_connection= release_connecton;
 | |
|   c->query= query;
 | |
| 
 | |
|   int ret= mysql_send_query(c->mysql, query, (unsigned long) c->query.size());
 | |
|   if (ret)
 | |
|   {
 | |
|     free_connections.push(c);
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   c->in_use= true;
 | |
|   add_to_pollset(c);
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| /*
 | |
|   Wait until all queries are completed and all 
 | |
|   connections are idle.
 | |
| */
 | |
| void connection_pool::wait_all()
 | |
| {
 | |
|   while (free_connections.size() != all_connections.size())
 | |
|     wait_for_completions();
 | |
| }
 | |
| 
 | |
| void connection_pool::for_each_connection(void(*f)(MYSQL *mysql))
 | |
| {
 | |
|   for (auto &c : all_connections)
 | |
|    f(c.mysql);
 | |
| }
 | |
| 
 | |
| int connection_pool::close()
 | |
| {
 | |
|   for (auto &c : all_connections)
 | |
|     mysql_close(c.mysql);
 | |
| 
 | |
|   all_connections.clear();
 | |
|   while (!free_connections.empty())
 | |
|     free_connections.pop();
 | |
| #ifdef _WIN32
 | |
|   if (iocp)
 | |
|   {
 | |
|     CloseHandle(iocp);
 | |
|     iocp= nullptr;
 | |
|   }
 | |
| #endif
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| pooled_connection::pooled_connection(MYSQL *c)
 | |
| {
 | |
|   mysql= c;
 | |
| #ifdef _WIN32
 | |
|   OVERLAPPED *ov= static_cast<OVERLAPPED *>(this);
 | |
|   memset(ov, 0, sizeof(OVERLAPPED));
 | |
|   mysql_protocol_type protocol;
 | |
|   if (c->host && !strcmp(c->host, "."))
 | |
|     protocol= MYSQL_PROTOCOL_PIPE;
 | |
|   else
 | |
|     protocol= (mysql_protocol_type) c->options.protocol;
 | |
|   is_pipe= protocol == MYSQL_PROTOCOL_PIPE;
 | |
|   handle= (HANDLE) mysql_get_socket(c);
 | |
| #else
 | |
|   fd= mysql_get_socket(c);
 | |
| #endif
 | |
| }
 | |
| 
 | |
| } // namespace async_pool
 |