/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include "semisync_master.h" #include "semisync_master_ack_receiver.h" #ifdef HAVE_PSI_MUTEX_INTERFACE extern PSI_mutex_key key_LOCK_ack_receiver; extern PSI_cond_key key_COND_ack_receiver; #endif #ifdef HAVE_PSI_THREAD_INTERFACE extern PSI_thread_key key_thread_ack_receiver; #endif extern Repl_semi_sync_master repl_semisync; /* Callback function of ack receive thread */ pthread_handler_t ack_receive_handler(void *arg) { Ack_receiver *recv= reinterpret_cast(arg); my_thread_init(); recv->run(); my_thread_end(); return NULL; } Ack_receiver::Ack_receiver() { DBUG_ENTER("Ack_receiver::Ack_receiver"); m_status= ST_DOWN; mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL); m_pid= 0; DBUG_VOID_RETURN; } void Ack_receiver::cleanup() { DBUG_ENTER("Ack_receiver::~Ack_receiver"); stop(); mysql_mutex_destroy(&m_mutex); mysql_cond_destroy(&m_cond); DBUG_VOID_RETURN; } bool Ack_receiver::start() { DBUG_ENTER("Ack_receiver::start"); mysql_mutex_lock(&m_mutex); if(m_status == ST_DOWN) { pthread_attr_t attr; m_status= ST_UP; if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) || pthread_attr_init(&attr) != 0 || pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 || #ifndef _WIN32 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 || #endif mysql_thread_create(key_thread_ack_receiver, &m_pid, &attr, ack_receive_handler, this)) { sql_print_error("Failed to start semi-sync ACK receiver thread, " " could not create thread(errno:%d)", errno); m_status= ST_DOWN; mysql_mutex_unlock(&m_mutex); DBUG_RETURN(true); } (void) pthread_attr_destroy(&attr); } mysql_mutex_unlock(&m_mutex); DBUG_RETURN(false); } void Ack_receiver::stop() { DBUG_ENTER("Ack_receiver::stop"); mysql_mutex_lock(&m_mutex); if (m_status == ST_UP) { m_status= ST_STOPPING; mysql_cond_broadcast(&m_cond); while (m_status == ST_STOPPING) mysql_cond_wait(&m_cond, &m_mutex); DBUG_ASSERT(m_status == ST_DOWN); m_pid= 0; } mysql_mutex_unlock(&m_mutex); DBUG_VOID_RETURN; } bool Ack_receiver::add_slave(THD *thd) { Slave *slave; DBUG_ENTER("Ack_receiver::add_slave"); if (!(slave= new Slave)) DBUG_RETURN(true); slave->thd= thd; slave->vio= *thd->net.vio; slave->vio.mysql_socket.m_psi= NULL; slave->vio.read_timeout= 1; mysql_mutex_lock(&m_mutex); m_slaves.push_back(slave); m_slaves_changed= true; mysql_cond_broadcast(&m_cond); mysql_mutex_unlock(&m_mutex); DBUG_RETURN(false); } void Ack_receiver::remove_slave(THD *thd) { I_List_iterator it(m_slaves); Slave *slave; DBUG_ENTER("Ack_receiver::remove_slave"); mysql_mutex_lock(&m_mutex); while ((slave= it++)) { if (slave->thd == thd) { delete slave; m_slaves_changed= true; break; } } mysql_mutex_unlock(&m_mutex); DBUG_VOID_RETURN; } inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage) { MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__); } inline void Ack_receiver::wait_for_slave_connection() { set_stage_info(stage_waiting_for_semi_sync_slave); mysql_cond_wait(&m_cond, &m_mutex); } my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count) { my_socket max_fd= INVALID_SOCKET; Slave *slave; I_List_iterator it(m_slaves); *count= 0; FD_ZERO(fds); while ((slave= it++)) { (*count)++; my_socket fd= slave->sock_fd(); max_fd= (fd > max_fd ? fd : max_fd); FD_SET(fd, fds); } return max_fd; } /* Auxilary function to initialize a NET object with given net buffer. */ static void init_net(NET *net, unsigned char *buff, unsigned int buff_len) { memset(net, 0, sizeof(NET)); net->max_packet= buff_len; net->buff= buff; net->buff_end= buff + buff_len; net->read_pos= net->buff; } void Ack_receiver::run() { // skip LOCK_global_system_variables due to the 3rd arg THD *thd= new THD(next_thread_id(), false, true); NET net; unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; fd_set read_fds; my_socket max_fd= INVALID_SOCKET; Slave *slave; my_thread_init(); DBUG_ENTER("Ack_receiver::run"); sql_print_information("Starting ack receiver thread"); thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND; thd->thread_stack= (char*) &thd; thd->store_globals(); thd->security_ctx->skip_grants(); thread_safe_increment32(&service_thread_count); thd->set_command(COM_DAEMON); init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH); mysql_mutex_lock(&m_mutex); m_slaves_changed= true; mysql_mutex_unlock(&m_mutex); while (1) { fd_set fds; int ret; uint slave_count; mysql_mutex_lock(&m_mutex); if (unlikely(m_status == ST_STOPPING)) goto end; set_stage_info(stage_waiting_for_semi_sync_ack_from_slave); if (unlikely(m_slaves_changed)) { if (unlikely(m_slaves.is_empty())) { wait_for_slave_connection(); mysql_mutex_unlock(&m_mutex); continue; } max_fd= get_slave_sockets(&read_fds, &slave_count); m_slaves_changed= false; DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count,(int) max_fd)); } struct timeval tv= {1, 0}; fds= read_fds; /* select requires max fd + 1 for the first argument */ ret= select((int)(max_fd+1), &fds, NULL, NULL, &tv); if (ret <= 0) { mysql_mutex_unlock(&m_mutex); ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret); if (ret == -1) sql_print_information("Failed to select() on semi-sync dump sockets, " "error: errno=%d", socket_errno); /* Sleep 1us, so other threads can catch the m_mutex easily. */ my_sleep(1); continue; } set_stage_info(stage_reading_semi_sync_ack); I_List_iterator it(m_slaves); while ((slave= it++)) { if (FD_ISSET(slave->sock_fd(), &fds)) { ulong len; net_clear(&net, 0); net.vio= &slave->vio; len= my_net_read(&net); if (likely(len != packet_error)) repl_semisync_master.report_reply_packet(slave->server_id(), net.read_pos, len); else if (net.last_errno == ER_NET_READ_ERROR) FD_CLR(slave->sock_fd(), &read_fds); } } mysql_mutex_unlock(&m_mutex); } end: sql_print_information("Stopping ack receiver thread"); m_status= ST_DOWN; delete thd; thread_safe_decrement32(&service_thread_count); signal_thd_deleted(); mysql_cond_broadcast(&m_cond); mysql_mutex_unlock(&m_mutex); DBUG_VOID_RETURN; }