mirror of
https://github.com/MariaDB/server.git
synced 2025-01-29 10:14:19 +01:00
WL#3337 (Event scheduler new architecture)
event_scheduler_ng.cc/h is no more
This commit is contained in:
parent
0d517461f0
commit
3f4e1f5c69
10 changed files with 992 additions and 1028 deletions
|
@ -1,6 +1,5 @@
|
|||
prepare stmt1 from ' show full processlist ';
|
||||
execute stmt1;
|
||||
Id User Host db Command Time State Info
|
||||
number event_scheduler localhost NULL Connect time Suspended NULL
|
||||
number root localhost test Query time NULL show full processlist
|
||||
deallocate prepare stmt1;
|
||||
|
|
|
@ -67,7 +67,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
|
|||
sql_array.h sql_cursor.h events.h \
|
||||
sql_plugin.h authors.h sql_partition.h event_data_objects.h \
|
||||
event_queue.h event_db_repository.h \
|
||||
partition_info.h partition_element.h event_scheduler_ng.h \
|
||||
partition_info.h partition_element.h event_scheduler.h \
|
||||
contributors.h
|
||||
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
|
||||
item.cc item_sum.cc item_buff.cc item_func.cc \
|
||||
|
@ -105,7 +105,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
|
|||
tztime.cc my_time.c my_user.c my_decimal.cc\
|
||||
sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \
|
||||
sp_cache.cc parse_file.cc sql_trigger.cc event_scheduler.cc\
|
||||
event_scheduler_ng.cc events.cc event_data_objects.cc \
|
||||
events.cc event_data_objects.cc \
|
||||
event_queue.cc event_db_repository.cc \
|
||||
sql_plugin.cc sql_binlog.cc \
|
||||
sql_builtin.cc sql_tablespace.cc partition_info.cc
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
#include "event_queue.h"
|
||||
#include "event_data_objects.h"
|
||||
#include "event_db_repository.h"
|
||||
#include "event_scheduler_ng.h"
|
||||
#include "event_scheduler.h"
|
||||
|
||||
|
||||
#define EVENT_QUEUE_INITIAL_SIZE 30
|
||||
|
@ -123,7 +123,7 @@ Event_queue::deinit_mutexes()
|
|||
*/
|
||||
|
||||
bool
|
||||
Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched)
|
||||
Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
|
||||
{
|
||||
int i= 0;
|
||||
bool ret= FALSE;
|
||||
|
|
|
@ -22,7 +22,7 @@ class Event_job_data;
|
|||
class Event_queue_element;
|
||||
|
||||
class THD;
|
||||
class Event_scheduler_ng;
|
||||
class Event_scheduler;
|
||||
|
||||
class Event_queue
|
||||
{
|
||||
|
@ -36,7 +36,7 @@ public:
|
|||
deinit_mutexes();
|
||||
|
||||
bool
|
||||
init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched);
|
||||
init_queue(Event_db_repository *db_repo, Event_scheduler *sched);
|
||||
|
||||
void
|
||||
deinit_queue();
|
||||
|
@ -109,7 +109,7 @@ protected:
|
|||
void
|
||||
dbug_dump_queue(time_t now);
|
||||
|
||||
Event_scheduler_ng *scheduler;
|
||||
Event_scheduler *scheduler;
|
||||
|
||||
/* The sorted queue with the Event_job_data objects */
|
||||
QUEUE queue;
|
||||
|
|
|
@ -14,3 +14,868 @@
|
|||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include "mysql_priv.h"
|
||||
#include "events.h"
|
||||
#include "event_data_objects.h"
|
||||
#include "event_scheduler.h"
|
||||
#include "event_queue.h"
|
||||
|
||||
#ifdef __GNUC__
|
||||
#if __GNUC__ >= 2
|
||||
#define SCHED_FUNC __FUNCTION__
|
||||
#endif
|
||||
#else
|
||||
#define SCHED_FUNC "<unknown>"
|
||||
#endif
|
||||
|
||||
#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__)
|
||||
#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__)
|
||||
#define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__)
|
||||
|
||||
extern pthread_attr_t connection_attrib;
|
||||
|
||||
struct scheduler_param
|
||||
{
|
||||
THD *thd;
|
||||
Event_scheduler *scheduler;
|
||||
};
|
||||
|
||||
struct scheduler_param scheduler_param_value;
|
||||
|
||||
|
||||
|
||||
static
|
||||
LEX_STRING scheduler_states_names[] =
|
||||
{
|
||||
{ C_STRING_WITH_LEN("INITIALIZED")},
|
||||
{ C_STRING_WITH_LEN("RUNNING")},
|
||||
{ C_STRING_WITH_LEN("STOPPING")}
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
Prints the stack of infos, warnings, errors from thd to
|
||||
the console so it can be fetched by the logs-into-tables and
|
||||
checked later.
|
||||
|
||||
SYNOPSIS
|
||||
evex_print_warnings
|
||||
thd Thread used during the execution of the event
|
||||
et The event itself
|
||||
*/
|
||||
|
||||
static void
|
||||
evex_print_warnings(THD *thd, Event_job_data *et)
|
||||
{
|
||||
MYSQL_ERROR *err;
|
||||
DBUG_ENTER("evex_print_warnings");
|
||||
if (!thd->warn_list.elements)
|
||||
DBUG_VOID_RETURN;
|
||||
|
||||
char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
|
||||
char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
|
||||
String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
|
||||
prefix.length(0);
|
||||
prefix.append("SCHEDULER: [");
|
||||
|
||||
append_identifier(thd, &prefix, et->definer.str, et->definer.length);
|
||||
prefix.append("][", 2);
|
||||
append_identifier(thd,&prefix, et->dbname.str, et->dbname.length);
|
||||
prefix.append('.');
|
||||
append_identifier(thd,&prefix, et->name.str, et->name.length);
|
||||
prefix.append("] ", 2);
|
||||
|
||||
List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
|
||||
while ((err= it++))
|
||||
{
|
||||
String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
|
||||
/* set it to 0 or we start adding at the end. That's the trick ;) */
|
||||
err_msg.length(0);
|
||||
err_msg.append(prefix);
|
||||
err_msg.append(err->msg, strlen(err->msg), system_charset_info);
|
||||
err_msg.append("]");
|
||||
DBUG_ASSERT(err->level < 3);
|
||||
(sql_print_message_handlers[err->level])("%*s", err_msg.length(),
|
||||
err_msg.c_ptr());
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Inits an scheduler thread handler, both the main and a worker
|
||||
|
||||
SYNOPSIS
|
||||
init_event_thread()
|
||||
thd - the THD of the thread. Has to be allocated by the caller.
|
||||
|
||||
NOTES
|
||||
1. The host of the thead is my_localhost
|
||||
2. thd->net is initted with NULL - no communication.
|
||||
|
||||
RETURN VALUE
|
||||
0 OK
|
||||
-1 Error
|
||||
*/
|
||||
|
||||
static int
|
||||
init_scheduler_thread(THD* thd)
|
||||
{
|
||||
DBUG_ENTER("init_event_thread");
|
||||
thd->client_capabilities= 0;
|
||||
thd->security_ctx->master_access= 0;
|
||||
thd->security_ctx->db_access= 0;
|
||||
thd->security_ctx->host_or_ip= (char*)my_localhost;
|
||||
thd->security_ctx->set_user((char*)"event_scheduler");
|
||||
my_net_init(&thd->net, NULL);
|
||||
thd->net.read_timeout= slave_net_timeout;
|
||||
thd->slave_thread= 0;
|
||||
thd->options|= OPTION_AUTO_IS_NULL;
|
||||
thd->client_capabilities|= CLIENT_MULTI_RESULTS;
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
thd->thread_id= thread_id++;
|
||||
threads.append(thd);
|
||||
thread_count++;
|
||||
thread_running++;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
|
||||
/*
|
||||
Guarantees that we will see the thread in SHOW PROCESSLIST though its
|
||||
vio is NULL.
|
||||
*/
|
||||
|
||||
thd->proc_info= "Initialized";
|
||||
thd->version= refresh_version;
|
||||
thd->set_time();
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Cleans up the THD and the threaded environment of the thread.
|
||||
|
||||
SYNOPSIS
|
||||
deinit_event_thread()
|
||||
thd Thread
|
||||
*/
|
||||
|
||||
static void
|
||||
deinit_event_thread(THD *thd)
|
||||
{
|
||||
thd->proc_info= "Clearing";
|
||||
DBUG_ASSERT(thd->net.buff != 0);
|
||||
net_end(&thd->net);
|
||||
DBUG_PRINT("exit", ("Scheduler thread finishing"));
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
thread_count--;
|
||||
thread_running--;
|
||||
delete thd;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
|
||||
my_thread_end();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Function that executes the scheduler,
|
||||
|
||||
SYNOPSIS
|
||||
event_scheduler_thread()
|
||||
arg Pointer to `struct scheduler_param`
|
||||
|
||||
RETURN VALUE
|
||||
0 OK
|
||||
*/
|
||||
|
||||
pthread_handler_t
|
||||
event_scheduler_thread(void *arg)
|
||||
{
|
||||
/* needs to be first for thread_stack */
|
||||
THD *thd= (THD *)(*(struct scheduler_param *) arg).thd;
|
||||
|
||||
thd->thread_stack= (char *)&thd; // remember where our stack is
|
||||
DBUG_ENTER("event_scheduler_thread");
|
||||
|
||||
my_thread_init();
|
||||
pthread_detach_this_thread();
|
||||
thd->real_id=pthread_self();
|
||||
if (init_thr_lock() || thd->store_globals())
|
||||
{
|
||||
thd->cleanup();
|
||||
goto end;
|
||||
}
|
||||
|
||||
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
|
||||
sigset_t set;
|
||||
VOID(sigemptyset(&set)); // Get mask in use
|
||||
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
|
||||
#endif
|
||||
|
||||
((struct scheduler_param *) arg)->scheduler->run(thd);
|
||||
|
||||
end:
|
||||
deinit_event_thread(thd);
|
||||
|
||||
DBUG_RETURN(0); // Against gcc warnings
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Function that executes an event in a child thread. Setups the
|
||||
environment for the event execution and cleans after that.
|
||||
|
||||
SYNOPSIS
|
||||
event_worker_thread()
|
||||
arg The Event_job_data object to be processed
|
||||
|
||||
RETURN VALUE
|
||||
0 OK
|
||||
*/
|
||||
|
||||
pthread_handler_t
|
||||
event_worker_thread(void *arg)
|
||||
{
|
||||
/* needs to be first for thread_stack */
|
||||
THD *thd;
|
||||
Event_job_data *event= (Event_job_data *)arg;
|
||||
int ret;
|
||||
|
||||
thd= event->thd;
|
||||
thd->thread_stack= (char *) &thd;
|
||||
|
||||
|
||||
my_thread_init();
|
||||
pthread_detach_this_thread();
|
||||
thd->real_id=pthread_self();
|
||||
if (init_thr_lock() || thd->store_globals())
|
||||
{
|
||||
thd->cleanup();
|
||||
goto end;
|
||||
}
|
||||
|
||||
|
||||
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
|
||||
sigset_t set;
|
||||
VOID(sigemptyset(&set)); // Get mask in use
|
||||
VOID(pthread_sigmask(SIG_UNBLOCK, &set, &thd->block_signals));
|
||||
#endif
|
||||
thd->init_for_queries();
|
||||
|
||||
DBUG_ENTER("event_worker_thread");
|
||||
DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
|
||||
"THD=0x%lx", time(NULL), thd));
|
||||
|
||||
sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
|
||||
event->dbname.str, event->name.str,
|
||||
event->definer.str, thd->thread_id);
|
||||
|
||||
thd->enable_slow_log= TRUE;
|
||||
|
||||
ret= event->execute(thd);
|
||||
|
||||
evex_print_warnings(thd, event);
|
||||
|
||||
sql_print_information("SCHEDULER: [%s.%s of %s] executed "
|
||||
" in thread thread %lu. RetCode=%d",
|
||||
event->dbname.str, event->name.str,
|
||||
event->definer.str, thd->thread_id, ret);
|
||||
if (ret == EVEX_COMPILE_ERROR)
|
||||
sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
|
||||
event->dbname.str, event->name.str,
|
||||
event->definer.str);
|
||||
else if (ret == EVEX_MICROSECOND_UNSUP)
|
||||
sql_print_information("SCHEDULER: MICROSECOND is not supported");
|
||||
|
||||
end:
|
||||
DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str,
|
||||
event->name.str));
|
||||
delete event;
|
||||
|
||||
deinit_event_thread(thd);
|
||||
|
||||
DBUG_RETURN(0); // Against gcc warnings
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Performs initialization of the scheduler data, outside of the
|
||||
threading primitives.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::init_scheduler()
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler::init_scheduler(Event_queue *q)
|
||||
{
|
||||
LOCK_SCHEDULER_DATA();
|
||||
thread_id= 0;
|
||||
state= INITIALIZED;
|
||||
queue= q;
|
||||
started_events= 0;
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Event_scheduler::deinit_scheduler() {}
|
||||
|
||||
|
||||
/*
|
||||
Inits scheduler's threading primitives.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::init_mutexes()
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler::init_mutexes()
|
||||
{
|
||||
pthread_mutex_init(&LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
|
||||
pthread_cond_init(&COND_state, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Deinits scheduler's threading primitives.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::deinit_mutexes()
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler::deinit_mutexes()
|
||||
{
|
||||
pthread_mutex_destroy(&LOCK_scheduler_state);
|
||||
pthread_cond_destroy(&COND_state);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Starts the scheduler (again). Creates a new THD and passes it to
|
||||
a forked thread. Does not wait for acknowledgement from the new
|
||||
thread that it has started. Asynchronous starting. Most of the
|
||||
needed initializations are done in the current thread to minimize
|
||||
the chance of failure in the spawned thread.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::start()
|
||||
|
||||
RETURN VALUE
|
||||
FALSE OK
|
||||
TRUE Error (not reported)
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler::start()
|
||||
{
|
||||
THD *new_thd= NULL;
|
||||
bool ret= FALSE;
|
||||
pthread_t th;
|
||||
DBUG_ENTER("Event_scheduler::start");
|
||||
|
||||
LOCK_SCHEDULER_DATA();
|
||||
DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
|
||||
if (state > INITIALIZED)
|
||||
goto end;
|
||||
|
||||
if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
|
||||
{
|
||||
sql_print_error("SCHEDULER: Cannot init manager event thread.");
|
||||
ret= TRUE;
|
||||
goto end;
|
||||
}
|
||||
new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
|
||||
new_thd->command= COM_DAEMON;
|
||||
|
||||
scheduler_param_value.thd= new_thd;
|
||||
scheduler_param_value.scheduler= this;
|
||||
|
||||
DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd));
|
||||
if (pthread_create(&th, &connection_attrib, event_scheduler_thread,
|
||||
(void*)&scheduler_param_value))
|
||||
{
|
||||
DBUG_PRINT("error", ("cannot create a new thread"));
|
||||
state= INITIALIZED;
|
||||
ret= TRUE;
|
||||
}
|
||||
DBUG_PRINT("info", ("Setting state go RUNNING"));
|
||||
state= RUNNING;
|
||||
end:
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
|
||||
if (ret && new_thd)
|
||||
{
|
||||
DBUG_PRINT("info", ("There was an error during THD creation. Clean up"));
|
||||
new_thd->proc_info= "Clearing";
|
||||
DBUG_ASSERT(new_thd->net.buff != 0);
|
||||
net_end(&new_thd->net);
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
thread_count--;
|
||||
thread_running--;
|
||||
delete new_thd;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
}
|
||||
DBUG_RETURN(ret);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Stops the scheduler (again). Waits for acknowledgement from the
|
||||
scheduler that it has stopped - synchronous stopping.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::stop()
|
||||
|
||||
RETURN VALUE
|
||||
FALSE OK
|
||||
TRUE Error (not reported)
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler::stop()
|
||||
{
|
||||
THD *thd= current_thd;
|
||||
DBUG_ENTER("Event_scheduler::stop");
|
||||
DBUG_PRINT("enter", ("thd=0x%lx", current_thd));
|
||||
|
||||
LOCK_SCHEDULER_DATA();
|
||||
DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
|
||||
if (state != RUNNING)
|
||||
goto end;
|
||||
|
||||
state= STOPPING;
|
||||
|
||||
DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
|
||||
sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
|
||||
|
||||
pthread_cond_signal(&COND_state);
|
||||
|
||||
/* Guarantee we don't catch spurious signals */
|
||||
sql_print_information("SCHEDULER: Waiting the manager thread to reply");
|
||||
do {
|
||||
DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
|
||||
"thread. Current value of state is %s . "
|
||||
"workers count=%d", scheduler_states_names[state].str,
|
||||
workers_count()));
|
||||
/* thd could be 0x0, when shutting down */
|
||||
COND_STATE_WAIT(NULL);
|
||||
} while (state == STOPPING);
|
||||
DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
|
||||
|
||||
thread_id= 0;
|
||||
end:
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
DBUG_RETURN(FALSE);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
The main loop of the scheduler.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::run()
|
||||
thd Thread
|
||||
|
||||
RETURN VALUE
|
||||
FALSE OK
|
||||
TRUE Error (Serious error)
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler::run(THD *thd)
|
||||
{
|
||||
int res;
|
||||
struct timespec abstime;
|
||||
Event_job_data *job_data;
|
||||
DBUG_ENTER("Event_scheduler::run");
|
||||
|
||||
LOCK_SCHEDULER_DATA();
|
||||
|
||||
thread_id= thd->thread_id;
|
||||
sql_print_information("SCHEDULER: Manager thread started with id %lu",
|
||||
thread_id);
|
||||
/*
|
||||
Recalculate the values in the queue because there could have been stops
|
||||
in executions of the scheduler and some times could have passed by.
|
||||
*/
|
||||
queue->recalculate_activation_times(thd);
|
||||
while (state == RUNNING)
|
||||
{
|
||||
thd->end_time();
|
||||
/* Gets a minimized version */
|
||||
if (queue->get_top_for_execution_if_time(thd, thd->query_start(),
|
||||
&job_data, &abstime))
|
||||
{
|
||||
sql_print_information("SCHEDULER: Serious error during getting next"
|
||||
" event to execute. Stopping.");
|
||||
break;
|
||||
}
|
||||
|
||||
DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d "
|
||||
"abs_time.tv_sec=%d",
|
||||
job_data, thd->query_start(), abstime.tv_sec));
|
||||
if (!job_data && !abstime.tv_sec)
|
||||
{
|
||||
DBUG_PRINT("info", ("The queue is empty. Going to sleep"));
|
||||
thd->enter_cond(&COND_state, &LOCK_scheduler_state,
|
||||
"Waiting on empty queue");
|
||||
COND_STATE_WAIT(NULL);
|
||||
thd->exit_cond("");
|
||||
DBUG_PRINT("info", ("Woke up. Got COND_state"));
|
||||
LOCK_SCHEDULER_DATA();
|
||||
}
|
||||
else if (abstime.tv_sec)
|
||||
{
|
||||
DBUG_PRINT("info", ("Have to sleep some time %u till",
|
||||
abstime.tv_sec - thd->query_start(), abstime.tv_sec));
|
||||
|
||||
thd->enter_cond(&COND_state, &LOCK_scheduler_state,
|
||||
"Waiting for next activation");
|
||||
COND_STATE_WAIT(&abstime);
|
||||
/*
|
||||
If we get signal we should recalculate the whether it's the right time
|
||||
because there could be :
|
||||
1. Spurious wake-up
|
||||
2. The top of the queue was changed (new one becase of create/update)
|
||||
*/
|
||||
/* This will do implicit UNLOCK_SCHEDULER_DATA() */
|
||||
thd->exit_cond("");
|
||||
DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution."));
|
||||
LOCK_SCHEDULER_DATA();
|
||||
}
|
||||
else
|
||||
{
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
res= execute_top(thd, job_data);
|
||||
LOCK_SCHEDULER_DATA();
|
||||
if (res)
|
||||
break;
|
||||
++started_events;
|
||||
}
|
||||
DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
|
||||
}
|
||||
DBUG_PRINT("info", ("Signalling back to the stopper COND_state"));
|
||||
pthread_cond_signal(&COND_state);
|
||||
error:
|
||||
state= INITIALIZED;
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
sql_print_information("SCHEDULER: Stopped");
|
||||
|
||||
DBUG_RETURN(res);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Creates a new THD instance and then forks a new thread, while passing
|
||||
the THD pointer and job_data to it.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::execute_top()
|
||||
|
||||
RETURN VALUE
|
||||
FALSE OK
|
||||
TRUE Error (Serious error)
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
|
||||
{
|
||||
THD *new_thd;
|
||||
pthread_t th;
|
||||
int res= 0;
|
||||
DBUG_ENTER("Event_scheduler::execute_top");
|
||||
if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
|
||||
goto error;
|
||||
|
||||
new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
|
||||
job_data->thd= new_thd;
|
||||
DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition",
|
||||
job_data->dbname.str, job_data->name.str));
|
||||
|
||||
/* Major failure */
|
||||
if ((res= pthread_create(&th, &connection_attrib, event_worker_thread,
|
||||
job_data)))
|
||||
goto error;
|
||||
|
||||
DBUG_PRINT("info", ("Launch succeeded. BURAN is in THD=0x%lx", new_thd));
|
||||
DBUG_RETURN(FALSE);
|
||||
|
||||
error:
|
||||
DBUG_PRINT("error", ("Baikonur, we have a problem! res=%d", res));
|
||||
if (new_thd)
|
||||
{
|
||||
new_thd->proc_info= "Clearing";
|
||||
DBUG_ASSERT(new_thd->net.buff != 0);
|
||||
net_end(&new_thd->net);
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
thread_count--;
|
||||
thread_running--;
|
||||
delete new_thd;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
}
|
||||
delete job_data;
|
||||
DBUG_RETURN(TRUE);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Returns the current state of the scheduler
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::get_state()
|
||||
|
||||
RETURN VALUE
|
||||
The state of the scheduler (INITIALIZED | RUNNING | STOPPING)
|
||||
*/
|
||||
|
||||
enum Event_scheduler::enum_state
|
||||
Event_scheduler::get_state()
|
||||
{
|
||||
enum Event_scheduler::enum_state ret;
|
||||
LOCK_SCHEDULER_DATA();
|
||||
ret= state;
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Returns the number of living event worker threads.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::workers_count()
|
||||
*/
|
||||
|
||||
uint
|
||||
Event_scheduler::workers_count()
|
||||
{
|
||||
THD *tmp;
|
||||
uint count= 0;
|
||||
|
||||
DBUG_ENTER("Event_scheduler::workers_count");
|
||||
pthread_mutex_lock(&LOCK_thread_count); // For unlink from list
|
||||
I_List_iterator<THD> it(threads);
|
||||
while ((tmp=it++))
|
||||
{
|
||||
if (tmp->command == COM_DAEMON)
|
||||
continue;
|
||||
if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
|
||||
++count;
|
||||
}
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
DBUG_PRINT("exit", ("%d", count));
|
||||
DBUG_RETURN(count);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Signals the main scheduler thread that the queue has changed
|
||||
its state.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::queue_changed()
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler::queue_changed()
|
||||
{
|
||||
DBUG_ENTER("Event_scheduler::queue_changed");
|
||||
DBUG_PRINT("info", ("Sending COND_state. state (read wo lock)=%s ",
|
||||
scheduler_states_names[state].str));
|
||||
pthread_cond_signal(&COND_state);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Auxiliary function for locking LOCK_scheduler_state. Used
|
||||
by the LOCK_SCHEDULER_DATA macro.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::lock_data()
|
||||
func Which function is requesting mutex lock
|
||||
line On which line mutex lock is requested
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler::lock_data(const char *func, uint line)
|
||||
{
|
||||
DBUG_ENTER("Event_scheduler::lock_data");
|
||||
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
|
||||
pthread_mutex_lock(&LOCK_scheduler_state);
|
||||
mutex_last_locked_in_func= func;
|
||||
mutex_last_locked_at_line= line;
|
||||
mutex_scheduler_data_locked= TRUE;
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Auxiliary function for unlocking LOCK_scheduler_state. Used
|
||||
by the UNLOCK_SCHEDULER_DATA macro.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::unlock_data()
|
||||
func Which function is requesting mutex unlock
|
||||
line On which line mutex unlock is requested
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler::unlock_data(const char *func, uint line)
|
||||
{
|
||||
DBUG_ENTER("Event_scheduler::unlock_data");
|
||||
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
|
||||
mutex_last_unlocked_at_line= line;
|
||||
mutex_scheduler_data_locked= FALSE;
|
||||
mutex_last_unlocked_in_func= func;
|
||||
pthread_mutex_unlock(&LOCK_scheduler_state);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Wrapper for pthread_cond_wait/timedwait
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::cond_wait()
|
||||
cond Conditional to wait for
|
||||
mutex Mutex of the conditional
|
||||
|
||||
RETURN VALUE
|
||||
Error code of pthread_cond_wait()
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler::cond_wait(struct timespec *abstime,
|
||||
const char *func, uint line)
|
||||
{
|
||||
DBUG_ENTER("Event_scheduler::cond_wait");
|
||||
waiting_on_cond= TRUE;
|
||||
mutex_last_unlocked_at_line= line;
|
||||
mutex_scheduler_data_locked= FALSE;
|
||||
mutex_last_unlocked_in_func= func;
|
||||
|
||||
if (abstime)
|
||||
pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
|
||||
else
|
||||
pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
|
||||
|
||||
mutex_last_locked_in_func= func;
|
||||
mutex_last_locked_at_line= line;
|
||||
mutex_scheduler_data_locked= TRUE;
|
||||
waiting_on_cond= FALSE;
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Dumps the internal status of the scheduler
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler::dump_internal_status()
|
||||
thd Thread
|
||||
|
||||
RETURN VALUE
|
||||
FALSE OK
|
||||
TRUE Error
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler::dump_internal_status(THD *thd)
|
||||
{
|
||||
int ret= 0;
|
||||
DBUG_ENTER("Event_scheduler::dump_internal_status");
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
CHARSET_INFO *scs= system_charset_info;
|
||||
Protocol *protocol= thd->protocol;
|
||||
char tmp_buff[5*STRING_BUFFER_USUAL_SIZE];
|
||||
char int_buff[STRING_BUFFER_USUAL_SIZE];
|
||||
String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
|
||||
String int_string(int_buff, sizeof(int_buff), scs);
|
||||
tmp_string.length(0);
|
||||
int_string.length(0);
|
||||
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler state"), scs);
|
||||
protocol->store(scheduler_states_names[state].str,
|
||||
scheduler_states_names[state].length, scs);
|
||||
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* thread_id */
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("thread_id"), scs);
|
||||
if (thread_id)
|
||||
{
|
||||
int_string.set((longlong) thread_id, scs);
|
||||
protocol->store(&int_string);
|
||||
}
|
||||
else
|
||||
protocol->store_null();
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* last locked at*/
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs);
|
||||
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
|
||||
tmp_string.alloced_length(), "%s::%d",
|
||||
mutex_last_locked_in_func,
|
||||
mutex_last_locked_at_line));
|
||||
protocol->store(&tmp_string);
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* last unlocked at*/
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs);
|
||||
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
|
||||
tmp_string.alloced_length(), "%s::%d",
|
||||
mutex_last_unlocked_in_func,
|
||||
mutex_last_unlocked_at_line));
|
||||
protocol->store(&tmp_string);
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* waiting on */
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs);
|
||||
int_string.set((longlong) waiting_on_cond, scs);
|
||||
protocol->store(&int_string);
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* workers_count */
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler workers count"), scs);
|
||||
int_string.set((longlong) workers_count(), scs);
|
||||
protocol->store(&int_string);
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* workers_count */
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler executed events"), scs);
|
||||
int_string.set((longlong) started_events, scs);
|
||||
protocol->store(&int_string);
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* scheduler_data_locked */
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler data locked"), scs);
|
||||
int_string.set((longlong) mutex_scheduler_data_locked, scs);
|
||||
protocol->store(&int_string);
|
||||
ret= protocol->write();
|
||||
end:
|
||||
#endif
|
||||
|
||||
DBUG_RETURN(ret);
|
||||
}
|
||||
|
|
|
@ -16,4 +16,108 @@
|
|||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
class Event_queue;
|
||||
class Event_job_data;
|
||||
|
||||
class Event_scheduler
|
||||
{
|
||||
public:
|
||||
Event_scheduler(){}
|
||||
~Event_scheduler(){}
|
||||
|
||||
enum enum_state
|
||||
{
|
||||
INITIALIZED = 0,
|
||||
RUNNING,
|
||||
STOPPING
|
||||
};
|
||||
|
||||
/* State changing methods follow */
|
||||
|
||||
bool
|
||||
start();
|
||||
|
||||
bool
|
||||
stop();
|
||||
|
||||
/*
|
||||
Need to be public because has to be called from the function
|
||||
passed to pthread_create.
|
||||
*/
|
||||
bool
|
||||
run(THD *thd);
|
||||
|
||||
bool
|
||||
init_scheduler(Event_queue *queue);
|
||||
|
||||
void
|
||||
deinit_scheduler();
|
||||
|
||||
void
|
||||
init_mutexes();
|
||||
|
||||
void
|
||||
deinit_mutexes();
|
||||
|
||||
/* Information retrieving methods follow */
|
||||
|
||||
enum enum_state
|
||||
get_state();
|
||||
|
||||
void
|
||||
queue_changed();
|
||||
|
||||
bool
|
||||
dump_internal_status(THD *thd);
|
||||
|
||||
private:
|
||||
uint
|
||||
workers_count();
|
||||
|
||||
/* helper functions */
|
||||
bool
|
||||
execute_top(THD *thd, Event_job_data *job_data);
|
||||
|
||||
/* helper functions for working with mutexes & conditionals */
|
||||
void
|
||||
lock_data(const char *func, uint line);
|
||||
|
||||
void
|
||||
unlock_data(const char *func, uint line);
|
||||
|
||||
void
|
||||
cond_wait(struct timespec *abstime, const char *func, uint line);
|
||||
|
||||
pthread_mutex_t LOCK_scheduler_state;
|
||||
|
||||
/* This is the current status of the life-cycle of the scheduler. */
|
||||
enum enum_state state;
|
||||
|
||||
/*
|
||||
Holds the thread id of the executor thread or 0 if the scheduler is not
|
||||
running. It is used by ::shutdown() to know which thread to kill with
|
||||
kill_one_thread(). The latter wake ups a thread if it is waiting on a
|
||||
conditional variable and sets thd->killed to non-zero.
|
||||
*/
|
||||
ulong thread_id;
|
||||
|
||||
pthread_cond_t COND_state;
|
||||
|
||||
Event_queue *queue;
|
||||
|
||||
uint mutex_last_locked_at_line;
|
||||
uint mutex_last_unlocked_at_line;
|
||||
const char* mutex_last_locked_in_func;
|
||||
const char* mutex_last_unlocked_in_func;
|
||||
bool mutex_scheduler_data_locked;
|
||||
bool waiting_on_cond;
|
||||
|
||||
ulonglong started_events;
|
||||
|
||||
private:
|
||||
/* Prevent use of these */
|
||||
Event_scheduler(const Event_scheduler &);
|
||||
void operator=(Event_scheduler &);
|
||||
};
|
||||
|
||||
#endif /* _EVENT_SCHEDULER_H_ */
|
||||
|
|
|
@ -1,881 +0,0 @@
|
|||
/* Copyright (C) 2004-2006 MySQL AB
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include "mysql_priv.h"
|
||||
#include "events.h"
|
||||
#include "event_data_objects.h"
|
||||
#include "event_scheduler_ng.h"
|
||||
#include "event_queue.h"
|
||||
|
||||
#ifdef __GNUC__
|
||||
#if __GNUC__ >= 2
|
||||
#define SCHED_FUNC __FUNCTION__
|
||||
#endif
|
||||
#else
|
||||
#define SCHED_FUNC "<unknown>"
|
||||
#endif
|
||||
|
||||
#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__)
|
||||
#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__)
|
||||
#define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__)
|
||||
|
||||
extern pthread_attr_t connection_attrib;
|
||||
|
||||
struct scheduler_param
|
||||
{
|
||||
THD *thd;
|
||||
Event_scheduler_ng *scheduler;
|
||||
};
|
||||
|
||||
struct scheduler_param scheduler_param_value;
|
||||
|
||||
|
||||
|
||||
static
|
||||
LEX_STRING scheduler_states_names[] =
|
||||
{
|
||||
{ C_STRING_WITH_LEN("INITIALIZED")},
|
||||
{ C_STRING_WITH_LEN("RUNNING")},
|
||||
{ C_STRING_WITH_LEN("STOPPING")}
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
Prints the stack of infos, warnings, errors from thd to
|
||||
the console so it can be fetched by the logs-into-tables and
|
||||
checked later.
|
||||
|
||||
SYNOPSIS
|
||||
evex_print_warnings
|
||||
thd Thread used during the execution of the event
|
||||
et The event itself
|
||||
*/
|
||||
|
||||
static void
|
||||
evex_print_warnings(THD *thd, Event_job_data *et)
|
||||
{
|
||||
MYSQL_ERROR *err;
|
||||
DBUG_ENTER("evex_print_warnings");
|
||||
if (!thd->warn_list.elements)
|
||||
DBUG_VOID_RETURN;
|
||||
|
||||
char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
|
||||
char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
|
||||
String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
|
||||
prefix.length(0);
|
||||
prefix.append("SCHEDULER: [");
|
||||
|
||||
append_identifier(thd, &prefix, et->definer.str, et->definer.length);
|
||||
prefix.append("][", 2);
|
||||
append_identifier(thd,&prefix, et->dbname.str, et->dbname.length);
|
||||
prefix.append('.');
|
||||
append_identifier(thd,&prefix, et->name.str, et->name.length);
|
||||
prefix.append("] ", 2);
|
||||
|
||||
List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
|
||||
while ((err= it++))
|
||||
{
|
||||
String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
|
||||
/* set it to 0 or we start adding at the end. That's the trick ;) */
|
||||
err_msg.length(0);
|
||||
err_msg.append(prefix);
|
||||
err_msg.append(err->msg, strlen(err->msg), system_charset_info);
|
||||
err_msg.append("]");
|
||||
DBUG_ASSERT(err->level < 3);
|
||||
(sql_print_message_handlers[err->level])("%*s", err_msg.length(),
|
||||
err_msg.c_ptr());
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Inits an scheduler thread handler, both the main and a worker
|
||||
|
||||
SYNOPSIS
|
||||
init_event_thread()
|
||||
thd - the THD of the thread. Has to be allocated by the caller.
|
||||
|
||||
NOTES
|
||||
1. The host of the thead is my_localhost
|
||||
2. thd->net is initted with NULL - no communication.
|
||||
|
||||
RETURN VALUE
|
||||
0 OK
|
||||
-1 Error
|
||||
*/
|
||||
|
||||
static int
|
||||
init_scheduler_thread(THD* thd)
|
||||
{
|
||||
DBUG_ENTER("init_event_thread");
|
||||
thd->client_capabilities= 0;
|
||||
thd->security_ctx->master_access= 0;
|
||||
thd->security_ctx->db_access= 0;
|
||||
thd->security_ctx->host_or_ip= (char*)my_localhost;
|
||||
thd->security_ctx->set_user((char*)"event_scheduler");
|
||||
my_net_init(&thd->net, NULL);
|
||||
thd->net.read_timeout= slave_net_timeout;
|
||||
thd->slave_thread= 0;
|
||||
thd->options|= OPTION_AUTO_IS_NULL;
|
||||
thd->client_capabilities|= CLIENT_MULTI_RESULTS;
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
thd->thread_id= thread_id++;
|
||||
threads.append(thd);
|
||||
thread_count++;
|
||||
thread_running++;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
|
||||
/*
|
||||
Guarantees that we will see the thread in SHOW PROCESSLIST though its
|
||||
vio is NULL.
|
||||
*/
|
||||
|
||||
thd->proc_info= "Initialized";
|
||||
thd->version= refresh_version;
|
||||
thd->set_time();
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Cleans up the THD and the threaded environment of the thread.
|
||||
|
||||
SYNOPSIS
|
||||
deinit_event_thread()
|
||||
thd Thread
|
||||
*/
|
||||
|
||||
static void
|
||||
deinit_event_thread(THD *thd)
|
||||
{
|
||||
thd->proc_info= "Clearing";
|
||||
DBUG_ASSERT(thd->net.buff != 0);
|
||||
net_end(&thd->net);
|
||||
DBUG_PRINT("exit", ("Scheduler thread finishing"));
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
thread_count--;
|
||||
thread_running--;
|
||||
delete thd;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
|
||||
my_thread_end();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Function that executes the scheduler,
|
||||
|
||||
SYNOPSIS
|
||||
event_scheduler_ng_thread()
|
||||
arg Pointer to `struct scheduler_param`
|
||||
|
||||
RETURN VALUE
|
||||
0 OK
|
||||
*/
|
||||
|
||||
pthread_handler_t
|
||||
event_scheduler_ng_thread(void *arg)
|
||||
{
|
||||
/* needs to be first for thread_stack */
|
||||
THD *thd= (THD *)(*(struct scheduler_param *) arg).thd;
|
||||
|
||||
thd->thread_stack= (char *)&thd; // remember where our stack is
|
||||
DBUG_ENTER("event_scheduler_ng_thread");
|
||||
|
||||
my_thread_init();
|
||||
pthread_detach_this_thread();
|
||||
thd->real_id=pthread_self();
|
||||
if (init_thr_lock() || thd->store_globals())
|
||||
{
|
||||
thd->cleanup();
|
||||
goto end;
|
||||
}
|
||||
|
||||
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
|
||||
sigset_t set;
|
||||
VOID(sigemptyset(&set)); // Get mask in use
|
||||
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
|
||||
#endif
|
||||
|
||||
((struct scheduler_param *) arg)->scheduler->run(thd);
|
||||
|
||||
end:
|
||||
deinit_event_thread(thd);
|
||||
|
||||
DBUG_RETURN(0); // Against gcc warnings
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Function that executes an event in a child thread. Setups the
|
||||
environment for the event execution and cleans after that.
|
||||
|
||||
SYNOPSIS
|
||||
event_worker_ng_thread()
|
||||
arg The Event_job_data object to be processed
|
||||
|
||||
RETURN VALUE
|
||||
0 OK
|
||||
*/
|
||||
|
||||
pthread_handler_t
|
||||
event_worker_ng_thread(void *arg)
|
||||
{
|
||||
/* needs to be first for thread_stack */
|
||||
THD *thd;
|
||||
Event_job_data *event= (Event_job_data *)arg;
|
||||
int ret;
|
||||
|
||||
thd= event->thd;
|
||||
thd->thread_stack= (char *) &thd;
|
||||
|
||||
|
||||
my_thread_init();
|
||||
pthread_detach_this_thread();
|
||||
thd->real_id=pthread_self();
|
||||
if (init_thr_lock() || thd->store_globals())
|
||||
{
|
||||
thd->cleanup();
|
||||
goto end;
|
||||
}
|
||||
|
||||
|
||||
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
|
||||
sigset_t set;
|
||||
VOID(sigemptyset(&set)); // Get mask in use
|
||||
VOID(pthread_sigmask(SIG_UNBLOCK, &set, &thd->block_signals));
|
||||
#endif
|
||||
thd->init_for_queries();
|
||||
|
||||
DBUG_ENTER("event_worker_ng_thread");
|
||||
DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
|
||||
"THD=0x%lx", time(NULL), thd));
|
||||
|
||||
sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
|
||||
event->dbname.str, event->name.str,
|
||||
event->definer.str, thd->thread_id);
|
||||
|
||||
thd->enable_slow_log= TRUE;
|
||||
|
||||
ret= event->execute(thd);
|
||||
|
||||
evex_print_warnings(thd, event);
|
||||
|
||||
sql_print_information("SCHEDULER: [%s.%s of %s] executed "
|
||||
" in thread thread %lu. RetCode=%d",
|
||||
event->dbname.str, event->name.str,
|
||||
event->definer.str, thd->thread_id, ret);
|
||||
if (ret == EVEX_COMPILE_ERROR)
|
||||
sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
|
||||
event->dbname.str, event->name.str,
|
||||
event->definer.str);
|
||||
else if (ret == EVEX_MICROSECOND_UNSUP)
|
||||
sql_print_information("SCHEDULER: MICROSECOND is not supported");
|
||||
|
||||
end:
|
||||
DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str,
|
||||
event->name.str));
|
||||
delete event;
|
||||
|
||||
deinit_event_thread(thd);
|
||||
|
||||
DBUG_RETURN(0); // Against gcc warnings
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Performs initialization of the scheduler data, outside of the
|
||||
threading primitives.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::init_scheduler()
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler_ng::init_scheduler(Event_queue *q)
|
||||
{
|
||||
LOCK_SCHEDULER_DATA();
|
||||
thread_id= 0;
|
||||
state= INITIALIZED;
|
||||
queue= q;
|
||||
started_events= 0;
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Event_scheduler_ng::deinit_scheduler() {}
|
||||
|
||||
|
||||
/*
|
||||
Inits scheduler's threading primitives.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::init_mutexes()
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler_ng::init_mutexes()
|
||||
{
|
||||
pthread_mutex_init(&LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
|
||||
pthread_cond_init(&COND_state, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Deinits scheduler's threading primitives.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::deinit_mutexes()
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler_ng::deinit_mutexes()
|
||||
{
|
||||
pthread_mutex_destroy(&LOCK_scheduler_state);
|
||||
pthread_cond_destroy(&COND_state);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Starts the scheduler (again). Creates a new THD and passes it to
|
||||
a forked thread. Does not wait for acknowledgement from the new
|
||||
thread that it has started. Asynchronous starting. Most of the
|
||||
needed initializations are done in the current thread to minimize
|
||||
the chance of failure in the spawned thread.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::start()
|
||||
|
||||
RETURN VALUE
|
||||
FALSE OK
|
||||
TRUE Error (not reported)
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler_ng::start()
|
||||
{
|
||||
THD *new_thd= NULL;
|
||||
bool ret= FALSE;
|
||||
pthread_t th;
|
||||
DBUG_ENTER("Event_scheduler_ng::start");
|
||||
|
||||
LOCK_SCHEDULER_DATA();
|
||||
DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
|
||||
if (state > INITIALIZED)
|
||||
goto end;
|
||||
|
||||
if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
|
||||
{
|
||||
sql_print_error("SCHEDULER: Cannot init manager event thread.");
|
||||
ret= TRUE;
|
||||
goto end;
|
||||
}
|
||||
new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
|
||||
new_thd->command= COM_DAEMON;
|
||||
|
||||
scheduler_param_value.thd= new_thd;
|
||||
scheduler_param_value.scheduler= this;
|
||||
|
||||
DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd));
|
||||
if (pthread_create(&th, &connection_attrib, event_scheduler_ng_thread,
|
||||
(void*)&scheduler_param_value))
|
||||
{
|
||||
DBUG_PRINT("error", ("cannot create a new thread"));
|
||||
state= INITIALIZED;
|
||||
ret= TRUE;
|
||||
}
|
||||
DBUG_PRINT("info", ("Setting state go RUNNING"));
|
||||
state= RUNNING;
|
||||
end:
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
|
||||
if (ret && new_thd)
|
||||
{
|
||||
DBUG_PRINT("info", ("There was an error during THD creation. Clean up"));
|
||||
new_thd->proc_info= "Clearing";
|
||||
DBUG_ASSERT(new_thd->net.buff != 0);
|
||||
net_end(&new_thd->net);
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
thread_count--;
|
||||
thread_running--;
|
||||
delete new_thd;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
}
|
||||
DBUG_RETURN(ret);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Stops the scheduler (again). Waits for acknowledgement from the
|
||||
scheduler that it has stopped - synchronous stopping.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::stop()
|
||||
|
||||
RETURN VALUE
|
||||
FALSE OK
|
||||
TRUE Error (not reported)
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler_ng::stop()
|
||||
{
|
||||
THD *thd= current_thd;
|
||||
DBUG_ENTER("Event_scheduler_ng::stop");
|
||||
DBUG_PRINT("enter", ("thd=0x%lx", current_thd));
|
||||
|
||||
LOCK_SCHEDULER_DATA();
|
||||
DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
|
||||
if (state != RUNNING)
|
||||
goto end;
|
||||
|
||||
state= STOPPING;
|
||||
|
||||
DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
|
||||
sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
|
||||
|
||||
pthread_cond_signal(&COND_state);
|
||||
|
||||
/* Guarantee we don't catch spurious signals */
|
||||
sql_print_information("SCHEDULER: Waiting the manager thread to reply");
|
||||
do {
|
||||
DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
|
||||
"thread. Current value of state is %s . "
|
||||
"workers count=%d", scheduler_states_names[state].str,
|
||||
workers_count()));
|
||||
/* thd could be 0x0, when shutting down */
|
||||
COND_STATE_WAIT(NULL);
|
||||
} while (state == STOPPING);
|
||||
DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
|
||||
|
||||
thread_id= 0;
|
||||
end:
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
DBUG_RETURN(FALSE);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
The main loop of the scheduler.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::run()
|
||||
thd Thread
|
||||
|
||||
RETURN VALUE
|
||||
FALSE OK
|
||||
TRUE Error (Serious error)
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler_ng::run(THD *thd)
|
||||
{
|
||||
int res;
|
||||
struct timespec abstime;
|
||||
Event_job_data *job_data;
|
||||
DBUG_ENTER("Event_scheduler_ng::run");
|
||||
|
||||
LOCK_SCHEDULER_DATA();
|
||||
|
||||
thread_id= thd->thread_id;
|
||||
sql_print_information("SCHEDULER: Manager thread started with id %lu",
|
||||
thread_id);
|
||||
/*
|
||||
Recalculate the values in the queue because there could have been stops
|
||||
in executions of the scheduler and some times could have passed by.
|
||||
*/
|
||||
queue->recalculate_activation_times(thd);
|
||||
while (state == RUNNING)
|
||||
{
|
||||
thd->end_time();
|
||||
/* Gets a minimized version */
|
||||
if (queue->get_top_for_execution_if_time(thd, thd->query_start(),
|
||||
&job_data, &abstime))
|
||||
{
|
||||
sql_print_information("SCHEDULER: Serious error during getting next"
|
||||
" event to execute. Stopping.");
|
||||
break;
|
||||
}
|
||||
|
||||
DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d "
|
||||
"abs_time.tv_sec=%d",
|
||||
job_data, thd->query_start(), abstime.tv_sec));
|
||||
if (!job_data && !abstime.tv_sec)
|
||||
{
|
||||
DBUG_PRINT("info", ("The queue is empty. Going to sleep"));
|
||||
thd->enter_cond(&COND_state, &LOCK_scheduler_state,
|
||||
"Waiting on empty queue");
|
||||
COND_STATE_WAIT(NULL);
|
||||
thd->exit_cond("");
|
||||
DBUG_PRINT("info", ("Woke up. Got COND_state"));
|
||||
LOCK_SCHEDULER_DATA();
|
||||
}
|
||||
else if (abstime.tv_sec)
|
||||
{
|
||||
DBUG_PRINT("info", ("Have to sleep some time %u till",
|
||||
abstime.tv_sec - thd->query_start(), abstime.tv_sec));
|
||||
|
||||
thd->enter_cond(&COND_state, &LOCK_scheduler_state,
|
||||
"Waiting for next activation");
|
||||
COND_STATE_WAIT(&abstime);
|
||||
/*
|
||||
If we get signal we should recalculate the whether it's the right time
|
||||
because there could be :
|
||||
1. Spurious wake-up
|
||||
2. The top of the queue was changed (new one becase of create/update)
|
||||
*/
|
||||
/* This will do implicit UNLOCK_SCHEDULER_DATA() */
|
||||
thd->exit_cond("");
|
||||
DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution."));
|
||||
LOCK_SCHEDULER_DATA();
|
||||
}
|
||||
else
|
||||
{
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
res= execute_top(thd, job_data);
|
||||
LOCK_SCHEDULER_DATA();
|
||||
if (res)
|
||||
break;
|
||||
++started_events;
|
||||
}
|
||||
DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
|
||||
}
|
||||
DBUG_PRINT("info", ("Signalling back to the stopper COND_state"));
|
||||
pthread_cond_signal(&COND_state);
|
||||
error:
|
||||
state= INITIALIZED;
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
sql_print_information("SCHEDULER: Stopped");
|
||||
|
||||
DBUG_RETURN(res);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Creates a new THD instance and then forks a new thread, while passing
|
||||
the THD pointer and job_data to it.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::execute_top()
|
||||
|
||||
RETURN VALUE
|
||||
FALSE OK
|
||||
TRUE Error (Serious error)
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler_ng::execute_top(THD *thd, Event_job_data *job_data)
|
||||
{
|
||||
THD *new_thd;
|
||||
pthread_t th;
|
||||
int res= 0;
|
||||
DBUG_ENTER("Event_scheduler_ng::execute_top");
|
||||
if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
|
||||
goto error;
|
||||
|
||||
new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
|
||||
job_data->thd= new_thd;
|
||||
DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition",
|
||||
job_data->dbname.str, job_data->name.str));
|
||||
|
||||
/* Major failure */
|
||||
if ((res= pthread_create(&th, &connection_attrib, event_worker_ng_thread,
|
||||
job_data)))
|
||||
goto error;
|
||||
|
||||
DBUG_PRINT("info", ("Launch succeeded. BURAN is in THD=0x%lx", new_thd));
|
||||
DBUG_RETURN(FALSE);
|
||||
|
||||
error:
|
||||
DBUG_PRINT("error", ("Baikonur, we have a problem! res=%d", res));
|
||||
if (new_thd)
|
||||
{
|
||||
new_thd->proc_info= "Clearing";
|
||||
DBUG_ASSERT(new_thd->net.buff != 0);
|
||||
net_end(&new_thd->net);
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
thread_count--;
|
||||
thread_running--;
|
||||
delete new_thd;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
}
|
||||
delete job_data;
|
||||
DBUG_RETURN(TRUE);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Returns the current state of the scheduler
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::get_state()
|
||||
|
||||
RETURN VALUE
|
||||
The state of the scheduler (INITIALIZED | RUNNING | STOPPING)
|
||||
*/
|
||||
|
||||
enum Event_scheduler_ng::enum_state
|
||||
Event_scheduler_ng::get_state()
|
||||
{
|
||||
enum Event_scheduler_ng::enum_state ret;
|
||||
LOCK_SCHEDULER_DATA();
|
||||
ret= state;
|
||||
UNLOCK_SCHEDULER_DATA();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Returns the number of living event worker threads.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::workers_count()
|
||||
*/
|
||||
|
||||
uint
|
||||
Event_scheduler_ng::workers_count()
|
||||
{
|
||||
THD *tmp;
|
||||
uint count= 0;
|
||||
|
||||
DBUG_ENTER("Event_scheduler_ng::workers_count");
|
||||
pthread_mutex_lock(&LOCK_thread_count); // For unlink from list
|
||||
I_List_iterator<THD> it(threads);
|
||||
while ((tmp=it++))
|
||||
{
|
||||
if (tmp->command == COM_DAEMON)
|
||||
continue;
|
||||
if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
|
||||
++count;
|
||||
}
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
DBUG_PRINT("exit", ("%d", count));
|
||||
DBUG_RETURN(count);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Signals the main scheduler thread that the queue has changed
|
||||
its state.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::queue_changed()
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler_ng::queue_changed()
|
||||
{
|
||||
DBUG_ENTER("Event_scheduler_ng::queue_changed");
|
||||
DBUG_PRINT("info", ("Sending COND_state. state (read wo lock)=%s ",
|
||||
scheduler_states_names[state].str));
|
||||
pthread_cond_signal(&COND_state);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Auxiliary function for locking LOCK_scheduler_state. Used
|
||||
by the LOCK_SCHEDULER_DATA macro.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::lock_data()
|
||||
func Which function is requesting mutex lock
|
||||
line On which line mutex lock is requested
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler_ng::lock_data(const char *func, uint line)
|
||||
{
|
||||
DBUG_ENTER("Event_scheduler_ng::lock_data");
|
||||
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
|
||||
pthread_mutex_lock(&LOCK_scheduler_state);
|
||||
mutex_last_locked_in_func= func;
|
||||
mutex_last_locked_at_line= line;
|
||||
mutex_scheduler_data_locked= TRUE;
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Auxiliary function for unlocking LOCK_scheduler_state. Used
|
||||
by the UNLOCK_SCHEDULER_DATA macro.
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::unlock_data()
|
||||
func Which function is requesting mutex unlock
|
||||
line On which line mutex unlock is requested
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler_ng::unlock_data(const char *func, uint line)
|
||||
{
|
||||
DBUG_ENTER("Event_scheduler_ng::unlock_data");
|
||||
DBUG_PRINT("enter", ("func=%s line=%u", func, line));
|
||||
mutex_last_unlocked_at_line= line;
|
||||
mutex_scheduler_data_locked= FALSE;
|
||||
mutex_last_unlocked_in_func= func;
|
||||
pthread_mutex_unlock(&LOCK_scheduler_state);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Wrapper for pthread_cond_wait/timedwait
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::cond_wait()
|
||||
cond Conditional to wait for
|
||||
mutex Mutex of the conditional
|
||||
|
||||
RETURN VALUE
|
||||
Error code of pthread_cond_wait()
|
||||
*/
|
||||
|
||||
void
|
||||
Event_scheduler_ng::cond_wait(struct timespec *abstime,
|
||||
const char *func, uint line)
|
||||
{
|
||||
DBUG_ENTER("Event_scheduler_ng::cond_wait");
|
||||
waiting_on_cond= TRUE;
|
||||
mutex_last_unlocked_at_line= line;
|
||||
mutex_scheduler_data_locked= FALSE;
|
||||
mutex_last_unlocked_in_func= func;
|
||||
|
||||
if (abstime)
|
||||
pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
|
||||
else
|
||||
pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
|
||||
|
||||
mutex_last_locked_in_func= func;
|
||||
mutex_last_locked_at_line= line;
|
||||
mutex_scheduler_data_locked= TRUE;
|
||||
waiting_on_cond= FALSE;
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Dumps the internal status of the scheduler
|
||||
|
||||
SYNOPSIS
|
||||
Event_scheduler_ng::dump_internal_status()
|
||||
thd Thread
|
||||
|
||||
RETURN VALUE
|
||||
FALSE OK
|
||||
TRUE Error
|
||||
*/
|
||||
|
||||
bool
|
||||
Event_scheduler_ng::dump_internal_status(THD *thd)
|
||||
{
|
||||
int ret= 0;
|
||||
DBUG_ENTER("Event_scheduler_ng::dump_internal_status");
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
CHARSET_INFO *scs= system_charset_info;
|
||||
Protocol *protocol= thd->protocol;
|
||||
char tmp_buff[5*STRING_BUFFER_USUAL_SIZE];
|
||||
char int_buff[STRING_BUFFER_USUAL_SIZE];
|
||||
String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
|
||||
String int_string(int_buff, sizeof(int_buff), scs);
|
||||
tmp_string.length(0);
|
||||
int_string.length(0);
|
||||
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler state"), scs);
|
||||
protocol->store(scheduler_states_names[state].str,
|
||||
scheduler_states_names[state].length, scs);
|
||||
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* thread_id */
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("thread_id"), scs);
|
||||
if (thread_id)
|
||||
{
|
||||
int_string.set((longlong) thread_id, scs);
|
||||
protocol->store(&int_string);
|
||||
}
|
||||
else
|
||||
protocol->store_null();
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* last locked at*/
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs);
|
||||
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
|
||||
tmp_string.alloced_length(), "%s::%d",
|
||||
mutex_last_locked_in_func,
|
||||
mutex_last_locked_at_line));
|
||||
protocol->store(&tmp_string);
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* last unlocked at*/
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs);
|
||||
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
|
||||
tmp_string.alloced_length(), "%s::%d",
|
||||
mutex_last_unlocked_in_func,
|
||||
mutex_last_unlocked_at_line));
|
||||
protocol->store(&tmp_string);
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* waiting on */
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs);
|
||||
int_string.set((longlong) waiting_on_cond, scs);
|
||||
protocol->store(&int_string);
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* workers_count */
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler workers count"), scs);
|
||||
int_string.set((longlong) workers_count(), scs);
|
||||
protocol->store(&int_string);
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* workers_count */
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler executed events"), scs);
|
||||
int_string.set((longlong) started_events, scs);
|
||||
protocol->store(&int_string);
|
||||
if ((ret= protocol->write()))
|
||||
goto end;
|
||||
|
||||
/* scheduler_data_locked */
|
||||
protocol->prepare_for_resend();
|
||||
protocol->store(STRING_WITH_LEN("scheduler data locked"), scs);
|
||||
int_string.set((longlong) mutex_scheduler_data_locked, scs);
|
||||
protocol->store(&int_string);
|
||||
ret= protocol->write();
|
||||
end:
|
||||
#endif
|
||||
|
||||
DBUG_RETURN(ret);
|
||||
}
|
|
@ -1,123 +0,0 @@
|
|||
#ifndef _EVENT_SCHEDULER_NG_H_
|
||||
#define _EVENT_SCHEDULER_NG_H_
|
||||
/* Copyright (C) 2004-2006 MySQL AB
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
class Event_queue;
|
||||
class Event_job_data;
|
||||
|
||||
class Event_scheduler_ng
|
||||
{
|
||||
public:
|
||||
Event_scheduler_ng(){}
|
||||
~Event_scheduler_ng(){}
|
||||
|
||||
enum enum_state
|
||||
{
|
||||
INITIALIZED = 0,
|
||||
RUNNING,
|
||||
STOPPING
|
||||
};
|
||||
|
||||
/* State changing methods follow */
|
||||
|
||||
bool
|
||||
start();
|
||||
|
||||
bool
|
||||
stop();
|
||||
|
||||
/*
|
||||
Need to be public because has to be called from the function
|
||||
passed to pthread_create.
|
||||
*/
|
||||
bool
|
||||
run(THD *thd);
|
||||
|
||||
bool
|
||||
init_scheduler(Event_queue *queue);
|
||||
|
||||
void
|
||||
deinit_scheduler();
|
||||
|
||||
void
|
||||
init_mutexes();
|
||||
|
||||
void
|
||||
deinit_mutexes();
|
||||
|
||||
/* Information retrieving methods follow */
|
||||
|
||||
enum enum_state
|
||||
get_state();
|
||||
|
||||
void
|
||||
queue_changed();
|
||||
|
||||
bool
|
||||
dump_internal_status(THD *thd);
|
||||
|
||||
private:
|
||||
uint
|
||||
workers_count();
|
||||
|
||||
/* helper functions */
|
||||
bool
|
||||
execute_top(THD *thd, Event_job_data *job_data);
|
||||
|
||||
/* helper functions for working with mutexes & conditionals */
|
||||
void
|
||||
lock_data(const char *func, uint line);
|
||||
|
||||
void
|
||||
unlock_data(const char *func, uint line);
|
||||
|
||||
void
|
||||
cond_wait(struct timespec *abstime, const char *func, uint line);
|
||||
|
||||
pthread_mutex_t LOCK_scheduler_state;
|
||||
|
||||
/* This is the current status of the life-cycle of the scheduler. */
|
||||
enum enum_state state;
|
||||
|
||||
/*
|
||||
Holds the thread id of the executor thread or 0 if the scheduler is not
|
||||
running. It is used by ::shutdown() to know which thread to kill with
|
||||
kill_one_thread(). The latter wake ups a thread if it is waiting on a
|
||||
conditional variable and sets thd->killed to non-zero.
|
||||
*/
|
||||
ulong thread_id;
|
||||
|
||||
pthread_cond_t COND_state;
|
||||
|
||||
Event_queue *queue;
|
||||
|
||||
uint mutex_last_locked_at_line;
|
||||
uint mutex_last_unlocked_at_line;
|
||||
const char* mutex_last_locked_in_func;
|
||||
const char* mutex_last_unlocked_in_func;
|
||||
bool mutex_scheduler_data_locked;
|
||||
bool waiting_on_cond;
|
||||
|
||||
ulonglong started_events;
|
||||
|
||||
private:
|
||||
/* Prevent use of these */
|
||||
Event_scheduler_ng(const Event_scheduler_ng &);
|
||||
void operator=(Event_scheduler_ng &);
|
||||
};
|
||||
|
||||
#endif /* _EVENT_SCHEDULER_NG_H_ */
|
|
@ -19,7 +19,7 @@
|
|||
#include "event_data_objects.h"
|
||||
#include "event_db_repository.h"
|
||||
#include "event_queue.h"
|
||||
#include "event_scheduler_ng.h"
|
||||
#include "event_scheduler.h"
|
||||
#include "sp_head.h"
|
||||
|
||||
/*
|
||||
|
@ -560,15 +560,15 @@ int
|
|||
Events::init()
|
||||
{
|
||||
DBUG_ENTER("Events::init");
|
||||
event_queue->init_queue(db_repository, scheduler_ng);
|
||||
scheduler_ng->init_scheduler(event_queue);
|
||||
event_queue->init_queue(db_repository, scheduler);
|
||||
scheduler->init_scheduler(event_queue);
|
||||
|
||||
/* it should be an assignment! */
|
||||
if (opt_event_scheduler)
|
||||
{
|
||||
DBUG_ASSERT(opt_event_scheduler == 1 || opt_event_scheduler == 2);
|
||||
if (opt_event_scheduler == 1)
|
||||
DBUG_RETURN(scheduler_ng->start());
|
||||
DBUG_RETURN(scheduler->start());
|
||||
}
|
||||
|
||||
DBUG_RETURN(0);
|
||||
|
@ -590,8 +590,8 @@ Events::deinit()
|
|||
{
|
||||
DBUG_ENTER("Events::deinit");
|
||||
|
||||
scheduler_ng->stop();
|
||||
scheduler_ng->deinit_scheduler();
|
||||
scheduler->stop();
|
||||
scheduler->deinit_scheduler();
|
||||
|
||||
event_queue->deinit_queue();
|
||||
|
||||
|
@ -617,8 +617,8 @@ Events::init_mutexes()
|
|||
event_queue= new Event_queue;
|
||||
event_queue->init_mutexes();
|
||||
|
||||
scheduler_ng= new Event_scheduler_ng();
|
||||
scheduler_ng->init_mutexes();
|
||||
scheduler= new Event_scheduler();
|
||||
scheduler->init_mutexes();
|
||||
}
|
||||
|
||||
|
||||
|
@ -633,9 +633,9 @@ void
|
|||
Events::destroy_mutexes()
|
||||
{
|
||||
event_queue->deinit_mutexes();
|
||||
scheduler_ng->deinit_mutexes();
|
||||
scheduler->deinit_mutexes();
|
||||
|
||||
delete scheduler_ng;
|
||||
delete scheduler;
|
||||
delete db_repository;
|
||||
|
||||
pthread_mutex_destroy(&LOCK_event_metadata);
|
||||
|
@ -670,7 +670,7 @@ Events::dump_internal_status(THD *thd)
|
|||
Protocol::SEND_EOF))
|
||||
DBUG_RETURN(TRUE);
|
||||
|
||||
if (scheduler_ng->dump_internal_status(thd) ||
|
||||
if (scheduler->dump_internal_status(thd) ||
|
||||
event_queue->dump_internal_status(thd))
|
||||
DBUG_RETURN(TRUE);
|
||||
|
||||
|
@ -694,7 +694,7 @@ bool
|
|||
Events::start_execution_of_events()
|
||||
{
|
||||
DBUG_ENTER("Events::start_execution_of_events");
|
||||
DBUG_RETURN(scheduler_ng->start());
|
||||
DBUG_RETURN(scheduler->start());
|
||||
}
|
||||
|
||||
|
||||
|
@ -715,7 +715,7 @@ bool
|
|||
Events::stop_execution_of_events()
|
||||
{
|
||||
DBUG_ENTER("Events::stop_execution_of_events");
|
||||
DBUG_RETURN(scheduler_ng->stop());
|
||||
DBUG_RETURN(scheduler->stop());
|
||||
}
|
||||
|
||||
|
||||
|
@ -734,5 +734,5 @@ bool
|
|||
Events::is_started()
|
||||
{
|
||||
DBUG_ENTER("Events::is_started");
|
||||
DBUG_RETURN(scheduler_ng->get_state() == Event_scheduler_ng::RUNNING);
|
||||
DBUG_RETURN(scheduler->get_state() == Event_scheduler::RUNNING);
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ class Event_parse_data;
|
|||
class Event_db_repository;
|
||||
class Event_queue;
|
||||
class Event_queue_element;
|
||||
class Event_scheduler_ng;
|
||||
class Event_scheduler;
|
||||
|
||||
/* Return codes */
|
||||
enum enum_events_error_code
|
||||
|
@ -117,7 +117,7 @@ private:
|
|||
static Events singleton;
|
||||
|
||||
Event_queue *event_queue;
|
||||
Event_scheduler_ng *scheduler_ng;
|
||||
Event_scheduler *scheduler;
|
||||
Event_db_repository *db_repository;
|
||||
|
||||
pthread_mutex_t LOCK_event_metadata;
|
||||
|
|
Loading…
Add table
Reference in a new issue