mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-31 02:46:29 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			1759 lines
		
	
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1759 lines
		
	
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2012, 2020, MariaDB Corporation.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or modify
 | |
|    it under the terms of the GNU General Public License as published by
 | |
|    the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
 | |
| 
 | |
| #if (defined HAVE_POOL_OF_THREADS) && !defined(EMBEDDED_LIBRARY)
 | |
| 
 | |
| #include "threadpool_generic.h"
 | |
| #include "mariadb.h"
 | |
| #include <violite.h>
 | |
| #include <sql_priv.h>
 | |
| #include <sql_class.h>
 | |
| #include <my_pthread.h>
 | |
| #include <scheduler.h>
 | |
| #include <sql_connect.h>
 | |
| #include <mysqld.h>
 | |
| #include <debug_sync.h>
 | |
| #include <time.h>
 | |
| #include <sql_plist.h>
 | |
| #include <threadpool.h>
 | |
| #include <algorithm>
 | |
| #ifdef _WIN32
 | |
| #include "threadpool_winsockets.h"
 | |
| #define OPTIONAL_IO_POLL_READ_PARAM this
 | |
| #else 
 | |
| #define OPTIONAL_IO_POLL_READ_PARAM 0
 | |
| #endif
 | |
| 
 | |
| static void io_poll_close(TP_file_handle fd)
 | |
| {
 | |
| #ifdef _WIN32
 | |
|   CloseHandle(fd);
 | |
| #else
 | |
|   close(fd);
 | |
| #endif
 | |
| }
 | |
| 
 | |
| /** Maximum number of native events a listener can read in one go */
 | |
| #define MAX_EVENTS 1024
 | |
| 
 | |
| /** Indicates that threadpool was initialized*/
 | |
| static bool threadpool_started= false; 
 | |
| 
 | |
| /* 
 | |
|   Define PSI Keys for performance schema. 
 | |
|   We have a mutex per group, worker threads, condition per worker thread, 
 | |
|   and timer thread  with its own mutex and condition.
 | |
| */
 | |
|  
 | |
|  
 | |
| #ifdef HAVE_PSI_INTERFACE
 | |
| static PSI_mutex_key key_group_mutex;
 | |
| static PSI_mutex_key key_timer_mutex;
 | |
| static PSI_mutex_info mutex_list[]=
 | |
| {
 | |
|   { &key_group_mutex, "group_mutex", 0},
 | |
|   { &key_timer_mutex, "timer_mutex", PSI_FLAG_GLOBAL}
 | |
| };
 | |
| 
 | |
| static PSI_cond_key key_worker_cond;
 | |
| static PSI_cond_key key_timer_cond;
 | |
| static PSI_cond_info cond_list[]=
 | |
| {
 | |
|   { &key_worker_cond, "worker_cond", 0},
 | |
|   { &key_timer_cond, "timer_cond", PSI_FLAG_GLOBAL}
 | |
| };
 | |
| 
 | |
| static PSI_thread_key key_worker_thread;
 | |
| static PSI_thread_key key_timer_thread;
 | |
| static PSI_thread_info	thread_list[] =
 | |
| {
 | |
|  {&key_worker_thread, "worker_thread", 0},
 | |
|  {&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL}
 | |
| };
 | |
| 
 | |
| /* Macro to simplify performance schema registration */
 | |
| #define PSI_register(X) \
 | |
|  if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
 | |
| #else
 | |
| #define PSI_register(X) /* no-op */
 | |
| #endif
 | |
| 
 | |
| thread_group_t *all_groups;
 | |
| static uint group_count;
 | |
| static Atomic_counter<uint32_t> shutdown_group_count;
 | |
| 
 | |
| /**
 | |
|  Used for printing "pool blocked" message, see
 | |
|  print_pool_blocked_message();
 | |
| */
 | |
| static ulonglong pool_block_start;
 | |
| 
 | |
| /* Global timer for all groups  */
 | |
| struct pool_timer_t
 | |
| {
 | |
|   mysql_mutex_t mutex;
 | |
|   mysql_cond_t cond;
 | |
|   volatile uint64 current_microtime;
 | |
|   std::atomic<uint64_t> next_timeout_check;
 | |
|   uint  tick_interval;
 | |
|   bool shutdown;
 | |
|   pthread_t timer_thread_id;
 | |
| };
 | |
| 
 | |
| static pool_timer_t pool_timer;
 | |
| 
 | |
| static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection);
 | |
| static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt);
 | |
| static int  wake_thread(thread_group_t *thread_group,bool due_to_stall);
 | |
| static int  wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall=false);
 | |
| static int  create_worker(thread_group_t *thread_group, bool due_to_stall);
 | |
| static void *worker_main(void *param);
 | |
| static void check_stall(thread_group_t *thread_group);
 | |
| static void set_next_timeout_check(ulonglong abstime);
 | |
| static void print_pool_blocked_message(bool);
 | |
| 
 | |
| /**
 | |
|  Asynchronous network IO.
 | |
| 
 | |
|  We use native edge-triggered network IO multiplexing facility.
 | |
|  This maps to different APIs on different Unixes.
 | |
| 
 | |
|  Supported are currently Linux with epoll, Solaris with event ports,
 | |
|  OSX and BSD with kevent, Windows with IOCP. All those API's are used with one-shot flags
 | |
|  (the event is signalled once client has written something into the socket,
 | |
|  then socket is removed from the "poll-set" until the  command is finished,
 | |
|  and we need to re-arm/re-register socket)
 | |
| 
 | |
|  No implementation for poll/select is currently provided.
 | |
| 
 | |
|  The API closely resembles all of the above mentioned platform APIs
 | |
|  and consists of following functions.
 | |
| 
 | |
|  - io_poll_create()
 | |
|  Creates an io_poll descriptor
 | |
|  On Linux: epoll_create()
 | |
| 
 | |
|  - io_poll_associate_fd(int poll_fd, TP_file_handle fd, void *data, void *opt)
 | |
|  Associate file descriptor with io poll descriptor
 | |
|  On Linux : epoll_ctl(..EPOLL_CTL_ADD))
 | |
| 
 | |
|  - io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
 | |
|   Associate file descriptor with io poll descriptor
 | |
|   On Linux: epoll_ctl(..EPOLL_CTL_DEL)
 | |
| 
 | |
| 
 | |
|  - io_poll_start_read(int poll_fd,int fd, void *data, void *opt)
 | |
|  The same as io_poll_associate_fd(), but cannot be used before
 | |
|  io_poll_associate_fd() was called.
 | |
|  On Linux : epoll_ctl(..EPOLL_CTL_MOD)
 | |
| 
 | |
|  - io_poll_wait (TP_file_handle pollfd, native_event *native_events, int maxevents,
 | |
|    int timeout_ms)
 | |
| 
 | |
|  wait until one or more descriptors added with io_poll_associate_fd()
 | |
|  or io_poll_start_read() becomes readable. Data associated with
 | |
|  descriptors can be retrieved from native_events array, using
 | |
|  native_event_get_userdata() function.
 | |
| 
 | |
|  On Linux: epoll_wait()
 | |
| */
 | |
| 
 | |
| #if defined (__linux__)
 | |
| #ifndef EPOLLRDHUP
 | |
| /* Early 2.6 kernel did not have EPOLLRDHUP */
 | |
| #define EPOLLRDHUP 0
 | |
| #endif
 | |
| static TP_file_handle io_poll_create()
 | |
| {
 | |
|   return epoll_create(1);
 | |
| }
 | |
| 
 | |
| 
 | |
| int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void*)
 | |
| {
 | |
|   struct epoll_event ev;
 | |
|   ev.data.u64= 0; /* Keep valgrind happy */
 | |
|   ev.data.ptr= data;
 | |
|   ev.events=  EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
 | |
|   return epoll_ctl(pollfd, EPOLL_CTL_ADD,  fd, &ev);
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
 | |
| {
 | |
|   struct epoll_event ev;
 | |
|   ev.data.u64= 0; /* Keep valgrind happy */
 | |
|   ev.data.ptr= data;
 | |
|   ev.events=  EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
 | |
|   return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev);
 | |
| }
 | |
