MDEV-23691 S3 storage engine: delayed slave can drop the table

This commit fixed the problems with S3 after the "DROP TABLE FORCE" changes.
It also fixes all failing replication S3 tests.

A slave is delayed if it is trying to execute replicated queries on a
table that is already converted to S3 by the master later in the binlog.

Fixes for replication events on S3 tables for delayed slaves:
- INSERT and INSERT ... SELECT and CREATE TABLE are ignored but written
  to the binary log.   UPDATE & DELETE will be fixed in a future commit.

Other things:
- On slaves with --s3-slave-ignore-updates set, allow S3 tables to be
  opened in read-write mode. This was done to be able to
  ignore-but-replicate queries like insert.  Without this change any
  open of an S3 table failed with 'Table is read only' which is too
  early to be able to replicate the original query.
- Errors are now printed if handler::extra() call fails in
  wait_while_tables_are_used().
- Error message for row changes are changed from HA_ERR_WRONG_COMMAND
  to HA_ERR_TABLE_READONLY.
- Disable some maria_extra() calls for S3 tables. This could cause
  S3 tables to fail in some cases.
- Added missing thr_lock_delete() to ma_open() in case of failure.
- Removed from mysql_prepare_insert() the not needed argument 'table'.
This commit is contained in:
Monty 2020-09-13 15:45:41 +03:00
parent 5902d5e0eb
commit 71d263a198
24 changed files with 539 additions and 82 deletions

View file

