mirror of
				https://github.com/MariaDB/server.git
				synced 2025-11-04 04:46:15 +01:00 
			
		
		
		
	Remove the affected assert. Wait for all AIO_buffer_cache::release_buffer() to finish before AIO_buffer_cache::clear().
		
			
				
	
	
		
			268 lines
		
	
	
	
		
			6.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			268 lines
		
	
	
	
		
			6.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2012 Monty Program Ab
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or modify
 | 
						|
   it under the terms of the GNU General Public License as published by
 | 
						|
   the Free Software Foundation; version 2 of the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
 | 
						|
 */
 | 
						|
 | 
						|
#include <winsock2.h>
 | 
						|
#include <my_global.h>
 | 
						|
#include <violite.h>
 | 
						|
#include "threadpool_winsockets.h"
 | 
						|
#include <algorithm>
 | 
						|
#include <vector>
 | 
						|
#include <mutex>
 | 
						|
 | 
						|
/*
 | 
						|
 A cache for IO buffers for asynchronous socket(or named pipe) reads.
 | 
						|
 | 
						|
 Considerations on Windows : since Windows locks the AIO buffers in physical memory,
 | 
						|
 it is important that these buffers are compactly allocated.
 | 
						|
 We try to to prevent any kinds of memory fragmentation
 | 
						|
 | 
						|
 A relatively small region (at most 1MB) is allocated, for equally sized smallish(256 bytes)
 | 
						|
 This allow buffers. The region is pagesize-aligned (via VirtualAlloc allocation)
 | 
						|
 | 
						|
 We use smallish IO buffers, 256 bytes is probably large enough for most of
 | 
						|
 the queries. Larger buffers could have funny effects(thread hogginng)
 | 
						|
 on threadpool scheduling in case client is using protocol pipelining.
 | 
						|
 | 
						|
 Also note, that even in an unlikely situation where cache runs out of buffers,
 | 
						|
 this does not lead to errors, zero szed reads will be used in WSARecv then.
 | 
						|
*/
 | 
						|
 | 
						|
constexpr size_t READ_BUFSIZ= 256;
 | 
						|
class AIO_buffer_cache
 | 
						|
{
 | 
						|
  const size_t ITEM_SIZE= READ_BUFSIZ;
 | 
						|
 | 
						|
  /** Limit the whole cache to 1MB*/
 | 
						|
  const size_t MAX_SIZE= 1048576;
 | 
						|
 | 
						|
  /* Allocation base */
 | 
						|
  char *m_base= 0;
 | 
						|
 | 
						|
  /* "Free list" with LIFO policy */
 | 
						|
  std::vector<char *> m_cache;
 | 
						|
  std::mutex m_mtx;
 | 
						|
  size_t m_elements=0;
 | 
						|
 | 
						|
public:
 | 
						|
  void set_size(size_t n_items);
 | 
						|
  char *acquire_buffer();
 | 
						|
  void release_buffer(char *v);
 | 
						|
  void clear();
 | 
						|
  ~AIO_buffer_cache();
 | 
						|
};
 | 
						|
 | 
						|
 | 
						|