| 
 | |
| int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
 | |
| {
 | |
|   struct epoll_event ev;
 | |
|   return epoll_ctl(pollfd, EPOLL_CTL_DEL,  fd, &ev);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|  Wrapper around epoll_wait.
 | |
|  NOTE - in case of EINTR, it restarts with original timeout. Since we use
 | |
|  either infinite or 0 timeouts, this is not critical
 | |
| */
 | |
| int io_poll_wait(TP_file_handle pollfd, native_event *native_events, int maxevents,
 | |
|               int timeout_ms)
 | |
| {
 | |
|   int ret;
 | |
|   do
 | |
|   {
 | |
|     ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms);
 | |
|   }
 | |
|   while(ret == -1 && errno == EINTR);
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| 
 | |
| static void *native_event_get_userdata(native_event *event)
 | |
| {
 | |
|   return event->data.ptr;
 | |
| }
 | |
| 
 | |
| #elif defined(HAVE_KQUEUE)
 | |
| 
 | |
| /*
 | |
|   NetBSD prior to 9.99.17 is incompatible with other BSDs, last parameter
 | |
|   in EV_SET macro (udata, user data) needs to be intptr_t, whereas it needs
 | |
|   to be void* everywhere else.
 | |
| */
 | |
| 
 | |
| #ifdef __NetBSD__
 | |
| #include <sys/param.h>
 | |
| #  if !__NetBSD_Prereq__(9,99,17)
 | |
| #define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, (intptr_t)g)
 | |
| #  endif
 | |
| #endif
 | |
| 
 | |
| #ifndef MY_EV_SET
 | |
| #define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, g)
 | |
| #endif
 | |
| 
 | |
| 
 | |
| TP_file_handle io_poll_create()
 | |
| {
 | |
|   return kqueue();
 | |
| }
 | |
| 
 | |
| int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
 | |
| {
 | |
|   struct kevent ke;
 | |
|   MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
 | |
|          0, 0, data);
 | |
|   return kevent(pollfd, &ke, 1, 0, 0, 0);
 | |
| }
 | |
| 
 | |
| 
 | |
| int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
 | |
| {
 | |
|   struct kevent ke;
 | |
|   MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
 | |
|          0, 0, data);
 | |
|   return io_poll_start_read(pollfd,fd, data, 0);
 | |
| }
 | |
| 
 | |
| 
 | |
| int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
 | |
| {
 | |
|   struct kevent ke;
 | |
|   MY_EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
 | |
|   return kevent(pollfd, &ke, 1, 0, 0, 0);
 | |
| }
 | |
| 
 | |
| 
 | |
| int io_poll_wait(TP_file_handle pollfd, struct kevent *events, int maxevents, int timeout_ms)
 | |
| {
 | |
|   struct timespec ts;
 | |
|   int ret;
 | |
|   if (timeout_ms >= 0)
 | |
|   {
 | |
|     ts.tv_sec= timeout_ms/1000;
 | |
|     ts.tv_nsec= (timeout_ms%1000)*1000000;
 | |
|   }
 | |
|   do
 | |
|   {
 | |
|     ret= kevent(pollfd, 0, 0, events, maxevents,
 | |
|                (timeout_ms >= 0)?&ts:NULL);
 | |
|   }
 | |
|   while (ret == -1 && errno == EINTR);
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| static void* native_event_get_userdata(native_event *event)
 | |
| {
 | |
|   return (void *)event->udata;
 | |
| }
 | |
| 
 | |
| #elif defined (__sun)
 | |
| 
 | |
| static TP_file_handle io_poll_create()
 | |
| {
 | |
|   return port_create();
 | |
| }
 | |
| 
 | |
| int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
 | |
| {
 | |
|   return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
 | |
| }
 | |
| 
 | |
| static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
 | |
| {
 | |
|   return io_poll_start_read(pollfd, fd, data, 0);
 | |
| }
 | |
| 
 | |
| int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
 | |
| {
 | |
|   return port_dissociate(pollfd, PORT_SOURCE_FD, fd);
 | |
| }
 | |
| 
 | |
| int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
 | |
| {
 | |
|   struct timespec ts;
 | |
|   int ret;
 | |
|   uint_t nget= 1;
 | |
|   if (timeout_ms >= 0)
 | |
|   {
 | |
|     ts.tv_sec= timeout_ms/1000;
 | |
|     ts.tv_nsec= (timeout_ms%1000)*1000000;
 | |
|   }
 | |
|   do
 | |
|   {
 | |
|     ret= port_getn(pollfd, events, maxevents, &nget,
 | |
|             (timeout_ms >= 0)?&ts:NULL);
 | |
|   }
 | |
|   while (ret == -1 && errno == EINTR);
 | |
|   DBUG_ASSERT(nget < INT_MAX);
 | |
|   return (int)nget;
 | |
| }
 | |
| 
 | |
| static void* native_event_get_userdata(native_event *event)
 | |
| {
 | |
|   return event->portev_user;
 | |
| }
 | |
| 
 | |
| #elif defined(_WIN32)
 | |
| 
 | |
| 
 | |
| static TP_file_handle io_poll_create()
 | |
| {
 | |
|   return CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
 | |
| }
 | |
| 
 | |
| 
 | |
| int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt)
 | |
| {
 | |
|   auto c= (TP_connection_generic *) opt;
 | |
|   return (int) c->win_sock.begin_read();
 | |
| }
 | |
| 
 | |
| 
 | |
| static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *opt)
 | |
| {
 | |
|   HANDLE h= CreateIoCompletionPort(fd, pollfd, (ULONG_PTR)data, 0);
 | |
|   if (!h)
 | |
|     return -1;
 | |
|   return io_poll_start_read(pollfd,fd, 0, opt);
 | |
| }
 | |
| 
 | |
| 
 | |
| typedef LONG NTSTATUS;
 | |
| 
 | |
| typedef struct _IO_STATUS_BLOCK {
 | |
|   union {
 | |
|     NTSTATUS Status;
 | |
|     PVOID Pointer;
 | |
|   };
 | |
|   ULONG_PTR Information;
 | |
| } IO_STATUS_BLOCK, * PIO_STATUS_BLOCK;
 | |
| 
 | |
| struct FILE_COMPLETION_INFORMATION {
 | |
|   HANDLE Port;
 | |
|   PVOID Key;
 | |
| };
 | |
| 
 | |
| enum FILE_INFORMATION_CLASS {
 | |
|   FileReplaceCompletionInformation = 0x3D
 | |
| };
 | |
| 
 | |
| 
 | |
| typedef NTSTATUS(WINAPI* pNtSetInformationFile)(HANDLE, PIO_STATUS_BLOCK, PVOID, ULONG, FILE_INFORMATION_CLASS);
 | |
| 
 | |
| int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
 | |
| {
 | |
|   static pNtSetInformationFile my_NtSetInformationFile = (pNtSetInformationFile)
 | |
|     GetProcAddress(GetModuleHandle("ntdll"), "NtSetInformationFile");
 | |
|   if (!my_NtSetInformationFile)
 | |
|     return -1; /* unexpected, we only support Windows 8.1+*/
 | |
|   IO_STATUS_BLOCK iosb{};
 | |
|   FILE_COMPLETION_INFORMATION fci{};
 | |
|   if (my_NtSetInformationFile(fd,&iosb,&fci,sizeof(fci),FileReplaceCompletionInformation))
 | |
|     return -1;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| static void *native_event_get_userdata(native_event *event)
 | |
| {
 | |
|   return (void *) event->lpCompletionKey;
 | |
| }
 | |
| 
 | |
| int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents,
 | |
|                  int timeout_ms)
 | |