@ -25,6 +25,14 @@ t1 CREATE TABLE `t1` (
`a` int(11) DEFAULT NULL,
`b` int(11) DEFAULT NULL
) ENGINE=Aria DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 COMMENT='hello'
alter table t1 engine=s3;
alter table t1 engine=innodb;
show create table t1;
Table Create Table
t1 CREATE TABLE `t1` (
`a` int(11) DEFAULT NULL,
`b` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1 COMMENT='hello'
select count(*), sum(a), sum(b) from t1;
count(*) sum(a) sum(b)
1000 500500 510500

View file

@ -1,5 +1,6 @@
--source include/have_s3.inc
--source include/have_sequence.inc
--source include/have_innodb.inc
#
# Create unique database for running the tests
@ -21,6 +22,9 @@ alter table t1 comment="hello";
show create table t1;
alter table t1 engine=aria;
show create table t1;
alter table t1 engine=s3;
alter table t1 engine=innodb;
show create table t1;
select count(*), sum(a), sum(b) from t1;
drop table t1;

View file

@ -63,7 +63,7 @@ count(*)
ALTER TABLE t2 CHECK PARTITION p3;
Table Op Msg_type Msg_text
s3.t2 check error Subpartition p3sp0 returned error
s3.t2 check error Unknown - internal error 131 during operation
s3.t2 check error Unknown - internal error 165 during operation
SELECT count(*) FROM t2;
count(*)
6

View file

@ -20,6 +20,8 @@ connection master;
create table t1 (a int, b int) engine=aria;
insert into t1 select seq,seq+10 from seq_1_to_10;
sync_slave_with_master;
connection master;
alter table t1 engine=s3;
show create table t1;
@ -116,12 +118,18 @@ connection slave;
stop slave;
connection master;
rename table t1 to t2;
create table t1 (a int, b int) engine=aria;
# Check the different create options with the table
create table t1 (a int) engine=aria;
drop table t1;
create table if not exists t1 (a int, b int) engine=aria;
drop table t1;
create or replace table t1 (a int, b int, c int) engine=aria;
alter table t1 engine=s3;
connection slave;
start slave;
connection master;
sync_slave_with_master;
show create table t1;
select * from t1 limit 2;
select * from t2 limit 2;
connection master;

View file

@ -0,0 +1,3 @@
!include ../rpl/my.cnf
!include ./my.cnf
!include ./slave.cnf

View file

@ -0,0 +1,124 @@
include/master-slave.inc
[connection master]
set binlog_format=mixed;
RESET MASTER;
connection slave;
set binlog_format=mixed;
RESET MASTER;
connection master;
connection slave;
use database;
connection master;
#
# MDEV-23691 S3 storage engine: delayed slave can drop the table
#
connection slave;
stop slave;
connection master;
create /*or replace*/ table t100 (
pk varchar(100)
) engine = 'innodb';
insert into t100 values ('old data');
alter table t100 engine=s3;
drop table t100;
create /*or replace*/ table t100 (
pk varchar(100)
) engine= innodb;
insert into t100 select 'new data' from seq_1_to_10;
alter table t100 engine=s3;
select count(*), 'before slave start' from t100;
count(*) before slave start
10 before slave start
connection slave;
start slave;
connection master;
connection slave;
connection master;
flush tables;
select count(*), 'after slave start' from t100;
count(*) after slave start
10 after slave start
show create table t100;
Table Create Table
t100 CREATE TABLE `t100` (
`pk` varchar(100) DEFAULT NULL
) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1
connection slave;
select count(*) from t100;
count(*)
10
connection master;
drop table t100;
#
# Test delayed slave with inserts
#
connection slave;
stop slave;
connection master;
create table t1 (a int) engine=innodb;
insert into t1 values (1),(2),(3);
insert into t1 select * from seq_4_to_6;
alter table t1 engine=s3;
connection slave;
start slave;
connection master;
connection slave;
select * from t1;
a
1
2
3
4
5
6
connection master;
drop table t1;
#
# Check slave binary log
#
connection slave;
include/show_binlog_events.inc
Log_name Pos Event_type Server_id End_log_pos Info
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # create database database
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; create /*or replace*/ table t100 (
pk varchar(100)
) engine = 'innodb'
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `database`; insert into t100 values ('old data')
slave-bin.000001 # Query # # COMMIT
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; alter table t100 engine=s3
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; DROP TABLE IF EXISTS `t100` /* generated by server */
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; create /*or replace*/ table t100 (
pk varchar(100)
) engine= innodb
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `database`; insert into t100 select 'new data' from seq_1_to_10
slave-bin.000001 # Query # # COMMIT
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; alter table t100 engine=s3
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; flush tables
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; DROP TABLE IF EXISTS `t100` /* generated by server */
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; create table t1 (a int) engine=innodb
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `database`; insert into t1 values (1),(2),(3)
slave-bin.000001 # Query # # COMMIT
slave-bin.000001 # Gtid # # BEGIN GTID #-#-#
slave-bin.000001 # Query # # use `database`; insert into t1 select * from seq_4_to_6
slave-bin.000001 # Query # # COMMIT
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; alter table t1 engine=s3
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; DROP TABLE IF EXISTS `t1` /* generated by server */
connection master;
#
# clean up
#
include/rpl_end.inc

View file

@ -0,0 +1,115 @@
--source include/have_s3.inc
--source include/have_innodb.inc
--source include/have_binlog_format_mixed.inc
--source include/master-slave.inc
--source include/have_sequence.inc
# First clear the binlog
set binlog_format=mixed;
RESET MASTER;
connection slave;
set binlog_format=mixed;
RESET MASTER;
connection master;
#
# Create unique database for running the tests
#
--source create_database.inc
sync_slave_with_master;
--replace_result $database database
--eval use $database
connection master;
--echo #
--echo # MDEV-23691 S3 storage engine: delayed slave can drop the table
--echo #
connection slave;
stop slave;
connection master;
#
# Create version 1 of the table
#
create /*or replace*/ table t100 (
pk varchar(100)
) engine = 'innodb';
insert into t100 values ('old data');
alter table t100 engine=s3;
#
# Create version 2 of the table
#
drop table t100;
create /*or replace*/ table t100 (
pk varchar(100)
) engine= innodb;
insert into t100 select 'new data' from seq_1_to_10;
alter table t100 engine=s3;
select count(*), 'before slave start' from t100;
#
# Now, start the slave
#
connection slave;
start slave;
connection master;
sync_slave_with_master;
#select count(*) from t100;
connection master;
flush tables;
select count(*), 'after slave start' from t100;
show create table t100;
connection slave;
select count(*) from t100;
connection master;
drop table t100;
--echo #
--echo # Test delayed slave with inserts
--echo #
# Stop slave
connection slave;
stop slave;
connection master;
# Create tables with data while slave is stopped
create table t1 (a int) engine=innodb;
insert into t1 values (1),(2),(3);
insert into t1 select * from seq_4_to_6;
alter table t1 engine=s3;
connection slave;
start slave;
connection master;
sync_slave_with_master;
select * from t1;
connection master;
drop table t1;
--echo #
--echo # Check slave binary log
--echo #
sync_slave_with_master;
--let $binlog_database=$database
--source include/show_binlog_events.inc
connection master;
--echo #
--echo # clean up
--echo #
--source drop_database.inc
--source include/rpl_end.inc

View file

@ -13,6 +13,8 @@ connection master;
#
create table t1 (a int, b int) engine=aria;
insert into t1 select seq,seq+10 from seq_1_to_10;
connection slave;
connection master;
alter table t1 engine=s3;
show create table t1;
Table Create Table
@ -134,14 +136,25 @@ connection slave;
stop slave;
connection master;
rename table t1 to t2;
create table t1 (a int, b int) engine=aria;
create table t1 (a int) engine=aria;
drop table t1;
create table if not exists t1 (a int, b int) engine=aria;
drop table t1;
create or replace table t1 (a int, b int, c int) engine=aria;
alter table t1 engine=s3;
connection slave;
start slave;
connection master;
connection slave;
show create table t1;
Table Create Table
t1 CREATE TABLE `t1` (
`a` int(11) DEFAULT NULL,
`b` int(11) DEFAULT NULL,
`c` int(11) DEFAULT NULL
) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1
select * from t1 limit 2;
a b
a b c
select * from t2 limit 2;
a b f
1 11 NULL
@ -238,7 +251,15 @@ slave-bin.000001 # Query # # use `database`; set @@sql_if_exists=1; alter table
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; set @@sql_if_exists=1; rename table t1 to t2
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; create table t1 (a int, b int) engine=aria
slave-bin.000001 # Query # # use `database`; create table t1 (a int) engine=aria
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; DROP TABLE IF EXISTS `t1` /* generated by server */
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; create table if not exists t1 (a int, b int) engine=aria
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; DROP TABLE IF EXISTS `t1` /* generated by server */
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; create or replace table t1 (a int, b int, c int) engine=aria
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; alter table t1 engine=s3
slave-bin.000001 # Gtid # # GTID #-#-#

View file

@ -13,6 +13,8 @@ connection master;
#
create table t1 (a int, b int) engine=aria;
insert into t1 select seq,seq+10 from seq_1_to_10;
connection slave;
connection master;
alter table t1 engine=s3;
show create table t1;
Table Create Table
@ -134,14 +136,25 @@ connection slave;
stop slave;
connection master;
rename table t1 to t2;
create table t1 (a int, b int) engine=aria;
create table t1 (a int) engine=aria;
drop table t1;
create table if not exists t1 (a int, b int) engine=aria;
drop table t1;
create or replace table t1 (a int, b int, c int) engine=aria;
alter table t1 engine=s3;
connection slave;
start slave;
connection master;
connection slave;
show create table t1;
Table Create Table
t1 CREATE TABLE `t1` (
`a` int(11) DEFAULT NULL,
`b` int(11) DEFAULT NULL,
`c` int(11) DEFAULT NULL
) ENGINE=S3 DEFAULT CHARSET=latin1 PAGE_CHECKSUM=1
select * from t1 limit 2;
a b
a b c
select * from t2 limit 2;
a b f
1 11 NULL
@ -238,7 +251,15 @@ slave-bin.000001 # Query # # use `database`; set @@sql_if_exists=1; alter table
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; set @@sql_if_exists=1; rename table t1 to t2
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; create table t1 (a int, b int) engine=aria
slave-bin.000001 # Query # # use `database`; create table t1 (a int) engine=aria
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; DROP TABLE IF EXISTS `t1` /* generated by server */
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; create table if not exists t1 (a int, b int) engine=aria
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; DROP TABLE IF EXISTS `t1` /* generated by server */
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; create or replace table t1 (a int, b int, c int) engine=aria
slave-bin.000001 # Gtid # # GTID #-#-#
slave-bin.000001 # Query # # use `database`; alter table t1 engine=s3
slave-bin.000001 # Gtid # # GTID #-#-#

View file

@ -2764,6 +2764,9 @@ int ha_delete_table(THD *thd, handlerton *hton, const char *path,
if (hton == NULL || hton == view_pseudo_hton)
DBUG_RETURN(0);
if (ha_check_if_updates_are_ignored(thd, hton, "DROP"))
DBUG_RETURN(0);
error= hton->drop_table(hton, path);
if (error > 0)
{
@ -2801,7 +2804,8 @@ int ha_delete_table(THD *thd, handlerton *hton, const char *path,
error= -1;
}
}
if (error)
DBUG_PRINT("exit", ("error: %d", error));
DBUG_RETURN(error);
}
@ -5008,7 +5012,8 @@ static my_bool delete_table_force(THD *thd, plugin_ref plugin, void *arg)
handlerton *hton = plugin_hton(plugin);
st_force_drop_table_params *param = (st_force_drop_table_params *)arg;
if (param->discovering == (hton->discover_table != NULL))
if (param->discovering == (hton->discover_table != NULL) &&
!(thd->slave_thread && (hton->flags & HTON_IGNORE_UPDATES)))
{
int error;
error= ha_delete_table(thd, hton, param->path, param->db, param->alias, 0);

View file

@ -2151,6 +2151,12 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
DBUG_RETURN(0);
}
/*
This is true if we are doing an alter table that is replicated as
CREATE TABLE ... SELECT
*/
if (thd->variables.option_bits & OPTION_BIN_COMMIT_OFF)
DBUG_RETURN(0);
DBUG_PRINT("debug",
("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",

View file

@ -1316,7 +1316,12 @@ bool wait_while_table_is_used(THD *thd, TABLE *table,
table->s->tdc->flush(thd, true);
/* extra() call must come only after all instances above are closed */
if (function != HA_EXTRA_NOT_USED)
DBUG_RETURN(table->file->extra(function));
{
int error= table->file->extra(function);
if (error)
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
DBUG_RETURN(FALSE);
}
@ -3907,7 +3912,8 @@ static bool upgrade_lock_if_not_exists(THD *thd,
{
DEBUG_SYNC(thd,"create_table_before_check_if_exists");
if (!create_info.or_replace() &&
ha_table_exists(thd, &create_table->db, &create_table->table_name))
ha_table_exists(thd, &create_table->db, &create_table->table_name,
&create_table->db_type))
{
if (create_info.if_not_exists())
{
@ -3955,7 +3961,8 @@ static bool upgrade_lock_if_not_exists(THD *thd,
Note that for CREATE TABLE IF EXISTS we only generate a warning
but still return TRUE (to abort the calling open_table() function).
On must check THD->is_error() if one wants to distinguish between warning
and error.
and error. If table existed, tables_start->db_type is set to the handlerton
for the found table.
*/
bool

View file

@ -7477,6 +7477,38 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
DBUG_RETURN(0);
}
/**
Binlog current query as a statement, ignoring the binlog filter setting.
The filter is in decide_logging_format() to mark queries to not be stored
in the binary log, for example by a shared distributed engine like S3.
This function resets the filter to ensure the the query is logged if
the binlog is active.
Note that 'direct' is set to false, which means that the query will
not be directly written to the binary log but instead to the cache.
@retval false ok
@retval true error
*/
bool THD::binlog_current_query_unfiltered()
{
if (!mysql_bin_log.is_open())
return 0;
reset_binlog_local_stmt_filter();
clear_binlog_local_stmt_filter();
return binlog_query(THD::STMT_QUERY_TYPE, query(), query_length(),
/* is_trans */ FALSE,
/* direct */ FALSE,
/* suppress_use */ FALSE,
/* Error */ 0) > 0;
}
void
THD::wait_for_wakeup_ready()
{

View file

@ -3491,6 +3491,7 @@ public:
char const *query, ulong query_len, bool is_trans,
bool direct, bool suppress_use,
int errcode);
bool binlog_current_query_unfiltered();
#endif
inline void

View file

@ -724,6 +724,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
Item *unused_conds= 0;
DBUG_ENTER("mysql_insert");
bzero((char*) &info,sizeof(info));
create_explain_query(thd->lex, thd->mem_root);
/*
Upgrade lock type if the requested lock is incompatible with
@ -764,16 +765,28 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
DBUG_RETURN(TRUE);
value_count= values->elements;
if (mysql_prepare_insert(thd, table_list, table, fields, values,
update_fields, update_values, duplic,
&unused_conds, FALSE))
if ((res= mysql_prepare_insert(thd, table_list, fields, values,
update_fields, update_values, duplic,
&unused_conds, FALSE)))
{
retval= thd->is_error();
if (res < 0)
{
/*
Insert should be ignored but we have to log the query in statement
format in the binary log
*/
if (thd->binlog_current_query_unfiltered())
retval= 1;
}
goto abort;
}
/* mysql_prepare_insert sets table_list->table if it was not set */
table= table_list->table;
/* Prepares LEX::returing_list if it is not empty */
if (returning)
result->prepare(returning->item_list, NULL);
/* mysql_prepare_insert sets table_list->table if it was not set */
table= table_list->table;
context= &thd->lex->first_select_lex()->context;
/*
@ -828,7 +841,6 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
/*
Fill in the given fields and dump it to the table file
*/
bzero((char*) &info,sizeof(info));
info.ignore= ignore;
info.handle_duplicates=duplic;
info.update_fields= &update_fields;
@ -1534,9 +1546,6 @@ static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
mysql_prepare_insert()
thd Thread handler
table_list Global/local table list
table Table to insert into
(can be NULL if table should
be taken from table_list->table)
where Where clause (for insert ... select)
select_insert TRUE if INSERT ... SELECT statement
@ -1551,15 +1560,16 @@ static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
before releasing the table object.
RETURN VALUE
FALSE OK
TRUE error
0 OK
>0 error
<0 insert should be ignored
*/
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
TABLE *table, List<Item> &fields, List_item *values,
List<Item> &update_fields, List<Item> &update_values,
enum_duplicates duplic, COND **where,
bool select_insert)
int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
List<Item> &fields, List_item *values,
List<Item> &update_fields, List<Item> &update_values,
enum_duplicates duplic, COND **where,
bool select_insert)
{
SELECT_LEX *select_lex= thd->lex->first_select_lex();
Name_resolution_context *context= &select_lex->context;
@ -1567,29 +1577,34 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
bool insert_into_view= (table_list->view != 0);
bool res= 0;
table_map map= 0;
TABLE *table;
DBUG_ENTER("mysql_prepare_insert");
DBUG_PRINT("enter", ("table_list: %p table: %p view: %d",
table_list, table,
(int)insert_into_view));
DBUG_PRINT("enter", ("table_list: %p view: %d",
table_list, (int) insert_into_view));
/* INSERT should have a SELECT or VALUES clause */
DBUG_ASSERT (!select_insert || !values);
if (mysql_handle_derived(thd->lex, DT_INIT))
DBUG_RETURN(TRUE);
DBUG_RETURN(1);
if (table_list->handle_derived(thd->lex, DT_MERGE_FOR_INSERT))
DBUG_RETURN(TRUE);
DBUG_RETURN(1);
if (thd->lex->handle_list_of_derived(table_list, DT_PREPARE))
DBUG_RETURN(TRUE);
DBUG_RETURN(1);
if (duplic == DUP_UPDATE)
{
/* it should be allocated before Item::fix_fields() */
if (table_list->set_insert_values(thd->mem_root))
DBUG_RETURN(TRUE);
DBUG_RETURN(1);
}
table= table_list->table;
if (table->file->check_if_updates_are_ignored("INSERT"))
DBUG_RETURN(-1);
if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
DBUG_RETURN(TRUE);
DBUG_RETURN(1);
/* Prepare the fields in the statement. */
if (values)
@ -1632,9 +1647,6 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
if (res)
DBUG_RETURN(res);
if (!table)
table= table_list->table;
if (check_duplic_insert_without_overlaps(thd, table, duplic) != 0)
DBUG_RETURN(true);
@ -1642,7 +1654,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
{
// Additional memory may be required to create historical items.
if (table_list->set_insert_values(thd->mem_root))
DBUG_RETURN(TRUE);
DBUG_RETURN(1);
}
if (!select_insert)
@ -1653,7 +1665,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
CHECK_DUP_ALLOW_DIFFERENT_ALIAS)))
{
update_non_unique_table_error(table_list, "INSERT", duplicate);
DBUG_RETURN(TRUE);
DBUG_RETURN(1);
}
select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
}
@ -1663,7 +1675,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
*/
if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
prepare_for_positional_update(table, table_list);
DBUG_RETURN(FALSE);
DBUG_RETURN(0);
}
@ -3703,12 +3715,14 @@ bool Delayed_insert::handle_inserts(void)
thd thread handler
RETURN
FALSE OK
TRUE Error
0 OK
> 0 Error
< 0 Ok, ignore insert
*/
bool mysql_insert_select_prepare(THD *thd, select_result *sel_res)
int mysql_insert_select_prepare(THD *thd, select_result *sel_res)
{
int res;
LEX *lex= thd->lex;
SELECT_LEX *select_lex= lex->first_select_lex();
DBUG_ENTER("mysql_insert_select_prepare");
@ -3718,11 +3732,11 @@ bool mysql_insert_select_prepare(THD *thd, select_result *sel_res)
clause if table is VIEW
*/
if (mysql_prepare_insert(thd, lex->query_tables,
lex->query_tables->table, lex->field_list, 0,
lex->update_list, lex->value_list, lex->duplicates,
&select_lex->where, TRUE))
DBUG_RETURN(TRUE);
if ((res= mysql_prepare_insert(thd, lex->query_tables, lex->field_list, 0,
lex->update_list, lex->value_list,
lex->duplicates,
&select_lex->where, TRUE)))
DBUG_RETURN(res);
/*
If sel_res is not empty, it means we have items in returing_list.
@ -3763,7 +3777,7 @@ bool mysql_insert_select_prepare(THD *thd, select_result *sel_res)
while ((table= ti++) && insert_tables--)
ti.remove();
DBUG_RETURN(FALSE);
DBUG_RETURN(0);
}

View file

@ -23,11 +23,11 @@
typedef List<Item> List_item;
typedef struct st_copy_info COPY_INFO;
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, TABLE *table,
List<Item> &fields, List_item *values,
List<Item> &update_fields,
List<Item> &update_values, enum_duplicates duplic,
COND **where, bool select_insert);
int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
List<Item> &fields, List_item *values,
List<Item> &update_fields,
List<Item> &update_values, enum_duplicates duplic,
COND **where, bool select_insert);
bool mysql_insert(THD *thd,TABLE_LIST *table,List<Item> &fields,
List<List_item> &values, List<Item> &update_fields,
List<Item> &update_values, enum_duplicates flag,

View file

@ -4717,6 +4717,14 @@ mysql_execute_command(THD *thd)
}
delete sel_result;
}
else if (res < 0)
{
/*
Insert should be ignored but we have to log the query in statement
format in the binary log
*/
res= thd->binlog_current_query_unfiltered();
}
delete result;
if (save_protocol)
{

View file

@ -41,7 +41,7 @@ bool multi_update_precheck(THD *thd, TABLE_LIST *tables);
bool multi_delete_precheck(THD *thd, TABLE_LIST *tables);
int mysql_multi_update_prepare(THD *thd);
int mysql_multi_delete_prepare(THD *thd);
bool mysql_insert_select_prepare(THD *thd,select_result *sel_res);
int mysql_insert_select_prepare(THD *thd,select_result *sel_res);
bool update_precheck(THD *thd, TABLE_LIST *tables);
bool delete_precheck(THD *thd, TABLE_LIST *tables);
bool insert_precheck(THD *thd, TABLE_LIST *tables);

View file

@ -1243,9 +1243,8 @@ static bool mysql_test_insert(Prepared_statement *stmt,
table_list->table->insert_values=(uchar *)1;
}
if (mysql_prepare_insert(thd, table_list, table_list->table,
fields, values, update_fields, update_values,
duplic, &unused_conds, FALSE))
if (mysql_prepare_insert(thd, table_list, fields, values, update_fields,
update_values, duplic, &unused_conds, FALSE))
goto error;
value_count= values->elements;

View file

@ -5079,6 +5079,7 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
int error= 1;
bool frm_only= create_table_mode == C_ALTER_TABLE_FRM_ONLY;
bool internal_tmp_table= create_table_mode == C_ALTER_TABLE || frm_only;
handlerton *exists_hton;
DBUG_ENTER("mysql_create_table_no_lock");
DBUG_PRINT("enter", ("db: '%s' table: '%s' tmp: %d path: %s",
db.str, table_name.str, internal_tmp_table, path));
@ -5167,8 +5168,16 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
goto err;
}
if (!internal_tmp_table && ha_table_exists(thd, &db, &table_name))
if (!internal_tmp_table && ha_table_exists(thd, &db, &table_name,
&exists_hton))
{
if (ha_check_if_updates_are_ignored(thd, exists_hton, "CREATE"))
{
/* Don't create table. CREATE will still be logged in binary log */
error= 0;
goto err;
}
if (options.or_replace())
{
(void) delete_statistics_for_table(thd, &db, &table_name);
@ -5206,7 +5215,20 @@ int create_table_impl(THD *thd, const LEX_CSTRING &orig_db,
goto err;
}
else if (options.if_not_exists())
{
/*
We never come here as part of normal create table as table existance
is checked in open_and_lock_tables(). We may come here as part of
ALTER TABLE when converting a table for a distributed engine to a
a local one.
*/
/* Log CREATE IF NOT EXISTS on slave for distributed engines */
if (thd->slave_thread && (exists_hton && exists_hton->flags &
HTON_IGNORE_UPDATES))
thd->log_current_statement= 1;
goto warn;
}
else
{
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name.str);
@ -5437,12 +5459,21 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
thd->lex->create_info.options|= create_info->options;
/* Open or obtain an exclusive metadata lock on table being created */
create_table->db_type= 0;
result= open_and_lock_tables(thd, *create_info, create_table, FALSE, 0);
thd->lex->create_info.options= save_thd_create_info_options;
if (result)
{
if (thd->slave_thread &&
!thd->is_error() && create_table->db_type &&
(create_table->db_type->flags & HTON_IGNORE_UPDATES))
{
/* Table existed in distributed engine. Log query to binary log */
result= 0;
goto err;
}
/* is_error() may be 0 if table existed and we generated a warning */
DBUG_RETURN(thd->is_error());
}
@ -9777,7 +9808,7 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
mysql_prepare_create_table().
*/
bool varchar= create_info->varchar, table_creation_was_logged= 0;
bool binlog_done= 0, log_if_exists= 0;
bool binlog_as_create_select= 0, log_if_exists= 0;
uint tables_opened;
handlerton *new_db_type, *old_db_type;
ha_rows copied=0, deleted=0;
@ -10699,7 +10730,6 @@ do_continue:;
thd->variables.option_bits|= OPTION_BIN_COMMIT_OFF;
res= (binlog_drop_table(thd, table) ||
binlog_create_table(thd, new_table, 1));
thd->variables.option_bits&= ~OPTION_BIN_COMMIT_OFF;
new_table->s->tmp_table= org_tmp_table;
if (res)
goto err_new_table_cleanup;
@ -10707,7 +10737,7 @@ do_continue:;
ha_write_row() will log inserted rows in copy_data_between_tables().
No additional logging of query is needed
*/
binlog_done= 1;
binlog_as_create_select= 1;
DBUG_ASSERT(new_table->file->row_logging);
new_table->mark_columns_needed_for_insert();
thd->binlog_write_table_map(new_table, 1);
@ -10763,10 +10793,21 @@ do_continue:;
if (thd->rename_temporary_table(new_table, &alter_ctx.new_db,
&alter_ctx.new_name))
goto err_new_table_cleanup;
if (binlog_as_create_select)
{
/*
The original table is now deleted. Copy the
DROP + CREATE + data statement to the binary log
*/
thd->variables.option_bits&= ~OPTION_BIN_COMMIT_OFF;
(binlog_hton->commit)(binlog_hton, thd, 1);
}
/* We don't replicate alter table statement on temporary tables */
if (!thd->is_current_stmt_binlog_format_row() &&
table_creation_was_logged &&
!binlog_done &&
!binlog_as_create_select &&
write_bin_log_with_if_exists(thd, true, false, log_if_exists))
DBUG_RETURN(true);
my_free(const_cast<uchar*>(frm.str));
@ -10926,6 +10967,15 @@ do_continue:;
NO_FRM_RENAME |
(engine_changed ? 0 : FN_IS_TMP));
}
if (binlog_as_create_select)
{
/*
The original table is now deleted. Copy the
DROP + CREATE + data statement to the binary log
*/
thd->variables.option_bits&= ~OPTION_BIN_COMMIT_OFF;
binlog_hton->commit(binlog_hton, thd, 1);
}
if (error)
{
@ -10938,6 +10988,7 @@ do_continue:;
}
end_inplace:
thd->variables.option_bits&= ~OPTION_BIN_COMMIT_OFF;
if (thd->locked_tables_list.reopen_tables(thd, false))
goto err_with_mdl_after_alter;
@ -10949,7 +11000,7 @@ end_inplace:
DBUG_ASSERT(!(mysql_bin_log.is_open() &&
thd->is_current_stmt_binlog_format_row() &&
(create_info->tmp_table())));
if (!binlog_done)
if (!binlog_as_create_select)
{
if (write_bin_log_with_if_exists(thd, true, false, log_if_exists))
DBUG_RETURN(true);
@ -10967,6 +11018,8 @@ end_inplace:
}
end_temporary:
thd->variables.option_bits&= ~OPTION_BIN_COMMIT_OFF;
my_snprintf(alter_ctx.tmp_buff, sizeof(alter_ctx.tmp_buff),
ER_THD(thd, ER_INSERT_INFO),
(ulong) (copied + deleted), (ulong) deleted,
@ -10977,6 +11030,8 @@ end_temporary:
err_new_table_cleanup:
DBUG_PRINT("error", ("err_new_table_cleanup"));
thd->variables.option_bits&= ~OPTION_BIN_COMMIT_OFF;
my_free(const_cast<uchar*>(frm.str));
/*
No default value was provided for a DATE/DATETIME field, the
@ -11014,7 +11069,7 @@ err_with_mdl_after_alter:
We can't reset error as we will return 'true' below and the server
expects that error is set
*/
if (!binlog_done)
if (!binlog_as_create_select)
write_bin_log_with_if_exists(thd, FALSE, FALSE, log_if_exists);
err_with_mdl:

View file

@ -269,7 +269,7 @@ int ha_s3::write_row(const uchar *buf)
DBUG_ENTER("ha_s3::write_row");
if (in_alter_table)
DBUG_RETURN(ha_maria::write_row(buf));
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
DBUG_RETURN(HA_ERR_TABLE_READONLY);
}
/* Return true if S3 can be used */
@ -576,7 +576,25 @@ int ha_s3::open(const char *name, int mode, uint open_flags)
if (!s3_usable())
DBUG_RETURN(HA_ERR_UNSUPPORTED);
if (mode != O_RDONLY && !(open_flags & HA_OPEN_FOR_CREATE))
/*
On slaves with s3_slave_ignore_updates set we allow tables to be
opened in write mode to be able to ignore queries that modify
the table trough handler::check_if_updates_are_ignored().
This is needed for the slave to be able to handle
CREATE TABLE t1...
INSERT INTO TABLE t1 ....
ALTER TABLE t1 ENGINE=S3
If this is not done, the insert will fail on the slave if the
master has already executed the ALTER TABLE.
We also have to allow open for create, as part of
ALTER TABLE ... ENGINE=S3.
Otherwise we only allow the table to be open in read mode
*/
if (mode != O_RDONLY && !(open_flags & HA_OPEN_FOR_CREATE) &&
!s3_slave_ignore_updates)
DBUG_RETURN(EACCES);
open_args= 0;

