mirror of
https://github.com/MariaDB/server.git
synced 2025-01-27 01:04:19 +01:00
636 lines
15 KiB
C++
636 lines
15 KiB
C++
/* Copyright (C) 2012, 2020, MariaDB
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
|
|
|
|
#include "mariadb.h"
|
|
#include <violite.h>
|
|
#include <sql_priv.h>
|
|
#include <sql_class.h>
|
|
#include <my_pthread.h>
|
|
#include <scheduler.h>
|
|
#include <sql_connect.h>
|
|
#include <sql_audit.h>
|
|
#include <debug_sync.h>
|
|
#include <threadpool.h>
|
|
#include <sql_class.h>
|
|
#include <sql_parse.h>
|
|
|
|
#ifdef WITH_WSREP
|
|
#include "wsrep_trans_observer.h"
|
|
#endif /* WITH_WSREP */
|
|
|
|
#ifdef _WIN32
|
|
#include "threadpool_winsockets.h"
|
|
#endif
|
|
|
|
/* Threadpool parameters */
|
|
|
|
uint threadpool_min_threads;
|
|
uint threadpool_idle_timeout;
|
|
uint threadpool_size;
|
|
uint threadpool_max_size;
|
|
uint threadpool_stall_limit;
|
|
uint threadpool_max_threads;
|
|
uint threadpool_oversubscribe;
|
|
uint threadpool_mode;
|
|
uint threadpool_prio_kickup_timer;
|
|
my_bool threadpool_exact_stats;
|
|
my_bool threadpool_dedicated_listener;
|
|
|
|
/* Stats */
|
|
TP_STATISTICS tp_stats;
|
|
|
|
|
|
static void threadpool_remove_connection(THD *thd);
|
|
static dispatch_command_return threadpool_process_request(THD *thd);
|
|
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
|
|
|
|
extern bool do_command(THD*);
|
|
|
|
static inline TP_connection *get_TP_connection(THD *thd)
|
|
{
|
|
return (TP_connection *)thd->event_scheduler.data;
|
|
}
|
|
|
|
/*
|
|
Worker threads contexts, and THD contexts.
|
|
=========================================
|
|
|
|
Both worker threads and connections have their sets of thread local variables
|
|
At the moment it is mysys_var (this has specific data for dbug, my_error and
|
|
similar goodies), and PSI per-client structure.
|
|
|
|
Whenever query is executed following needs to be done:
|
|
|
|
1. Save worker thread context.
|
|
2. Change TLS variables to connection specific ones using thread_attach(THD*).
|
|
This function does some additional work , e.g setting up
|
|
thread_stack/thread_ends_here pointers.
|
|
3. Process query
|
|
4. Restore worker thread context.
|
|
|
|
Connection login and termination follows similar schema w.r.t saving and
|
|
restoring contexts.
|
|
|
|
For both worker thread, and for the connection, mysys variables are created
|
|
using my_thread_init() and freed with my_thread_end().
|
|
|
|
*/
|
|
struct Worker_thread_context
|
|
{
|
|
PSI_thread *psi_thread;
|
|
st_my_thread_var* mysys_var;
|
|
|
|
Worker_thread_context()
|
|
{
|
|
psi_thread= PSI_CALL_get_thread();
|
|
mysys_var= my_thread_var;
|
|
}
|
|
|
|
~Worker_thread_context()
|
|
{
|
|
PSI_CALL_set_thread(psi_thread);
|
|
set_mysys_var(mysys_var);
|
|
set_current_thd(nullptr);
|
|
}
|
|
};
|
|
|
|
|
|
#ifdef HAVE_PSI_INTERFACE
|
|
|
|
/*
|
|
The following fixes PSI "idle" psi instrumentation.
|
|
The server assumes that connection becomes idle
|
|
just before net_read_packet() and switches to active after it.
|
|
In out setup, server becomes idle when async socket io is made.
|
|
*/
|
|
|
|
extern void net_before_header_psi(struct st_net *net, void *user_data, size_t);
|
|
|
|
static void dummy_before_header(struct st_net *, void *, size_t)
|
|
{
|
|
}
|
|
|
|
static void re_init_net_server_extension(THD *thd)
|
|
{
|
|
thd->m_net_server_extension.m_before_header = dummy_before_header;
|
|
}
|
|
|
|
#else
|
|
|
|
#define re_init_net_server_extension(thd)
|
|
|
|
#endif /* HAVE_PSI_INTERFACE */
|
|
|
|
static inline bool has_unread_compressed_data(const NET *net)
|
|
{
|
|
return net->compress && net->remain_in_buf;
|
|
}
|
|
|
|
static inline void set_thd_idle(THD *thd)
|
|
{
|
|
thd->net.reading_or_writing= 1;
|
|
#ifdef HAVE_PSI_INTERFACE
|
|
if (!has_unread_compressed_data(&thd->net))
|
|
net_before_header_psi(&thd->net, thd, 0);
|
|
#endif
|
|
}
|
|
|
|
/*
|
|
Per OS thread info (ID and pthread_self)
|
|
stored as TLS, because of syscall overhead
|
|
(on Linux)
|
|
*/
|
|
struct OS_thread_info
|
|
{
|
|
pthread_t self;
|
|
ssize_t stack_size;
|
|
uint32_t thread_id;
|
|
|
|
inline bool initialized() { return stack_size != 0; }
|
|
|
|
void init(ssize_t ssize)
|
|
{
|
|
#if _WIN32
|
|
self= thread_id= GetCurrentThreadId();
|
|
#else
|
|
#ifdef __NR_gettid
|
|
thread_id= (uint32) syscall(__NR_gettid);
|
|
#else
|
|
thread_id= 0;
|
|
#endif
|
|
self= pthread_self();
|
|
#endif
|
|
stack_size= ssize;
|
|
}
|
|
};
|
|
static thread_local OS_thread_info os_thread_info;
|
|
|
|
static const OS_thread_info *get_os_thread_info()
|
|
{
|
|
auto *res= &os_thread_info;
|
|
if (!res->initialized())
|
|
res->init((ssize_t) (my_thread_stack_size * STACK_DIRECTION));
|
|
return res;
|
|
}
|
|
|
|
/*
|
|
Attach/associate the connection with the OS thread,
|
|
*/
|
|
static void thread_attach(THD* thd)
|
|
{
|
|
#ifdef WITH_WSREP
|
|
/* Wait until possible background rollback has finished before
|
|
attaching the thd. */
|
|
wsrep_wait_rollback_complete_and_acquire_ownership(thd);
|
|
#endif /* WITH_WSREP */
|
|
set_mysys_var(thd->mysys_var);
|
|
thd->thread_stack=(char*)&thd;
|
|
set_current_thd(thd);
|
|
auto tinfo= get_os_thread_info();
|
|
thd->real_id= tinfo->self;
|
|
thd->os_thread_id= tinfo->thread_id;
|
|
DBUG_ASSERT(thd->mysys_var == my_thread_var);
|
|
thd->mysys_var->stack_ends_here= thd->thread_stack + tinfo->stack_size;
|
|
PSI_CALL_set_thread(thd->get_psi());
|
|
}
|
|
|
|
/*
|
|
Determine connection priority , using current
|
|
transaction state and 'threadpool_priority' variable value.
|
|
*/
|
|
static TP_PRIORITY get_priority(TP_connection *c)
|
|
{
|
|
DBUG_ASSERT(c->thd == current_thd);
|
|
TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
|
|
if (prio == TP_PRIORITY_AUTO)
|
|
prio= c->thd->transaction->is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
|
|
|
|
return prio;
|
|
}
|
|
|
|
|
|
void tp_callback(TP_connection *c)
|
|
{
|
|
DBUG_ASSERT(c);
|
|
|
|
Worker_thread_context worker_context;
|
|
|
|
THD *thd= c->thd;
|
|
|
|
c->state = TP_STATE_RUNNING;
|
|
|
|
if (unlikely(!thd))
|
|
{
|
|
/* No THD, need to login first. */
|
|
DBUG_ASSERT(c->connect);
|
|
thd= c->thd= threadpool_add_connection(c->connect, c);
|
|
if (!thd)
|
|
{
|
|
/* Bail out on connect error.*/
|
|
goto error;
|
|
}
|
|
c->connect= 0;
|
|
}
|
|
else
|
|
{
|
|
retry:
|
|
switch(threadpool_process_request(thd))
|
|
{
|
|
case DISPATCH_COMMAND_WOULDBLOCK:
|
|
if (!thd->async_state.try_suspend())
|
|
{
|
|
/*
|
|
All async operations finished meanwhile, thus nobody is will wake up
|
|
this THD. Therefore, we'll resume "manually" here.
|
|
*/
|
|
thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
|
|
goto retry;
|
|
}
|
|
return;
|
|
case DISPATCH_COMMAND_CLOSE_CONNECTION:
|
|
/* QUIT or an error occurred. */
|
|
goto error;
|
|
case DISPATCH_COMMAND_SUCCESS:
|
|
break;
|
|
}
|
|
thd->async_state.m_state= thd_async_state::enum_async_state::NONE;
|
|
}
|
|
|
|
/* Set priority */
|
|
c->priority= get_priority(c);
|
|
|
|
/* Read next command from client. */
|
|
c->set_io_timeout(thd->get_net_wait_timeout());
|
|
c->state= TP_STATE_IDLE;
|
|
if (c->start_io())
|
|
goto error;
|
|
return;
|
|
|
|
error:
|
|
c->thd= 0;
|
|
if (thd)
|
|
{
|
|
threadpool_remove_connection(thd);
|
|
}
|
|
delete c;
|
|
}
|
|
|
|
|
|
static THD *threadpool_add_connection(CONNECT *connect, TP_connection *c)
|
|
{
|
|
THD *thd= NULL;
|
|
|
|
/*
|
|
Create a new connection context: mysys_thread_var and PSI thread
|
|
Store them in THD.
|
|
*/
|
|
|
|
set_mysys_var(NULL);
|
|
my_thread_init();
|
|
st_my_thread_var* mysys_var= my_thread_var;
|
|
PSI_CALL_set_thread(PSI_CALL_new_thread(key_thread_one_connection, connect, 0));
|
|
if (!mysys_var ||!(thd= connect->create_thd(NULL)))
|
|
{
|
|
/* Out of memory? */
|
|
connect->close_and_delete();
|
|
if (mysys_var)
|
|
my_thread_end();
|
|
return NULL;
|
|
}
|
|
|
|
thd->event_scheduler.data= c;
|
|
server_threads.insert(thd); // Make THD visible in show processlist
|
|
delete connect; // must be after server_threads.insert, see close_connections()
|
|
thd->set_mysys_var(mysys_var);
|
|
|
|
/* Login. */
|
|
thread_attach(thd);
|
|
mysql_socket_set_thread_owner(thd->net.vio->mysql_socket);
|
|
re_init_net_server_extension(thd);
|
|
ulonglong now= microsecond_interval_timer();
|
|
thd->prior_thr_create_utime= now;
|
|
thd->start_utime= now;
|
|
thd->thr_create_utime= now;
|
|
|
|
setup_connection_thread_globals(thd);
|
|
|
|
if (thd_prepare_connection(thd))
|
|
goto end;
|
|
|
|
c->init_vio(thd->net.vio);
|
|
|
|
/*
|
|
Check if THD is ok, as prepare_new_connection_state()
|
|
can fail, for example if init command failed.
|
|
*/
|
|
if (!thd_is_connection_alive(thd))
|
|
goto end;
|
|
|
|
thd->skip_wait_timeout= true;
|
|
set_thd_idle(thd);
|
|
return thd;
|
|
|
|
end:
|
|
threadpool_remove_connection(thd);
|
|
return NULL;
|
|
}
|
|
|
|
|
|
static void threadpool_remove_connection(THD *thd)
|
|
{
|
|
thread_attach(thd);
|
|
thd->net.reading_or_writing = 0;
|
|
end_connection(thd);
|
|
close_connection(thd, 0);
|
|
unlink_thd(thd);
|
|
PSI_CALL_delete_current_thread(); // before THD is destroyed
|
|
delete thd;
|
|
|
|
/*
|
|
Free resources associated with this connection:
|
|
mysys thread_var and PSI thread.
|
|
*/
|
|
my_thread_end();
|
|
}
|
|
|
|
|
|
/*
|
|
Ensure that proper error message is sent to client,
|
|
and "aborted" message appears in the log in case of
|
|
wait timeout.
|
|
|
|
See also timeout handling in net_serv.cc
|
|
*/
|
|
static void handle_wait_timeout(THD *thd)
|
|
{
|
|
thd->get_stmt_da()->reset_diagnostics_area();
|
|
thd->reset_killed();
|
|
my_error(ER_NET_READ_INTERRUPTED, MYF(0));
|
|
thd->net.last_errno= ER_NET_READ_INTERRUPTED;
|
|
thd->net.error= 2;
|
|
}
|
|
|
|
/** Check if some client data is cached in thd->net or thd->net.vio */
|
|
static bool has_unread_data(THD* thd)
|
|
{
|
|
NET *net= &thd->net;
|
|
Vio *vio= net->vio;
|
|
return vio->has_data(vio) || has_unread_compressed_data(net);
|
|
}
|
|
|
|
|
|
/**
|
|
Process a single client request or a single batch.
|
|
*/
|
|
static dispatch_command_return threadpool_process_request(THD *thd)
|
|
{
|
|
dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS;
|
|
|
|
thread_attach(thd);
|
|
if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
|
|
goto resume;
|
|
|
|
if (thd->killed >= KILL_CONNECTION)
|
|
{
|
|
/*
|
|
killed flag was set by timeout handler
|
|
or KILL command. Return error.
|
|
*/
|
|
retval= DISPATCH_COMMAND_CLOSE_CONNECTION;
|
|
if(thd->killed == KILL_WAIT_TIMEOUT)
|
|
handle_wait_timeout(thd);
|
|
goto end;
|
|
}
|
|
|
|
|
|
/*
|
|
In the loop below, the flow is essentially the copy of
|
|
thead-per-connections
|
|
logic, see do_handle_one_connection() in sql_connect.c
|
|
|
|
The goal is to execute a single query, thus the loop is normally executed
|
|
only once. However for SSL connections, it can be executed multiple times
|
|
(SSL can preread and cache incoming data, and vio->has_data() checks if it
|
|
was the case).
|
|
*/
|
|
for(;;)
|
|
{
|
|
thd->net.reading_or_writing= 0;
|
|
if (mysql_audit_release_required(thd))
|
|
mysql_audit_release(thd);
|
|
|
|
resume:
|
|
retval= do_command(thd, false);
|
|
switch(retval)
|
|
{
|
|
case DISPATCH_COMMAND_WOULDBLOCK:
|
|
case DISPATCH_COMMAND_CLOSE_CONNECTION:
|
|
goto end;
|
|
case DISPATCH_COMMAND_SUCCESS:
|
|
break;
|
|
}
|
|
|
|
if (!thd_is_connection_alive(thd))
|
|
{
|
|
retval=DISPATCH_COMMAND_CLOSE_CONNECTION;
|
|
goto end;
|
|
}
|
|
|
|
set_thd_idle(thd);
|
|
|
|
if (!has_unread_data(thd))
|
|
{
|
|
/* More info on this debug sync is in sql_parse.cc*/
|
|
DEBUG_SYNC(thd, "before_do_command_net_read");
|
|
goto end;
|
|
}
|
|
}
|
|
|
|
end:
|
|
return retval;
|
|
}
|
|
|
|
|
|
static TP_pool *pool;
|
|
|
|
static bool tp_init()
|
|
{
|
|
|
|
#ifdef _WIN32
|
|
if (threadpool_mode == TP_MODE_WINDOWS)
|
|
pool= new (std::nothrow) TP_pool_win;
|
|
else
|
|
pool= new (std::nothrow) TP_pool_generic;
|
|
#else
|
|
pool= new (std::nothrow) TP_pool_generic;
|
|
#endif
|
|
if (!pool)
|
|
return true;
|
|
if (pool->init())
|
|
{
|
|
delete pool;
|
|
pool= 0;
|
|
return true;
|
|
}
|
|
#ifdef _WIN32
|
|
init_win_aio_buffers(max_connections);
|
|
#endif
|
|
return false;
|
|
}
|
|
|
|
static void tp_add_connection(CONNECT *connect)
|
|
{
|
|
TP_connection *c= pool->new_connection(connect);
|
|
DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;);
|
|
if (c)
|
|
pool->add(c);
|
|
else
|
|
connect->close_and_delete();
|
|
}
|
|
|
|
int tp_get_idle_thread_count()
|
|
{
|
|
return pool? pool->get_idle_thread_count(): 0;
|
|
}
|
|
|
|
int tp_get_thread_count()
|
|
{
|
|
return pool ? pool->get_thread_count() : 0;
|
|
}
|
|
|
|
void tp_set_min_threads(uint val)
|
|
{
|
|
if (pool)
|
|
pool->set_min_threads(val);
|
|
}
|
|
|
|
|
|
void tp_set_max_threads(uint val)
|
|
{
|
|
if (pool)
|
|
pool->set_max_threads(val);
|
|
}
|
|
|
|
void tp_set_threadpool_size(uint val)
|
|
{
|
|
if (pool)
|
|
pool->set_pool_size(val);
|
|
}
|
|
|
|
|
|
void tp_set_threadpool_stall_limit(uint val)
|
|
{
|
|
if (pool)
|
|
pool->set_stall_limit(val);
|
|
}
|
|
|
|
|
|
void tp_timeout_handler(TP_connection *c)
|
|
{
|
|
if (c->state != TP_STATE_IDLE)
|
|
return;
|
|
THD *thd= c->thd;
|
|
mysql_mutex_lock(&thd->LOCK_thd_kill);
|
|
Vio *vio= thd->net.vio;
|
|
if (vio && (vio_pending(vio) > 0 || vio->has_data(vio)) &&
|
|
c->state == TP_STATE_IDLE)
|
|
{
|
|
/*
|
|
There is some data on that connection, i.e
|
|
i.e there was no inactivity timeout.
|
|
Don't kill.
|
|
*/
|
|
c->state= TP_STATE_PENDING;
|
|
}
|
|
else if (c->state == TP_STATE_IDLE)
|
|
{
|
|
thd->set_killed_no_mutex(KILL_WAIT_TIMEOUT);
|
|
c->priority= TP_PRIORITY_HIGH;
|
|
post_kill_notification(thd);
|
|
}
|
|
mysql_mutex_unlock(&thd->LOCK_thd_kill);
|
|
}
|
|
|
|
MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) Atomic_counter<unsigned long long> tp_waits[THD_WAIT_LAST];
|
|
|
|
static void tp_wait_begin(THD *thd, int type)
|
|
{
|
|
TP_connection *c = get_TP_connection(thd);
|
|
if (c)
|
|
{
|
|
DBUG_ASSERT(type > 0 && type < THD_WAIT_LAST);
|
|
tp_waits[type]++;
|
|
c->wait_begin(type);
|
|
}
|
|
}
|
|
|
|
|
|
static void tp_wait_end(THD *thd)
|
|
{
|
|
TP_connection *c = get_TP_connection(thd);
|
|
if (c)
|
|
c->wait_end();
|
|
}
|
|
|
|
|
|
static void tp_end()
|
|
{
|
|
delete pool;
|
|
#ifdef _WIN32
|
|
destroy_win_aio_buffers();
|
|
#endif
|
|
}
|
|
|
|
static void tp_post_kill_notification(THD *thd)
|
|
{
|
|
TP_connection *c= get_TP_connection(thd);
|
|
if (c)
|
|
c->priority= TP_PRIORITY_HIGH;
|
|
post_kill_notification(thd);
|
|
}
|
|
|
|
/* Resume previously suspended THD */
|
|
static void tp_resume(THD* thd)
|
|
{
|
|
DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPENDED);
|
|
thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
|
|
TP_connection* c = get_TP_connection(thd);
|
|
pool->resume(c);
|
|
}
|
|
|
|
static scheduler_functions tp_scheduler_functions=
|
|
{
|
|
0, // max_threads
|
|
NULL,
|
|
NULL,
|
|
tp_init, // init
|
|
tp_add_connection, // add_connection
|
|
tp_wait_begin, // thd_wait_begin
|
|
tp_wait_end, // thd_wait_end
|
|
tp_post_kill_notification, // post kill notification
|
|
tp_end, // end
|
|
tp_resume
|
|
};
|
|
|
|
void pool_of_threads_scheduler(struct scheduler_functions *func,
|
|
ulong *arg_max_connections,
|
|
Atomic_counter<uint> *arg_connection_count)
|
|
{
|
|
*func = tp_scheduler_functions;
|
|
func->max_threads= threadpool_max_threads;
|
|
func->max_connections= arg_max_connections;
|
|
func->connection_count= arg_connection_count;
|
|
scheduler_init();
|
|
}
|