mirror of
https://github.com/MariaDB/server.git
synced 2025-01-16 03:52:35 +01:00
Auto merge from 5.1-rep-semisync
This commit is contained in:
commit
0dcc854520
8 changed files with 219 additions and 88 deletions
|
@ -1849,11 +1849,13 @@ sub environment_setup {
|
|||
my $lib_semisync_master_plugin=
|
||||
mtr_file_exists(vs_config_dirs('plugin/semisync',$semisync_master_filename),
|
||||
"$basedir/plugin/semisync/.libs/" . $semisync_master_filename,
|
||||
"$basedir/lib/mysql/plugin/" . $semisync_master_filename);
|
||||
"$basedir/lib/mysql/plugin/" . $semisync_master_filename,
|
||||
"$basedir/lib/plugin/" . $semisync_master_filename);
|
||||
my $lib_semisync_slave_plugin=
|
||||
mtr_file_exists(vs_config_dirs('plugin/semisync',$semisync_slave_filename),
|
||||
"$basedir/plugin/semisync/.libs/" . $semisync_slave_filename,
|
||||
"$basedir/lib/mysql/plugin/" . $semisync_slave_filename);
|
||||
"$basedir/lib/mysql/plugin/" . $semisync_slave_filename,
|
||||
"$basedir/lib/plugin/" . $semisync_slave_filename);
|
||||
if ($lib_semisync_master_plugin && $lib_semisync_slave_plugin)
|
||||
{
|
||||
$ENV{'SEMISYNC_MASTER_PLUGIN'}= basename($lib_semisync_master_plugin);
|
||||
|
|
46
mysql-test/suite/rpl/r/rpl_semi_sync_event.result
Normal file
46
mysql-test/suite/rpl/r/rpl_semi_sync_event.result
Normal file
|
@ -0,0 +1,46 @@
|
|||
stop slave;
|
||||
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
|
||||
reset master;
|
||||
reset slave;
|
||||
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
|
||||
start slave;
|
||||
include/stop_slave.inc
|
||||
include/start_slave.inc
|
||||
SET GLOBAL event_scheduler = ON;
|
||||
CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8)) ENGINE=ENGINE_TYPE;
|
||||
INSERT INTO t1 (f) VALUES ('a'),('a'),('a'),('a'),('a');
|
||||
INSERT INTO t1 SELECT i+5, f FROM t1;
|
||||
INSERT INTO t1 SELECT i+10, f FROM t1;
|
||||
CREATE EVENT ev1 ON SCHEDULE EVERY 1 SECOND
|
||||
DO INSERT INTO t1 VALUES (SLEEP(5),CONCAT('ev1_',CONNECTION_ID()));
|
||||
CREATE EVENT ev2 ON SCHEDULE EVERY 1 SECOND
|
||||
DO INSERT INTO t1 VALUES (SLEEP(5),CONCAT('ev2_',CONNECTION_ID()));
|
||||
STOP SLAVE IO_THREAD;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 20;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 19;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 18;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 17;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 16;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 15;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 14;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 13;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 12;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 11;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 10;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 9;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 8;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 7;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 6;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 5;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 4;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 3;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 2;
|
||||
UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = 1;
|
||||
SET GLOBAL event_scheduler = OFF;
|
||||
include/stop_slave.inc
|
||||
UNINSTALL PLUGIN rpl_semi_sync_slave;
|
||||
UNINSTALL PLUGIN rpl_semi_sync_master;
|
||||
include/start_slave.inc
|
||||
DROP EVENT ev1;
|
||||
DROP EVENT ev2;
|
||||
DROP TABLE t1;
|
1
mysql-test/suite/rpl/t/rpl_semi_sync_event-master.opt
Normal file
1
mysql-test/suite/rpl/t/rpl_semi_sync_event-master.opt
Normal file
|
@ -0,0 +1 @@
|
|||
$SEMISYNC_PLUGIN_OPT --max-connections=23
|
1
mysql-test/suite/rpl/t/rpl_semi_sync_event-slave.opt
Normal file
1
mysql-test/suite/rpl/t/rpl_semi_sync_event-slave.opt
Normal file
|
@ -0,0 +1 @@
|
|||
$SEMISYNC_PLUGIN_OPT
|
108
mysql-test/suite/rpl/t/rpl_semi_sync_event.test
Normal file
108
mysql-test/suite/rpl/t/rpl_semi_sync_event.test
Normal file
|
@ -0,0 +1,108 @@
|
|||
source include/have_semisync_plugin.inc;
|
||||
source include/not_embedded.inc;
|
||||
source include/master-slave.inc;
|
||||
source include/have_innodb.inc;
|
||||
|
||||
let $engine_type= InnoDB;
|
||||
|
||||
# Suppress warnings that might be generated during the test
|
||||
disable_query_log;
|
||||
connection master;
|
||||
call mtr.add_suppression("Timeout waiting for reply of binlog");
|
||||
call mtr.add_suppression("Semi-sync master .* waiting for slave reply");
|
||||
call mtr.add_suppression("Read semi-sync reply");
|
||||
connection slave;
|
||||
call mtr.add_suppression("Master server does not support semi-sync");
|
||||
call mtr.add_suppression("Semi-sync slave .* reply");
|
||||
enable_query_log;
|
||||
|
||||
connection master;
|
||||
disable_query_log;
|
||||
let $value = query_get_value(show variables like 'rpl_semi_sync_master_enabled', Value, 1);
|
||||
if (`select '$value' = 'No such row'`)
|
||||
{
|
||||
set sql_log_bin=0;
|
||||
eval INSTALL PLUGIN rpl_semi_sync_master SONAME '$SEMISYNC_MASTER_PLUGIN';
|
||||
SET GLOBAL rpl_semi_sync_master_enabled = 1;
|
||||
set sql_log_bin=1;
|
||||
}
|
||||
enable_query_log;
|
||||
|
||||
connection slave;
|
||||
source include/stop_slave.inc;
|
||||
|
||||
disable_query_log;
|
||||
let $value= query_get_value(show variables like 'rpl_semi_sync_slave_enabled', Value, 1);
|
||||
if (`select '$value' = 'No such row'`)
|
||||
{
|
||||
set sql_log_bin=0;
|
||||
eval INSTALL PLUGIN rpl_semi_sync_slave SONAME '$SEMISYNC_SLAVE_PLUGIN';
|
||||
SET GLOBAL rpl_semi_sync_slave_enabled = 1;
|
||||
set sql_log_bin=1;
|
||||
}
|
||||
enable_query_log;
|
||||
|
||||
source include/start_slave.inc;
|
||||
|
||||
connection master;
|
||||
SET GLOBAL event_scheduler = ON;
|
||||
|
||||
replace_result $engine_type ENGINE_TYPE;
|
||||
eval CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8)) ENGINE=$engine_type;
|
||||
INSERT INTO t1 (f) VALUES ('a'),('a'),('a'),('a'),('a');
|
||||
INSERT INTO t1 SELECT i+5, f FROM t1;
|
||||
INSERT INTO t1 SELECT i+10, f FROM t1;
|
||||
|
||||
CREATE EVENT ev1 ON SCHEDULE EVERY 1 SECOND
|
||||
DO INSERT INTO t1 VALUES (SLEEP(5),CONCAT('ev1_',CONNECTION_ID()));
|
||||
CREATE EVENT ev2 ON SCHEDULE EVERY 1 SECOND
|
||||
DO INSERT INTO t1 VALUES (SLEEP(5),CONCAT('ev2_',CONNECTION_ID()));
|
||||
|
||||
connection slave;
|
||||
STOP SLAVE IO_THREAD;
|
||||
|
||||
connection master;
|
||||
let $run = 20;
|
||||
while ($run)
|
||||
{
|
||||
connect (m$run,localhost,root,,);
|
||||
connection m$run;
|
||||
send;
|
||||
eval UPDATE t1 SET f = CONCAT('up_',CONNECTION_ID()) WHERE i = $run;
|
||||
connection master;
|
||||
dec $run;
|
||||
}
|
||||
|
||||
connection master;
|
||||
SET GLOBAL event_scheduler = OFF;
|
||||
|
||||
let $run = 20;
|
||||
while ($run)
|
||||
{
|
||||
connection m$run;
|
||||
reap;
|
||||
disconnect m$run;
|
||||
dec $run;
|
||||
}
|
||||
|
||||
#
|
||||
# Clean up
|
||||
#
|
||||
connection slave;
|
||||
source include/stop_slave.inc;
|
||||
|
||||
disable_warnings;
|
||||
UNINSTALL PLUGIN rpl_semi_sync_slave;
|
||||
|
||||
connection master;
|
||||
UNINSTALL PLUGIN rpl_semi_sync_master;
|
||||
enable_warnings;
|
||||
|
||||
connection slave;
|
||||
source include/start_slave.inc;
|
||||
|
||||
connection master;
|
||||
DROP EVENT ev1;
|
||||
DROP EVENT ev2;
|
||||
DROP TABLE t1;
|
||||
sync_slave_with_master;
|
|
@ -63,29 +63,14 @@ static int gettimeofday(struct timeval *tv, void *tz)
|
|||
*
|
||||
******************************************************************************/
|
||||
|
||||
ActiveTranx::ActiveTranx(int max_connections,
|
||||
pthread_mutex_t *lock,
|
||||
ActiveTranx::ActiveTranx(pthread_mutex_t *lock,
|
||||
unsigned long trace_level)
|
||||
: Trace(trace_level), num_transactions_(max_connections),
|
||||
num_entries_(max_connections << 1),
|
||||
: Trace(trace_level),
|
||||
num_entries_(max_connections << 1), /* Transaction hash table size
|
||||
* is set to double the size
|
||||
* of max_connections */
|
||||
lock_(lock)
|
||||
{
|
||||
/* Allocate the memory for the array */
|
||||
node_array_ = new TranxNode[num_transactions_];
|
||||
for (int idx = 0; idx < num_transactions_; ++idx)
|
||||
{
|
||||
node_array_[idx].log_pos_ = 0;
|
||||
node_array_[idx].hash_next_ = NULL;
|
||||
node_array_[idx].next_ = node_array_ + idx + 1;
|
||||
|
||||
node_array_[idx].log_name_ = new char[FN_REFLEN];
|
||||
node_array_[idx].log_name_[0] = '\x0';
|
||||
}
|
||||
node_array_[num_transactions_-1].next_ = NULL;
|
||||
|
||||
/* All nodes in the array go to the pool initially. */
|
||||
free_pool_ = node_array_;
|
||||
|
||||
/* No transactions are in the list initially. */
|
||||
trx_front_ = NULL;
|
||||
trx_rear_ = NULL;
|
||||
|
@ -95,24 +80,13 @@ ActiveTranx::ActiveTranx(int max_connections,
|
|||
for (int idx = 0; idx < num_entries_; ++idx)
|
||||
trx_htb_[idx] = NULL;
|
||||
|
||||
sql_print_information("Semi-sync replication initialized for %d "
|
||||
"transactions.", num_transactions_);
|
||||
sql_print_information("Semi-sync replication initialized for transactions.");
|
||||
}
|
||||
|
||||
ActiveTranx::~ActiveTranx()
|
||||
{
|
||||
for (int idx = 0; idx < num_transactions_; ++idx)
|
||||
{
|
||||
delete [] node_array_[idx].log_name_;
|
||||
node_array_[idx].log_name_ = NULL;
|
||||
}
|
||||
|
||||
delete [] node_array_;
|
||||
delete [] trx_htb_;
|
||||
|
||||
node_array_ = NULL;
|
||||
trx_htb_ = NULL;
|
||||
num_transactions_ = 0;
|
||||
num_entries_ = 0;
|
||||
}
|
||||
|
||||
|
@ -143,26 +117,21 @@ unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
|
|||
|
||||
ActiveTranx::TranxNode* ActiveTranx::alloc_tranx_node()
|
||||
{
|
||||
TranxNode *ptr = free_pool_;
|
||||
|
||||
if (free_pool_)
|
||||
MYSQL_THD thd= (MYSQL_THD)current_thd;
|
||||
/* The memory allocated for TranxNode will be automatically freed at
|
||||
the end of the command of current THD. And because
|
||||
ha_autocommit_or_rollback() will always be called before that, so
|
||||
we are sure that the node will be removed from the active list
|
||||
before it get freed. */
|
||||
TranxNode *trx_node = (TranxNode *)thd_alloc(thd, sizeof(TranxNode));
|
||||
if (trx_node)
|
||||
{
|
||||
free_pool_ = free_pool_->next_;
|
||||
ptr->next_ = NULL;
|
||||
ptr->hash_next_ = NULL;
|
||||
trx_node->log_name_[0] = '\0';
|
||||
trx_node->log_pos_= 0;
|
||||
trx_node->next_= 0;
|
||||
trx_node->hash_next_= 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
free_pool should never be NULL here, because we have
|
||||
max_connections number of pre-allocated nodes.
|
||||
*/
|
||||
sql_print_error("You have encountered a semi-sync bug (free_pool == NULL), "
|
||||
"please report to http://bugs.mysql.com");
|
||||
assert(free_pool_);
|
||||
}
|
||||
|
||||
return ptr;
|
||||
return trx_node;
|
||||
}
|
||||
|
||||
int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
|
||||
|
@ -306,14 +275,12 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
|
|||
/* Clear the active transaction list. */
|
||||
if (trx_front_ != NULL)
|
||||
{
|
||||
trx_rear_->next_ = free_pool_;
|
||||
free_pool_ = trx_front_;
|
||||
trx_front_ = NULL;
|
||||
trx_rear_ = NULL;
|
||||
}
|
||||
|
||||
if (trace_level_ & kTraceDetail)
|
||||
sql_print_information("%s: free all nodes back to free list", kWho);
|
||||
sql_print_information("%s: cleared all nodes", kWho);
|
||||
}
|
||||
else if (new_front != trx_front_)
|
||||
{
|
||||
|
@ -325,10 +292,6 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
|
|||
while (curr_node != new_front)
|
||||
{
|
||||
next_node = curr_node->next_;
|
||||
|
||||
/* Put the node in the memory pool. */
|
||||
curr_node->next_ = free_pool_;
|
||||
free_pool_ = curr_node;
|
||||
n_frees++;
|
||||
|
||||
/* Remove the node from the hash table. */
|
||||
|
@ -350,7 +313,7 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
|
|||
trx_front_ = new_front;
|
||||
|
||||
if (trace_level_ & kTraceDetail)
|
||||
sql_print_information("%s: free %d nodes back until pos (%s, %lu)",
|
||||
sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
|
||||
kWho, n_frees,
|
||||
trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
|
||||
}
|
||||
|
@ -391,8 +354,7 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
|
|||
wait_file_pos_(0),
|
||||
master_enabled_(false),
|
||||
wait_timeout_(0L),
|
||||
state_(0),
|
||||
max_transactions_(0L)
|
||||
state_(0)
|
||||
{
|
||||
strcpy(reply_file_name_, "");
|
||||
strcpy(wait_file_name_, "");
|
||||
|
@ -413,7 +375,6 @@ int ReplSemiSyncMaster::initObject()
|
|||
/* References to the parameter works after set_options(). */
|
||||
setWaitTimeout(rpl_semi_sync_master_timeout);
|
||||
setTraceLevel(rpl_semi_sync_master_trace_level);
|
||||
max_transactions_ = (int)max_connections;
|
||||
|
||||
/* Mutex initialization can only be done after MY_INIT(). */
|
||||
pthread_mutex_init(&LOCK_binlog_, MY_MUTEX_INIT_FAST);
|
||||
|
@ -436,9 +397,7 @@ int ReplSemiSyncMaster::enableMaster()
|
|||
|
||||
if (!getMasterEnabled())
|
||||
{
|
||||
active_tranxs_ = new ActiveTranx(max_connections,
|
||||
&LOCK_binlog_,
|
||||
trace_level_);
|
||||
active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_);
|
||||
if (active_tranxs_ != NULL)
|
||||
{
|
||||
commit_file_name_inited_ = false;
|
||||
|
|
|
@ -23,31 +23,26 @@
|
|||
/**
|
||||
This class manages memory for active transaction list.
|
||||
|
||||
We record each active transaction with a TranxNode. Because each
|
||||
session can only have only one open transaction, the total active
|
||||
transaction nodes can not exceed the maximum sessions. Currently
|
||||
in MySQL, sessions are the same as connections.
|
||||
We record each active transaction with a TranxNode, each session
|
||||
can have only one open transaction. Because of EVENT, the total
|
||||
active transaction nodes can exceed the maximum allowed
|
||||
connections.
|
||||
*/
|
||||
class ActiveTranx
|
||||
:public Trace {
|
||||
private:
|
||||
struct TranxNode {
|
||||
char *log_name_;
|
||||
char log_name_[FN_REFLEN];
|
||||
my_off_t log_pos_;
|
||||
struct TranxNode *next_; /* the next node in the sorted list */
|
||||
struct TranxNode *hash_next_; /* the next node during hash collision */
|
||||
};
|
||||
|
||||
/* The following data structure maintains an active transaction list. */
|
||||
TranxNode *node_array_;
|
||||
TranxNode *free_pool_;
|
||||
|
||||
/* These two record the active transaction list in sort order. */
|
||||
TranxNode *trx_front_, *trx_rear_;
|
||||
|
||||
TranxNode **trx_htb_; /* A hash table on active transactions. */
|
||||
|
||||
int num_transactions_; /* maximum transactions */
|
||||
int num_entries_; /* maximum hash table entries */
|
||||
pthread_mutex_t *lock_; /* mutex lock */
|
||||
|
||||
|
@ -74,8 +69,7 @@ private:
|
|||
}
|
||||
|
||||
public:
|
||||
ActiveTranx(int max_connections, pthread_mutex_t *lock,
|
||||
unsigned long trace_level);
|
||||
ActiveTranx(pthread_mutex_t *lock, unsigned long trace_level);
|
||||
~ActiveTranx();
|
||||
|
||||
/* Insert an active transaction node with the specified position.
|
||||
|
@ -177,11 +171,6 @@ class ReplSemiSyncMaster
|
|||
|
||||
bool state_; /* whether semi-sync is switched */
|
||||
|
||||
/* The number of maximum active transactions. This should be the same as
|
||||
* maximum connections because MySQL does not do connection sharing now.
|
||||
*/
|
||||
int max_transactions_;
|
||||
|
||||
void lock();
|
||||
void unlock();
|
||||
void cond_broadcast();
|
||||
|
|
|
@ -137,28 +137,53 @@ void delegates_destroy()
|
|||
*/
|
||||
#define FOREACH_OBSERVER(r, f, thd, args) \
|
||||
param.server_id= thd->server_id; \
|
||||
/*
|
||||
Use a struct to make sure that they are allocated adjacent, check
|
||||
delete_dynamic().
|
||||
*/ \
|
||||
struct { \
|
||||
DYNAMIC_ARRAY plugins; \
|
||||
/* preallocate 8 slots */ \
|
||||
plugin_ref plugins_buffer[8]; \
|
||||
} s; \
|
||||
DYNAMIC_ARRAY *plugins= &s.plugins; \
|
||||
plugin_ref *plugins_buffer= s.plugins_buffer; \
|
||||
my_init_dynamic_array2(plugins, sizeof(plugin_ref), \
|
||||
plugins_buffer, 8, 8); \
|
||||
read_lock(); \
|
||||
Observer_info_iterator iter= observer_info_iter(); \
|
||||
Observer_info *info= iter++; \
|
||||
for (; info; info= iter++) \
|
||||
{ \
|
||||
plugin_ref plugin= \
|
||||
my_plugin_lock(thd, &info->plugin); \
|
||||
my_plugin_lock(0, &info->plugin); \
|
||||
if (!plugin) \
|
||||
{ \
|
||||
r= 1; \
|
||||
/* plugin is not intialized or deleted, this is not an error */ \
|
||||
r= 0; \
|
||||
break; \
|
||||
} \
|
||||
insert_dynamic(plugins, (uchar *)&plugin); \
|
||||
if (((Observer *)info->observer)->f \
|
||||
&& ((Observer *)info->observer)->f args) \
|
||||
{ \
|
||||
r= 1; \
|
||||
plugin_unlock(thd, plugin); \
|
||||
sql_print_error("Run function '" #f "' in plugin '%s' failed", \
|
||||
info->plugin_int->name.str); \
|
||||
break; \
|
||||
} \
|
||||
plugin_unlock(thd, plugin); \
|
||||
} \
|
||||
unlock()
|
||||
unlock(); \
|
||||
/*
|
||||
Unlock plugins should be done after we released the Delegate lock
|
||||
to avoid possible deadlock when this is the last user of the
|
||||
plugin, and when we unlock the plugin, it will try to
|
||||
deinitialize the plugin, which will try to lock the Delegate in
|
||||
order to remove the observers.
|
||||
*/ \
|
||||
plugin_unlock_list(0, (plugin_ref*)plugins->buffer, \
|
||||
plugins->elements); \
|
||||
delete_dynamic(plugins)
|
||||
|
||||
|
||||
int Trans_delegate::after_commit(THD *thd, bool all)
|
||||
|
|
Loading…
Reference in a new issue