void AIO_buffer_cache::set_size(size_t n_items)
 | 
						|
{
 | 
						|
  DBUG_ASSERT(!m_base);
 | 
						|
  m_elements= std::min(n_items, MAX_SIZE / ITEM_SIZE);
 | 
						|
  auto sz= m_elements * ITEM_SIZE;
 | 
						|
 | 
						|
  m_base=
 | 
						|
      (char *) VirtualAlloc(0, sz, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
 | 
						|
  if (!m_base)
 | 
						|
  {
 | 
						|
    m_elements= 0;
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  /* Try to help memory manager here, by prelocking region in memory*/
 | 
						|
  (void) VirtualLock(m_base, sz);
 | 
						|
 | 
						|
  m_cache.reserve(m_elements);
 | 
						|
  for (ssize_t i= m_elements - 1; i >= 0 ; i--)
 | 
						|
    m_cache.push_back(m_base + i * ITEM_SIZE);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
  Returns a buffer, or NULL if no free buffers.
 | 
						|
 | 
						|
  LIFO policy is implemented, so we do not touch too many
 | 
						|
  pages (no std::stack though)
 | 
						|
*/
 | 
						|
char *AIO_buffer_cache::acquire_buffer()
 | 
						|
{
 | 
						|
  std::unique_lock<std::mutex> lk(m_mtx);
 | 
						|
  if (m_cache.empty())
 | 
						|
    return nullptr;
 | 
						|
  auto p= m_cache.back();
 | 
						|
  m_cache.pop_back();
 | 
						|
  return p;
 | 
						|
}
 | 
						|
 | 
						|
void AIO_buffer_cache::release_buffer(char *v)
 | 
						|
{
 | 
						|
  std::unique_lock<std::mutex> lk(m_mtx);
 | 
						|
  m_cache.push_back(v);
 | 
						|
}
 | 
						|
 | 
						|
void AIO_buffer_cache::clear()
 | 
						|
{
 | 
						|
  if (!m_base)
 | 
						|
    return;
 | 
						|
 | 
						|
  std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
 | 
						|
  for(;;)
 | 
						|
  {
 | 
						|
    if (lk.try_lock())
 | 
						|
    {
 | 
						|
      if (m_cache.size() == m_elements)
 | 
						|
        break;
 | 
						|
      lk.unlock();
 | 
						|
    }
 | 
						|
    Sleep(100);
 | 
						|
  }
 | 
						|
  VirtualFree(m_base, 0, MEM_RELEASE);
 | 
						|
  m_cache.clear();
 | 
						|
  m_base= 0;
 | 
						|
  m_elements= 0;
 | 
						|
}
 | 
						|
 | 
						|
AIO_buffer_cache::~AIO_buffer_cache() { clear(); }
 | 
						|
 | 
						|
/* Global variable for the cache buffers.*/
 | 
						|
AIO_buffer_cache read_buffers;
 | 
						|
 | 
						|
win_aiosocket::~win_aiosocket()
 | 
						|
{
 | 
						|
  if (m_buf_ptr)
 | 
						|
    read_buffers.release_buffer(m_buf_ptr);
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/** Return number of unread bytes.*/
 | 
						|
size_t win_aiosocket::buffer_remaining()
 | 
						|
{
 | 
						|
  return m_buf_datalen - m_buf_off;
 | 
						|
}
 | 
						|
 | 
						|
static my_bool my_vio_has_data(st_vio *vio)
 | 
						|
{
 | 
						|
  auto sock= (win_aiosocket *) vio->tp_ctx;
 | 
						|
  return sock->buffer_remaining() || sock->m_orig_vio_has_data(vio);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 (Half-)buffered read.
 | 
						|
 | 
						|
 The buffer is filled once, by completion of the async IO.
 | 
						|
 | 
						|
 We do not refill the buffer once it is read off,
 | 
						|
 does not make sense.
 | 
						|
*/
 | 
						|
static size_t my_vio_read(st_vio *vio, uchar *dest, size_t sz)
 | 
						|
{
 | 
						|
  auto sock= (win_aiosocket *) vio->tp_ctx;
 | 
						|
  DBUG_ASSERT(sock);
 | 
						|
 | 
						|
  auto nbytes= std::min(sock->buffer_remaining(), sz);
 | 
						|
 | 
						|
  if (nbytes > 0)
 | 
						|
  {
 | 
						|
    /* Copy to output, adjust the offset.*/
 | 
						|
    memcpy(dest, sock->m_buf_ptr + sock->m_buf_off, nbytes);
 | 
						|
    sock->m_buf_off += nbytes;
 | 
						|
    return nbytes;
 | 
						|
  }
 | 
						|
 | 
						|
  return sock->m_orig_vio_read(vio, dest, sz);
 | 
						|
}
 | 
						|
 | 
						|
DWORD win_aiosocket::begin_read()
 | 
						|
{
 | 
						|
  DWORD err = ERROR_SUCCESS;
 | 
						|
  static char c;
 | 
						|
  WSABUF buf;
 | 
						|
 | 
						|
  DBUG_ASSERT(!buffer_remaining());
 | 
						|
 | 
						|
  /*
 | 
						|
    If there is no internal buffer to store data,
 | 
						|
    we do zero size read, but still need a valid
 | 
						|
    pointer for the buffer parameter.
 | 
						|
  */
 | 
						|
  if (m_buf_ptr)
 | 
						|
    buf= {(ULONG)READ_BUFSIZ, m_buf_ptr};
 | 
						|
  else
 | 
						|
    buf= {0, &c};
 | 
						|
 | 
						|
 | 
						|
  if (!m_is_pipe)
 | 
						|
  {
 | 
						|
    /* Do async io (sockets). */
 | 
						|
    DWORD flags= 0;
 | 
						|
    if (WSARecv((SOCKET) m_handle, &buf, 1, 0, &flags, &m_overlapped, NULL))
 | 
						|
      err= WSAGetLastError();
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    /* Do async read (named pipe) */
 | 
						|
    if (!ReadFile(m_handle, buf.buf, buf.len, 0, &m_overlapped))
 | 
						|
      err= GetLastError();
 | 
						|
  }
 | 
						|
 | 
						|
  if (!err || err == ERROR_IO_PENDING)
 | 
						|
    return 0;
 | 
						|
  return err;
 | 
						|
}
 | 
						|
 | 
						|
void win_aiosocket::end_read(ULONG nbytes, DWORD err)
 | 
						|
{
 | 
						|
  DBUG_ASSERT(!buffer_remaining());
 | 
						|
  DBUG_ASSERT(!nbytes || m_buf_ptr);
 | 
						|
  m_buf_off= 0;
 | 
						|
  m_buf_datalen= nbytes;
 | 
						|
}
 | 
						|
 | 
						|
void win_aiosocket::init(Vio *vio)
 | 
						|
{
 | 
						|
  m_is_pipe= vio->type == VIO_TYPE_NAMEDPIPE;
 | 
						|
  m_handle=
 | 
						|
      m_is_pipe ? vio->hPipe : (HANDLE) mysql_socket_getfd(vio->mysql_socket);
 | 
						|
 | 
						|
  SetFileCompletionNotificationModes(m_handle, FILE_SKIP_SET_EVENT_ON_HANDLE);
 | 
						|
  if (vio->type == VIO_TYPE_SSL)
 | 
						|
  {
 | 
						|
    /*
 | 
						|
    TODO : This requires fixing viossl to call our manipulated VIO
 | 
						|
    */
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  if (!(m_buf_ptr = read_buffers.acquire_buffer()))
 | 
						|
  {
 | 
						|
    /* Ran out of buffers, that's fine.*/
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  vio->tp_ctx= this;
 | 
						|
 | 
						|
  m_orig_vio_has_data= vio->has_data;
 | 
						|
  vio->has_data= my_vio_has_data;
 | 
						|
 | 
						|
  m_orig_vio_read= vio->read;
 | 
						|
  vio->read= my_vio_read;
 | 
						|
}
 | 
						|
 | 
						|
void init_win_aio_buffers(unsigned int n_buffers)
 | 
						|
{
 | 
						|
  read_buffers.set_size(n_buffers);
 | 
						|
}
 | 
						|
 | 
						|
extern void destroy_win_aio_buffers()
 | 
						|
{
 | 
						|
  read_buffers.clear();
 | 
						|
}
 |