mariadb/sql/semisync_master_ack_receiver.cc
Andrei Elkin e972125f11 MDEV-13073 This part merges the Ali semisync related changes
and specifically the ack receiving functionality.
Semisync is turned to be static instead of plugin so its functions
are invoked at the same points as RUN_HOOKS.
The RUN_HOOKS and the observer interface remain to be removed by later
patch.

Todo:
  React on killed status by repl_semisync_master.wait_after_sync(). Currently
  Repl_semi_sync_master::commit_trx does not check the killed status.

  There were few bugfixes found that are present in mysql and its unclear
  whether/how they are covered. Those include:

  Bug#15985893: GTID SKIPPED EVENTS ON MASTER CAUSE SEMI SYNC TIME-OUTS
  Bug#17932935 CALLING IS_SEMI_SYNC_SLAVE() IN EACH FUNCTION CALL
                 HAS BAD PERFORMANCE
  Bug#20574628: SEMI-SYNC REPLICATION PERFORMANCE DEGRADES WITH A HIGH NUMBER OF THREADS
2017-12-18 13:43:37 +02:00

308 lines
7.7 KiB
C++

/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include <my_global.h>
#include "semisync_master.h"
#include "semisync_master_ack_receiver.h"
extern PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
extern PSI_cond_key key_ss_cond_Ack_receiver_cond;
extern PSI_thread_key key_ss_thread_Ack_receiver_thread;
extern ReplSemiSyncMaster repl_semisync;
/* Callback function of ack receive thread */
pthread_handler_t ack_receive_handler(void *arg)
{
Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg);
my_thread_init();
recv->run();
my_thread_end();
return NULL;
}
Ack_receiver::Ack_receiver()
{
const char *kWho = "Ack_receiver::Ack_receiver";
function_enter(kWho);
m_status= ST_DOWN;
mysql_mutex_init(key_ss_mutex_Ack_receiver_mutex, &m_mutex,
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_ss_cond_Ack_receiver_cond, &m_cond, NULL);
m_pid= 0;
function_exit(kWho);
}
void Ack_receiver::cleanup()
{
const char *kWho = "Ack_receiver::~Ack_receiver";
function_enter(kWho);
stop();
mysql_mutex_destroy(&m_mutex);
mysql_cond_destroy(&m_cond);
function_exit(kWho);
}
bool Ack_receiver::start()
{
const char *kWho = "Ack_receiver::start";
function_enter(kWho);
mysql_mutex_lock(&m_mutex);
if(m_status == ST_DOWN)
{
pthread_attr_t attr;
m_status= ST_UP;
if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) ||
pthread_attr_init(&attr) != 0 ||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 ||
#ifndef _WIN32
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
#endif
mysql_thread_create(key_ss_thread_Ack_receiver_thread, &m_pid,
&attr, ack_receive_handler, this))
{
sql_print_error("Failed to start semi-sync ACK receiver thread, "
" could not create thread(errno:%d)", errno);
m_status= ST_DOWN;
mysql_mutex_unlock(&m_mutex);
return function_exit(kWho, true);
}
(void) pthread_attr_destroy(&attr);
}
mysql_mutex_unlock(&m_mutex);
return function_exit(kWho, false);
}
void Ack_receiver::stop()
{
const char *kWho = "Ack_receiver::stop";
function_enter(kWho);
mysql_mutex_lock(&m_mutex);
if (m_status == ST_UP)
{
m_status= ST_STOPPING;
mysql_cond_broadcast(&m_cond);
while (m_status == ST_STOPPING)
mysql_cond_wait(&m_cond, &m_mutex);
DBUG_ASSERT(m_status == ST_DOWN);
m_pid= 0;
}
mysql_mutex_unlock(&m_mutex);
function_exit(kWho);
}
bool Ack_receiver::add_slave(THD *thd)
{
Slave *slave;
const char *kWho = "Ack_receiver::add_slave";
function_enter(kWho);
if (!(slave= new Slave))
return function_exit(kWho, true);
slave->thd= thd;
slave->vio= *thd->net.vio;
slave->vio.mysql_socket.m_psi= NULL;
slave->vio.read_timeout= 1;
mysql_mutex_lock(&m_mutex);
m_slaves.push_back(slave);
m_slaves_changed= true;
mysql_cond_broadcast(&m_cond);
mysql_mutex_unlock(&m_mutex);
return function_exit(kWho, false);
}
void Ack_receiver::remove_slave(THD *thd)
{
I_List_iterator<Slave> it(m_slaves);
Slave *slave;
const char *kWho = "Ack_receiver::remove_slave";
function_enter(kWho);
mysql_mutex_lock(&m_mutex);
while ((slave= it++))
{
if (slave->thd == thd)
{
delete slave;
m_slaves_changed= true;
break;
}
}
mysql_mutex_unlock(&m_mutex);
function_exit(kWho);
}
inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
{
MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
}
inline void Ack_receiver::wait_for_slave_connection()
{
set_stage_info(stage_waiting_for_semi_sync_slave);
mysql_cond_wait(&m_cond, &m_mutex);
}
my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count)
{
my_socket max_fd= INVALID_SOCKET;
Slave *slave;
I_List_iterator<Slave> it(m_slaves);
*count= 0;
FD_ZERO(fds);
while ((slave= it++))
{
(*count)++;
my_socket fd= slave->sock_fd();
max_fd= (fd > max_fd ? fd : max_fd);
FD_SET(fd, fds);
}
return max_fd;
}
/* Auxilary function to initialize a NET object with given net buffer. */
static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
{
memset(net, 0, sizeof(NET));
net->max_packet= buff_len;
net->buff= buff;
net->buff_end= buff + buff_len;
net->read_pos= net->buff;
}
void Ack_receiver::run()
{
// skip LOCK_global_system_variables due to the 3rd arg
THD *thd= new THD(next_thread_id(), false, true);
NET net;
unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
fd_set read_fds;
my_socket max_fd= INVALID_SOCKET;
Slave *slave;
my_thread_init();
DBUG_ENTER("Ack_receiver::run");
sql_print_information("Starting ack receiver thread");
thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
thd->thread_stack= (char*) &thd;
thd->store_globals();
thd->security_ctx->skip_grants();
thread_safe_increment32(&service_thread_count);
thd->set_command(COM_DAEMON);
init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
mysql_mutex_lock(&m_mutex);
m_slaves_changed= true;
mysql_mutex_unlock(&m_mutex);
while (1)
{
fd_set fds;
int ret;
uint slave_count;
mysql_mutex_lock(&m_mutex);
if (unlikely(m_status == ST_STOPPING))
goto end;
set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
if (unlikely(m_slaves_changed))
{
if (unlikely(m_slaves.is_empty()))
{
wait_for_slave_connection();
mysql_mutex_unlock(&m_mutex);
continue;
}
max_fd= get_slave_sockets(&read_fds, &slave_count);
m_slaves_changed= false;
DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count, max_fd));
}
struct timeval tv= {1, 0};
fds= read_fds;
/* select requires max fd + 1 for the first argument */
ret= select(max_fd+1, &fds, NULL, NULL, &tv);
if (ret <= 0)
{
mysql_mutex_unlock(&m_mutex);
ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
if (ret == -1)
sql_print_information("Failed to select() on semi-sync dump sockets, "
"error: errno=%d", socket_errno);
/* Sleep 1us, so other threads can catch the m_mutex easily. */
my_sleep(1);
continue;
}
set_stage_info(stage_reading_semi_sync_ack);
I_List_iterator<Slave> it(m_slaves);
while ((slave= it++))
{
if (FD_ISSET(slave->sock_fd(), &fds))
{
ulong len;
net_clear(&net, 0);
net.vio= &slave->vio;
len= my_net_read(&net);
if (likely(len != packet_error))
repl_semisync_master.reportReplyPacket(slave->server_id(),
net.read_pos, len);
else if (net.last_errno == ER_NET_READ_ERROR)
FD_CLR(slave->sock_fd(), &read_fds);
}
}
mysql_mutex_unlock(&m_mutex);
}
end:
sql_print_information("Stopping ack receiver thread");
m_status= ST_DOWN;
delete thd;
thread_safe_decrement32(&service_thread_count);
signal_thd_deleted();
mysql_cond_broadcast(&m_cond);
mysql_mutex_unlock(&m_mutex);
DBUG_VOID_RETURN;
}