mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-31 10:56:12 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			839 lines
		
	
	
	
		
			23 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			839 lines
		
	
	
	
		
			23 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (c) 2006, 2019, Oracle and/or its affiliates.
 | |
|    Copyright (c) 2009, 2020, MariaDB Corporation
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or modify
 | |
|    it under the terms of the GNU General Public License as published by
 | |
|    the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA */
 | |
| 
 | |
| #include "mariadb.h"
 | |
| #include "sql_priv.h"
 | |
| #include "unireg.h"
 | |
| #include "event_scheduler.h"
 | |
| #include "events.h"
 | |
| #include "event_data_objects.h"
 | |
| #include "event_queue.h"
 | |
| #include "event_db_repository.h"
 | |
| #include "sql_connect.h"         // init_new_connection_handler_thread
 | |
| #include "sql_class.h"
 | |
| 
 | |
| /**
 | |
|   @addtogroup Event_Scheduler
 | |
|   @{
 | |
| */
 | |
| 
 | |
| #ifdef __GNUC__
 | |
| #if __GNUC__ >= 2
 | |
| #define SCHED_FUNC __FUNCTION__
 | |
| #endif
 | |
| #else
 | |
| #define SCHED_FUNC "<unknown>"
 | |
| #endif
 | |
| 
 | |
| #define LOCK_DATA()       lock_data(SCHED_FUNC, __LINE__)
 | |
| #define UNLOCK_DATA()     unlock_data(SCHED_FUNC, __LINE__)
 | |
| #define COND_STATE_WAIT(mythd, abstime, stage) \
 | |
|         cond_wait(mythd, abstime, stage, SCHED_FUNC, __FILE__, __LINE__)
 | |
| 
 | |
| extern pthread_attr_t connection_attrib;
 | |
| extern ulong event_executed;
 | |
| 
 | |
| Event_db_repository *Event_worker_thread::db_repository;
 | |
| 
 | |
| 
 | |
| static
 | |
| const LEX_CSTRING scheduler_states_names[] =
 | |
| {
 | |
|   { STRING_WITH_LEN("INITIALIZED") },
 | |
|   { STRING_WITH_LEN("RUNNING") },
 | |
|   { STRING_WITH_LEN("STOPPING") }
 | |
| };
 | |
| 
 | |
| struct scheduler_param {
 | |
|   THD *thd;
 | |
|   Event_scheduler *scheduler;
 | |
| };
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Prints the stack of infos, warnings, errors from thd to
 | |
|   the console so it can be fetched by the logs-into-tables and
 | |
|   checked later.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     evex_print_warnings
 | |
|       thd  Thread used during the execution of the event
 | |
|       et   The event itself
 | |
| */
 | |
| 
 | |
| void
 | |
| Event_worker_thread::print_warnings(THD *thd, Event_job_data *et)
 | |