| {
 | |
|   ULONG n;
 | |
|   if (!GetQueuedCompletionStatusEx(pollfd, events, maxevents, &n, timeout_ms, FALSE))
 | |
|     return -1;
 | |
| 
 | |
|   /* Update win_sock with number of bytes read.*/
 | |
|   for (ULONG i= 0; i < n; i++)
 | |
|   {
 | |
|     auto ev= &events[i];
 | |
|     auto c= (TP_connection_generic *) native_event_get_userdata(ev);
 | |
|     /* null userdata zero means shutdown (see PostQueuedCompletionStatus() usage*/
 | |
|     if (c)
 | |
|     {
 | |
|       c->win_sock.end_read(ev->dwNumberOfBytesTransferred, 0);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return (int) n;
 | |
| }
 | |
| 
 | |
| #endif
 | |
| 
 | |
| 
 | |
| /* Dequeue element from a workqueue */
 | |
| 
 | |
| static TP_connection_generic *queue_get(thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("queue_get");
 | |
|   thread_group->queue_event_count++;
 | |
|   TP_connection_generic *c;
 | |
|   for (int i=0; i < NQUEUES;i++)
 | |
|   {
 | |
|     c= thread_group->queues[i].pop_front();
 | |
|     if (c)
 | |
|       DBUG_RETURN(c);
 | |
|   }
 | |
|   DBUG_RETURN(0);
 | |
| }
 | |
| 
 | |
| static TP_connection_generic* queue_get(thread_group_t* group, operation_origin origin)
 | |
| {
 | |
|   auto ret = queue_get(group);
 | |
|   if (ret)
 | |
|   {
 | |
|     TP_INCREMENT_GROUP_COUNTER(group, dequeues[(int)origin]);
 | |
|   }
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| static bool is_queue_empty(thread_group_t *thread_group)
 | |
| {
 | |
|   for (int i=0; i < NQUEUES; i++)
 | |
|   {
 | |
|     if (!thread_group->queues[i].is_empty())
 | |
|       return false;
 | |
|   }
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| 
 | |
| static void queue_init(thread_group_t *thread_group)
 | |
| {
 | |
|   for (int i=0; i < NQUEUES; i++)
 | |
|   {
 | |
|     thread_group->queues[i].empty();
 | |
|   }
 | |
| }
 | |
| 
 | |
| static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt)
 | |
| {
 | |
|   ulonglong now= threadpool_exact_stats?microsecond_interval_timer():pool_timer.current_microtime;
 | |
|   for(int i=0; i < cnt; i++)
 | |
|   {
 | |
|     TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]);
 | |
|     c->enqueue_time= now;
 | |
|     thread_group->queues[c->priority].push_back(c);
 | |
|   }
 | |
| }
 | |
| 
 | |
| /*
 | |
|   Handle wait timeout :
 | |
|   Find connections that have been idle for too long and kill them.
 | |
|   Also, recalculate time when next timeout check should run.
 | |
| */
 | |
| 
 | |
| static my_bool timeout_check(THD *thd, pool_timer_t *timer)
 | |
| {
 | |
|   DBUG_ENTER("timeout_check");
 | |
|   if (thd->net.reading_or_writing == 1)
 | |
|   {
 | |
|     TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data;
 | |
|     if (!connection || connection->state != TP_STATE_IDLE)
 | |
|     {
 | |
|       /*
 | |
|         Connection does not have scheduler data. This happens for example
 | |
|         if THD belongs to a different scheduler, that is listening to extra_port.
 | |
|       */
 | |
|       DBUG_RETURN(0);
 | |
|     }
 | |
| 
 | |
|     if(connection->abs_wait_timeout < timer->current_microtime)
 | |
|     {
 | |
|       tp_timeout_handler(connection);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       if (connection->abs_wait_timeout < timer->current_microtime)
 | |
|         tp_timeout_handler(connection);
 | |
|       else
 | |
|         set_next_timeout_check(connection->abs_wait_timeout);
 | |
|     }
 | |
|   }
 | |
|   DBUG_RETURN(0);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|  Timer thread.
 | |
| 
 | |
|   Periodically, check if one of the thread groups is stalled. Stalls happen if
 | |
|   events are not being dequeued from the queue, or from the network, Primary
 | |
|   reason for stall can be a lengthy executing non-blocking request. It could
 | |
|   also happen that thread is waiting but wait_begin/wait_end is forgotten by
 | |
|   storage engine. Timer thread will create a new thread in group in case of
 | |
|   a stall.
 | |
| 
 | |
|   Besides checking for stalls, timer thread is also responsible for terminating
 | |
|   clients that have been idle for longer than wait_timeout seconds.
 | |
| 
 | |
|   TODO: Let the timer sleep for long time if there is no work to be done.
 | |
|   Currently it wakes up rather often on and idle server.
 | |
| */
 | |
| 
 | |
| static void* timer_thread(void *param)
 | |
| {
 | |
|   uint i;
 | |
|   pool_timer_t* timer=(pool_timer_t *)param;
 | |
| 
 | |
|   my_thread_init();
 | |
|   DBUG_ENTER("timer_thread");
 | |
|   timer->next_timeout_check.store(std::numeric_limits<uint64_t>::max(),
 | |
|                                   std::memory_order_relaxed);
 | |
|   timer->current_microtime= microsecond_interval_timer();
 | |
| 
 | |
|   for(;;)
 | |
|   {
 | |
|     struct timespec ts;
 | |
|     int err;
 | |
| 
 | |
|     set_timespec_nsec(ts, timer->tick_interval*1000000LL);
 | |
|     mysql_mutex_lock(&timer->mutex);
 | |
|     err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
 | |
|     if (timer->shutdown)
 | |
|     {
 | |
|       mysql_mutex_unlock(&timer->mutex);
 | |
|       break;
 | |
|     }
 | |
|     if (err == ETIMEDOUT)
 | |
|     {
 | |
|       timer->current_microtime= microsecond_interval_timer();
 | |
| 
 | |
|       /* Check stalls in thread groups */
 | |
|       for (i= 0; i < threadpool_max_size; i++)
 | |
|       {
 | |
|         if(all_groups[i].connection_count)
 | |
|            check_stall(&all_groups[i]);
 | |
|       }
 | |
| 
 | |
|       /* Check if any client exceeded wait_timeout */
 | |
|       if (timer->next_timeout_check.load(std::memory_order_relaxed) <=
 | |
|           timer->current_microtime)
 | |
|       {
 | |
|         /* Reset next timeout check, it will be recalculated below */
 | |
|         timer->next_timeout_check.store(std::numeric_limits<uint64_t>::max(),
 | |
|                                         std::memory_order_relaxed);
 | |
|         server_threads.iterate(timeout_check, timer);
 | |
|       }
 | |
|     }
 | |
|     mysql_mutex_unlock(&timer->mutex);
 | |
|   }
 | |
| 
 | |
|   mysql_mutex_destroy(&timer->mutex);
 | |
|   my_thread_end();
 | |
|   return NULL;
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| void check_stall(thread_group_t *thread_group)
 | |
| {
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
| 
 | |
|   /*
 | |
|    Bump priority for the low priority connections that spent too much
 | |
|    time in low prio queue.
 | |
|   */
 | |
|   TP_connection_generic *c;
 | |
|   for (;;)
 | |
|   {
 | |
|     c= thread_group->queues[TP_PRIORITY_LOW].front();
 | |
|     if (c && pool_timer.current_microtime - c->enqueue_time > 1000ULL * threadpool_prio_kickup_timer)
 | |
|     {
 | |
|       thread_group->queues[TP_PRIORITY_LOW].remove(c);
 | |
|       thread_group->queues[TP_PRIORITY_HIGH].push_back(c);
 | |
|     }
 | |
|     else
 | |
|       break;
 | |
|   }
 | |
| 
 | |
|   /*
 | |
|     Check if listener is present. If not,  check whether any IO
 | |
|     events were dequeued since last time. If not, this means
 | |
|     listener is either in tight loop or thd_wait_begin()
 | |
|     was forgotten. Create a new worker(it will make itself listener).
 | |
|   */
 | |
|   if (!thread_group->listener && !thread_group->io_event_count)
 | |
|   {
 | |
|     wake_or_create_thread(thread_group, true);
 | |
|     mysql_mutex_unlock(&thread_group->mutex);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   /*  Reset io event count */
 | |
|   thread_group->io_event_count= 0;
 | |
| 
 | |
|   /*
 | |
|     Check whether requests from the workqueue are being dequeued.
 | |
| 
 | |
|     The stall detection and resolution works as follows:
 | |
| 
 | |
|     1. There is a counter thread_group->queue_event_count for the number of
 | |
|        events removed from the queue. Timer resets the counter to 0 on each run.
 | |
|     2. Timer determines stall if this counter remains 0 since last check
 | |
|        and the queue is not empty.
 | |
|     3. Once timer determined a stall it sets thread_group->stalled flag and
 | |
|        wakes and idle worker (or creates a new one, subject to throttling).
 | |
|     4. The stalled flag is reset, when an event is dequeued.
 | |
| 
 | |
|     Q : Will this handling lead to an unbound growth of threads, if queue
 | |
|     stalls permanently?
 | |
|     A : No. If queue stalls permanently, it is an indication for many very long
 | |
|     simultaneous queries. The maximum number of simultanoues queries is
 | |
|     max_connections, further we have threadpool_max_threads limit, upon which no
 | |
|     worker threads are created. So in case there is a flood of very long
 | |
|     queries, threadpool would slowly approach thread-per-connection behavior.
 | |
|     NOTE:
 | |
|     If long queries never wait, creation of the new threads is done by timer,
 | |
|     so it is slower than in real thread-per-connection. However if long queries
 | |
|     do wait and indicate that via thd_wait_begin/end callbacks, thread creation
 | |
|     will be faster.
 | |
|   */
 | |
|   if (!is_queue_empty(thread_group) && !thread_group->queue_event_count)
 | |
|   {
 | |
|     thread_group->stalled= true;
 | |
|     TP_INCREMENT_GROUP_COUNTER(thread_group,stalls);
 | |
|     wake_or_create_thread(thread_group,true);
 | |
|   }
 | |
| 
 | |
|   /* Reset queue event count */
 | |
|   thread_group->queue_event_count= 0;
 | |
| 
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
| }
 | |
| 
 | |
| 
 | |
| static void start_timer(pool_timer_t* timer)
 | |
| {
 | |
|   DBUG_ENTER("start_timer");
 | |
|   mysql_mutex_init(key_timer_mutex,&timer->mutex, NULL);
 | |
|   mysql_cond_init(key_timer_cond, &timer->cond, NULL);
 | |
|   timer->shutdown = false;
 | |
|   mysql_thread_create(key_timer_thread, &timer->timer_thread_id, NULL,
 | |
|                       timer_thread, timer);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| static void stop_timer(pool_timer_t *timer)
 | |
| {
 | |
|   DBUG_ENTER("stop_timer");
 | |
|   mysql_mutex_lock(&timer->mutex);
 | |
|   timer->shutdown = true;
 | |
|   mysql_cond_signal(&timer->cond);
 | |
|   mysql_mutex_unlock(&timer->mutex);
 | |
|   pthread_join(timer->timer_thread_id, NULL);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Poll for socket events and distribute them to worker threads
 | |
|   In many case current thread will handle single event itself.
 | |
| 
 | |
|   @return a ready connection, or NULL on shutdown
 | |
| */
 | |
| static TP_connection_generic * listener(worker_thread_t *current_thread,
 | |
|                                thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("listener");
 | |
|   TP_connection_generic *retval= NULL;
 | |
| 
 | |
|   for(;;)
 | |
|   {
 | |
|     native_event ev[MAX_EVENTS];
 | |
|     int cnt;
 | |
| 
 | |
|     if (thread_group->shutdown)
 | |
|       break;
 | |
| 
 | |
|     cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
 | |
|     TP_INCREMENT_GROUP_COUNTER(thread_group, polls[(int)operation_origin::LISTENER]);
 | |
|     if (cnt <=0)
 | |
|     {
 | |
|       DBUG_ASSERT(thread_group->shutdown);
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     mysql_mutex_lock(&thread_group->mutex);
 | |
| 
 | |
|     if (thread_group->shutdown)
 | |
|     {
 | |
|       mysql_mutex_unlock(&thread_group->mutex);
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     thread_group->io_event_count += cnt;
 | |
| 
 | |
|     /*
 | |
|      We got some network events and need to make decisions : whether
 | |
|      listener  hould handle events and whether or not any wake worker
 | |
|      threads so they can handle events.
 | |
| 
 | |
|      Q1 : Should listener handle an event itself, or put all events into
 | |
|      queue  and let workers handle the events?
 | |
| 
 | |
|      Solution :
 | |
|      Generally, listener that handles events itself is preferable. We do not
 | |
|      want listener thread to change its state from waiting  to running too
 | |
|      often, Since listener has just woken from poll, it better uses its time
 | |
|      slice and does some work. Besides, not handling events means they go to
 | |
|      the  queue, and often to wake another worker must wake up to handle the
 | |
|      event. This is not good, as we want to avoid wakeups.
 | |
| 
 | |
|      The downside of listener that also handles queries is that we can
 | |
|      potentially leave thread group  for long time not picking the new
 | |
|      network events. It is not  a major problem, because this stall will be
 | |
|      detected  sooner or later by  the timer thread. Still, relying on timer
 | |
|      is not always good, because it may "tick" too slow (large timer_interval)
 | |
| 
 | |
|      We use following strategy to solve this problem - if queue was not empty
 | |
|      we suspect flood of network events and listener stays, Otherwise, it
 | |
|      handles a query.
 | |
| 
 | |
|      Q2: If queue is not empty, how many workers to wake?
 | |
| 
 | |
|      Solution:
 | |
|      We generally try to keep one thread per group active (threads handling
 | |
|      queries are considered active, unless they stuck in inside some "wait")
 | |
|      Thus, we will wake only one worker, and only if there is not active
 | |
|      threads currently,and listener is not going to handle a query. When we
 | |
|      don't wake, we hope that  currently active  threads will finish fast and
 | |
|      handle the queue. If this does  not happen, timer thread will detect stall
 | |
|      and wake a worker.
 | |
| 
 | |
|      NOTE: Currently nothing is done to detect or prevent long queuing times.
 | |
|      A solution for the future would be to give up "one active thread per
 | |
|      group" principle, if events stay  in the queue for too long, and just wake
 | |
|      more workers.
 | |
|     */
 | |
| 
 | |
|     bool listener_picks_event=is_queue_empty(thread_group) && !threadpool_dedicated_listener;
 | |
|     queue_put(thread_group, ev, cnt);
 | |
|     if (listener_picks_event)
 | |
|     {
 | |
|       /* Handle the first event. */
 | |
|       retval= queue_get(thread_group, operation_origin::LISTENER);
 | |
|       mysql_mutex_unlock(&thread_group->mutex);
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     if(thread_group->active_thread_count==0)
 | |
|     {
 | |
|       /* We added some work items to queue, now wake a worker. */
 | |
|       if(wake_thread(thread_group, false))
 | |
|       {
 | |
|         /*
 | |
|           Wake failed, hence groups has no idle threads. Now check if there are
 | |
|           any threads in the group except listener.
 | |
|         */
 | |
|         if(thread_group->thread_count == 1)
 | |
|         {
 | |
|            /*
 | |
|              Currently there is no worker thread in the group, as indicated by
 | |
|              thread_count == 1 (this means listener is the only one thread in
 | |
|              the group).
 | |
|              The queue is not empty, and listener is not going to handle
 | |
|              events. In order to drain the queue,  we create a worker here.
 | |
|              Alternatively, we could just rely on timer to detect stall, and
 | |
|              create thread, but waiting for timer would be an inefficient and
 | |
|              pointless delay.
 | |
|            */
 | |
|            create_worker(thread_group, false);
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     mysql_mutex_unlock(&thread_group->mutex);
 | |
|   }
 | |
| 
 | |
|   DBUG_RETURN(retval);
 | |
| }
 | |
| 
 | |
| /**
 | |
|   Adjust thread counters in group or global
 | |
|   whenever thread is created or is about to exit
 | |
| 
 | |
|   @param thread_group
 | |
|   @param count -  1, when new thread is created
 | |
|                  -1, when thread is about to exit
 | |
| */
 | |
| 
 | |
| static void add_thread_count(thread_group_t *thread_group, int32 count)
 | |
| {
 | |
|   thread_group->thread_count += count;
 | |
|   /* worker starts out and end in "active" state */
 | |
|   thread_group->active_thread_count += count;
 | |
|   tp_stats.num_worker_threads+= count;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Creates a new worker thread.
 | |
|   thread_mutex must be held when calling this function
 | |
| 
 | |
|   NOTE: in rare cases, the number of threads can exceed
 | |
|   threadpool_max_threads, because we need at least 2 threads
 | |
|   per group to prevent deadlocks (one listener + one worker)
 | |
| */
 | |
| 
 | |
| static int create_worker(thread_group_t *thread_group, bool due_to_stall)
 | |
| {
 | |
|   pthread_t thread_id;
 | |
|   bool max_threads_reached= false;
 | |
|   int err;
 | |
| 
 | |
|   DBUG_ENTER("create_worker");
 | |
|   if (tp_stats.num_worker_threads >= threadpool_max_threads
 | |
|      && thread_group->thread_count >= 2)
 | |
|   {
 | |
|     err= 1;
 | |
|     max_threads_reached= true;
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   err= mysql_thread_create(key_worker_thread, &thread_id,
 | |
|          thread_group->pthread_attr, worker_main, thread_group);
 | |
|   if (!err)
 | |
|   {
 | |
|     thread_group->last_thread_creation_time=microsecond_interval_timer();
 | |
|     statistic_increment(thread_created,&LOCK_status);
 | |
|     add_thread_count(thread_group, 1);
 | |
|     TP_INCREMENT_GROUP_COUNTER(thread_group,thread_creations);
 | |
|     if(due_to_stall)
 | |
|     {
 | |
|       TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations_due_to_stall);
 | |
|     }
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     my_errno= errno;
 | |
|   }
 | |
| 
 | |
| end:
 | |
|   if (err)
 | |
|     print_pool_blocked_message(max_threads_reached);
 | |
|   else
 | |
|     pool_block_start= 0; /* Reset pool blocked timer, if it was set */
 | |
| 
 | |
|   DBUG_RETURN(err);
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|  Calculate microseconds throttling delay for thread creation.
 | |
| 
 | |
|  The value depends on how many threads are already in the group:
 | |
|  small number of threads means no delay, the more threads the larger
 | |
|  the delay.
 | |
| 
 | |
|  The actual values were not calculated using any scientific methods.
 | |
|  They just look right, and behave well in practice.
 | |
| */
 | |
| 
 | |
| #define THROTTLING_FACTOR (threadpool_stall_limit/std::max(DEFAULT_THREADPOOL_STALL_LIMIT,threadpool_stall_limit))
 | |
| 
 | |
| static ulonglong microsecond_throttling_interval(thread_group_t *thread_group)
 | |
| {
 | |
|   int count= thread_group->thread_count;
 | |
| 
 | |
|   if (count < 1+ (int)threadpool_oversubscribe)
 | |
|     return 0;
 | |
| 
 | |
|   if (count < 8)
 | |
|     return 50*1000*THROTTLING_FACTOR;
 | |
| 
 | |
|   if(count < 16)
 | |
|     return 100*1000*THROTTLING_FACTOR;
 | |
| 
 | |
|   return 200*100*THROTTLING_FACTOR;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Wakes a worker thread, or creates a new one.
 | |
| 
 | |
|   Worker creation is throttled, so we avoid too many threads
 | |
|   to be created during the short time.
 | |
| */
 | |
| static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall)
 | |
| {
 | |
|   DBUG_ENTER("wake_or_create_thread");
 | |
| 
 | |
|   if (thread_group->shutdown)
 | |
|    DBUG_RETURN(0);
 | |
| 
 | |
|   if (wake_thread(thread_group, due_to_stall) == 0)
 | |
|   {
 | |
|     DBUG_RETURN(0);
 | |
|   }
 | |
| 
 | |
|   if (thread_group->thread_count > thread_group->connection_count)
 | |
|     DBUG_RETURN(-1);
 | |
| 
 | |
| 
 | |
|   if (thread_group->active_thread_count == 0)
 | |
|   {
 | |
|     /*
 | |
|      We're better off creating a new thread here  with no delay, either there
 | |
|      are no workers at all, or they all are all blocking and there was no
 | |
|      idle  thread to wakeup. Smells like a potential deadlock or very slowly
 | |
|      executing requests, e.g sleeps or user locks.
 | |
|     */
 | |
|     DBUG_RETURN(create_worker(thread_group, due_to_stall));
 | |
|   }
 | |
| 
 | |
|   ulonglong now = microsecond_interval_timer();
 | |
|   ulonglong time_since_last_thread_created =
 | |
|     (now - thread_group->last_thread_creation_time);
 | |
| 
 | |
|   /* Throttle thread creation. */
 | |
|   if (time_since_last_thread_created >
 | |
|        microsecond_throttling_interval(thread_group))
 | |
|   {
 | |
|     DBUG_RETURN(create_worker(thread_group, due_to_stall));
 | |
|   }
 | |
| 
 | |
|   TP_INCREMENT_GROUP_COUNTER(thread_group,throttles);
 | |
|   DBUG_RETURN(-1);
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
 | |
| {
 | |
|   DBUG_ENTER("thread_group_init");
 | |
|   thread_group->pthread_attr = thread_attr;
 | |
|   mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
 | |
|   thread_group->pollfd= INVALID_HANDLE_VALUE;
 | |
|   thread_group->shutdown_pipe[0]= -1;
 | |
|   thread_group->shutdown_pipe[1]= -1;
 | |
|   queue_init(thread_group);
 | |
|   DBUG_RETURN(0);
 | |
| }
 | |
| 
 | |
| 
 | |
| void thread_group_destroy(thread_group_t *thread_group)
 | |
| {
 | |
|   mysql_mutex_destroy(&thread_group->mutex);
 | |
|   if (thread_group->pollfd != INVALID_HANDLE_VALUE)
 | |
|   {
 | |
|     io_poll_close(thread_group->pollfd);
 | |
|     thread_group->pollfd= INVALID_HANDLE_VALUE;
 | |
|   }
 | |
| #ifndef _WIN32
 | |
|   for(int i=0; i < 2; i++)
 | |
|   {
 | |
|     if(thread_group->shutdown_pipe[i] != -1)
 | |
|     {
 | |
|       close(thread_group->shutdown_pipe[i]);
 | |
|       thread_group->shutdown_pipe[i]= -1;
 | |
|     }
 | |
|   }
 | |
| #endif
 | |
| 
 | |
|   if (!--shutdown_group_count)
 | |
|   {
 | |
|     my_free(all_groups);
 | |
|     all_groups= 0;
 | |
|   }
 | |
| }
 | |
| 
 | |
| /**
 | |
|   Wake sleeping thread from waiting list
 | |
| */
 | |
| 
 | |
| static int wake_thread(thread_group_t *thread_group,bool due_to_stall)
 | |
| {
 | |
|   DBUG_ENTER("wake_thread");
 | |
|   worker_thread_t *thread = thread_group->waiting_threads.front();
 | |
|   if(thread)
 | |
|   {
 | |
|     thread->woken= true;
 | |
|     thread_group->waiting_threads.remove(thread);
 | |
|     mysql_cond_signal(&thread->cond);
 | |
|     TP_INCREMENT_GROUP_COUNTER(thread_group, wakes);
 | |
|     if (due_to_stall)
 | |
|     {
 | |
|       TP_INCREMENT_GROUP_COUNTER(thread_group, wakes_due_to_stall);
 | |
|     }
 | |
|     DBUG_RETURN(0);
 | |
|   }
 | |
|   DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
 | |
| }
 | |
| 
 | |
| /*
 | |
|    Wake listener thread (during shutdown)
 | |
|    Self-pipe trick is used in most cases,except IOCP.
 | |
| */
 | |
| static int wake_listener(thread_group_t *thread_group)
 | |
| {
 | |
| #ifndef _WIN32
 | |
|   if (pipe(thread_group->shutdown_pipe))
 | |
|   {
 | |
|     return -1;
 | |
|   }
 | |
| 
 | |
|   /* Wake listener */
 | |
|   if (io_poll_associate_fd(thread_group->pollfd,
 | |
|     thread_group->shutdown_pipe[0], NULL, NULL))
 | |
|   {
 | |
|     return -1;
 | |
|   }
 | |
|   char c= 0;
 | |
|   if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
 | |
|     return -1;
 | |
| #else
 | |
|   PostQueuedCompletionStatus(thread_group->pollfd, 0, 0, 0);
 | |
| #endif
 | |
|   return 0;
 | |
| }
 | |
| /**
 | |
|   Initiate shutdown for thread group.
 | |
| 
 | |
|   The shutdown is asynchronous, we only care to  wake all threads in here, so
 | |
|   they can finish. We do not wait here until threads terminate. Final cleanup
 | |
|   of the group (thread_group_destroy) will be done by the last exiting threads.
 | |
| */
 | |
| 
 | |
| static void thread_group_close(thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("thread_group_close");
 | |
| 
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   if (thread_group->thread_count == 0)
 | |
|   {
 | |
|     mysql_mutex_unlock(&thread_group->mutex);
 | |
|     thread_group_destroy(thread_group);
 | |
|     DBUG_VOID_RETURN;
 | |
|   }
 | |
| 
 | |
|   thread_group->shutdown= true;
 | |
|   thread_group->listener= NULL;
 | |
| 
 | |
|   wake_listener(thread_group);
 | |
| 
 | |
|   /* Wake all workers. */
 | |
|   while(wake_thread(thread_group, false) == 0)
 | |
|   {
 | |
|   }
 | |
| 
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Add work to the queue. Maybe wake a worker if they all sleep.
 | |
| 
 | |
|   Currently, this function is only used when new connections need to
 | |
|   perform login (this is done in worker threads).
 | |
| 
 | |
| */
 | |
| 
 | |
| static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection)
 | |
| {
 | |
|   DBUG_ENTER("queue_put");
 | |
| 
 | |
|   connection->enqueue_time= threadpool_exact_stats?microsecond_interval_timer():pool_timer.current_microtime;
 | |
|   thread_group->queues[connection->priority].push_back(connection);
 | |
| 
 | |
|   if (thread_group->active_thread_count == 0)
 | |
|     wake_or_create_thread(thread_group);
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Prevent too many threads executing at the same time,if the workload is
 | |
|   not CPU bound.
 | |
| */
 | |
| 
 | |
| static bool too_many_threads(thread_group_t *thread_group)
 | |
| {
 | |
|   return (thread_group->active_thread_count >= 1+(int)threadpool_oversubscribe
 | |
|    && !thread_group->stalled);
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Retrieve a connection with pending event.
 | |
| 
 | |
|   Pending event in our case means that there is either a pending login request
 | |
|   (if connection is not yet logged in), or there are unread bytes on the socket.
 | |
| 
 | |
|   If there are no pending events currently, thread will wait.
 | |
|   If timeout specified in abstime parameter passes, the function returns NULL.
 | |
| 
 | |
|   @param current_thread - current worker thread
 | |
|   @param thread_group - current thread group
 | |
|   @param abstime - absolute wait timeout
 | |
| 
 | |
|   @return
 | |
|   connection with pending event.
 | |
|   NULL is returned if timeout has expired,or on shutdown.
 | |
| */
 | |
| 
 | |
| TP_connection_generic *get_event(worker_thread_t *current_thread,
 | |
|   thread_group_t *thread_group,  struct timespec *abstime)
 | |
| {
 | |
|   DBUG_ENTER("get_event");
 | |
|   TP_connection_generic *connection = NULL;
 | |
| 
 | |
| 
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   DBUG_ASSERT(thread_group->active_thread_count >= 0);
 | |
| 
 | |
|   for(;;)
 | |
|   {
 | |
|     int err=0;
 | |
|     bool oversubscribed = too_many_threads(thread_group);
 | |
|     if (thread_group->shutdown)
 | |
|      break;
 | |
| 
 | |
|     /* Check if queue is not empty */
 | |
|     if (!oversubscribed)
 | |
|     {
 | |
|       connection = queue_get(thread_group, operation_origin::WORKER);
 | |
|       if(connection)
 | |
|       {
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     /* If there is  currently no listener in the group, become one. */
 | |
|     if(!thread_group->listener)
 | |
|     {
 | |
|       thread_group->listener= current_thread;
 | |
|       thread_group->active_thread_count--;
 | |
|       mysql_mutex_unlock(&thread_group->mutex);
 | |
| 
 | |
|       connection = listener(current_thread, thread_group);
 | |
| 
 | |
|       mysql_mutex_lock(&thread_group->mutex);
 | |
|       thread_group->active_thread_count++;
 | |
|       /* There is no listener anymore, it just returned. */
 | |
|       thread_group->listener= NULL;
 | |
|       break;
 | |
|     }
 | |
| 
 | |
| 
 | |
|     /*
 | |
|       Last thing we try before going to sleep is to
 | |
|       non-blocking event poll, i.e with timeout = 0.
 | |
|       If this returns events, pick one
 | |
|     */
 | |
|     if (!oversubscribed && !threadpool_dedicated_listener)
 | |
|     {
 | |
|       native_event ev[MAX_EVENTS];
 | |
|       int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0);
 | |
|       TP_INCREMENT_GROUP_COUNTER(thread_group, polls[(int)operation_origin::WORKER]);
 | |
|       if (cnt > 0)
 | |
|       {
 | |
|         queue_put(thread_group, ev, cnt);
 | |
|         connection= queue_get(thread_group,operation_origin::WORKER);
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
| 
 | |
| 
 | |
|     /* And now, finally sleep */
 | |
|     current_thread->woken = false; /* wake() sets this to true */
 | |
| 
 | |
|     /*
 | |
|       Add current thread to the head of the waiting list  and wait.
 | |
|       It is important to add thread to the head rather than tail
 | |
|       as it ensures LIFO wakeup order (hot caches, working inactivity timeout)
 | |
|     */
 | |
|     thread_group->waiting_threads.push_front(current_thread);
 | |
| 
 | |
|     thread_group->active_thread_count--;
 | |
|     if (abstime)
 | |
|     {
 | |
|       err = mysql_cond_timedwait(¤t_thread->cond, &thread_group->mutex,
 | |
|                                  abstime);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       err = mysql_cond_wait(¤t_thread->cond, &thread_group->mutex);
 | |
|     }
 | |
|     thread_group->active_thread_count++;
 | |
| 
 | |
|     if (!current_thread->woken)
 | |
|     {
 | |
|       /*
 | |
|         Thread was not signalled by wake(), it might be a spurious wakeup or
 | |
|         a timeout. Anyhow, we need to remove ourselves from the list now.
 | |
|         If thread was explicitly woken, than caller removed us from the list.
 | |
|       */
 | |
|       thread_group->waiting_threads.remove(current_thread);
 | |
|     }
 | |
| 
 | |
|     if (err)
 | |
|       break;
 | |
|   }
 | |
| 
 | |
|   thread_group->stalled= false;
 | |
| 
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
| 
 | |
|   DBUG_RETURN(connection);
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Tells the pool that worker starts waiting  on IO, lock, condition,
 | |
|   sleep() or similar.
 | |
| */
 | |
| 
 | |
| void wait_begin(thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("wait_begin");
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   thread_group->active_thread_count--;
 | |
| 
 | |
|   DBUG_ASSERT(thread_group->active_thread_count >=0);
 | |
|   DBUG_ASSERT(thread_group->connection_count > 0);
 | |
| 
 | |
|   if ((thread_group->active_thread_count == 0) &&
 | |
|      (!is_queue_empty(thread_group) || !thread_group->listener))
 | |
|   {
 | |
|     /*
 | |
|       Group might stall while this thread waits, thus wake
 | |
|       or create a worker to prevent stall.
 | |
|     */
 | |
|     wake_or_create_thread(thread_group);
 | |
|   }
 | |
| 
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| /**
 | |
|   Tells the pool has finished waiting.
 | |
| */
 | |
| 
 | |
| void wait_end(thread_group_t *thread_group)
 | |
| {
 | |
|   DBUG_ENTER("wait_end");
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   thread_group->active_thread_count++;
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| TP_connection * TP_pool_generic::new_connection(CONNECT *c)
 | |
| {
 | |
|   return new (std::nothrow) TP_connection_generic(c);
 | |
| }
 | |
| 
 | |
| /**
 | |
|   Add a new connection to thread pool
 | |
| */
 | |
| 
 | |
| void TP_pool_generic::add(TP_connection *c)
 | |
| {
 | |
|   DBUG_ENTER("tp_add_connection");
 | |
| 
 | |
|   TP_connection_generic *connection=(TP_connection_generic *)c;
 | |
|   thread_group_t *thread_group= connection->thread_group;
 | |
|   /*
 | |
|     Add connection to the work queue.Actual logon
 | |
|     will be done by a worker thread.
 | |
|   */
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   queue_put(thread_group, connection);
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| void TP_pool_generic::resume(TP_connection* c)
 | |
| {
 | |
|   add(c);
 | |
| }
 | |
| 
 | |
| /**
 | |
|   MySQL scheduler callback: wait begin
 | |
| */
 | |
| 
 | |
| void TP_connection_generic::wait_begin(int type)
 | |
| {
 | |
|   DBUG_ENTER("wait_begin");
 | |
| 
 | |
|   DBUG_ASSERT(!waiting);
 | |
|   waiting++;
 | |
|   if (waiting == 1)
 | |
|     ::wait_begin(thread_group);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   MySQL scheduler callback: wait end
 | |
| */
 | |
| 
 | |
| void TP_connection_generic::wait_end()
 | |
| {
 | |
|   DBUG_ENTER("wait_end");
 | |
|   DBUG_ASSERT(waiting);
 | |
|   waiting--;
 | |
|   if (waiting == 0)
 | |
|     ::wait_end(thread_group);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| static void set_next_timeout_check(ulonglong abstime)
 | |
| {
 | |
|   auto old= pool_timer.next_timeout_check.load(std::memory_order_relaxed);
 | |
|   DBUG_ENTER("set_next_timeout_check");
 | |
|   while (abstime < old)
 | |
|   {
 | |
|     if (pool_timer.next_timeout_check.
 | |
|                    compare_exchange_weak(old, abstime,
 | |
|                                          std::memory_order_relaxed,
 | |
|                                          std::memory_order_relaxed))
 | |
|      break;
 | |
|   }
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| static size_t get_group_id(my_thread_id tid)
 | |
| {
 | |
|   return size_t(tid % group_count);
 | |
| }
 | |
| 
 | |
| 
 | |
| TP_connection_generic::TP_connection_generic(CONNECT *c):
 | |
|   TP_connection(c),
 | |
|   thread_group(0),
 | |
|   next_in_queue(0),
 | |
|   prev_in_queue(0),
 | |
|   abs_wait_timeout(ULONGLONG_MAX),
 | |
|   bound_to_poll_descriptor(false),
 | |
|   waiting(false),
 | |
|   fix_group(false)
 | |
| {
 | |
|   DBUG_ASSERT(c->vio_type != VIO_CLOSED);
 | |
| 
 | |
| #ifdef _WIN32
 | |
|   fd= (c->vio_type == VIO_TYPE_NAMEDPIPE) ?
 | |
|     c->pipe: (TP_file_handle) mysql_socket_getfd(c->sock);
 | |
| #else
 | |
|   fd= mysql_socket_getfd(c->sock);
 | |
| #endif
 | |
| 
 | |
|   /* Assign connection to a group. */
 | |
|   thread_group_t *group=
 | |
|     &all_groups[get_group_id(c->thread_id)];
 | |
|   thread_group=group;
 | |
| 
 | |
|   mysql_mutex_lock(&group->mutex);
 | |
|   group->connection_count++;
 | |
|   mysql_mutex_unlock(&group->mutex);
 | |
| }
 | |
| 
 | |
| TP_connection_generic::~TP_connection_generic()
 | |
| {
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   thread_group->connection_count--;
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
| }
 | |
| 
 | |
| /**
 | |
|   Set wait timeout for connection.
 | |
| */
 | |
| 
 | |
| void TP_connection_generic::set_io_timeout(int timeout_sec)
 | |
| {
 | |
|   DBUG_ENTER("set_wait_timeout");
 | |
|   /*
 | |
|     Calculate wait deadline for this connection.
 | |
|     Instead of using microsecond_interval_timer() which has a syscall
 | |
|     overhead, use pool_timer.current_microtime and take
 | |
|     into account that its value could be off by at most
 | |
|     one tick interval.
 | |
|   */
 | |
| 
 | |
|   abs_wait_timeout= pool_timer.current_microtime +
 | |
|     1000LL*pool_timer.tick_interval +
 | |
|     1000000LL*timeout_sec;
 | |
| 
 | |
|   set_next_timeout_check(abs_wait_timeout);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Handle a (rare) special case,where connection needs to
 | |
|   migrate to a different group because group_count has changed
 | |
|   after thread_pool_size setting.
 | |
| */
 | |
| 
 | |
| static int change_group(TP_connection_generic *c,
 | |
|  thread_group_t *old_group,
 | |
|  thread_group_t *new_group)
 | |
| {
 | |
|   int ret= 0;
 | |
| 
 | |
|   DBUG_ASSERT(c->thread_group == old_group);
 | |
| 
 | |
|   /* Remove connection from the old group. */
 | |
|   mysql_mutex_lock(&old_group->mutex);
 | |
|   if (c->bound_to_poll_descriptor)
 | |
|   {
 | |
|     io_poll_disassociate_fd(old_group->pollfd,c->fd);
 | |
|     c->bound_to_poll_descriptor= false;
 | |
|   }
 | |
|   c->thread_group->connection_count--;
 | |
|   mysql_mutex_unlock(&old_group->mutex);
 | |
| 
 | |
|   /* Add connection to the new group. */
 | |
|   mysql_mutex_lock(&new_group->mutex);
 | |
|   c->thread_group= new_group;
 | |
|   new_group->connection_count++;
 | |
|   /* Ensure that there is a listener in the new group. */
 | |
|   if (!new_group->thread_count)
 | |
|     ret= create_worker(new_group, false);
 | |
|   mysql_mutex_unlock(&new_group->mutex);
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| 
 | |
| int TP_connection_generic::start_io()
 | |
| {
 | |
|   /*
 | |
|     Usually, connection will stay in the same group for the entire
 | |
|     connection's life. However, we do allow group_count to
 | |
|     change at runtime, which means in rare cases when it changes is 
 | |
|     connection should need to migrate  to another group, this ensures
 | |
|     to ensure equal load between groups.
 | |
| 
 | |
|     So we recalculate in which group the connection should be, based
 | |
|     on thread_id and current group count, and migrate if necessary.
 | |
|   */
 | |
|   if (fix_group)
 | |
|   {
 | |
|     fix_group = false;
 | |
|     thread_group_t *new_group= &all_groups[get_group_id(thd->thread_id)];
 | |
| 
 | |
|     if (new_group != thread_group)
 | |
|     {
 | |
|       if (change_group(this, thread_group, new_group))
 | |
|         return -1;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /*
 | |
|     Bind to poll descriptor if not yet done.
 | |
|   */
 | |
|   if (!bound_to_poll_descriptor)
 | |
|   {
 | |
|     bound_to_poll_descriptor= true;
 | |
|     return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
 | |
|   }
 | |
| 
 | |
|   return io_poll_start_read(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Worker thread's main
 | |
| */
 | |
| 
 | |
| static void *worker_main(void *param)
 | |
| {
 | |
| 
 | |
|   worker_thread_t this_thread;
 | |
|   pthread_detach_this_thread();
 | |
|   my_thread_init();
 | |
| 
 | |
|   DBUG_ENTER("worker_main");
 | |
| 
 | |
|   thread_group_t *thread_group = (thread_group_t *)param;
 | |
| 
 | |
|   /* Init per-thread structure */
 | |
|   mysql_cond_init(key_worker_cond, &this_thread.cond, NULL);
 | |
|   this_thread.thread_group= thread_group;
 | |
|   this_thread.event_count=0;
 | |
| 
 | |
|   /* Run event loop */
 | |
|   for(;;)
 | |
|   {
 | |
|     TP_connection_generic *connection;
 | |
|     struct timespec ts;
 | |
|     set_timespec(ts,threadpool_idle_timeout);
 | |
|     connection = get_event(&this_thread, thread_group, &ts);
 | |
|     if (!connection)
 | |
|       break;
 | |
|     this_thread.event_count++;
 | |
|     tp_callback(connection);
 | |
|   }
 | |
| 
 | |
|   /* Thread shutdown: cleanup per-worker-thread structure. */
 | |
|   mysql_cond_destroy(&this_thread.cond);
 | |
| 
 | |
|   bool last_thread;                    /* last thread in group exits */
 | |
|   mysql_mutex_lock(&thread_group->mutex);
 | |
|   add_thread_count(thread_group, -1);
 | |
|   last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown);
 | |
|   mysql_mutex_unlock(&thread_group->mutex);
 | |
| 
 | |
|   /* Last thread in group exits and pool is terminating, destroy group.*/
 | |
|   if (last_thread)
 | |
|     thread_group_destroy(thread_group);
 | |
| 
 | |
|   my_thread_end();
 | |
|   return NULL;
 | |
| }
 | |
| 
 | |
| 
 | |
| TP_pool_generic::TP_pool_generic() = default;
 | |
| 
 | |
| int TP_pool_generic::init()
 | |
| {
 | |
|   DBUG_ENTER("TP_pool_generic::TP_pool_generic");
 | |
|   threadpool_max_size= MY_MAX(threadpool_size, 128);
 | |
|   all_groups= (thread_group_t *)
 | |
|     my_malloc(PSI_INSTRUMENT_ME,
 | |
|               sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
 | |
|   if (!all_groups)
 | |
|   {
 | |
|     threadpool_max_size= 0;
 | |
|     sql_print_error("Allocation failed");
 | |
|     DBUG_RETURN(-1);
 | |
|   }
 | |
|   PSI_register(mutex);
 | |
|   PSI_register(cond);
 | |
|   PSI_register(thread);
 | |
|   scheduler_init();
 | |
|   threadpool_started= true;
 | |
|   for (uint i= 0; i < threadpool_max_size; i++)
 | |
|   {
 | |
|     thread_group_init(&all_groups[i], get_connection_attrib());
 | |
|   }
 | |
|   set_pool_size(threadpool_size);
 | |
|   if(group_count == 0)
 | |
|   {
 | |
|     /* Something went wrong */
 | |
|     sql_print_error("Can't set threadpool size to %d",threadpool_size);
 | |
|     DBUG_RETURN(-1);
 | |
|   }
 | |
|   pool_timer.tick_interval= threadpool_stall_limit;
 | |
|   start_timer(&pool_timer);
 | |
|   DBUG_RETURN(0);
 | |
| }
 | |
| 
 | |
| TP_pool_generic::~TP_pool_generic()
 | |
| {
 | |
|   DBUG_ENTER("tp_end");
 | |
| 
 | |
|   if (!threadpool_started)
 | |
|     DBUG_VOID_RETURN;
 | |
| 
 | |
|   stop_timer(&pool_timer);
 | |
|   shutdown_group_count= threadpool_max_size;
 | |
|   for (uint i= 0; i < threadpool_max_size; i++)
 | |
|   {
 | |
|     thread_group_close(&all_groups[i]);
 | |
|   }
 | |
| 
 | |
|   /*
 | |
|     Wait until memory occupied by all_groups is freed.
 | |
|   */
 | |
|   int timeout_ms=5000;
 | |
|   while(all_groups && timeout_ms--)
 | |
|     my_sleep(1000);
 | |
| 
 | |
|   threadpool_started= false;
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| static my_bool thd_reset_group(THD* thd, void*)
 | |
| {
 | |
|   auto c= (TP_connection_generic*)thd->event_scheduler.data;
 | |
|   if(c)
 | |
|     c->fix_group= true;
 | |
|   return FALSE;
 | |
| }
 | |
| 
 | |
| /** Ensure that poll descriptors are created when threadpool_size changes */
 | |
| int TP_pool_generic::set_pool_size(uint size)
 | |
| {
 | |
|   bool success= true;
 | |
| 
 | |
|   for(uint i=0; i< size; i++)
 | |
|   {
 | |
|     thread_group_t *group= &all_groups[i];
 | |
|     mysql_mutex_lock(&group->mutex);
 | |
|     if (group->pollfd == INVALID_HANDLE_VALUE)
 | |
|     {
 | |
|       group->pollfd= io_poll_create();
 | |
|       success= (group->pollfd != INVALID_HANDLE_VALUE);
 | |
|       if(!success)
 | |
|       {
 | |
|         sql_print_error("io_poll_create() failed, errno=%d", errno);
 | |
|       }
 | |
|     }
 | |
|     mysql_mutex_unlock(&group->mutex);
 | |
|     if (!success)
 | |
|     {
 | |
|       group_count= i;
 | |
|       return -1;
 | |
|     }
 | |
|   }
 | |
|   group_count= size;
 | |
|   server_threads.iterate(thd_reset_group);
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| int TP_pool_generic::set_stall_limit(uint limit)
 | |
| {
 | |
|   mysql_mutex_lock(&(pool_timer.mutex));
 | |
|   pool_timer.tick_interval= limit;
 | |
|   mysql_mutex_unlock(&(pool_timer.mutex));
 | |
|   mysql_cond_signal(&(pool_timer.cond));
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|  Calculate number of idle/waiting threads in the pool.
 | |
| 
 | |
|  Sum idle threads over all groups.
 | |
|  Don't do any locking, it is not required for stats.
 | |
| */
 | |
| 
 | |
| int TP_pool_generic::get_idle_thread_count()
 | |
| {
 | |
|   int sum=0;
 | |
|   for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
 | |
|   {
 | |
|     sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count);
 | |
|   }
 | |
|   return sum;
 | |
| }
 | |
| 
 | |
| 
 | |
| /* Report threadpool problems */
 | |
| 
 | |
| /**
 | |
|    Delay in microseconds, after which "pool blocked" message is printed.
 | |
|    (30 sec == 30 Mio usec)
 | |
| */
 | |
| #define BLOCK_MSG_DELAY (30*1000000)
 | |
| 
 | |
| #define MAX_THREADS_REACHED_MSG \
 | |
| "Threadpool could not create additional thread to handle queries, because the \
 | |
| number of allowed threads was reached. Increasing 'thread_pool_max_threads' \
 | |
| parameter can help in this situation.\n \
 | |
| If 'extra_port' parameter is set, you can still connect to the database with \
 | |
| superuser account (it must be TCP connection using extra_port as TCP port) \
 | |
| and troubleshoot the situation. \
 | |
| A likely cause of pool blocks are clients that lock resources for long time. \
 | |
| 'show processlist' or 'show engine innodb status' can give additional hints."
 | |
| 
 | |
| #define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)."
 | |
| 
 | |
| /**
 | |
|  Write a message when blocking situation in threadpool occurs.
 | |
|  The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds.
 | |
|  It will be just a single message for each blocking situation (to prevent
 | |
|  log flood).
 | |
| */
 | |
| 
 | |
| static void print_pool_blocked_message(bool max_threads_reached)
 | |
| {
 | |
|   ulonglong now;
 | |
|   static bool msg_written;
 | |
| 
 | |
|   now= microsecond_interval_timer();
 | |
|   if (pool_block_start == 0)
 | |
|   {
 | |
|     pool_block_start= now;
 | |
|     msg_written = false;
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   if (now > pool_block_start + BLOCK_MSG_DELAY && !msg_written)
 | |
|   {
 | |
|     if (max_threads_reached)
 | |
|       sql_print_warning(MAX_THREADS_REACHED_MSG);
 | |
|     else
 | |
|       sql_print_warning(CREATE_THREAD_ERROR_MSG, my_errno);
 | |
| 
 | |
|     sql_print_information("Threadpool has been blocked for %u seconds\n",
 | |
|       (uint)((now- pool_block_start)/1000000));
 | |
|     /* avoid reperated messages for the same blocking situation */
 | |
|     msg_written= true;
 | |
|   }
 | |
| }
 | |
| 
 | |
| #endif /* HAVE_POOL_OF_THREADS */
 | 
