mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-31 19:06:14 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			392 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			392 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or modify
 | |
|    it under the terms of the GNU General Public License as published by
 | |
|    the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
 | |
| 
 | |
| #include <my_global.h>
 | |
| #include "semisync_master.h"
 | |
| #include "semisync_master_ack_receiver.h"
 | |
| 
 | |
| #ifdef HAVE_PSI_MUTEX_INTERFACE
 | |
| extern PSI_mutex_key key_LOCK_ack_receiver;
 | |
| extern PSI_cond_key key_COND_ack_receiver;
 | |
| #endif
 | |
| #ifdef HAVE_PSI_THREAD_INTERFACE
 | |
| extern PSI_thread_key key_thread_ack_receiver;
 | |
| #endif
 | |
| 
 | |
| my_socket global_ack_signal_fd= -1;
 | |
| 
 | |
| /* Callback function of ack receive thread */
 | |
| pthread_handler_t ack_receive_handler(void *arg)
 | |
| {
 | |
|   Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg);
 | |
| 
 | |
|   my_thread_init();
 | |
|   my_thread_set_name("Ack_receiver");
 | |
|   recv->run();
 | |
|   my_thread_end();
 | |
| 
 | |
|   return NULL;
 | |
| }
 | |
| 
 | |
| Ack_receiver::Ack_receiver()
 | |
| {
 | |
|   DBUG_ENTER("Ack_receiver::Ack_receiver");
 | |
| 
 | |
|   m_status= ST_DOWN;
 | |
|   mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex, NULL);
 | |
|   mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL);
 | |
|   mysql_cond_init(key_COND_ack_receiver, &m_cond_reply, NULL);
 | |
|   m_pid= 0;
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| void Ack_receiver::cleanup()
 | |
