mirror of
https://github.com/MariaDB/server.git
synced 2025-04-16 20:25:33 +02:00
Merge MDEV-5754, MDEV-5769, and MDEV-5764 into 10.0
This commit is contained in:
commit
5ec49e6452
12 changed files with 292 additions and 66 deletions
BIN
mysql-test/std_data/mariadb-5.5-binlog.000001
Normal file
BIN
mysql-test/std_data/mariadb-5.5-binlog.000001
Normal file
Binary file not shown.
|
@ -62,6 +62,32 @@ slave-relay-bin.000007 # Query # # # Dummy ev
|
|||
slave-relay-bin.000007 # Table_map # # table_id: # (test.t1)
|
||||
slave-relay-bin.000007 # Write_rows_v1 # # table_id: # flags: STMT_END_F
|
||||
slave-relay-bin.000007 # Query # # COMMIT
|
||||
*** MDEV-5754: MySQL 5.5 slaves cannot replicate from MariaDB 10.0 ***
|
||||
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
|
||||
INSERT INTO t2 VALUES (1);
|
||||
SET debug_sync='now WAIT_FOR master_queued1';
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
|
||||
INSERT INTO t2 VALUES (2);
|
||||
SET debug_sync='now WAIT_FOR master_queued2';
|
||||
SET debug_sync='now SIGNAL master_cont1';
|
||||
SET debug_sync='RESET';
|
||||
SET debug_sync='RESET';
|
||||
SET debug_sync='RESET';
|
||||
include/show_binlog_events.inc
|
||||
Log_name Pos Event_type Server_id End_log_pos Info
|
||||
master-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=#
|
||||
master-bin.000003 # Table_map # # table_id: # (test.t2)
|
||||
master-bin.000003 # Write_rows_v1 # # table_id: # flags: STMT_END_F
|
||||
master-bin.000003 # Xid # # COMMIT /* XID */
|
||||
master-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=#
|
||||
master-bin.000003 # Table_map # # table_id: # (test.t2)
|
||||
master-bin.000003 # Write_rows_v1 # # table_id: # flags: STMT_END_F
|
||||
master-bin.000003 # Xid # # COMMIT /* XID */
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
a
|
||||
1
|
||||
2
|
||||
# Test that slave which cannot tolerate holes in binlog stream but
|
||||
# knows the event does not get dummy event
|
||||
include/stop_slave.inc
|
||||
|
@ -95,5 +121,5 @@ select @@global.replicate_annotate_row_events;
|
|||
set @@global.debug_dbug= @old_slave_dbug;
|
||||
Clean up.
|
||||
set @@global.binlog_checksum = @old_master_binlog_checksum;
|
||||
DROP TABLE t1;
|
||||
DROP TABLE t1, t2;
|
||||
include/rpl_end.inc
|
||||
|
|
27
mysql-test/suite/rpl/r/rpl_old_master.result
Normal file
27
mysql-test/suite/rpl/r/rpl_old_master.result
Normal file
|
@ -0,0 +1,27 @@
|
|||
include/master-slave.inc
|
||||
[connection master]
|
||||
include/stop_slave.inc
|
||||
include/rpl_stop_server.inc [server_number=1]
|
||||
include/rpl_start_server.inc [server_number=1]
|
||||
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
CHANGE MASTER TO master_host='127.0.0.1', master_port=SERVER_MYPORT_1, master_user='root', master_log_file='master-bin.000001', master_log_pos=4;
|
||||
include/start_slave.inc
|
||||
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
|
||||
INSERT INTO t2 VALUES (1);
|
||||
SELECT * FROM t1 ORDER BY a;
|
||||
a b
|
||||
1 1
|
||||
2 2
|
||||
3 4
|
||||
4 8
|
||||
5 16
|
||||
SELECT * FROM t2;
|
||||
a
|
||||
1
|
||||
include/stop_slave.inc
|
||||
SET GLOBAL slave_parallel_threads=@old_parallel;
|
||||
DROP TABLE t1;
|
||||
include/start_slave.inc
|
||||
DROP TABLE t2;
|
||||
include/rpl_end.inc
|
|
@ -1,6 +1,8 @@
|
|||
--source include/master-slave.inc
|
||||
--source include/have_debug.inc
|
||||
--source include/have_debug_sync.inc
|
||||
--source include/have_binlog_format_row.inc
|
||||
--source include/have_innodb.inc
|
||||
|
||||
connection master;
|
||||
|
||||
|
@ -71,6 +73,52 @@ let $binlog_start= 0;
|
|||
let $binlog_limit=7,5;
|
||||
--source include/show_relaylog_events.inc
|
||||
|
||||
|
||||
--echo *** MDEV-5754: MySQL 5.5 slaves cannot replicate from MariaDB 10.0 ***
|
||||
|
||||
# The problem was that for a group commit, we get commit id into the
|
||||
# GTID event, and there was a bug in the code that replaces GTID with
|
||||
# dummy that failed when commit id was present.
|
||||
#
|
||||
# So setup a group commit in InnoDB.
|
||||
|
||||
--connection master
|
||||
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
|
||||
let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1);
|
||||
let $binlog_start= query_get_value(SHOW MASTER STATUS, Position, 1);
|
||||
|
||||
--connect (con1,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
|
||||
send INSERT INTO t2 VALUES (1);
|
||||
|
||||
--connection master
|
||||
SET debug_sync='now WAIT_FOR master_queued1';
|
||||
|
||||
--connect (con2,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
|
||||
send INSERT INTO t2 VALUES (2);
|
||||
|
||||
--connection master
|
||||
SET debug_sync='now WAIT_FOR master_queued2';
|
||||
SET debug_sync='now SIGNAL master_cont1';
|
||||
|
||||
--connection con1
|
||||
REAP;
|
||||
SET debug_sync='RESET';
|
||||
--connection con2
|
||||
REAP;
|
||||
SET debug_sync='RESET';
|
||||
--connection master
|
||||
SET debug_sync='RESET';
|
||||
let $binlog_limit= 0, 8;
|
||||
--source include/show_binlog_events.inc
|
||||
--save_master_pos
|
||||
|
||||
--connection slave
|
||||
--sync_with_master
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
|
||||
|
||||
--echo # Test that slave which cannot tolerate holes in binlog stream but
|
||||
--echo # knows the event does not get dummy event
|
||||
|
||||
|
@ -106,6 +154,6 @@ set @@global.debug_dbug= @old_slave_dbug;
|
|||
--echo Clean up.
|
||||
connection master;
|
||||
set @@global.binlog_checksum = @old_master_binlog_checksum;
|
||||
DROP TABLE t1;
|
||||
DROP TABLE t1, t2;
|
||||
sync_slave_with_master;
|
||||
--source include/rpl_end.inc
|
||||
|
|
49
mysql-test/suite/rpl/t/rpl_old_master.test
Normal file
49
mysql-test/suite/rpl/t/rpl_old_master.test
Normal file
|
@ -0,0 +1,49 @@
|
|||
# Test replicating off old master.
|
||||
# We simulate old master by copying in pre-generated binlog files from earlier
|
||||
# server versions.
|
||||
|
||||
--source include/have_innodb.inc
|
||||
--source include/master-slave.inc
|
||||
|
||||
--connection slave
|
||||
--source include/stop_slave.inc
|
||||
|
||||
--connection master
|
||||
--let $datadir= `SELECT @@datadir`
|
||||
|
||||
--let $rpl_server_number= 1
|
||||
--source include/rpl_stop_server.inc
|
||||
|
||||
--remove_file $datadir/master-bin.000001
|
||||
--copy_file $MYSQL_TEST_DIR/std_data/mariadb-5.5-binlog.000001 $datadir/master-bin.000001
|
||||
|
||||
--let $rpl_server_number= 1
|
||||
--source include/rpl_start_server.inc
|
||||
|
||||
--source include/wait_until_connected_again.inc
|
||||
|
||||
--connection slave
|
||||
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
--replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1
|
||||
eval CHANGE MASTER TO master_host='127.0.0.1', master_port=$SERVER_MYPORT_1, master_user='root', master_log_file='master-bin.000001', master_log_pos=4;
|
||||
--source include/start_slave.inc
|
||||
|
||||
--connection master
|
||||
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
|
||||
INSERT INTO t2 VALUES (1);
|
||||
--save_master_pos
|
||||
|
||||
--connection slave
|
||||
--sync_with_master
|
||||
SELECT * FROM t1 ORDER BY a;
|
||||
SELECT * FROM t2;
|
||||
|
||||
--source include/stop_slave.inc
|
||||
SET GLOBAL slave_parallel_threads=@old_parallel;
|
||||
DROP TABLE t1;
|
||||
--source include/start_slave.inc
|
||||
|
||||
--connection master
|
||||
DROP TABLE t2;
|
||||
--source include/rpl_end.inc
|
|
@ -3736,9 +3736,14 @@ Query_log_event::begin_event(String *packet, ulong ev_offset,
|
|||
DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
|
||||
checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
|
||||
|
||||
/* Currently we only need to replace GTID event. */
|
||||
DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN);
|
||||
if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
|
||||
/*
|
||||
Currently we only need to replace GTID event.
|
||||
The length of GTID differs depending on whether it contains commit id.
|
||||
*/
|
||||
DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN ||
|
||||
data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2);
|
||||
if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN &&
|
||||
data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2)
|
||||
return 1;
|
||||
|
||||
flags= uint2korr(p + FLAGS_OFFSET);
|
||||
|
@ -3751,9 +3756,22 @@ Query_log_event::begin_event(String *packet, ulong ev_offset,
|
|||
int4store(q + Q_EXEC_TIME_OFFSET, 0);
|
||||
q[Q_DB_LEN_OFFSET]= 0;
|
||||
int2store(q + Q_ERR_CODE_OFFSET, 0);
|
||||
int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
|
||||
q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
|
||||
q+= Q_DATA_OFFSET + 1;
|
||||
if (data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
|
||||
{
|
||||
int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
|
||||
q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
|
||||
q+= Q_DATA_OFFSET + 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2);
|
||||
/* Put in an empty time_zone_str to take up the extra 2 bytes. */
|
||||
int2store(q + Q_STATUS_VARS_LEN_OFFSET, 2);
|
||||
q[Q_DATA_OFFSET]= Q_TIME_ZONE_CODE;
|
||||
q[Q_DATA_OFFSET+1]= 0; /* Zero length for empty time_zone_str */
|
||||
q[Q_DATA_OFFSET+2]= 0; /* Zero terminator for empty db */
|
||||
q+= Q_DATA_OFFSET + 3;
|
||||
}
|
||||
memcpy(q, "BEGIN", 5);
|
||||
|
||||
if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
|
||||
|
@ -6779,7 +6797,7 @@ Gtid_list_log_event::write(IO_CACHE *file)
|
|||
int
|
||||
Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
|
||||
{
|
||||
Relay_log_info const *rli= rgi->rli;
|
||||
Relay_log_info *rli= const_cast<Relay_log_info*>(rgi->rli);
|
||||
int ret;
|
||||
if (gl_flags & FLAG_IGN_GTIDS)
|
||||
{
|
||||
|
@ -6799,10 +6817,11 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
|
|||
{
|
||||
char str_buf[128];
|
||||
String str(str_buf, sizeof(str_buf), system_charset_info);
|
||||
const_cast<Relay_log_info*>(rli)->until_gtid_pos.to_string(&str);
|
||||
rli->until_gtid_pos.to_string(&str);
|
||||
sql_print_information("Slave SQL thread stops because it reached its"
|
||||
" UNTIL master_gtid_pos %s", str.c_ptr_safe());
|
||||
const_cast<Relay_log_info*>(rli)->abort_slave= true;
|
||||
rli->abort_slave= true;
|
||||
rli->stop_for_until= true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -3123,12 +3123,15 @@ public:
|
|||
<td>flags</td>
|
||||
<td>1 byte bitfield</td>
|
||||
<td>Bit 0 set indicates stand-alone event (no terminating COMMIT)</td>
|
||||
<td>Bit 1 set indicates group commit, and that commit id exists</td>
|
||||
</tr>
|
||||
|
||||
<tr>
|
||||
<td>Reserved</td>
|
||||
<td>6 bytes</td>
|
||||
<td>Reserved bytes, set to 0. Maybe be used for future expansion.</td>
|
||||
<td>Reserved (no group commit) / commit id (group commit) (see flags bit 1)</td>
|
||||
<td>6 bytes / 8 bytes</td>
|
||||
<td>Reserved bytes, set to 0. Maybe be used for future expansion (no
|
||||
group commit). OR commit id, same for all GTIDs in the same group
|
||||
commit (see flags bit 1).</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
|
|
|
@ -173,6 +173,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
|
|||
rgi->is_error= true;
|
||||
rgi->cleanup_context(thd, true);
|
||||
rgi->rli->abort_slave= true;
|
||||
rgi->rli->stop_for_until= false;
|
||||
mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
|
||||
mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
|
||||
rgi->rli->relay_log.signal_update();
|
||||
|
@ -1122,7 +1123,7 @@ rpl_parallel::find(uint32 domain_id)
|
|||
|
||||
|
||||
void
|
||||
rpl_parallel::wait_for_done(THD *thd)
|
||||
rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
|
||||
{
|
||||
struct rpl_parallel_entry *e;
|
||||
rpl_parallel_thread *rpt;
|
||||
|
@ -1152,9 +1153,13 @@ rpl_parallel::wait_for_done(THD *thd)
|
|||
started executing yet. So we set e->stop_count here and use it to
|
||||
decide in the worker threads whether to continue executing an event
|
||||
group or whether to skip it, when force_abort is set.
|
||||
|
||||
If we stop due to reaching the START SLAVE UNTIL condition, then we
|
||||
need to continue executing any queued events up to that point.
|
||||
*/
|
||||
e->force_abort= true;
|
||||
e->stop_count= e->count_committing_event_groups;
|
||||
e->stop_count= rli->stop_for_until ?
|
||||
e->count_queued_event_groups : e->count_committing_event_groups;
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
for (j= 0; j < e->rpl_thread_max; ++j)
|
||||
{
|
||||
|
@ -1190,6 +1195,30 @@ rpl_parallel::wait_for_done(THD *thd)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
This function handles the case where the SQL driver thread reached the
|
||||
START SLAVE UNTIL position; we stop queueing more events but continue
|
||||
processing remaining, already queued events; then use executes manual
|
||||
STOP SLAVE; then this function signals to worker threads that they
|
||||
should stop the processing of any remaining queued events.
|
||||
*/
|
||||
void
|
||||
rpl_parallel::stop_during_until()
|
||||
{
|
||||
struct rpl_parallel_entry *e;
|
||||
uint32 i;
|
||||
|
||||
for (i= 0; i < domain_hash.records; ++i)
|
||||
{
|
||||
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
|
||||
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
||||
if (e->force_abort)
|
||||
e->stop_count= e->count_committing_event_groups;
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool
|
||||
rpl_parallel::workers_idle()
|
||||
{
|
||||
|
@ -1230,11 +1259,12 @@ abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
|
|||
do_event() is executed by the sql_driver_thd thread.
|
||||
It's main purpose is to find a thread that can execute the query.
|
||||
|
||||
@retval false ok, event was accepted
|
||||
@retval true error
|
||||
@retval 0 ok, event was accepted
|
||||
@retval 1 error
|
||||
@retval -1 event should be executed serially, in the sql driver thread
|
||||
*/
|
||||
|
||||
bool
|
||||
int
|
||||
rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
ulonglong event_size)
|
||||
{
|
||||
|
@ -1248,6 +1278,32 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||
bool did_enter_cond= false;
|
||||
PSI_stage_info old_stage;
|
||||
|
||||
/* Handle master log name change, seen in Rotate_log_event. */
|
||||
typ= ev->get_type_code();
|
||||
if (unlikely(typ == ROTATE_EVENT))
|
||||
{
|
||||
Rotate_log_event *rev= static_cast<Rotate_log_event *>(ev);
|
||||
if ((rev->server_id != global_system_variables.server_id ||
|
||||
rli->replicate_same_server_id) &&
|
||||
!rev->is_relay_log_event() &&
|
||||
!rli->is_in_group())
|
||||
{
|
||||
memcpy(rli->future_event_master_log_name,
|
||||
rev->new_log_ident, rev->ident_len+1);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Execute queries non-parallel if slave_skip_counter is set, as it's is
|
||||
easier to skip queries in single threaded mode.
|
||||
*/
|
||||
if (rli->slave_skip_counter)
|
||||
return -1;
|
||||
|
||||
/* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
|
||||
if (unlikely(!current) && typ != GTID_EVENT)
|
||||
return -1;
|
||||
|
||||
/* ToDo: what to do with this lock?!? */
|
||||
mysql_mutex_unlock(&rli->data_lock);
|
||||
|
||||
|
@ -1259,21 +1315,20 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||
been partially queued, but after that we will just ignore any further
|
||||
events the SQL driver thread may try to queue, and eventually it will stop.
|
||||
*/
|
||||
if (((typ= ev->get_type_code()) == GTID_EVENT ||
|
||||
!(is_group_event= Log_event::is_group_event(typ))) &&
|
||||
rli->abort_slave)
|
||||
is_group_event= Log_event::is_group_event(typ);
|
||||
if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave)
|
||||
sql_thread_stopping= true;
|
||||
if (sql_thread_stopping)
|
||||
{
|
||||
delete ev;
|
||||
/*
|
||||
Return false ("no error"); normal stop is not an error, and otherwise the
|
||||
error has already been recorded.
|
||||
Return "no error"; normal stop is not an error, and otherwise the error
|
||||
has already been recorded.
|
||||
*/
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (typ == GTID_EVENT || unlikely(!current))
|
||||
if (typ == GTID_EVENT)
|
||||
{
|
||||
uint32 domain_id;
|
||||
if (likely(typ == GTID_EVENT))
|
||||
|
@ -1288,7 +1343,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||
{
|
||||
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
|
||||
delete ev;
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
current= e;
|
||||
}
|
||||
|
@ -1307,7 +1362,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||
{
|
||||
/* This means we were killed. The error is already signalled. */
|
||||
delete ev;
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!(qev= cur_thread->get_qev(ev, event_size, rli)))
|
||||
|
@ -1315,7 +1370,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
||||
&did_enter_cond, &old_stage);
|
||||
delete ev;
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (typ == GTID_EVENT)
|
||||
|
@ -1328,7 +1383,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
||||
&did_enter_cond, &old_stage);
|
||||
delete ev;
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1366,7 +1421,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
||||
&did_enter_cond, &old_stage);
|
||||
delete ev;
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
e->current_gco= rgi->gco= gco;
|
||||
}
|
||||
|
@ -1380,7 +1435,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||
e->current_sub_id= rgi->gtid_sub_id;
|
||||
++e->count_queued_event_groups;
|
||||
}
|
||||
else if (!is_group_event || !e)
|
||||
else if (!is_group_event)
|
||||
{
|
||||
my_off_t log_pos;
|
||||
int err;
|
||||
|
@ -1389,38 +1444,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
|
||||
Same for events not preceeded by GTID (we should not see those normally,
|
||||
but they might be from an old master).
|
||||
|
||||
The variable `e' is NULL for the case where the master did not
|
||||
have GTID, like a MariaDB 5.5 or MySQL master.
|
||||
*/
|
||||
qev->rgi= serial_rgi;
|
||||
/* Handle master log name change, seen in Rotate_log_event. */
|
||||
if (typ == ROTATE_EVENT)
|
||||
{
|
||||
Rotate_log_event *rev= static_cast<Rotate_log_event *>(qev->ev);
|
||||
if ((rev->server_id != global_system_variables.server_id ||
|
||||
rli->replicate_same_server_id) &&
|
||||
!rev->is_relay_log_event() &&
|
||||
!rli->is_in_group())
|
||||
{
|
||||
memcpy(rli->future_event_master_log_name,
|
||||
rev->new_log_ident, rev->ident_len+1);
|
||||
}
|
||||
}
|
||||
|
||||
tmp= serial_rgi->is_parallel_exec;
|
||||
serial_rgi->is_parallel_exec= true;
|
||||
err= rpt_handle_event(qev, NULL);
|
||||
serial_rgi->is_parallel_exec= tmp;
|
||||
log_pos= qev->ev->log_pos;
|
||||
delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev);
|
||||
log_pos= ev->log_pos;
|
||||
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
|
||||
|
||||
if (err)
|
||||
{
|
||||
cur_thread->free_qev(qev);
|
||||
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
|
||||
&did_enter_cond, &old_stage);
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
/*
|
||||
Queue an empty event, so that the position will be updated in a
|
||||
|
@ -1451,5 +1490,5 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||
&did_enter_cond, &old_stage);
|
||||
mysql_cond_signal(&cur_thread->COND_rpl_thread);
|
||||
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -222,10 +222,10 @@ struct rpl_parallel {
|
|||
~rpl_parallel();
|
||||
void reset();
|
||||
rpl_parallel_entry *find(uint32 domain_id);
|
||||
void wait_for_done(THD *thd);
|
||||
void wait_for_done(THD *thd, Relay_log_info *rli);
|
||||
void stop_during_until();
|
||||
bool workers_idle();
|
||||
bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
ulonglong event_size);
|
||||
int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size);
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -60,7 +60,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
|
|||
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
|
||||
last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
|
||||
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
|
||||
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
|
||||
inited(0), abort_slave(0), stop_for_until(0),
|
||||
slave_running(0), until_condition(UNTIL_NONE),
|
||||
until_log_pos(0), retried_trans(0), executed_entries(0),
|
||||
m_flags(0)
|
||||
{
|
||||
|
|
|
@ -262,6 +262,7 @@ public:
|
|||
*/
|
||||
volatile bool inited;
|
||||
volatile bool abort_slave;
|
||||
volatile bool stop_for_until;
|
||||
volatile uint slave_running;
|
||||
|
||||
/*
|
||||
|
|
33
sql/slave.cc
33
sql/slave.cc
|
@ -615,7 +615,14 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
|
|||
if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
|
||||
{
|
||||
DBUG_PRINT("info",("Terminating SQL thread"));
|
||||
mi->rli.abort_slave=1;
|
||||
if (opt_slave_parallel_threads > 0 &&
|
||||
mi->rli.abort_slave && mi->rli.stop_for_until)
|
||||
{
|
||||
mi->rli.stop_for_until= false;
|
||||
mi->rli.parallel.stop_during_until();
|
||||
}
|
||||
else
|
||||
mi->rli.abort_slave=1;
|
||||
if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock,
|
||||
&mi->rli.stop_cond,
|
||||
&mi->rli.slave_running,
|
||||
|
@ -3427,6 +3434,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
|
|||
message about error in query execution to be printed.
|
||||
*/
|
||||
rli->abort_slave= 1;
|
||||
rli->stop_for_until= true;
|
||||
mysql_mutex_unlock(&rli->data_lock);
|
||||
delete ev;
|
||||
DBUG_RETURN(1);
|
||||
|
@ -3454,13 +3462,17 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
|
|||
|
||||
update_state_of_relay_log(rli, ev);
|
||||
|
||||
/*
|
||||
Execute queries in parallel, except if slave_skip_counter is set,
|
||||
as it's is easier to skip queries in single threaded mode.
|
||||
*/
|
||||
|
||||
if (opt_slave_parallel_threads > 0 && rli->slave_skip_counter == 0)
|
||||
DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, event_size));
|
||||
if (opt_slave_parallel_threads > 0)
|
||||
{
|
||||
int res= rli->parallel.do_event(serial_rgi, ev, event_size);
|
||||
if (res >= 0)
|
||||
DBUG_RETURN(res);
|
||||
/*
|
||||
Else we proceed to execute the event non-parallel.
|
||||
This is the case for pre-10.0 events without GTID, and for handling
|
||||
slave_skip_counter.
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
For GTID, allocate a new sub_id for the given domain_id.
|
||||
|
@ -4371,6 +4383,7 @@ pthread_handler_t handle_slave_sql(void *arg)
|
|||
Seconds_Behind_Master grows. No big deal.
|
||||
*/
|
||||
rli->abort_slave = 0;
|
||||
rli->stop_for_until= false;
|
||||
mysql_mutex_unlock(&rli->run_lock);
|
||||
mysql_cond_broadcast(&rli->start_cond);
|
||||
|
||||
|
@ -4542,7 +4555,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
|
|||
}
|
||||
|
||||
if (opt_slave_parallel_threads > 0)
|
||||
rli->parallel.wait_for_done(thd);
|
||||
rli->parallel.wait_for_done(thd, rli);
|
||||
|
||||
/* Thread stopped. Print the current replication position to the log */
|
||||
{
|
||||
|
@ -4568,7 +4581,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
|
|||
get the correct position printed.)
|
||||
*/
|
||||
if (opt_slave_parallel_threads > 0)
|
||||
rli->parallel.wait_for_done(thd);
|
||||
rli->parallel.wait_for_done(thd, rli);
|
||||
|
||||
/*
|
||||
Some events set some playgrounds, which won't be cleared because thread
|
||||
|
|
Loading…
Add table
Reference in a new issue