mirror of
https://github.com/MariaDB/server.git
synced 2025-01-18 04:53:01 +01:00
756 lines
20 KiB
C++
756 lines
20 KiB
C++
/* Copyright (C) 2007 MySQL AB
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
|
|
|
/*
|
|
Implementation for the thread scheduler
|
|
*/
|
|
|
|
#ifdef USE_PRAGMA_INTERFACE
|
|
#pragma implementation
|
|
#endif
|
|
|
|
#include <mysql_priv.h>
|
|
#if MYSQL_VERSION_ID >= 60000
|
|
#include "sql_audit.h"
|
|
#endif
|
|
|
|
/*
|
|
'Dummy' functions to be used when we don't need any handling for a scheduler
|
|
event
|
|
*/
|
|
|
|
static bool init_dummy(void) {return 0;}
|
|
static void post_kill_dummy(THD *thd) {}
|
|
static void end_dummy(void) {}
|
|
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
|
|
|
|
/*
|
|
Initialize default scheduler with dummy functions so that setup functions
|
|
only need to declare those that are relvant for their usage
|
|
*/
|
|
|
|
scheduler_functions::scheduler_functions()
|
|
:init(init_dummy),
|
|
init_new_connection_thread(init_new_connection_handler_thread),
|
|
add_connection(0), // Must be defined
|
|
post_kill_notification(post_kill_dummy),
|
|
end_thread(end_thread_dummy), end(end_dummy)
|
|
{}
|
|
|
|
|
|
/*
|
|
End connection, in case when we are using 'no-threads'
|
|
*/
|
|
|
|
static bool no_threads_end(THD *thd, bool put_in_cache)
|
|
{
|
|
unlink_thd(thd);
|
|
pthread_mutex_unlock(&LOCK_thread_count);
|
|
return 1; // Abort handle_one_connection
|
|
}
|
|
|
|
|
|
/*
|
|
Initailize scheduler for --thread-handling=no-threads
|
|
*/
|
|
|
|
void one_thread_scheduler(scheduler_functions *func)
|
|
{
|
|
func->max_threads= 1;
|
|
max_connections= 1;
|
|
func->max_connections= &max_connections;
|
|
func->connection_count= &connection_count;
|
|
#ifndef EMBEDDED_LIBRARY
|
|
func->add_connection= handle_connection_in_main_thread;
|
|
#endif
|
|
func->init_new_connection_thread= init_dummy;
|
|
func->end_thread= no_threads_end;
|
|
}
|
|
|
|
|
|
/*
|
|
Initialize scheduler for --thread-handling=one-thread-per-connection
|
|
*/
|
|
|
|
#ifndef EMBEDDED_LIBRARY
|
|
void one_thread_per_connection_scheduler(scheduler_functions *func,
|
|
ulong *arg_max_connections,
|
|
uint *arg_connection_count)
|
|
{
|
|
func->max_threads= *arg_max_connections + 1;
|
|
func->max_connections= arg_max_connections;
|
|
func->connection_count= arg_connection_count;
|
|
func->add_connection= create_thread_to_handle_connection;
|
|
func->end_thread= one_thread_per_connection_end;
|
|
}
|
|
#endif /* EMBEDDED_LIBRARY */
|
|
|
|
|
|
#if defined(HAVE_LIBEVENT) && HAVE_POOL_OF_THREADS == 1
|
|
|
|
#include "event.h"
|
|
|
|
static struct event_base *base;
|
|
|
|
static uint created_threads, killed_threads;
|
|
static bool kill_pool_threads;
|
|
|
|
static struct event thd_add_event;
|
|
static struct event thd_kill_event;
|
|
|
|
static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */
|
|
static LIST *thds_need_adding; /* list of thds to add to libevent queue */
|
|
|
|
static int thd_add_pair[2]; /* pipe to signal add a connection to libevent*/
|
|
static int thd_kill_pair[2]; /* pipe to signal kill a connection in libevent */
|
|
|
|
/*
|
|
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
|
|
event_del) and thds_need_processing and thds_waiting_for_io.
|
|
*/
|
|
static pthread_mutex_t LOCK_event_loop;
|
|
static LIST *thds_need_processing; /* list of thds that needs some processing */
|
|
static LIST *thds_waiting_for_io; /* list of thds with added events */
|
|
|
|
pthread_handler_t libevent_thread_proc(void *arg);
|
|
static void libevent_end();
|
|
static bool libevent_needs_immediate_processing(THD *thd);
|
|
static void libevent_connection_close(THD *thd);
|
|
static bool libevent_should_close_connection(THD* thd);
|
|
static void libevent_thd_add(THD* thd);
|
|
void libevent_io_callback(int Fd, short Operation, void *ctx);
|
|
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
|
|
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
|
|
|
|
|
|
/*
|
|
Create a pipe and set to non-blocking.
|
|
Returns TRUE if there is an error.
|
|
*/
|
|
|
|
static bool init_socketpair(int sock_pair[])
|
|
{
|
|
sock_pair[0]= sock_pair[1]= -1;
|
|
return (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, sock_pair) < 0 ||
|
|
evutil_make_socket_nonblocking(sock_pair[0]) == -1 ||
|
|
evutil_make_socket_nonblocking(sock_pair[1]) == -1);
|
|
}
|
|
|
|
static void close_socketpair(int sock_pair[])
|
|
{
|
|
if (sock_pair[0] != -1)
|
|
EVUTIL_CLOSESOCKET(sock_pair[0]);
|
|
if (sock_pair[1] != -1)
|
|
EVUTIL_CLOSESOCKET(sock_pair[1]);
|
|
}
|
|
|
|
/*
|
|
thd_scheduler keeps the link between THD and events.
|
|
It's embedded in the THD class.
|
|
*/
|
|
|
|
thd_scheduler::thd_scheduler()
|
|
: logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
|
|
{
|
|
#ifndef DBUG_OFF
|
|
dbug_explain[0]= '\0';
|
|
set_explain= FALSE;
|
|
#endif
|
|
}
|
|
|
|
|
|
thd_scheduler::~thd_scheduler()
|
|
{
|
|
my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
|
|
}
|
|
|
|
|
|
bool thd_scheduler::init(THD *parent_thd)
|
|
{
|
|
io_event=
|
|
(struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
|
|
|
|
if (!io_event)
|
|
{
|
|
sql_print_error("Memory allocation error in thd_scheduler::init\n");
|
|
return TRUE;
|
|
}
|
|
|
|
event_set(io_event, (int)parent_thd->net.vio->sd, EV_READ,
|
|
libevent_io_callback, (void*)parent_thd);
|
|
|
|
list.data= parent_thd;
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
|
|
/*
|
|
Attach/associate the connection with the OS thread, for command processing.
|
|
*/
|
|
|
|
bool thd_scheduler::thread_attach()
|
|
{
|
|
DBUG_ASSERT(!thread_attached);
|
|
THD* thd = (THD*)list.data;
|
|
if (libevent_should_close_connection(thd) ||
|
|
setup_connection_thread_globals(thd))
|
|
{
|
|
return TRUE;
|
|
}
|
|
my_errno= 0;
|
|
thd->mysys_var->abort= 0;
|
|
thread_attached= TRUE;
|
|
#ifndef DBUG_OFF
|
|
/*
|
|
When we attach the thread for a connection for the first time,
|
|
we know that there is no session value set yet. Thus
|
|
the initial setting of set_explain to FALSE is OK.
|
|
*/
|
|
if (set_explain)
|
|
DBUG_SET(dbug_explain);
|
|
#endif
|
|
return FALSE;
|
|
}
|
|
|
|
|
|
/*
|
|
Detach/disassociate the connection with the OS thread.
|
|
*/
|
|
|
|
void thd_scheduler::thread_detach()
|
|
{
|
|
if (thread_attached)
|
|
{
|
|
THD* thd = (THD*)list.data;
|
|
thd->reset_globals();
|
|
thread_attached= FALSE;
|
|
#ifndef DBUG_OFF
|
|
/*
|
|
If during the session @@session.dbug was assigned, the
|
|
dbug options/state has been pushed. Check if this is the
|
|
case, to be able to restore the state when we attach this
|
|
logical connection to a physical thread.
|
|
*/
|
|
if (_db_is_pushed_())
|
|
{
|
|
set_explain= TRUE;
|
|
if (DBUG_EXPLAIN(dbug_explain, sizeof(dbug_explain)))
|
|
sql_print_error("thd_scheduler: DBUG_EXPLAIN buffer is too small");
|
|
}
|
|
/* DBUG_POP() is a no-op in case there is no session state */
|
|
DBUG_POP();
|
|
#endif
|
|
}
|
|
}
|
|
|
|
/**
|
|
Create all threads for the thread pool
|
|
|
|
NOTES
|
|
After threads are created we wait until all threads has signaled that
|
|
they have started before we return
|
|
|
|
RETURN
|
|
0 ok
|
|
1 We got an error creating the thread pool
|
|
In this case we will abort all created threads
|
|
*/
|
|
|
|
static bool libevent_init(void)
|
|
{
|
|
uint i;
|
|
DBUG_ENTER("libevent_init");
|
|
|
|
base= (struct event_base *) event_init();
|
|
|
|
created_threads= 0;
|
|
killed_threads= 0;
|
|
kill_pool_threads= FALSE;
|
|
|
|
pthread_mutex_init(&LOCK_event_loop, MY_MUTEX_INIT_FAST);
|
|
pthread_mutex_init(&LOCK_thd_add, MY_MUTEX_INIT_FAST);
|
|
|
|
/* set up sockets used to add new thds to the event pool */
|
|
if (init_socketpair(thd_add_pair))
|
|
{
|
|
sql_print_error("init_socketpair(thd_add_spair) error in libevent_init");
|
|
DBUG_RETURN(1);
|
|
}
|
|
/* set up sockets used to kill thds in the event queue */
|
|
if (init_socketpair(thd_kill_pair))
|
|
{
|
|
sql_print_error("init_socketpair(thd_kill_pair) error in libevent_init");
|
|
close_socketpair(thd_add_pair);
|
|
DBUG_RETURN(1);
|
|
}
|
|
event_set(&thd_add_event, thd_add_pair[0], EV_READ|EV_PERSIST,
|
|
libevent_add_thd_callback, NULL);
|
|
event_set(&thd_kill_event, thd_kill_pair[0], EV_READ|EV_PERSIST,
|
|
libevent_kill_thd_callback, NULL);
|
|
|
|
if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
|
|
{
|
|
sql_print_error("thd_add_event event_add error in libevent_init");
|
|
libevent_end();
|
|
DBUG_RETURN(1);
|
|
}
|
|
/* Set up the thread pool */
|
|
created_threads= killed_threads= 0;
|
|
pthread_mutex_lock(&LOCK_thread_count);
|
|
|
|
for (i= 0; i < thread_pool_size; i++)
|
|
{
|
|
pthread_t thread;
|
|
int error;
|
|
if ((error= pthread_create(&thread, &connection_attrib,
|
|
libevent_thread_proc, 0)))
|
|
{
|
|
sql_print_error("Can't create completion port thread (error %d)",
|
|
error);
|
|
pthread_mutex_unlock(&LOCK_thread_count);
|
|
libevent_end(); // Cleanup
|
|
DBUG_RETURN(TRUE);
|
|
}
|
|
}
|
|
|
|
/* Wait until all threads are created */
|
|
while (created_threads != thread_pool_size)
|
|
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
|
|
pthread_mutex_unlock(&LOCK_thread_count);
|
|
|
|
DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
|
|
DBUG_RETURN(FALSE);
|
|
}
|
|
|
|
|
|
/*
|
|
This is called when data is ready on the socket.
|
|
|
|
NOTES
|
|
This is only called by the thread that owns LOCK_event_loop.
|
|
|
|
We add the thd that got the data to thds_need_processing, and
|
|
cause the libevent event_loop() to terminate. Then this same thread will
|
|
return from event_loop and pick the thd value back up for processing.
|
|
*/
|
|
|
|
void libevent_io_callback(int, short, void *ctx)
|
|
{
|
|
safe_mutex_assert_owner(&LOCK_event_loop);
|
|
THD *thd= (THD*)ctx;
|
|
thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->event_scheduler.list);
|
|
thds_need_processing= list_add(thds_need_processing, &thd->event_scheduler.list);
|
|
}
|
|
|
|
/*
|
|
This is called when we have a thread we want to be killed.
|
|
|
|
NOTES
|
|
This is only called by the thread that owns LOCK_event_loop.
|
|
*/
|
|
|
|
void libevent_kill_thd_callback(int Fd, short, void*)
|
|
{
|
|
safe_mutex_assert_owner(&LOCK_event_loop);
|
|
|
|
/* clear the pending events */
|
|
char c;
|
|
while (recv(Fd, &c, sizeof(c), 0) == sizeof(c))
|
|
{}
|
|
|
|
LIST* list= thds_waiting_for_io;
|
|
while (list)
|
|
{
|
|
THD *thd= (THD*)list->data;
|
|
list= list_rest(list);
|
|
if ((int) thd->killed >= (int) KILL_CONNECTION)
|
|
{
|
|
/*
|
|
Delete from libevent and add to the processing queue.
|
|
*/
|
|
event_del(thd->event_scheduler.io_event);
|
|
thds_waiting_for_io= list_delete(thds_waiting_for_io,
|
|
&thd->event_scheduler.list);
|
|
thds_need_processing= list_add(thds_need_processing,
|
|
&thd->event_scheduler.list);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
This is used to add connections to the pool. This callback is invoked from
|
|
the libevent event_loop() call whenever the thd_add_pair[1] has a byte
|
|
written to it.
|
|
|
|
NOTES
|
|
This is only called by the thread that owns LOCK_event_loop.
|
|
*/
|
|
|
|
void libevent_add_thd_callback(int Fd, short, void *)
|
|
{
|
|
safe_mutex_assert_owner(&LOCK_event_loop);
|
|
|
|
/* clear the pending events */
|
|
char c;
|
|
while (recv(Fd, &c, sizeof(c), 0) == sizeof(c))
|
|
{}
|
|
|
|
pthread_mutex_lock(&LOCK_thd_add);
|
|
while (thds_need_adding)
|
|
{
|
|
/* pop the first thd off the list */
|
|
THD* thd= (THD*)thds_need_adding->data;
|
|
thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
|
|
|
|
pthread_mutex_unlock(&LOCK_thd_add);
|
|
|
|
if (!thd->event_scheduler.logged_in || libevent_should_close_connection(thd))
|
|
{
|
|
/*
|
|
Add thd to thds_need_processing list. If it needs closing we'll close
|
|
it outside of event_loop().
|
|
*/
|
|
thds_need_processing= list_add(thds_need_processing,
|
|
&thd->event_scheduler.list);
|
|
}
|
|
else
|
|
{
|
|
/* Add to libevent */
|
|
if (event_add(thd->event_scheduler.io_event, NULL))
|
|
{
|
|
sql_print_error("event_add error in libevent_add_thd_callback");
|
|
libevent_connection_close(thd);
|
|
}
|
|
else
|
|
{
|
|
thds_waiting_for_io= list_add(thds_waiting_for_io,
|
|
&thd->event_scheduler.list);
|
|
}
|
|
}
|
|
pthread_mutex_lock(&LOCK_thd_add);
|
|
}
|
|
pthread_mutex_unlock(&LOCK_thd_add);
|
|
}
|
|
|
|
|
|
/**
|
|
Notify the thread pool about a new connection
|
|
|
|
NOTES
|
|
LOCK_thread_count is locked on entry. This function MUST unlock it!
|
|
*/
|
|
|
|
static void libevent_add_connection(THD *thd)
|
|
{
|
|
DBUG_ENTER("libevent_add_connection");
|
|
DBUG_PRINT("enter", ("thd: %p thread_id: %lu",
|
|
thd, thd->thread_id));
|
|
|
|
if (thd->event_scheduler.init(thd))
|
|
{
|
|
sql_print_error("Scheduler init error in libevent_add_new_connection");
|
|
pthread_mutex_unlock(&LOCK_thread_count);
|
|
libevent_connection_close(thd);
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
threads.append(thd);
|
|
libevent_thd_add(thd);
|
|
|
|
pthread_mutex_unlock(&LOCK_thread_count);
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
/**
|
|
@brief Signal a waiting connection it's time to die.
|
|
|
|
@details This function will signal libevent the THD should be killed.
|
|
Either the global LOCK_thd_count or the THD's LOCK_thd_data must be locked
|
|
upon entry.
|
|
|
|
@param[in] thd The connection to kill
|
|
*/
|
|
|
|
static void libevent_post_kill_notification(THD *)
|
|
{
|
|
/*
|
|
Note, we just wake up libevent with an event that a THD should be killed,
|
|
It will search its list of thds for thd->killed == KILL_CONNECTION to
|
|
find the THDs it should kill.
|
|
|
|
So we don't actually tell it which one and we don't actually use the
|
|
THD being passed to us, but that's just a design detail that could change
|
|
later.
|
|
*/
|
|
char c= 0;
|
|
send(thd_kill_pair[1], &c, sizeof(c), 0);
|
|
}
|
|
|
|
|
|
/*
|
|
Close and delete a connection.
|
|
*/
|
|
|
|
static void libevent_connection_close(THD *thd)
|
|
{
|
|
DBUG_ENTER("libevent_connection_close");
|
|
DBUG_PRINT("enter", ("thd: %p", thd));
|
|
|
|
thd->killed= KILL_CONNECTION; // Avoid error messages
|
|
|
|
if (thd->net.vio->type != VIO_CLOSED) // not already closed
|
|
{
|
|
end_connection(thd);
|
|
close_connection(thd, 0, 1);
|
|
}
|
|
thd->event_scheduler.thread_detach();
|
|
unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */
|
|
pthread_mutex_unlock(&LOCK_thread_count);
|
|
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
/*
|
|
Returns true if we should close and delete a THD connection.
|
|
*/
|
|
|
|
static bool libevent_should_close_connection(THD* thd)
|
|
{
|
|
return (thd->net.error ||
|
|
thd->net.vio == 0 ||
|
|
(int) thd->killed >= (int) KILL_CONNECTION);
|
|
}
|
|
|
|
|
|
/*
|
|
libevent_thread_proc is the outer loop of each thread in the thread pool.
|
|
These procs only return/terminate on shutdown (kill_pool_threads == true).
|
|
*/
|
|
|
|
pthread_handler_t libevent_thread_proc(void *arg)
|
|
{
|
|
if (init_new_connection_handler_thread())
|
|
{
|
|
my_thread_global_end();
|
|
sql_print_error("libevent_thread_proc: my_thread_init() failed");
|
|
exit(1);
|
|
}
|
|
DBUG_ENTER("libevent_thread_proc");
|
|
|
|
/*
|
|
Signal libevent_init() when all threads has been created and are ready to
|
|
receive events.
|
|
*/
|
|
(void) pthread_mutex_lock(&LOCK_thread_count);
|
|
created_threads++;
|
|
thread_created++;
|
|
if (created_threads == thread_pool_size)
|
|
(void) pthread_cond_signal(&COND_thread_count);
|
|
(void) pthread_mutex_unlock(&LOCK_thread_count);
|
|
|
|
for (;;)
|
|
{
|
|
THD *thd= NULL;
|
|
(void) pthread_mutex_lock(&LOCK_event_loop);
|
|
|
|
/* get thd(s) to process */
|
|
while (!thds_need_processing)
|
|
{
|
|
if (kill_pool_threads)
|
|
{
|
|
/* the flag that we should die has been set */
|
|
(void) pthread_mutex_unlock(&LOCK_event_loop);
|
|
goto thread_exit;
|
|
}
|
|
event_loop(EVLOOP_ONCE);
|
|
}
|
|
|
|
/* pop the first thd off the list */
|
|
thd= (THD*)thds_need_processing->data;
|
|
thds_need_processing= list_delete(thds_need_processing,
|
|
thds_need_processing);
|
|
|
|
(void) pthread_mutex_unlock(&LOCK_event_loop);
|
|
|
|
/* now we process the connection (thd) */
|
|
|
|
/* set up the thd<->thread links. */
|
|
thd->thread_stack= (char*) &thd;
|
|
|
|
if (thd->event_scheduler.thread_attach())
|
|
{
|
|
libevent_connection_close(thd);
|
|
continue;
|
|
}
|
|
|
|
/* is the connection logged in yet? */
|
|
if (!thd->event_scheduler.logged_in)
|
|
{
|
|
DBUG_PRINT("info", ("init new connection. sd: %d",
|
|
thd->net.vio->sd));
|
|
lex_start(thd);
|
|
if (login_connection(thd))
|
|
{
|
|
/* Failed to log in */
|
|
libevent_connection_close(thd);
|
|
continue;
|
|
}
|
|
else
|
|
{
|
|
/* login successful */
|
|
#if MYSQL_VERSION_ID >= 60000
|
|
MYSQL_CONNECTION_START(thd->thread_id, thd->security_ctx->priv_user,
|
|
(char *) thd->security_ctx->host_or_ip);
|
|
#endif
|
|
thd->event_scheduler.logged_in= TRUE;
|
|
prepare_new_connection_state(thd);
|
|
if (!libevent_needs_immediate_processing(thd))
|
|
continue; /* New connection is now waiting for data in libevent*/
|
|
}
|
|
}
|
|
|
|
do
|
|
{
|
|
/* Process a query */
|
|
if (do_command(thd))
|
|
{
|
|
libevent_connection_close(thd);
|
|
break;
|
|
}
|
|
} while (libevent_needs_immediate_processing(thd));
|
|
}
|
|
|
|
thread_exit:
|
|
DBUG_PRINT("exit", ("ending thread"));
|
|
(void) pthread_mutex_lock(&LOCK_thread_count);
|
|
killed_threads++;
|
|
pthread_cond_broadcast(&COND_thread_count);
|
|
(void) pthread_mutex_unlock(&LOCK_thread_count);
|
|
my_thread_end();
|
|
pthread_exit(0);
|
|
DBUG_RETURN(0); /* purify: deadcode */
|
|
}
|
|
|
|
|
|
/*
|
|
Returns TRUE if the connection needs immediate processing and FALSE if
|
|
instead it's queued for libevent processing or closed,
|
|
*/
|
|
|
|
static bool libevent_needs_immediate_processing(THD *thd)
|
|
{
|
|
if (libevent_should_close_connection(thd))
|
|
{
|
|
libevent_connection_close(thd);
|
|
return FALSE;
|
|
}
|
|
/*
|
|
If more data in the socket buffer, return TRUE to process another command.
|
|
|
|
Note: we cannot add for event processing because the whole request might
|
|
already be buffered and we wouldn't receive an event.
|
|
*/
|
|
if (vio_pending(thd->net.vio) > 0)
|
|
return TRUE;
|
|
|
|
thd->event_scheduler.thread_detach();
|
|
libevent_thd_add(thd);
|
|
return FALSE;
|
|
}
|
|
|
|
|
|
/*
|
|
Adds a THD to queued for libevent processing.
|
|
|
|
This call does not actually register the event with libevent.
|
|
Instead, it places the THD onto a queue and signals libevent by writing
|
|
a byte into thd_add_pair, which will cause our libevent_add_thd_callback to
|
|
be invoked which will find the THD on the queue and add it to libevent.
|
|
*/
|
|
|
|
static void libevent_thd_add(THD* thd)
|
|
{
|
|
char c= 0;
|
|
/* release any audit resources, this thd is going to sleep */
|
|
#if MYSQL_VERSION_ID >= 60000
|
|
mysql_audit_release(thd);
|
|
#endif
|
|
pthread_mutex_lock(&LOCK_thd_add);
|
|
/* queue for libevent */
|
|
thds_need_adding= list_add(thds_need_adding, &thd->event_scheduler.list);
|
|
/* notify libevent */
|
|
send(thd_add_pair[1], &c, sizeof(c), 0);
|
|
pthread_mutex_unlock(&LOCK_thd_add);
|
|
}
|
|
|
|
|
|
/**
|
|
Wait until all pool threads have been deleted for clean shutdown
|
|
*/
|
|
|
|
static void libevent_end()
|
|
{
|
|
DBUG_ENTER("libevent_end");
|
|
DBUG_PRINT("enter", ("created_threads: %d killed_threads: %u",
|
|
created_threads, killed_threads));
|
|
|
|
/*
|
|
check if initialized. This may not be the case if get an error at
|
|
startup
|
|
*/
|
|
if (!base)
|
|
DBUG_VOID_RETURN;
|
|
|
|
(void) pthread_mutex_lock(&LOCK_thread_count);
|
|
|
|
|
|
kill_pool_threads= TRUE;
|
|
while (killed_threads != created_threads)
|
|
{
|
|
/* wake up the event loop */
|
|
char c= 0;
|
|
send(thd_add_pair[1], &c, sizeof(c), 0);
|
|
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
|
|
}
|
|
(void) pthread_mutex_unlock(&LOCK_thread_count);
|
|
|
|
event_del(&thd_add_event);
|
|
close_socketpair(thd_add_pair);
|
|
event_del(&thd_kill_event);
|
|
close_socketpair(thd_kill_pair);
|
|
event_base_free(base);
|
|
base= 0;
|
|
|
|
(void) pthread_mutex_destroy(&LOCK_event_loop);
|
|
(void) pthread_mutex_destroy(&LOCK_thd_add);
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
|
|
void pool_of_threads_scheduler(scheduler_functions* func)
|
|
{
|
|
func->max_threads= thread_pool_size;
|
|
func->max_connections= &max_connections;
|
|
func->connection_count= &connection_count;
|
|
func->init= libevent_init;
|
|
func->end= libevent_end;
|
|
func->post_kill_notification= libevent_post_kill_notification;
|
|
func->add_connection= libevent_add_connection;
|
|
}
|
|
|
|
#endif
|