| {
 | |
|   DBUG_ENTER("Ack_receiver::~Ack_receiver");
 | |
| 
 | |
|   stop();
 | |
|   mysql_mutex_destroy(&m_mutex);
 | |
|   mysql_cond_destroy(&m_cond);
 | |
|   mysql_cond_destroy(&m_cond_reply);
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| bool Ack_receiver::start()
 | |
| {
 | |
|   DBUG_ENTER("Ack_receiver::start");
 | |
| 
 | |
|   mysql_mutex_lock(&m_mutex);
 | |
|   if(m_status == ST_DOWN)
 | |
|   {
 | |
|     pthread_attr_t attr;
 | |
| 
 | |
|     m_status= ST_UP;
 | |
| 
 | |
|     if (DBUG_IF("rpl_semisync_simulate_create_thread_failure") ||
 | |
|         pthread_attr_init(&attr) != 0 ||
 | |
|         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 ||
 | |
| #ifndef _WIN32
 | |
|         pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
 | |
| #endif
 | |
|         mysql_thread_create(key_thread_ack_receiver, &m_pid,
 | |
|                             &attr, ack_receive_handler, this))
 | |
|     {
 | |
|       sql_print_error("Failed to start semi-sync ACK receiver thread, "
 | |
|                       " could not create thread(errno:%d)", errno);
 | |
| 
 | |
|       m_status= ST_DOWN;
 | |
|       mysql_mutex_unlock(&m_mutex);
 | |
| 
 | |
|       DBUG_RETURN(true);
 | |
|     }
 | |
|     (void) pthread_attr_destroy(&attr);
 | |
|   }
 | |
|   mysql_mutex_unlock(&m_mutex);
 | |
| 
 | |
|   DBUG_RETURN(false);
 | |
| }
 | |
| 
 | |
| void Ack_receiver::stop()
 | |
| {
 | |
|   DBUG_ENTER("Ack_receiver::stop");
 | |
| 
 | |
|   mysql_mutex_lock(&m_mutex);
 | |
|   if (m_status == ST_UP)
 | |
|   {
 | |
|     m_status= ST_STOPPING;
 | |
|     signal_listener();            // Signal listener thread to stop
 | |
|     mysql_cond_broadcast(&m_cond);
 | |
| 
 | |
|     while (m_status == ST_STOPPING)
 | |
|       mysql_cond_wait(&m_cond, &m_mutex);
 | |
| 
 | |
|     DBUG_ASSERT(m_status == ST_DOWN);
 | |
| 
 | |
|     m_pid= 0;
 | |
|   }
 | |
|   mysql_mutex_unlock(&m_mutex);
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| #ifndef DBUG_OFF
 | |
| void static dbug_verify_no_duplicate_slaves(Slave_ilist *m_slaves, THD *thd)
 | |
| {
 | |
|   I_List_iterator<Slave> it(*m_slaves);
 | |
|   Slave *slave;
 | |
|   while ((slave= it++))
 | |
|   {
 | |
|     DBUG_ASSERT(slave->thd->variables.server_id != thd->variables.server_id);
 | |
|   }
 | |
| }
 | |
| #else
 | |
| #define dbug_verify_no_duplicate_slaves(A,B) do {} while(0)
 | |
| #endif
 | |
| 
 | |
| 
 | |
| bool Ack_receiver::add_slave(THD *thd)
 | |
| {
 | |
|   Slave *slave;
 | |
|   DBUG_ENTER("Ack_receiver::add_slave");
 | |
| 
 | |
|   if (!(slave= new Slave))
 | |
|     DBUG_RETURN(true);
 | |
| 
 | |
|   slave->active= 0;
 | |
|   slave->thd= thd;
 | |
|   slave->vio= *thd->net.vio;
 | |
|   slave->vio.mysql_socket.m_psi= NULL;
 | |
|   slave->vio.read_timeout= 1;                   // 1 ms
 | |
| 
 | |
|   mysql_mutex_lock(&m_mutex);
 | |
| 
 | |
|   dbug_verify_no_duplicate_slaves(&m_slaves, thd);
 | |
| 
 | |
|   m_slaves.push_back(slave);
 | |
|   m_slaves_changed= true;
 | |
|   mysql_cond_broadcast(&m_cond);
 | |
|   mysql_mutex_unlock(&m_mutex);
 | |
| 
 | |
|   signal_listener();  // Inform listener that there are new slaves
 | |
| 
 | |
|   DBUG_RETURN(false);
 | |
| }
 | |
| 
 | |
| void Ack_receiver::remove_slave(THD *thd)
 | |
| {
 | |
|   I_List_iterator<Slave> it(m_slaves);
 | |
|   Slave *slave;
 | |
|   bool slaves_changed= 0;
 | |
|   DBUG_ENTER("Ack_receiver::remove_slave");
 | |
| 
 | |
|   mysql_mutex_lock(&m_mutex);
 | |
| 
 | |
|   while ((slave= it++))
 | |
|   {
 | |
|     if (slave->thd == thd)
 | |
|     {
 | |
|       delete slave;
 | |
|       slaves_changed= true;
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
|   if (slaves_changed)
 | |
|   {
 | |
|     m_slaves_changed= true;
 | |
|     mysql_cond_broadcast(&m_cond);
 | |
|     /*
 | |
|       Wait until Ack_receiver::run() acknowledges remove of slave
 | |
|       As this is only sent under the mutex and after listeners has
 | |
|       been collected, we know that listener has ignored the found
 | |
|       slave.
 | |
|     */
 | |
|     if (m_status != ST_DOWN)
 | |
|       mysql_cond_wait(&m_cond_reply, &m_mutex);
 | |
|   }
 | |
|   mysql_mutex_unlock(&m_mutex);
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
 | |
| {
 | |
|   (void)MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
 | |
| }
 | |
| 
 | |
| void Ack_receiver::wait_for_slave_connection(THD *thd)
 | |
| {
 | |
|   thd->enter_cond(&m_cond, &m_mutex, &stage_waiting_for_semi_sync_slave,
 | |
|                   0, __func__, __FILE__, __LINE__);
 | |
| 
 | |
|   while (m_status == ST_UP && m_slaves.is_empty())
 | |
|     mysql_cond_wait(&m_cond, &m_mutex);
 | |
| 
 | |
|   thd->exit_cond(0, __func__, __FILE__, __LINE__);
 | |
| }
 | |
| 
 | |
| /* Auxilary function to initialize a NET object with given net buffer. */
 | |
| static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
 | |
| {
 | |
|   memset(net, 0, sizeof(NET));
 | |
|   net->max_packet= buff_len;
 | |
|   net->buff= buff;
 | |
|   net->buff_end= buff + buff_len;
 | |
|   net->read_pos= net->buff;
 | |
| }
 | |
| 
 | |
| void Ack_receiver::run()
 | |
| {
 | |
|   THD *thd= new THD(next_thread_id());
 | |
|   NET net;
 | |
|   unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
 | |
|   DBUG_ENTER("Ack_receiver::run");
 | |
| 
 | |
|   my_thread_init();
 | |
| 
 | |
| #ifdef HAVE_POLL
 | |
|   Poll_socket_listener listener(m_slaves);
 | |
| #else
 | |
|   Select_socket_listener listener(m_slaves);
 | |
| #endif //HAVE_POLL
 | |
| 
 | |
|   if (listener.got_error())
 | |
|   {
 | |
|     sql_print_error("Got error %iE starting ack receiver thread",
 | |
|                     listener.got_error());
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   sql_print_information("Starting ack receiver thread");
 | |
|   thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
 | |
|   thd->store_globals();
 | |
|   thd->security_ctx->skip_grants();
 | |
|   thd->set_command(COM_DAEMON);
 | |
|   init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
 | |
| 
 | |
|   /*
 | |
|     Mark that we have to setup the listener. Note that only this functions can
 | |
|     set m_slaves_changed to false
 | |
|   */
 | |
|   m_slaves_changed= true;
 | |
| 
 | |
|   while (1)
 | |
|   {
 | |
|     int ret, slave_count= 0;
 | |
|     Slave *slave;
 | |
| 
 | |
|     mysql_mutex_lock(&m_mutex);
 | |
|     if (unlikely(m_status != ST_UP))
 | |
|       goto end;
 | |
| 
 | |
|     if (unlikely(m_slaves_changed))
 | |
|     {
 | |
|       if (unlikely(m_slaves.is_empty()))
 | |
|       {
 | |
|         m_slaves_changed= false;
 | |
|         mysql_cond_broadcast(&m_cond_reply);      // Signal remove_slave
 | |
|         wait_for_slave_connection(thd);
 | |
|         /* Wait for slave unlocks m_mutex */
 | |
|         continue;
 | |
|       }
 | |
| 
 | |
|       set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
 | |
|       if ((slave_count= listener.init_slave_sockets()) == 0)
 | |
|       {
 | |
|         mysql_mutex_unlock(&m_mutex);
 | |
|         m_slaves_changed= true;
 | |
|         continue;                               // Retry
 | |
|       }
 | |
|       if (slave_count < 0)
 | |
|         goto end;
 | |
|       m_slaves_changed= false;
 | |
|       mysql_cond_broadcast(&m_cond_reply);      // Signal remove_slave
 | |
|     }
 | |
| 
 | |
| #ifdef HAVE_POLL
 | |
|       DBUG_PRINT("info", ("fd count %u", slave_count));
 | |
| #else     
 | |
|       DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count,
 | |
|                           (int) listener.get_max_fd()));
 | |
| #endif
 | |
| 
 | |
|     mysql_mutex_unlock(&m_mutex);
 | |
|     ret= listener.listen_on_sockets();
 | |
| 
 | |
|     if (ret <= 0)
 | |
|     {
 | |
| 
 | |
|       ret= DBUG_IF("rpl_semisync_simulate_select_error") ? -1 : ret;
 | |
| 
 | |
|       if (ret == -1 && errno != EINTR)
 | |
|         sql_print_information("Failed to wait on semi-sync sockets, "
 | |
|                               "error: errno=%d", socket_errno);
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     listener.clear_signal();
 | |
|     mysql_mutex_lock(&m_mutex);
 | |
|     set_stage_info(stage_reading_semi_sync_ack);
 | |
|     Slave_ilist_iterator it(m_slaves);
 | |
|     while ((slave= it++))
 | |
|     {
 | |
|       if (slave->active &&
 | |
|           ((slave->vio.read_pos < slave->vio.read_end) ||
 | |
|            listener.is_socket_active(slave)))
 | |
|       {
 | |
|         ulong len;
 | |
| 
 | |
|         /* Semi-sync packets will always be sent with pkt_nr == 1 */
 | |
|         net_clear(&net, 0);
 | |
|         net.vio= &slave->vio;
 | |
|         /*
 | |
|           Set compress flag. This is needed to support
 | |
|           Slave_compress_protocol flag enabled Slaves
 | |
|         */
 | |
|         net.compress= slave->thd->net.compress;
 | |
| 
 | |
|         if (unlikely(listener.is_socket_hangup(slave)))
 | |
|         {
 | |
|           if (global_system_variables.log_warnings > 2)
 | |
|             sql_print_warning("Semisync ack receiver got hangup "
 | |
|                               "from slave server-id %d",
 | |
|                               slave->server_id());
 | |
|           it.remove();
 | |
|           m_slaves_changed= true;
 | |
|           continue;
 | |
|         }
 | |
| 
 | |
|         len= my_net_read(&net);
 | |
|         if (likely(len != packet_error))
 | |
|         {
 | |
|           int res;
 | |
|           res= repl_semisync_master.report_reply_packet(slave->server_id(),
 | |
|                                                         net.read_pos, len);
 | |
|           if (unlikely(res < 0))
 | |
|           {
 | |
|             /*
 | |
|               Slave has sent COM_QUIT or other failure.
 | |
|               Delete it from listener
 | |
|             */
 | |
|             it.remove();
 | |
|             m_slaves_changed= true;
 | |
|           }
 | |
|         }
 | |
|         else if (net.last_errno == ER_NET_READ_ERROR)
 | |
|         {
 | |
|           if (net.last_errno > 0 && global_system_variables.log_warnings > 2)
 | |
|             sql_print_warning("Semisync ack receiver got error %d \"%s\" "
 | |
|                               "from slave server-id %d",
 | |
|                               net.last_errno, ER_DEFAULT(net.last_errno),
 | |
|                               slave->server_id());
 | |
|           it.remove();
 | |
|           m_slaves_changed= true;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     mysql_mutex_unlock(&m_mutex);
 | |
|   }
 | |
| 
 | |
| end:
 | |
|   sql_print_information("Stopping ack receiver thread");
 | |
|   m_status= ST_DOWN;
 | |
|   mysql_cond_broadcast(&m_cond);
 | |
|   mysql_cond_broadcast(&m_cond_reply);
 | |
|   mysql_mutex_unlock(&m_mutex);
 | |
| 
 | |
|   delete thd;
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | 
