mirror of
https://github.com/MariaDB/server.git
synced 2025-01-15 19:42:28 +01:00
ba3354fca6
Remove the affected assert. Wait for all AIO_buffer_cache::release_buffer() to finish before AIO_buffer_cache::clear().
268 lines
6.2 KiB
C++
268 lines
6.2 KiB
C++
/* Copyright (C) 2012 Monty Program Ab
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
|
|
*/
|
|
|
|
#include <winsock2.h>
|
|
#include <my_global.h>
|
|
#include <violite.h>
|
|
#include "threadpool_winsockets.h"
|
|
#include <algorithm>
|
|
#include <vector>
|
|
#include <mutex>
|
|
|
|
/*
|
|
A cache for IO buffers for asynchronous socket(or named pipe) reads.
|
|
|
|
Considerations on Windows : since Windows locks the AIO buffers in physical memory,
|
|
it is important that these buffers are compactly allocated.
|
|
We try to to prevent any kinds of memory fragmentation
|
|
|
|
A relatively small region (at most 1MB) is allocated, for equally sized smallish(256 bytes)
|
|
This allow buffers. The region is pagesize-aligned (via VirtualAlloc allocation)
|
|
|
|
We use smallish IO buffers, 256 bytes is probably large enough for most of
|
|
the queries. Larger buffers could have funny effects(thread hogginng)
|
|
on threadpool scheduling in case client is using protocol pipelining.
|
|
|
|
Also note, that even in an unlikely situation where cache runs out of buffers,
|
|
this does not lead to errors, zero szed reads will be used in WSARecv then.
|
|
*/
|
|
|
|
constexpr size_t READ_BUFSIZ= 256;
|
|
class AIO_buffer_cache
|
|
{
|
|
const size_t ITEM_SIZE= READ_BUFSIZ;
|
|
|
|
/** Limit the whole cache to 1MB*/
|
|
const size_t MAX_SIZE= 1048576;
|
|
|
|
/* Allocation base */
|
|
char *m_base= 0;
|
|
|
|
/* "Free list" with LIFO policy */
|
|
std::vector<char *> m_cache;
|
|
std::mutex m_mtx;
|
|
size_t m_elements=0;
|
|
|
|
public:
|
|
void set_size(size_t n_items);
|
|
char *acquire_buffer();
|
|
void release_buffer(char *v);
|
|
void clear();
|
|
~AIO_buffer_cache();
|
|
};
|
|
|
|
|
|
void AIO_buffer_cache::set_size(size_t n_items)
|
|
{
|
|
DBUG_ASSERT(!m_base);
|
|
m_elements= std::min(n_items, MAX_SIZE / ITEM_SIZE);
|
|
auto sz= m_elements * ITEM_SIZE;
|
|
|
|
m_base=
|
|
(char *) VirtualAlloc(0, sz, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
|
|
if (!m_base)
|
|
{
|
|
m_elements= 0;
|
|
return;
|
|
}
|
|
|
|
/* Try to help memory manager here, by prelocking region in memory*/
|
|
(void) VirtualLock(m_base, sz);
|
|
|
|
m_cache.reserve(m_elements);
|
|
for (ssize_t i= m_elements - 1; i >= 0 ; i--)
|
|
m_cache.push_back(m_base + i * ITEM_SIZE);
|
|
}
|
|
|
|
/*
|
|
Returns a buffer, or NULL if no free buffers.
|
|
|
|
LIFO policy is implemented, so we do not touch too many
|
|
pages (no std::stack though)
|
|
*/
|
|
char *AIO_buffer_cache::acquire_buffer()
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
if (m_cache.empty())
|
|
return nullptr;
|
|
auto p= m_cache.back();
|
|
m_cache.pop_back();
|
|
return p;
|
|
}
|
|
|
|
void AIO_buffer_cache::release_buffer(char *v)
|
|
{
|
|
std::unique_lock<std::mutex> lk(m_mtx);
|
|
m_cache.push_back(v);
|
|
}
|
|
|
|
void AIO_buffer_cache::clear()
|
|
{
|
|
if (!m_base)
|
|
return;
|
|
|
|
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
|
|
for(;;)
|
|
{
|
|
if (lk.try_lock())
|
|
{
|
|
if (m_cache.size() == m_elements)
|
|
break;
|
|
lk.unlock();
|
|
}
|
|
Sleep(100);
|
|
}
|
|
VirtualFree(m_base, 0, MEM_RELEASE);
|
|
m_cache.clear();
|
|
m_base= 0;
|
|
m_elements= 0;
|
|
}
|
|
|
|
AIO_buffer_cache::~AIO_buffer_cache() { clear(); }
|
|
|
|
/* Global variable for the cache buffers.*/
|
|
AIO_buffer_cache read_buffers;
|
|
|
|
win_aiosocket::~win_aiosocket()
|
|
{
|
|
if (m_buf_ptr)
|
|
read_buffers.release_buffer(m_buf_ptr);
|
|
}
|
|
|
|
|
|
/** Return number of unread bytes.*/
|
|
size_t win_aiosocket::buffer_remaining()
|
|
{
|
|
return m_buf_datalen - m_buf_off;
|
|
}
|
|
|
|
static my_bool my_vio_has_data(st_vio *vio)
|
|
{
|
|
auto sock= (win_aiosocket *) vio->tp_ctx;
|
|
return sock->buffer_remaining() || sock->m_orig_vio_has_data(vio);
|
|
}
|
|
|
|
/*
|
|
(Half-)buffered read.
|
|
|
|
The buffer is filled once, by completion of the async IO.
|
|
|
|
We do not refill the buffer once it is read off,
|
|
does not make sense.
|
|
*/
|
|
static size_t my_vio_read(st_vio *vio, uchar *dest, size_t sz)
|
|
{
|
|
auto sock= (win_aiosocket *) vio->tp_ctx;
|
|
DBUG_ASSERT(sock);
|
|
|
|
auto nbytes= std::min(sock->buffer_remaining(), sz);
|
|
|
|
if (nbytes > 0)
|
|
{
|
|
/* Copy to output, adjust the offset.*/
|
|
memcpy(dest, sock->m_buf_ptr + sock->m_buf_off, nbytes);
|
|
sock->m_buf_off += nbytes;
|
|
return nbytes;
|
|
}
|
|
|
|
return sock->m_orig_vio_read(vio, dest, sz);
|
|
}
|
|
|
|
DWORD win_aiosocket::begin_read()
|
|
{
|
|
DWORD err = ERROR_SUCCESS;
|
|
static char c;
|
|
WSABUF buf;
|
|
|
|
DBUG_ASSERT(!buffer_remaining());
|
|
|
|
/*
|
|
If there is no internal buffer to store data,
|
|
we do zero size read, but still need a valid
|
|
pointer for the buffer parameter.
|
|
*/
|
|
if (m_buf_ptr)
|
|
buf= {(ULONG)READ_BUFSIZ, m_buf_ptr};
|
|
else
|
|
buf= {0, &c};
|
|
|
|
|
|
if (!m_is_pipe)
|
|
{
|
|
/* Do async io (sockets). */
|
|
DWORD flags= 0;
|
|
if (WSARecv((SOCKET) m_handle, &buf, 1, 0, &flags, &m_overlapped, NULL))
|
|
err= WSAGetLastError();
|
|
}
|
|
else
|
|
{
|
|
/* Do async read (named pipe) */
|
|
if (!ReadFile(m_handle, buf.buf, buf.len, 0, &m_overlapped))
|
|
err= GetLastError();
|
|
}
|
|
|
|
if (!err || err == ERROR_IO_PENDING)
|
|
return 0;
|
|
return err;
|
|
}
|
|
|
|
void win_aiosocket::end_read(ULONG nbytes, DWORD err)
|
|
{
|
|
DBUG_ASSERT(!buffer_remaining());
|
|
DBUG_ASSERT(!nbytes || m_buf_ptr);
|
|
m_buf_off= 0;
|
|
m_buf_datalen= nbytes;
|
|
}
|
|
|
|
void win_aiosocket::init(Vio *vio)
|
|
{
|
|
m_is_pipe= vio->type == VIO_TYPE_NAMEDPIPE;
|
|
m_handle=
|
|
m_is_pipe ? vio->hPipe : (HANDLE) mysql_socket_getfd(vio->mysql_socket);
|
|
|
|
SetFileCompletionNotificationModes(m_handle, FILE_SKIP_SET_EVENT_ON_HANDLE);
|
|
if (vio->type == VIO_TYPE_SSL)
|
|
{
|
|
/*
|
|
TODO : This requires fixing viossl to call our manipulated VIO
|
|
*/
|
|
return;
|
|
}
|
|
|
|
if (!(m_buf_ptr = read_buffers.acquire_buffer()))
|
|
{
|
|
/* Ran out of buffers, that's fine.*/
|
|
return;
|
|
}
|
|
|
|
vio->tp_ctx= this;
|
|
|
|
m_orig_vio_has_data= vio->has_data;
|
|
vio->has_data= my_vio_has_data;
|
|
|
|
m_orig_vio_read= vio->read;
|
|
vio->read= my_vio_read;
|
|
}
|
|
|
|
void init_win_aio_buffers(unsigned int n_buffers)
|
|
{
|
|
read_buffers.set_size(n_buffers);
|
|
}
|
|
|
|
extern void destroy_win_aio_buffers()
|
|
{
|
|
read_buffers.clear();
|
|
}
|