mariadb/sql/threadpool_common.cc

634 lines
14 KiB
C++
Raw Normal View History

2020-04-27 14:24:41 +03:00
/* Copyright (C) 2012, 2020, MariaDB
2012-02-17 23:27:15 +01:00
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
2012-02-17 23:27:15 +01:00
#include "mariadb.h"
#include <violite.h>
#include <sql_priv.h>
#include <sql_class.h>
#include <my_pthread.h>
#include <scheduler.h>
#include <sql_connect.h>
#include <sql_audit.h>
#include <debug_sync.h>
2012-01-15 11:17:45 +01:00
#include <threadpool.h>
#include <sql_class.h>
#include <sql_parse.h>
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
#endif /* WITH_WSREP */
#ifdef _WIN32
#include "threadpool_winsockets.h"
#endif
/* Threadpool parameters */
2011-12-18 20:40:38 +01:00
uint threadpool_min_threads;
uint threadpool_idle_timeout;
uint threadpool_size;
uint threadpool_max_size;
uint threadpool_stall_limit;
uint threadpool_max_threads;
uint threadpool_oversubscribe;
uint threadpool_mode;
uint threadpool_prio_kickup_timer;
my_bool threadpool_exact_stats;
my_bool threadpool_dedicated_listener;
2012-01-24 03:23:14 +01:00
/* Stats */
TP_STATISTICS tp_stats;
static void threadpool_remove_connection(THD *thd);
static dispatch_command_return threadpool_process_request(THD *thd);
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
2012-01-15 11:17:45 +01:00
extern bool do_command(THD*);
static inline TP_connection *get_TP_connection(THD *thd)
{
return (TP_connection *)thd->event_scheduler.data;
}
/*
Worker threads contexts, and THD contexts.
2012-01-15 11:17:45 +01:00
=========================================
2020-05-29 12:21:27 +02:00
Both worker threads and connections have their sets of thread local variables
At the moment it is mysys_var (this has specific data for dbug, my_error and
2012-01-15 11:17:45 +01:00
similar goodies), and PSI per-client structure.
Whenever query is executed following needs to be done:
1. Save worker thread context.
2. Change TLS variables to connection specific ones using thread_attach(THD*).
2020-05-29 12:21:27 +02:00
This function does some additional work , e.g setting up
thread_stack/thread_ends_here pointers.
3. Process query
4. Restore worker thread context.
2020-05-29 12:21:27 +02:00
Connection login and termination follows similar schema w.r.t saving and
restoring contexts.
2020-05-29 12:21:27 +02:00
For both worker thread, and for the connection, mysys variables are created
using my_thread_init() and freed with my_thread_end().
*/
struct Worker_thread_context
{
PSI_thread *psi_thread;
st_my_thread_var* mysys_var;
Worker_thread_context()
{
2020-04-27 14:24:41 +03:00
psi_thread= PSI_CALL_get_thread();
mysys_var= my_thread_var;
}
~Worker_thread_context()
{
PSI_CALL_set_thread(psi_thread);
set_mysys_var(mysys_var);
set_current_thd(nullptr);
}
};
#ifdef HAVE_PSI_INTERFACE
/*
The following fixes PSI "idle" psi instrumentation.
The server assumes that connection becomes idle
just before net_read_packet() and switches to active after it.
In out setup, server becomes idle when async socket io is made.
*/
extern void net_before_header_psi(struct st_net *net, void *user_data, size_t);
static void dummy_before_header(struct st_net *, void *, size_t)
{
}
static void re_init_net_server_extension(THD *thd)
{
thd->m_net_server_extension.m_before_header = dummy_before_header;
}
#else
#define re_init_net_server_extension(thd)
#endif /* HAVE_PSI_INTERFACE */
static inline void set_thd_idle(THD *thd)
{
thd->net.reading_or_writing= 1;
#ifdef HAVE_PSI_INTERFACE
net_before_header_psi(&thd->net, thd, 0);
#endif
}
/*
Per OS thread info (ID and pthread_self)
stored as TLS, because of syscall overhead
(on Linux)
*/
struct OS_thread_info
{
pthread_t self;
ssize_t stack_size;
uint32_t thread_id;
inline bool initialized() { return stack_size != 0; }
void init(ssize_t ssize)
{
#if _WIN32
self= thread_id= GetCurrentThreadId();
#else
#ifdef __NR_gettid
thread_id= (uint32) syscall(__NR_gettid);
#else
thread_id= 0;
#endif
self= pthread_self();
#endif
stack_size= ssize;
}
};
static thread_local OS_thread_info os_thread_info;
static const OS_thread_info *get_os_thread_info()
{
auto *res= &os_thread_info;
if (!res->initialized())
res->init((ssize_t) (my_thread_stack_size * STACK_DIRECTION));
return res;
}
/*
Attach/associate the connection with the OS thread,
*/
static void thread_attach(THD* thd)
{
#ifdef WITH_WSREP
/* Wait until possible background rollback has finished before
attaching the thd. */
wsrep_wait_rollback_complete_and_acquire_ownership(thd);
#endif /* WITH_WSREP */
set_mysys_var(thd->mysys_var);
thd->thread_stack=(char*)&thd;
set_current_thd(thd);
auto tinfo= get_os_thread_info();
thd->real_id= tinfo->self;
thd->os_thread_id= tinfo->thread_id;
DBUG_ASSERT(thd->mysys_var == my_thread_var);
thd->mysys_var->stack_ends_here= thd->thread_stack + tinfo->stack_size;
2020-03-07 15:12:02 +01:00
PSI_CALL_set_thread(thd->get_psi());
mysql_socket_set_thread_owner(thd->net.vio->mysql_socket);
}
/*
2020-05-29 12:21:27 +02:00
Determine connection priority , using current
transaction state and 'threadpool_priority' variable value.
*/
static TP_PRIORITY get_priority(TP_connection *c)
{
DBUG_ASSERT(c->thd == current_thd);
TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
if (prio == TP_PRIORITY_AUTO)
prio= c->thd->transaction->is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
return prio;
}
void tp_callback(TP_connection *c)
{
DBUG_ASSERT(c);
Worker_thread_context worker_context;
THD *thd= c->thd;
c->state = TP_STATE_RUNNING;
if (unlikely(!thd))
{
/* No THD, need to login first. */
DBUG_ASSERT(c->connect);
thd= c->thd= threadpool_add_connection(c->connect, c);
if (!thd)
{
/* Bail out on connect error.*/
goto error;
}
c->connect= 0;
}
else
{
retry:
switch(threadpool_process_request(thd))
{
case DISPATCH_COMMAND_WOULDBLOCK:
if (!thd->async_state.try_suspend())
{
/*
All async operations finished meanwhile, thus nobody is will wake up
this THD. Therefore, we'll resume "manually" here.
*/
thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
goto retry;
}
return;
case DISPATCH_COMMAND_CLOSE_CONNECTION:
/* QUIT or an error occurred. */
goto error;
case DISPATCH_COMMAND_SUCCESS:
break;
}
thd->async_state.m_state= thd_async_state::enum_async_state::NONE;
}
/* Set priority */
c->priority= get_priority(c);
/* Read next command from client. */
c->set_io_timeout(thd->get_net_wait_timeout());
c->state= TP_STATE_IDLE;
if (c->start_io())
goto error;
return;
error:
c->thd= 0;
if (thd)
{
threadpool_remove_connection(thd);
}
delete c;
}
static THD *threadpool_add_connection(CONNECT *connect, TP_connection *c)
{
THD *thd= NULL;
/*
2012-01-15 11:17:45 +01:00
Create a new connection context: mysys_thread_var and PSI thread
Store them in THD.
*/
set_mysys_var(NULL);
my_thread_init();
st_my_thread_var* mysys_var= my_thread_var;
2020-03-07 15:12:02 +01:00
PSI_CALL_set_thread(PSI_CALL_new_thread(key_thread_one_connection, connect, 0));
if (!mysys_var ||!(thd= connect->create_thd(NULL)))
{
/* Out of memory? */
connect->close_and_delete();
if (mysys_var)
my_thread_end();
return NULL;
}
2020-07-03 21:49:45 +02:00
thd->event_scheduler.data= c;
server_threads.insert(thd); // Make THD visible in show processlist
delete connect; // must be after server_threads.insert, see close_connections()
thd->set_mysys_var(mysys_var);
/* Login. */
thread_attach(thd);
re_init_net_server_extension(thd);
ulonglong now= microsecond_interval_timer();
thd->prior_thr_create_utime= now;
thd->start_utime= now;
thd->thr_create_utime= now;
setup_connection_thread_globals(thd);
if (thd_prepare_connection(thd))
goto end;
c->init_vio(thd->net.vio);
/*
Check if THD is ok, as prepare_new_connection_state()
can fail, for example if init command failed.
*/
if (!thd_is_connection_alive(thd))
goto end;
thd->skip_wait_timeout= true;
set_thd_idle(thd);
return thd;
end:
2016-12-29 13:23:18 +01:00
threadpool_remove_connection(thd);
return NULL;
}
2012-01-15 11:17:45 +01:00
static void threadpool_remove_connection(THD *thd)
{
thread_attach(thd);
2016-03-09 16:42:45 +01:00
thd->net.reading_or_writing = 0;
end_connection(thd);
close_connection(thd, 0);
unlink_thd(thd);
2020-03-07 15:12:02 +01:00
PSI_CALL_delete_current_thread(); // before THD is destroyed
delete thd;
2016-03-09 16:42:45 +01:00
2012-01-15 11:17:45 +01:00
/*
2020-05-29 12:21:27 +02:00
Free resources associated with this connection:
2012-01-15 11:17:45 +01:00
mysys thread_var and PSI thread.
*/
my_thread_end();
}
/*
Ensure that proper error message is sent to client,
and "aborted" message appears in the log in case of
wait timeout.
See also timeout handling in net_serv.cc
*/
static void handle_wait_timeout(THD *thd)
{
thd->get_stmt_da()->reset_diagnostics_area();
thd->reset_killed();
my_error(ER_NET_READ_INTERRUPTED, MYF(0));
thd->net.last_errno= ER_NET_READ_INTERRUPTED;
thd->net.error= 2;
}
/** Check if some client data is cached in thd->net or thd->net.vio */
static bool has_unread_data(THD* thd)
{
NET *net= &thd->net;
if (net->compress && net->remain_in_buf)
return true;
Vio *vio= net->vio;
return vio->has_data(vio);
}
2012-02-17 23:27:15 +01:00
/**
Process a single client request or a single batch.
*/
static dispatch_command_return threadpool_process_request(THD *thd)
{
dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS;
thread_attach(thd);
if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
goto resume;
2011-12-27 16:10:34 +01:00
if (thd->killed >= KILL_CONNECTION)
{
2020-05-29 12:21:27 +02:00
/*
killed flag was set by timeout handler
2012-01-15 11:17:45 +01:00
or KILL command. Return error.
*/
retval= DISPATCH_COMMAND_CLOSE_CONNECTION;
if(thd->killed == KILL_WAIT_TIMEOUT)
handle_wait_timeout(thd);
2012-02-16 16:59:04 +01:00
goto end;
}
2012-02-16 16:59:04 +01:00
2012-02-17 23:27:15 +01:00
/*
In the loop below, the flow is essentially the copy of
thead-per-connections
2012-02-17 23:27:15 +01:00
logic, see do_handle_one_connection() in sql_connect.c
The goal is to execute a single query, thus the loop is normally executed
only once. However for SSL connections, it can be executed multiple times
(SSL can preread and cache incoming data, and vio->has_data() checks if it
was the case).
*/
for(;;)
{
thd->net.reading_or_writing= 0;
if (mysql_audit_release_required(thd))
mysql_audit_release(thd);
resume:
retval= do_command(thd, false);
switch(retval)
{
case DISPATCH_COMMAND_WOULDBLOCK:
case DISPATCH_COMMAND_CLOSE_CONNECTION:
goto end;
case DISPATCH_COMMAND_SUCCESS:
break;
}
if (!thd_is_connection_alive(thd))
{
retval=DISPATCH_COMMAND_CLOSE_CONNECTION;
2012-02-16 16:59:04 +01:00
goto end;
}
set_thd_idle(thd);
if (!has_unread_data(thd))
{
2012-01-15 11:17:45 +01:00
/* More info on this debug sync is in sql_parse.cc*/
DEBUG_SYNC(thd, "before_do_command_net_read");
2012-02-16 16:59:04 +01:00
goto end;
}
2012-02-16 16:59:04 +01:00
}
2012-02-16 16:59:04 +01:00
end:
return retval;
}
static TP_pool *pool;
static bool tp_init()
{
#ifdef _WIN32
if (threadpool_mode == TP_MODE_WINDOWS)
pool= new (std::nothrow) TP_pool_win;
else
pool= new (std::nothrow) TP_pool_generic;
#else
pool= new (std::nothrow) TP_pool_generic;
#endif
if (!pool)
return true;
if (pool->init())
{
delete pool;
pool= 0;
return true;
}
#ifdef _WIN32
init_win_aio_buffers(max_connections);
#endif
return false;
}
static void tp_add_connection(CONNECT *connect)
{
TP_connection *c= pool->new_connection(connect);
DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;);
if (c)
pool->add(c);
else
connect->close_and_delete();
}
int tp_get_idle_thread_count()
{
return pool? pool->get_idle_thread_count(): 0;
}
int tp_get_thread_count()
{
return pool ? pool->get_thread_count() : 0;
}
void tp_set_min_threads(uint val)
{
if (pool)
pool->set_min_threads(val);
}
void tp_set_max_threads(uint val)
{
if (pool)
pool->set_max_threads(val);
}
void tp_set_threadpool_size(uint val)
{
if (pool)
pool->set_pool_size(val);
}
void tp_set_threadpool_stall_limit(uint val)
{
if (pool)
pool->set_stall_limit(val);
}
void tp_timeout_handler(TP_connection *c)
{
if (c->state != TP_STATE_IDLE)
return;
THD *thd= c->thd;
mysql_mutex_lock(&thd->LOCK_thd_kill);
Vio *vio= thd->net.vio;
if (vio && (vio_pending(vio) > 0 || vio->has_data(vio)) &&
c->state == TP_STATE_IDLE)
{
/*
There is some data on that connection, i.e
i.e there was no inactivity timeout.
Don't kill.
*/
c->state= TP_STATE_PENDING;
}
else if (c->state == TP_STATE_IDLE)
{
thd->set_killed_no_mutex(KILL_WAIT_TIMEOUT);
c->priority= TP_PRIORITY_HIGH;
post_kill_notification(thd);
}
mysql_mutex_unlock(&thd->LOCK_thd_kill);
}
MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) Atomic_counter<unsigned long long> tp_waits[THD_WAIT_LAST];
static void tp_wait_begin(THD *thd, int type)
{
TP_connection *c = get_TP_connection(thd);
if (c)
{
DBUG_ASSERT(type > 0 && type < THD_WAIT_LAST);
tp_waits[type]++;
c->wait_begin(type);
}
}
static void tp_wait_end(THD *thd)
{
TP_connection *c = get_TP_connection(thd);
if (c)
c->wait_end();
}
static void tp_end()
{
delete pool;
#ifdef _WIN32
destroy_win_aio_buffers();
#endif
}
static void tp_post_kill_notification(THD *thd)
{
TP_connection *c= get_TP_connection(thd);
if (c)
c->priority= TP_PRIORITY_HIGH;
post_kill_notification(thd);
}
/* Resume previously suspended THD */
static void tp_resume(THD* thd)
{
DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPENDED);
thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
TP_connection* c = get_TP_connection(thd);
pool->resume(c);
}
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
NULL,
NULL,
tp_init, // init
tp_add_connection, // add_connection
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post kill notification
tp_end, // end
tp_resume
};
void pool_of_threads_scheduler(struct scheduler_functions *func,
ulong *arg_max_connections,
Atomic_counter<uint> *arg_connection_count)
{
*func = tp_scheduler_functions;
2012-01-15 11:17:45 +01:00
func->max_threads= threadpool_max_threads;
func->max_connections= arg_max_connections;
func->connection_count= arg_connection_count;
scheduler_init();
}