mariadb/sql/scheduler.cc

756 lines
20 KiB
C++

/* Copyright (C) 2007 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
Implementation for the thread scheduler
*/
#ifdef USE_PRAGMA_INTERFACE
#pragma implementation
#endif
#include <mysql_priv.h>
#if MYSQL_VERSION_ID >= 60000
#include "sql_audit.h"
#endif
/*
'Dummy' functions to be used when we don't need any handling for a scheduler
event
*/
static bool init_dummy(void) {return 0;}
static void post_kill_dummy(THD *thd) {}
static void end_dummy(void) {}
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
/*
Initialize default scheduler with dummy functions so that setup functions
only need to declare those that are relvant for their usage
*/
scheduler_functions::scheduler_functions()
:init(init_dummy),
init_new_connection_thread(init_new_connection_handler_thread),
add_connection(0), // Must be defined
post_kill_notification(post_kill_dummy),
end_thread(end_thread_dummy), end(end_dummy)
{}
/*
End connection, in case when we are using 'no-threads'
*/
static bool no_threads_end(THD *thd, bool put_in_cache)
{
unlink_thd(thd);
pthread_mutex_unlock(&LOCK_thread_count);
return 1; // Abort handle_one_connection
}
/*
Initailize scheduler for --thread-handling=no-threads
*/
void one_thread_scheduler(scheduler_functions *func)
{
func->max_threads= 1;
max_connections= 1;
func->max_connections= &max_connections;
func->connection_count= &connection_count;
#ifndef EMBEDDED_LIBRARY
func->add_connection= handle_connection_in_main_thread;
#endif
func->init_new_connection_thread= init_dummy;
func->end_thread= no_threads_end;
}
/*
Initialize scheduler for --thread-handling=one-thread-per-connection
*/
#ifndef EMBEDDED_LIBRARY
void one_thread_per_connection_scheduler(scheduler_functions *func,
ulong *arg_max_connections,
uint *arg_connection_count)
{
func->max_threads= *arg_max_connections + 1;
func->max_connections= arg_max_connections;
func->connection_count= arg_connection_count;
func->add_connection= create_thread_to_handle_connection;
func->end_thread= one_thread_per_connection_end;
}
#endif /* EMBEDDED_LIBRARY */
#if defined(HAVE_LIBEVENT) && HAVE_POOL_OF_THREADS == 1
#include "event.h"
static struct event_base *base;
static uint created_threads, killed_threads;
static bool kill_pool_threads;
static struct event thd_add_event;
static struct event thd_kill_event;
static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */
static LIST *thds_need_adding; /* list of thds to add to libevent queue */
static int thd_add_pair[2]; /* pipe to signal add a connection to libevent*/
static int thd_kill_pair[2]; /* pipe to signal kill a connection in libevent */
/*
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
event_del) and thds_need_processing and thds_waiting_for_io.
*/
static pthread_mutex_t LOCK_event_loop;
static LIST *thds_need_processing; /* list of thds that needs some processing */
static LIST *thds_waiting_for_io; /* list of thds with added events */
pthread_handler_t libevent_thread_proc(void *arg);
static void libevent_end();
static bool libevent_needs_immediate_processing(THD *thd);
static void libevent_connection_close(THD *thd);
static bool libevent_should_close_connection(THD* thd);
static void libevent_thd_add(THD* thd);
void libevent_io_callback(int Fd, short Operation, void *ctx);
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
/*
Create a pipe and set to non-blocking.
Returns TRUE if there is an error.
*/
static bool init_socketpair(int sock_pair[])
{
sock_pair[0]= sock_pair[1]= -1;
return (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, sock_pair) < 0 ||
evutil_make_socket_nonblocking(sock_pair[0]) == -1 ||
evutil_make_socket_nonblocking(sock_pair[1]) == -1);
}
static void close_socketpair(int sock_pair[])
{
if (sock_pair[0] != -1)
EVUTIL_CLOSESOCKET(sock_pair[0]);
if (sock_pair[1] != -1)
EVUTIL_CLOSESOCKET(sock_pair[1]);
}
/*
thd_scheduler keeps the link between THD and events.
It's embedded in the THD class.
*/
thd_scheduler::thd_scheduler()
: logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
{
#ifndef DBUG_OFF
dbug_explain[0]= '\0';
set_explain= FALSE;
#endif
}
thd_scheduler::~thd_scheduler()
{
my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
}
bool thd_scheduler::init(THD *parent_thd)
{
io_event=
(struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
if (!io_event)
{
sql_print_error("Memory allocation error in thd_scheduler::init\n");
return TRUE;
}
event_set(io_event, (int)parent_thd->net.vio->sd, EV_READ,
libevent_io_callback, (void*)parent_thd);
list.data= parent_thd;
return FALSE;
}
/*
Attach/associate the connection with the OS thread, for command processing.
*/
bool thd_scheduler::thread_attach()
{
DBUG_ASSERT(!thread_attached);
THD* thd = (THD*)list.data;
if (libevent_should_close_connection(thd) ||
setup_connection_thread_globals(thd))
{
return TRUE;
}
my_errno= 0;
thd->mysys_var->abort= 0;
thread_attached= TRUE;
#ifndef DBUG_OFF
/*
When we attach the thread for a connection for the first time,
we know that there is no session value set yet. Thus
the initial setting of set_explain to FALSE is OK.
*/
if (set_explain)
DBUG_SET(dbug_explain);
#endif
return FALSE;
}
/*
Detach/disassociate the connection with the OS thread.
*/
void thd_scheduler::thread_detach()
{
if (thread_attached)
{
THD* thd = (THD*)list.data;
thd->reset_globals();
thread_attached= FALSE;
#ifndef DBUG_OFF
/*
If during the session @@session.dbug was assigned, the
dbug options/state has been pushed. Check if this is the
case, to be able to restore the state when we attach this
logical connection to a physical thread.
*/
if (_db_is_pushed_())
{
set_explain= TRUE;
if (DBUG_EXPLAIN(dbug_explain, sizeof(dbug_explain)))
sql_print_error("thd_scheduler: DBUG_EXPLAIN buffer is too small");
}
/* DBUG_POP() is a no-op in case there is no session state */
DBUG_POP();
#endif
}
}
/**
Create all threads for the thread pool
NOTES
After threads are created we wait until all threads has signaled that
they have started before we return
RETURN
0 ok
1 We got an error creating the thread pool
In this case we will abort all created threads
*/
static bool libevent_init(void)
{
uint i;
DBUG_ENTER("libevent_init");
base= (struct event_base *) event_init();
created_threads= 0;
killed_threads= 0;
kill_pool_threads= FALSE;
pthread_mutex_init(&LOCK_event_loop, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_thd_add, MY_MUTEX_INIT_FAST);
/* set up sockets used to add new thds to the event pool */
if (init_socketpair(thd_add_pair))
{
sql_print_error("init_socketpair(thd_add_spair) error in libevent_init");
DBUG_RETURN(1);
}
/* set up sockets used to kill thds in the event queue */
if (init_socketpair(thd_kill_pair))
{
sql_print_error("init_socketpair(thd_kill_pair) error in libevent_init");
close_socketpair(thd_add_pair);
DBUG_RETURN(1);
}
event_set(&thd_add_event, thd_add_pair[0], EV_READ|EV_PERSIST,
libevent_add_thd_callback, NULL);
event_set(&thd_kill_event, thd_kill_pair[0], EV_READ|EV_PERSIST,
libevent_kill_thd_callback, NULL);
if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
{
sql_print_error("thd_add_event event_add error in libevent_init");
libevent_end();
DBUG_RETURN(1);
}
/* Set up the thread pool */
created_threads= killed_threads= 0;
pthread_mutex_lock(&LOCK_thread_count);
for (i= 0; i < thread_pool_size; i++)
{
pthread_t thread;
int error;
if ((error= pthread_create(&thread, &connection_attrib,
libevent_thread_proc, 0)))
{
sql_print_error("Can't create completion port thread (error %d)",
error);
pthread_mutex_unlock(&LOCK_thread_count);
libevent_end(); // Cleanup
DBUG_RETURN(TRUE);
}
}
/* Wait until all threads are created */
while (created_threads != thread_pool_size)
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
pthread_mutex_unlock(&LOCK_thread_count);
DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
DBUG_RETURN(FALSE);
}
/*
This is called when data is ready on the socket.
NOTES
This is only called by the thread that owns LOCK_event_loop.
We add the thd that got the data to thds_need_processing, and
cause the libevent event_loop() to terminate. Then this same thread will
return from event_loop and pick the thd value back up for processing.
*/
void libevent_io_callback(int, short, void *ctx)
{
safe_mutex_assert_owner(&LOCK_event_loop);
THD *thd= (THD*)ctx;
thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->event_scheduler.list);
thds_need_processing= list_add(thds_need_processing, &thd->event_scheduler.list);
}
/*
This is called when we have a thread we want to be killed.
NOTES
This is only called by the thread that owns LOCK_event_loop.
*/
void libevent_kill_thd_callback(int Fd, short, void*)
{
safe_mutex_assert_owner(&LOCK_event_loop);
/* clear the pending events */
char c;
while (recv(Fd, &c, sizeof(c), 0) == sizeof(c))
{}
LIST* list= thds_waiting_for_io;
while (list)
{
THD *thd= (THD*)list->data;
list= list_rest(list);
if ((int) thd->killed >= (int) KILL_CONNECTION)
{
/*
Delete from libevent and add to the processing queue.
*/
event_del(thd->event_scheduler.io_event);
thds_waiting_for_io= list_delete(thds_waiting_for_io,
&thd->event_scheduler.list);
thds_need_processing= list_add(thds_need_processing,
&thd->event_scheduler.list);
}
}
}
/*
This is used to add connections to the pool. This callback is invoked from
the libevent event_loop() call whenever the thd_add_pair[1] has a byte
written to it.
NOTES
This is only called by the thread that owns LOCK_event_loop.
*/
void libevent_add_thd_callback(int Fd, short, void *)
{
safe_mutex_assert_owner(&LOCK_event_loop);
/* clear the pending events */
char c;
while (recv(Fd, &c, sizeof(c), 0) == sizeof(c))
{}
pthread_mutex_lock(&LOCK_thd_add);
while (thds_need_adding)
{
/* pop the first thd off the list */
THD* thd= (THD*)thds_need_adding->data;
thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
pthread_mutex_unlock(&LOCK_thd_add);
if (!thd->event_scheduler.logged_in || libevent_should_close_connection(thd))
{
/*
Add thd to thds_need_processing list. If it needs closing we'll close
it outside of event_loop().
*/
thds_need_processing= list_add(thds_need_processing,
&thd->event_scheduler.list);
}
else
{
/* Add to libevent */
if (event_add(thd->event_scheduler.io_event, NULL))
{
sql_print_error("event_add error in libevent_add_thd_callback");
libevent_connection_close(thd);
}
else
{
thds_waiting_for_io= list_add(thds_waiting_for_io,
&thd->event_scheduler.list);
}
}
pthread_mutex_lock(&LOCK_thd_add);
}
pthread_mutex_unlock(&LOCK_thd_add);
}
/**
Notify the thread pool about a new connection
NOTES
LOCK_thread_count is locked on entry. This function MUST unlock it!
*/
static void libevent_add_connection(THD *thd)
{
DBUG_ENTER("libevent_add_connection");
DBUG_PRINT("enter", ("thd: %p thread_id: %lu",
thd, thd->thread_id));
if (thd->event_scheduler.init(thd))
{
sql_print_error("Scheduler init error in libevent_add_new_connection");
pthread_mutex_unlock(&LOCK_thread_count);
libevent_connection_close(thd);
DBUG_VOID_RETURN;
}
threads.append(thd);
libevent_thd_add(thd);
pthread_mutex_unlock(&LOCK_thread_count);
DBUG_VOID_RETURN;
}
/**
@brief Signal a waiting connection it's time to die.
@details This function will signal libevent the THD should be killed.
Either the global LOCK_thd_count or the THD's LOCK_thd_data must be locked
upon entry.
@param[in] thd The connection to kill
*/
static void libevent_post_kill_notification(THD *)
{
/*
Note, we just wake up libevent with an event that a THD should be killed,
It will search its list of thds for thd->killed == KILL_CONNECTION to
find the THDs it should kill.
So we don't actually tell it which one and we don't actually use the
THD being passed to us, but that's just a design detail that could change
later.
*/
char c= 0;
send(thd_kill_pair[1], &c, sizeof(c), 0);
}
/*
Close and delete a connection.
*/
static void libevent_connection_close(THD *thd)
{
DBUG_ENTER("libevent_connection_close");
DBUG_PRINT("enter", ("thd: %p", thd));
thd->killed= KILL_CONNECTION; // Avoid error messages
if (thd->net.vio->type != VIO_CLOSED) // not already closed
{
end_connection(thd);
close_connection(thd, 0, 1);
}
thd->event_scheduler.thread_detach();
unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */
pthread_mutex_unlock(&LOCK_thread_count);
DBUG_VOID_RETURN;
}
/*
Returns true if we should close and delete a THD connection.
*/
static bool libevent_should_close_connection(THD* thd)
{
return (thd->net.error ||
thd->net.vio == 0 ||
(int) thd->killed >= (int) KILL_CONNECTION);
}
/*
libevent_thread_proc is the outer loop of each thread in the thread pool.
These procs only return/terminate on shutdown (kill_pool_threads == true).
*/
pthread_handler_t libevent_thread_proc(void *arg)
{
if (init_new_connection_handler_thread())
{
my_thread_global_end();
sql_print_error("libevent_thread_proc: my_thread_init() failed");
exit(1);
}
DBUG_ENTER("libevent_thread_proc");
/*
Signal libevent_init() when all threads has been created and are ready to
receive events.
*/
(void) pthread_mutex_lock(&LOCK_thread_count);
created_threads++;
thread_created++;
if (created_threads == thread_pool_size)
(void) pthread_cond_signal(&COND_thread_count);
(void) pthread_mutex_unlock(&LOCK_thread_count);
for (;;)
{
THD *thd= NULL;
(void) pthread_mutex_lock(&LOCK_event_loop);
/* get thd(s) to process */
while (!thds_need_processing)
{
if (kill_pool_threads)
{
/* the flag that we should die has been set */
(void) pthread_mutex_unlock(&LOCK_event_loop);
goto thread_exit;
}
event_loop(EVLOOP_ONCE);
}
/* pop the first thd off the list */
thd= (THD*)thds_need_processing->data;
thds_need_processing= list_delete(thds_need_processing,
thds_need_processing);
(void) pthread_mutex_unlock(&LOCK_event_loop);
/* now we process the connection (thd) */
/* set up the thd<->thread links. */
thd->thread_stack= (char*) &thd;
if (thd->event_scheduler.thread_attach())
{
libevent_connection_close(thd);
continue;
}
/* is the connection logged in yet? */
if (!thd->event_scheduler.logged_in)
{
DBUG_PRINT("info", ("init new connection. sd: %d",
thd->net.vio->sd));
lex_start(thd);
if (login_connection(thd))
{
/* Failed to log in */
libevent_connection_close(thd);
continue;
}
else
{
/* login successful */
#if MYSQL_VERSION_ID >= 60000
MYSQL_CONNECTION_START(thd->thread_id, thd->security_ctx->priv_user,
(char *) thd->security_ctx->host_or_ip);
#endif
thd->event_scheduler.logged_in= TRUE;
prepare_new_connection_state(thd);
if (!libevent_needs_immediate_processing(thd))
continue; /* New connection is now waiting for data in libevent*/
}
}
do
{
/* Process a query */
if (do_command(thd))
{
libevent_connection_close(thd);
break;
}
} while (libevent_needs_immediate_processing(thd));
}
thread_exit:
DBUG_PRINT("exit", ("ending thread"));
(void) pthread_mutex_lock(&LOCK_thread_count);
killed_threads++;
pthread_cond_broadcast(&COND_thread_count);
(void) pthread_mutex_unlock(&LOCK_thread_count);
my_thread_end();
pthread_exit(0);
DBUG_RETURN(0); /* purify: deadcode */
}
/*
Returns TRUE if the connection needs immediate processing and FALSE if
instead it's queued for libevent processing or closed,
*/
static bool libevent_needs_immediate_processing(THD *thd)
{
if (libevent_should_close_connection(thd))
{
libevent_connection_close(thd);
return FALSE;
}
/*
If more data in the socket buffer, return TRUE to process another command.
Note: we cannot add for event processing because the whole request might
already be buffered and we wouldn't receive an event.
*/
if (vio_pending(thd->net.vio) > 0)
return TRUE;
thd->event_scheduler.thread_detach();
libevent_thd_add(thd);
return FALSE;
}
/*
Adds a THD to queued for libevent processing.
This call does not actually register the event with libevent.
Instead, it places the THD onto a queue and signals libevent by writing
a byte into thd_add_pair, which will cause our libevent_add_thd_callback to
be invoked which will find the THD on the queue and add it to libevent.
*/
static void libevent_thd_add(THD* thd)
{
char c= 0;
/* release any audit resources, this thd is going to sleep */
#if MYSQL_VERSION_ID >= 60000
mysql_audit_release(thd);
#endif
pthread_mutex_lock(&LOCK_thd_add);
/* queue for libevent */
thds_need_adding= list_add(thds_need_adding, &thd->event_scheduler.list);
/* notify libevent */
send(thd_add_pair[1], &c, sizeof(c), 0);
pthread_mutex_unlock(&LOCK_thd_add);
}
/**
Wait until all pool threads have been deleted for clean shutdown
*/
static void libevent_end()
{
DBUG_ENTER("libevent_end");
DBUG_PRINT("enter", ("created_threads: %d killed_threads: %u",
created_threads, killed_threads));
/*
check if initialized. This may not be the case if get an error at
startup
*/
if (!base)
DBUG_VOID_RETURN;
(void) pthread_mutex_lock(&LOCK_thread_count);
kill_pool_threads= TRUE;
while (killed_threads != created_threads)
{
/* wake up the event loop */
char c= 0;
send(thd_add_pair[1], &c, sizeof(c), 0);
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
}
(void) pthread_mutex_unlock(&LOCK_thread_count);
event_del(&thd_add_event);
close_socketpair(thd_add_pair);
event_del(&thd_kill_event);
close_socketpair(thd_kill_pair);
event_base_free(base);
base= 0;
(void) pthread_mutex_destroy(&LOCK_event_loop);
(void) pthread_mutex_destroy(&LOCK_thd_add);
DBUG_VOID_RETURN;
}
void pool_of_threads_scheduler(scheduler_functions* func)
{
func->max_threads= thread_pool_size;
func->max_connections= &max_connections;
func->connection_count= &connection_count;
func->init= libevent_init;
func->end= libevent_end;
func->post_kill_notification= libevent_post_kill_notification;
func->add_connection= libevent_add_connection;
}
#endif