| {
 | |
|   const Sql_condition *err;
 | |
|   DBUG_ENTER("evex_print_warnings");
 | |
|   if (thd->get_stmt_da()->is_warning_info_empty())
 | |
|     DBUG_VOID_RETURN;
 | |
| 
 | |
|   char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
 | |
|   char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
 | |
|   String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
 | |
|   prefix.length(0);
 | |
|   prefix.append(STRING_WITH_LEN("Event Scheduler: ["));
 | |
| 
 | |
|   prefix.append(et->definer.str, et->definer.length, system_charset_info);
 | |
|   prefix.append(STRING_WITH_LEN("]["));
 | |
|   prefix.append(et->dbname.str, et->dbname.length, system_charset_info);
 | |
|   prefix.append('.');
 | |
|   prefix.append(et->name.str, et->name.length, system_charset_info);
 | |
|   prefix.append(STRING_WITH_LEN("] "));
 | |
| 
 | |
|   Diagnostics_area::Sql_condition_iterator it=
 | |
|     thd->get_stmt_da()->sql_conditions();
 | |
|   while ((err= it++))
 | |
|   {
 | |
|     String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
 | |
|     /* set it to 0 or we start adding at the end. That's the trick ;) */
 | |
|     err_msg.length(0);
 | |
|     err_msg.append(prefix);
 | |
|     err_msg.append(err->get_message_text(),
 | |
|                    err->get_message_octet_length(), system_charset_info);
 | |
|     DBUG_ASSERT(err->get_level() < 3);
 | |
|     (sql_print_message_handlers[err->get_level()])("%*s", err_msg.length(),
 | |
|                                                    err_msg.c_ptr_safe());
 | |
|   }
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Performs post initialization of structures in a new thread.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     post_init_event_thread()
 | |
|       thd  Thread
 | |
| 
 | |
|   NOTES
 | |
|       Before this is called, one should not do any DBUG_XXX() calls.
 | |
| 
 | |
| */
 | |
| 
 | |
| bool
 | |
| post_init_event_thread(THD *thd)
 | |
| {
 | |
|   (void) init_new_connection_handler_thread();
 | |
|   if (init_thr_lock())
 | |
|   {
 | |
|     thd->cleanup();
 | |
|     return TRUE;
 | |
|   }
 | |
|   thd->store_globals();
 | |
|   return FALSE;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Cleans up the THD and the threaded environment of the thread.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     deinit_event_thread()
 | |
|       thd  Thread
 | |
| */
 | |
| 
 | |
| void
 | |
| deinit_event_thread(THD *thd)
 | |
| {
 | |
|   thd->proc_info= "Clearing";
 | |
|   DBUG_PRINT("exit", ("Event thread finishing"));
 | |
|   server_threads.erase(thd);
 | |
|   delete thd;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Performs pre- mysql_thread_create() initialisation of THD. Do this
 | |
|   in the thread that will pass THD to the child thread. In the
 | |
|   child thread call post_init_event_thread().
 | |
| 
 | |
|   SYNOPSIS
 | |
|     pre_init_event_thread()
 | |
|       thd  The THD of the thread. Has to be allocated by the caller.
 | |
| 
 | |
|   NOTES
 | |
|     1. The host of the thread is my_localhost
 | |
|     2. thd->net is initted with NULL - no communication.
 | |
| */
 | |
| 
 | |
| void
 | |
| pre_init_event_thread(THD* thd)
 | |
| {
 | |
|   THD *orig_thd= current_thd;
 | |
|   DBUG_ENTER("pre_init_event_thread");
 | |
| 
 | |
|   set_current_thd(thd);
 | |
|   thd->client_capabilities= 0;
 | |
|   thd->security_ctx->master_access= NO_ACL;
 | |
|   thd->security_ctx->db_access= NO_ACL;
 | |
|   thd->security_ctx->host_or_ip= (char*)my_localhost;
 | |
|   my_net_init(&thd->net, NULL, thd, MYF(MY_THREAD_SPECIFIC));
 | |
|   thd->security_ctx->set_user((char*)"event_scheduler");
 | |
|   thd->net.read_timeout= slave_net_timeout;
 | |
|   thd->variables.option_bits|= OPTION_AUTO_IS_NULL;
 | |
|   thd->client_capabilities|= CLIENT_MULTI_RESULTS;
 | |
|   server_threads.insert(thd);
 | |
| 
 | |
|   /*
 | |
|     Guarantees that we will see the thread in SHOW PROCESSLIST though its
 | |
|     vio is NULL.
 | |
|   */
 | |
| 
 | |
|   thd->proc_info= "Initialized";
 | |
|   thd->set_time();
 | |
| 
 | |
|   /* Do not use user-supplied timeout value for system threads. */
 | |
|   thd->variables.lock_wait_timeout= LONG_TIMEOUT;
 | |
| 
 | |
|   set_current_thd(orig_thd);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Function that executes the scheduler,
 | |
| 
 | |
|   SYNOPSIS
 | |
|     event_scheduler_thread()
 | |
|       arg  Pointer to `struct scheduler_param`
 | |
| 
 | |
|   RETURN VALUE
 | |
|     0  OK
 | |
| */
 | |
| 
 | |
| pthread_handler_t
 | |
| event_scheduler_thread(void *arg)
 | |
| {
 | |
|   THD *thd= (THD *) ((struct scheduler_param *) arg)->thd;
 | |
|   Event_scheduler *scheduler= ((struct scheduler_param *) arg)->scheduler;
 | |
|   bool res;
 | |
|   thd->reset_stack();
 | |
| 
 | |
|   mysql_thread_set_psi_id(thd->thread_id);
 | |
|   my_thread_set_name("event_scheduler");
 | |
| 
 | |
|   res= post_init_event_thread(thd);
 | |
| 
 | |
|   DBUG_ENTER("event_scheduler_thread");
 | |
|   my_free(arg);
 | |
|   if (!res)
 | |
|     scheduler->run(thd);
 | |
| 
 | |
|   deinit_event_thread(thd);
 | |
|   DBUG_LEAVE;                               // Against gcc warnings
 | |
|   my_thread_end();
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Function that executes an event in a child thread. Setups the
 | |
|   environment for the event execution and cleans after that.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     event_worker_thread()
 | |
|       arg  The Event_job_data object to be processed
 | |
| 
 | |
|   RETURN VALUE
 | |
|     0  OK
 | |
| */
 | |
| 
 | |
| pthread_handler_t
 | |
| event_worker_thread(void *arg)
 | |
| {
 | |
|   THD *thd;
 | |
|   Event_queue_element_for_exec *event= (Event_queue_element_for_exec *)arg;
 | |
| 
 | |
|   thd= event->thd;
 | |
| 
 | |
|   mysql_thread_set_psi_id(thd->thread_id);
 | |
|   my_thread_set_name("event_worker");
 | |
| 
 | |
|   Event_worker_thread worker_thread;
 | |
|   worker_thread.run(thd, event);
 | |
| 
 | |
|   my_thread_end();
 | |
|   return 0;                                     // Can't return anything here
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Function that executes an event in a child thread. Setups the
 | |
|   environment for the event execution and cleans after that.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     Event_worker_thread::run()
 | |
|       thd    Thread context
 | |
|       event  The Event_queue_element_for_exec object to be processed
 | |
| */
 | |
| 
 | |
| void
 | |
| Event_worker_thread::run(THD *thd, Event_queue_element_for_exec *event)
 | |
| {
 | |
|   Event_job_data job_data;
 | |
|   bool res;
 | |
| 
 | |
|   DBUG_ASSERT(thd->m_digest == NULL);
 | |
|   DBUG_ASSERT(thd->m_statement_psi == NULL);
 | |
| 
 | |
| #ifdef HAVE_PSI_STATEMENT_INTERFACE
 | |
|   PSI_statement_locker_state state;
 | |
|   thd->m_statement_psi= MYSQL_START_STATEMENT(& state,
 | |
|                                               event->get_psi_info()->m_key,
 | |
|                                               event->dbname.str,
 | |
|                                               event->dbname.length,
 | |
|                                               thd->charset(), NULL);
 | |
| #endif
 | |
| 
 | |
|   res= post_init_event_thread(thd);
 | |
| 
 | |
|   DBUG_ENTER("Event_worker_thread::run");
 | |
|   DBUG_PRINT("info", ("Time is %u, THD: %p", (uint)my_time(0), thd));
 | |
| 
 | |
|   if (res)
 | |
|     goto end;
 | |
| 
 | |
|   if ((res= db_repository->load_named_event(thd, &event->dbname, &event->name,
 | |
|                                             &job_data)))
 | |
|   {
 | |
|     DBUG_PRINT("error", ("Got error from load_named_event"));
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   thd->enable_slow_log= TRUE;
 | |
| 
 | |
|   res= job_data.execute(thd, event->dropped);
 | |
| 
 | |
|   print_warnings(thd, &job_data);
 | |
| 
 | |
|   if (res)
 | |
|     sql_print_information("Event Scheduler: "
 | |
|                           "[%s].[%s.%s] event execution failed.",
 | |
|                           job_data.definer.str,
 | |
|                           job_data.dbname.str, job_data.name.str);
 | |
| end:
 | |
| #ifdef HAVE_PSI_STATEMENT_INTERFACE
 | |
|   MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
 | |
|   thd->m_statement_psi= NULL;
 | |
| #endif
 | |
|   DBUG_ASSERT(thd->m_digest == NULL);
 | |
|   DBUG_PRINT("info", ("Done with Event %s.%s", event->dbname.str,
 | |
|              event->name.str));
 | |
| 
 | |
|   delete event;
 | |
|   deinit_event_thread(thd);
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| Event_scheduler::Event_scheduler(Event_queue *queue_arg)
 | |
|   :state(INITIALIZED),
 | |
|   scheduler_thd(NULL),
 | |
|   queue(queue_arg),
 | |
|   mutex_last_locked_at_line(0),
 | |
|   mutex_last_unlocked_at_line(0),
 | |
|   mutex_last_locked_in_func("n/a"),
 | |
|   mutex_last_unlocked_in_func("n/a"),
 | |
|   mutex_scheduler_data_locked(FALSE),
 | |
|   waiting_on_cond(FALSE),
 | |
|   started_events(0)
 | |
| {
 | |
|   mysql_mutex_init(key_event_scheduler_LOCK_scheduler_state,
 | |
|                    &LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
 | |
|   mysql_cond_init(key_event_scheduler_COND_state, &COND_state, NULL);
 | |
|   mysql_mutex_record_order(&LOCK_scheduler_state, &LOCK_global_system_variables);
 | |
| }
 | |
| 
 | |
| 
 | |
| Event_scheduler::~Event_scheduler()
 | |
| {
 | |
|   stop();                                    /* does nothing if not running */
 | |
|   mysql_mutex_destroy(&LOCK_scheduler_state);
 | |
|   mysql_cond_destroy(&COND_state);
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Starts the scheduler (again). Creates a new THD and passes it to
 | |
|   a forked thread. Does not wait for acknowledgement from the new
 | |
|   thread that it has started. Asynchronous starting. Most of the
 | |
|   needed initializations are done in the current thread to minimize
 | |
|   the chance of failure in the spawned thread.
 | |
| 
 | |
|   @param[out] err_no - errno indicating type of error which caused
 | |
|                        failure to start scheduler thread.
 | |
| 
 | |
|   @return
 | |
|     @retval false Success.
 | |
|     @retval true  Error.
 | |
| */
 | |
| 
 | |
| bool
 | |
| Event_scheduler::start(int *err_no)
 | |
| {
 | |
|   THD *new_thd= NULL;
 | |
|   bool ret= false;
 | |
|   pthread_t th;
 | |
|   struct scheduler_param *scheduler_param_value;
 | |
|   DBUG_ENTER("Event_scheduler::start");
 | |
| 
 | |
|   LOCK_DATA();
 | |
|   DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state].str));
 | |
|   if (state > INITIALIZED)
 | |
|     goto end;
 | |
| 
 | |
|   if (!(new_thd= new THD(next_thread_id())))
 | |
|   {
 | |
|     sql_print_error("Event Scheduler: Cannot initialize the scheduler thread");
 | |
|     ret= true;
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   pre_init_event_thread(new_thd);
 | |
|   new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
 | |
|   new_thd->set_command(COM_DAEMON);
 | |
| 
 | |
|   /*
 | |
|     We should run the event scheduler thread under the super-user privileges.
 | |
|     In particular, this is needed to be able to lock the mysql.event table
 | |
|     for writing when the server is running in the read-only mode.
 | |
| 
 | |
|     Same goes for transaction access mode. Set it to read-write for this thd.
 | |
|   */
 | |
|   new_thd->security_ctx->master_access |= PRIV_IGNORE_READ_ONLY;
 | |
|   new_thd->variables.tx_read_only= false;
 | |
|   new_thd->tx_read_only= false;
 | |
| 
 | |
|   /* This should not be marked with MY_THREAD_SPECIFIC */
 | |
|   scheduler_param_value=
 | |
|     (struct scheduler_param *)my_malloc(key_memory_Event_scheduler_scheduler_param,
 | |
|                                         sizeof(struct scheduler_param), MYF(0));
 | |
|   scheduler_param_value->thd= new_thd;
 | |
|   scheduler_param_value->scheduler= this;
 | |
| 
 | |
|   scheduler_thd= new_thd;
 | |
|   DBUG_PRINT("info", ("Setting state go RUNNING"));
 | |
|   state= RUNNING;
 | |
|   DBUG_PRINT("info", ("Forking new thread for scheduler. THD: %p", new_thd));
 | |
|   if ((*err_no= mysql_thread_create(key_thread_event_scheduler,
 | |
|                                     &th, &connection_attrib,
 | |
|                                     event_scheduler_thread,
 | |
|                                     (void*)scheduler_param_value)))
 | |
|   {
 | |
|     DBUG_PRINT("error", ("cannot create a new thread"));
 | |
|     sql_print_error("Event scheduler: Failed to start scheduler,"
 | |
|                     " Can not create thread for event scheduler (errno=%d)",
 | |
|                     *err_no);
 | |
| 
 | |
|     state= INITIALIZED;
 | |
|     scheduler_thd= NULL;
 | |
|     deinit_event_thread(new_thd);
 | |
| 
 | |
|     my_free(scheduler_param_value);
 | |
|     ret= true;
 | |
|   }
 | |
| 
 | |
| end:
 | |
|   UNLOCK_DATA();
 | |
|   DBUG_RETURN(ret);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   The main loop of the scheduler.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     Event_scheduler::run()
 | |
|       thd  Thread
 | |
| 
 | |
|   RETURN VALUE
 | |
|     FALSE  OK
 | |
|     TRUE   Error (Serious error)
 | |
| */
 | |
| 
 | |
| bool
 | |
| Event_scheduler::run(THD *thd)
 | |
| {
 | |
|   int res= FALSE;
 | |
|   DBUG_ENTER("Event_scheduler::run");
 | |
| 
 | |
|   sql_print_information("Event Scheduler: scheduler thread started with id %lu",
 | |
|                         (ulong) thd->thread_id);
 | |
|   /*
 | |
|     Recalculate the values in the queue because there could have been stops
 | |
|     in executions of the scheduler and some times could have passed by.
 | |
|   */
 | |
|   queue->recalculate_activation_times(thd);
 | |
| 
 | |
|   while (is_running())
 | |
|   {
 | |
|     Event_queue_element_for_exec *event_name;
 | |
| 
 | |
|     /* Gets a minimized version */
 | |
|     if (queue->get_top_for_execution_if_time(thd, &event_name))
 | |
|     {
 | |
|       sql_print_information("Event Scheduler: "
 | |
|                             "Serious error during getting next "
 | |
|                             "event to execute. Stopping");
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     DBUG_PRINT("info", ("get_top_for_execution_if_time returned "
 | |
|                         "event_name=%p", event_name));
 | |
|     if (event_name)
 | |
|     {
 | |
|       if ((res= execute_top(event_name)))
 | |
|         break;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       DBUG_ASSERT(thd->killed);
 | |
|       DBUG_PRINT("info", ("job_data is NULL, the thread was killed"));
 | |
|     }
 | |
|     DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
 | |
|     free_root(thd->mem_root, MYF(0));
 | |
|     /* Ensure we don't have any open tables or table locks */
 | |
|     DBUG_ASSERT(thd->lock == 0);
 | |
|   }
 | |
| 
 | |
|   LOCK_DATA();
 | |
|   scheduler_thd= NULL;
 | |
|   state= INITIALIZED;
 | |
|   DBUG_PRINT("info", ("Broadcasting COND_state back to the stoppers"));
 | |
|   mysql_cond_broadcast(&COND_state);
 | |
|   UNLOCK_DATA();
 | |
| 
 | |
|   DBUG_RETURN(res);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Creates a new THD instance and then forks a new thread, while passing
 | |
|   the THD pointer and job_data to it.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     Event_scheduler::execute_top()
 | |
| 
 | |
|   RETURN VALUE
 | |
|     FALSE  OK
 | |
|     TRUE   Error (Serious error)
 | |
| */
 | |
| 
 | |
| bool
 | |
| Event_scheduler::execute_top(Event_queue_element_for_exec *event_name)
 | |
| {
 | |
|   THD *new_thd;
 | |
|   pthread_t th;
 | |
|   int res= 0;
 | |
|   DBUG_ENTER("Event_scheduler::execute_top");
 | |
| 
 | |
|   if (!(new_thd= new THD(next_thread_id())))
 | |
|     goto error;
 | |
| 
 | |
|   pre_init_event_thread(new_thd);
 | |
|   new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
 | |
|   event_name->thd= new_thd;
 | |
|   DBUG_PRINT("info", ("Event %s@%s ready for start",
 | |
|              event_name->dbname.str, event_name->name.str));
 | |
| 
 | |
|   /*
 | |
|     TODO: should use thread pool here, preferably with an upper limit
 | |
|     on number of threads: if too many events are scheduled for the
 | |
|     same time, starting all of them at once won't help them run truly
 | |
|     in parallel (because of the great amount of synchronization), so
 | |
|     we may as well execute them in sequence, keeping concurrency at a
 | |
|     reasonable level.
 | |
|   */
 | |
|   /* Major failure */
 | |
|   if ((res= mysql_thread_create(key_thread_event_worker,
 | |
|                                 &th, &connection_attrib, event_worker_thread,
 | |
|                                 event_name)))
 | |
|   {
 | |
|     mysql_mutex_lock(&LOCK_global_system_variables);
 | |
|     Events::opt_event_scheduler= Events::EVENTS_OFF;
 | |
|     mysql_mutex_unlock(&LOCK_global_system_variables);
 | |
| 
 | |
|     sql_print_error("Event_scheduler::execute_top: Can not create event worker"
 | |
|                     " thread (errno=%d). Stopping event scheduler", res);
 | |
| 
 | |
|     deinit_event_thread(new_thd);
 | |
|     goto error;
 | |
|   }
 | |
| 
 | |
|   started_events++;
 | |
|   executed_events++;                            // For SHOW STATUS
 | |
| 
 | |
|   DBUG_PRINT("info", ("Event is in THD: %p", new_thd));
 | |
|   DBUG_RETURN(FALSE);
 | |
| 
 | |
| error:
 | |
|   DBUG_PRINT("error", ("Event_scheduler::execute_top() res: %d", res));
 | |
|   delete event_name;
 | |
|   DBUG_RETURN(TRUE);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Checks whether the state of the scheduler is RUNNING
 | |
| 
 | |
|   SYNOPSIS
 | |
|     Event_scheduler::is_running()
 | |
| 
 | |
|   RETURN VALUE
 | |
|     TRUE   RUNNING
 | |
|     FALSE  Not RUNNING
 | |
| */
 | |
| 
 | |
| bool
 | |
| Event_scheduler::is_running()
 | |
| {
 | |
|   LOCK_DATA();
 | |
|   bool ret= (state == RUNNING);
 | |
|   UNLOCK_DATA();
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Stops the scheduler (again). Waits for acknowledgement from the
 | |
|   scheduler that it has stopped - synchronous stopping.
 | |
| 
 | |
|   Already running events will not be stopped. If the user needs
 | |
|   them stopped manual intervention is needed.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     Event_scheduler::stop()
 | |
| 
 | |
|   RETURN VALUE
 | |
|     FALSE  OK
 | |
|     TRUE   Error (not reported)
 | |
| */
 | |
| 
 | |
| bool
 | |
| Event_scheduler::stop()
 | |
| {
 | |
|   THD *thd= current_thd;
 | |
|   DBUG_ENTER("Event_scheduler::stop");
 | |
|   DBUG_PRINT("enter", ("thd: %p", thd));
 | |
| 
 | |
|   LOCK_DATA();
 | |
|   DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state].str));
 | |
|   if (state != RUNNING)
 | |
|   {
 | |
|     /* Synchronously wait until the scheduler stops. */
 | |
|     while (state != INITIALIZED)
 | |
|       COND_STATE_WAIT(thd, NULL, &stage_waiting_for_scheduler_to_stop);
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   /* Guarantee we don't catch spurious signals */
 | |
|   do {
 | |
|     DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from "
 | |
|                         "the scheduler thread.  Current value of state is %s . "
 | |
|                         "workers count=%d", scheduler_states_names[state].str,
 | |
|                         workers_count()));
 | |
|     /*
 | |
|       NOTE: We don't use kill_one_thread() because it can't kill COM_DEAMON
 | |
|       threads. In addition, kill_one_thread() requires THD but during shutdown
 | |
|       current_thd is NULL. Hence, if kill_one_thread should be used it has to
 | |
|       be modified to kill also daemons, by adding a flag, and also we have to
 | |
|       create artificial THD here. To save all this work, we just do what
 | |
|       kill_one_thread() does to kill a thread. See also sql_repl.cc for similar
 | |
|       usage.
 | |
|     */
 | |
| 
 | |
|     state= STOPPING;
 | |
|     DBUG_PRINT("info", ("Scheduler thread has id %lu",
 | |
|                         (ulong) scheduler_thd->thread_id));
 | |
|     /* This will wake up the thread if it waits on Queue's conditional */
 | |
|     sql_print_information("Event Scheduler: Killing the scheduler thread, "
 | |
|                           "thread id %lu",
 | |
|                           (ulong) scheduler_thd->thread_id);
 | |
|     scheduler_thd->awake(KILL_CONNECTION);
 | |
| 
 | |
|     /* thd could be 0x0, when shutting down */
 | |
|     sql_print_information("Event Scheduler: "
 | |
|                           "Waiting for the scheduler thread to reply");
 | |
| 
 | |
|     /*
 | |
|       Wait only 2 seconds, as there is a small chance the thread missed the
 | |
|       above awake() call and we may have to do it again
 | |
|     */
 | |
|     struct timespec top_time;
 | |
|     set_timespec(top_time, 2);
 | |
|     COND_STATE_WAIT(thd, &top_time, &stage_waiting_for_scheduler_to_stop);
 | |
|   } while (state == STOPPING);
 | |
|   DBUG_PRINT("info", ("Scheduler thread has cleaned up. Set state to INIT"));
 | |
|   sql_print_information("Event Scheduler: Stopped");
 | |
| end:
 | |
|   UNLOCK_DATA();
 | |
|   DBUG_RETURN(FALSE);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Returns the number of living event worker threads.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     Event_scheduler::workers_count()
 | |
| */
 | |
| 
 | |
| static my_bool workers_count_callback(THD *thd, uint32_t *count)
 | |
| {
 | |
|   if (thd->system_thread == SYSTEM_THREAD_EVENT_WORKER)
 | |
|     ++*count;
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| 
 | |
| uint
 | |
| Event_scheduler::workers_count()
 | |
| {
 | |
|   uint32_t count= 0;
 | |
|   DBUG_ENTER("Event_scheduler::workers_count");
 | |
|   server_threads.iterate(workers_count_callback, &count);
 | |
|   DBUG_RETURN(count);
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Auxiliary function for locking LOCK_scheduler_state. Used
 | |
|   by the LOCK_DATA macro.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     Event_scheduler::lock_data()
 | |
|       func  Which function is requesting mutex lock
 | |
|       line  On which line mutex lock is requested
 | |
| */
 | |
| 
 | |
| void
 | |
| Event_scheduler::lock_data(const char *func, uint line)
 | |
| {
 | |
|   DBUG_ENTER("Event_scheduler::lock_data");
 | |
|   DBUG_PRINT("enter", ("func=%s line=%u", func, line));
 | |
|   mysql_mutex_lock(&LOCK_scheduler_state);
 | |
|   mutex_last_locked_in_func= func;
 | |
|   mutex_last_locked_at_line= line;
 | |
|   mutex_scheduler_data_locked= TRUE;
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Auxiliary function for unlocking LOCK_scheduler_state. Used
 | |
|   by the UNLOCK_DATA macro.
 | |
| 
 | |
|   SYNOPSIS
 | |
|     Event_scheduler::unlock_data()
 | |
|       func  Which function is requesting mutex unlock
 | |
|       line  On which line mutex unlock is requested
 | |
| */
 | |
| 
 | |
| void
 | |
| Event_scheduler::unlock_data(const char *func, uint line)
 | |
| {
 | |
|   DBUG_ENTER("Event_scheduler::unlock_data");
 | |
|   DBUG_PRINT("enter", ("func=%s line=%u", func, line));
 | |
|   mutex_last_unlocked_at_line= line;
 | |
|   mutex_scheduler_data_locked= FALSE;
 | |
|   mutex_last_unlocked_in_func= func;
 | |
|   mysql_mutex_unlock(&LOCK_scheduler_state);
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Wrapper for mysql_cond_wait/timedwait
 | |
| 
 | |
|   SYNOPSIS
 | |
|     Event_scheduler::cond_wait()
 | |
|       thd     Thread (Could be NULL during shutdown procedure)
 | |
|       abstime If not null then call mysql_cond_timedwait()
 | |
|       msg     Message for thd->proc_info
 | |
|       func    Which function is requesting cond_wait
 | |
|       line    On which line cond_wait is requested
 | |
| */
 | |
| 
 | |
| void
 | |
| Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const PSI_stage_info *stage,
 | |
|                            const char *src_func, const char *src_file, uint src_line)
 | |
| {
 | |
|   DBUG_ENTER("Event_scheduler::cond_wait");
 | |
|   waiting_on_cond= TRUE;
 | |
|   mutex_last_unlocked_at_line= src_line;
 | |
|   mutex_scheduler_data_locked= FALSE;
 | |
|   mutex_last_unlocked_in_func= src_func;
 | |
|   if (thd)
 | |
|     thd->enter_cond(&COND_state, &LOCK_scheduler_state, stage,
 | |
|                     NULL, src_func, src_file, src_line);
 | |
| 
 | |
|   DBUG_PRINT("info", ("mysql_cond_%swait", abstime? "timed":""));
 | |
|   if (!abstime)
 | |
|     mysql_cond_wait(&COND_state, &LOCK_scheduler_state);
 | |
|   else
 | |
|     mysql_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
 | |
|   if (thd)
 | |
|   {
 | |
|     /*
 | |
|       This will free the lock so we need to relock. Not the best thing to
 | |
|       do but we need to obey cond_wait()
 | |
|     */
 | |
|     thd->exit_cond(NULL, src_func, src_file, src_line);
 | |
|     LOCK_DATA();
 | |
|   }
 | |
|   mutex_last_locked_in_func= src_func;
 | |
|   mutex_last_locked_at_line= src_line;
 | |
|   mutex_scheduler_data_locked= TRUE;
 | |
|   waiting_on_cond= FALSE;
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
|   Dumps the internal status of the scheduler
 | |
| 
 | |
|   SYNOPSIS
 | |
|     Event_scheduler::dump_internal_status()
 | |
| */
 | |
| 
 | |
| void
 | |
| Event_scheduler::dump_internal_status()
 | |
| {
 | |
|   DBUG_ENTER("Event_scheduler::dump_internal_status");
 | |
| 
 | |
|   puts("");
 | |
|   puts("Event scheduler status:");
 | |
|   printf("State      : %s\n", scheduler_states_names[state].str);
 | |
|   printf("Thread id  : %lu\n", scheduler_thd ?
 | |
|          (ulong) scheduler_thd->thread_id : (ulong) 0);
 | |
|   printf("LLA        : %s:%u\n", mutex_last_locked_in_func,
 | |
|                                  mutex_last_locked_at_line);
 | |
|   printf("LUA        : %s:%u\n", mutex_last_unlocked_in_func,
 | |
|                                  mutex_last_unlocked_at_line);
 | |
|   printf("WOC        : %s\n", waiting_on_cond? "YES":"NO");
 | |
|   printf("Workers    : %u\n", workers_count());
 | |
|   printf("Executed   : %lu\n", (ulong) started_events);
 | |
|   printf("Data locked: %s\n", mutex_scheduler_data_locked ? "YES":"NO");
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| /**
 | |
|   @} (End of group Event_Scheduler)
 | |
| */
 | 