View file

@ -37,32 +37,32 @@ public:
int update_row(const uchar * old_data, const uchar * new_data)
{
DBUG_ENTER("update_row");
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
DBUG_RETURN(HA_ERR_TABLE_READONLY);
}
int delete_row(const uchar * buf)
{
DBUG_ENTER("delete_row");
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
DBUG_RETURN(HA_ERR_TABLE_READONLY);
}
int check(THD * thd, HA_CHECK_OPT * check_opt)
{
DBUG_ENTER("delete_row");
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
DBUG_RETURN(HA_ERR_TABLE_READONLY);
}
int analyze(THD * thd, HA_CHECK_OPT * check_opt)
{
DBUG_ENTER("analyze");
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
DBUG_RETURN(HA_ERR_TABLE_READONLY);
}
int repair(THD * thd, HA_CHECK_OPT * check_opt)
{
DBUG_ENTER("repair");
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
DBUG_RETURN(HA_ERR_TABLE_READONLY);
}
int preload_keys(THD * thd, HA_CHECK_OPT * check_opt)
{
DBUG_ENTER("preload_keys");
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
DBUG_RETURN(HA_ERR_TABLE_READONLY);
}
int external_lock(THD * thd, int lock_type);
/*

View file

@ -229,6 +229,9 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function,
info->lock_wait= MY_SHORT_WAIT;
break;
case HA_EXTRA_NO_KEYS:
if (share->s3_path) /* Not supported with S3 */
break;
/* we're going to modify pieces of the state, stall Checkpoint */
mysql_mutex_lock(&share->intern_lock);
if (info->lock_type == F_UNLCK)
@ -369,16 +372,16 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function,
if (end_io_cache(&info->rec_cache))
error= 1;
}
if (share->kfile.file >= 0)
if (share->kfile.file >= 0 && share->s3_path == 0)
{
if (do_flush)
{
/* Save the state so that others can find it from disk. */
if ((share->changed &&
_ma_state_info_write(share,
if (share->changed &&
(_ma_state_info_write(share,
MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
MA_STATE_INFO_WRITE_FULL_INFO)) ||
mysql_file_sync(share->kfile.file, MYF(0)))
MA_STATE_INFO_WRITE_FULL_INFO) ||
mysql_file_sync(share->kfile.file, MYF(0))))
error= my_errno;
}
else
@ -390,7 +393,7 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function,
}
}
if (share->data_file_type == BLOCK_RECORD &&
share->bitmap.file.file >= 0)
share->bitmap.file.file >= 0 && share->s3_path == 0)
{
DBUG_ASSERT(share->bitmap.non_flushable == 0 &&
share->bitmap.changed == 0);

View file

@ -369,6 +369,7 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags,
#ifdef WITH_S3_STORAGE_ENGINE
else
{
open_mode= mode;
errpos= 1;
if (s3f.set_database_and_table_from_path(s3, name_buff))
{
@ -1050,6 +1051,7 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags,
share->state_history->next= 0;
}
}
errpos= 7;
thr_lock_init(&share->lock);
mysql_mutex_init(key_SHARE_intern_lock,
&share->intern_lock, MY_MUTEX_INIT_FAST);
@ -1193,6 +1195,9 @@ err:
_ma_report_error(save_errno, &tmp_name);
}
switch (errpos) {
case 7:
thr_lock_delete(&share->lock);
/* fall through */
case 6:
/* Avoid mutex test in _ma_bitmap_end() */
share->internal_table= 1;