MDEV-9114: Bulk operations (Array binding)

(+ default values)
This commit is contained in:
Oleksandr Byelkin 2016-06-29 20:03:06 +02:00
parent c6713f651f
commit e2d6912609
19 changed files with 619 additions and 176 deletions

View file

@ -11,11 +11,17 @@ enum enum_server_command
COM_STMT_RESET, COM_SET_OPTION, COM_STMT_FETCH, COM_DAEMON,
COM_MDB_GAP_BEG,
COM_MDB_GAP_END=250,
COM_SLAVE_WORKER,
COM_SLAVE_IO,
COM_SLAVE_SQL,
COM_MULTI,
COM_END
COM_SLAVE_WORKER=251,
COM_SLAVE_IO=252,
COM_SLAVE_SQL=253,
COM_MULTI=254,
COM_END=255
};
enum enum_indicator_type
{
STMT_INDICATOR_NONE= 0,
STMT_INDICATOR_NULL,
STMT_INDICATOR_DEFAULT
};
struct st_vio;
typedef struct st_vio Vio;

View file

@ -114,12 +114,23 @@ enum enum_server_command
/* don't forget to update const char *command_name[] in sql_parse.cc */
COM_MDB_GAP_BEG,
COM_MDB_GAP_END=250,
COM_SLAVE_WORKER,
COM_SLAVE_IO,
COM_SLAVE_SQL,
COM_MULTI,
COM_SLAVE_WORKER=251,
COM_SLAVE_IO=252,
COM_SLAVE_SQL=253,
COM_MULTI=254,
/* Must be last */
COM_END
COM_END=255
};
/*
Bulk PS protocol indicator value:
*/
enum enum_indicator_type
{
STMT_INDICATOR_NONE= 0,
STMT_INDICATOR_NULL,
STMT_INDICATOR_DEFAULT
};
/* sql type stored in .frm files for virtual fields */
@ -256,6 +267,8 @@ enum enum_server_command
#define MARIADB_CLIENT_PROGRESS (1ULL << 32)
/* support COM_MULTI */
#define MARIADB_CLIENT_COM_MULTI (1ULL << 33)
/* support of array binding */
#define MARIADB_CLIENT_STMT_BULK_OPERATIONS (1UL << 34)
#ifdef HAVE_COMPRESS
#define CAN_CLIENT_COMPRESS CLIENT_COMPRESS
@ -295,7 +308,8 @@ enum enum_server_command
CLIENT_SESSION_TRACK |\
CLIENT_DEPRECATE_EOF |\
CLIENT_CONNECT_ATTRS |\
MARIADB_CLIENT_COM_MULTI)
MARIADB_CLIENT_COM_MULTI |\
MARIADB_CLIENT_STMT_BULK_OPERATIONS)
/*
To be added later:

View file

@ -10821,3 +10821,43 @@ bool Field::validate_value_in_record_with_warn(THD *thd, const uchar *record)
dbug_tmp_restore_column_map(table->read_set, old_map);
return rc;
}
bool Field::save_in_field_default_value(bool view_error_processing)
{
THD *thd= table->in_use;
if (flags & NO_DEFAULT_VALUE_FLAG &&
real_type() != MYSQL_TYPE_ENUM)
{
if (reset())
{
my_message(ER_CANT_CREATE_GEOMETRY_OBJECT,
ER_THD(thd, ER_CANT_CREATE_GEOMETRY_OBJECT), MYF(0));
return -1;
}
if (view_error_processing)
{
TABLE_LIST *view= table->pos_in_table_list->top_table();
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_NO_DEFAULT_FOR_VIEW_FIELD,
ER_THD(thd, ER_NO_DEFAULT_FOR_VIEW_FIELD),
view->view_db.str,
view->view_name.str);
}
else
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_NO_DEFAULT_FOR_FIELD,
ER_THD(thd, ER_NO_DEFAULT_FOR_FIELD),
field_name);
}
return 1;
}
set_default();
return
!is_null() &&
validate_value_in_record_with_warn(thd, table->record[0]) &&
thd->is_error() ? -1 : 0;
}

View file

@ -1451,6 +1451,9 @@ public:
// Exactly the same rules with REF access
return can_optimize_keypart_ref(cond, item);
}
bool save_in_field_default_value(bool view_eror_processing);
friend int cre_myisam(char * name, register TABLE *form, uint options,
ulonglong auto_increment_value);
friend class Copy_field;

View file

@ -3236,6 +3236,7 @@ Item_param::Item_param(THD *thd, uint pos_in_query_arg):
Rewritable_query_parameter(pos_in_query_arg, 1),
Type_handler_hybrid_field_type(MYSQL_TYPE_VARCHAR),
state(NO_VALUE),
indicators(0), indicator(STMT_INDICATOR_NONE),
/* Don't pretend to be a literal unless value for this item is set. */
item_type(PARAM_ITEM),
set_param_func(default_set_param_func),
@ -3600,6 +3601,10 @@ int Item_param::save_in_field(Field *field, bool no_conversions)
str_value.charset());
case NULL_VALUE:
return set_field_to_null_with_conversions(field, no_conversions);
case DEFAULT_VALUE:
return field->save_in_field_default_value(field->table->pos_in_table_list->
top_table() !=
field->table->pos_in_table_list);
case NO_VALUE:
default:
DBUG_ASSERT(0);
@ -3645,6 +3650,9 @@ double Item_param::val_real()
return TIME_to_double(&value.time);
case NULL_VALUE:
return 0.0;
case DEFAULT_VALUE:
my_message(ER_INVALID_DEFAULT_PARAM,
ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
default:
DBUG_ASSERT(0);
}
@ -3672,6 +3680,9 @@ longlong Item_param::val_int()
}
case TIME_VALUE:
return (longlong) TIME_to_ulonglong(&value.time);
case DEFAULT_VALUE:
my_message(ER_INVALID_DEFAULT_PARAM,
ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return 0;
default:
@ -3699,6 +3710,9 @@ my_decimal *Item_param::val_decimal(my_decimal *dec)
{
return TIME_to_my_decimal(&value.time, dec);
}
case DEFAULT_VALUE:
my_message(ER_INVALID_DEFAULT_PARAM,
ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return 0;
default:
@ -3734,6 +3748,9 @@ String *Item_param::val_str(String* str)
str->set_charset(&my_charset_bin);
return str;
}
case DEFAULT_VALUE:
my_message(ER_INVALID_DEFAULT_PARAM,
ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return NULL;
default:
@ -3812,6 +3829,9 @@ const String *Item_param::query_val_str(THD *thd, String* str) const
thd->variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES);
break;
}
case DEFAULT_VALUE:
my_message(ER_INVALID_DEFAULT_PARAM,
ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return &my_null_string;
default:
@ -3862,6 +3882,9 @@ Item_param::clone_item(THD *thd)
{
MEM_ROOT *mem_root= thd->mem_root;
switch (state) {
case DEFAULT_VALUE:
my_message(ER_INVALID_DEFAULT_PARAM,
ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return new (mem_root) Item_null(thd, name);
case INT_VALUE:
@ -3894,6 +3917,9 @@ Item_param::eq(const Item *item, bool binary_cmp) const
return FALSE;
switch (state) {
case DEFAULT_VALUE:
my_message(ER_INVALID_DEFAULT_PARAM,
ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return null_eq(item);
case INT_VALUE:
@ -3917,6 +3943,10 @@ void Item_param::print(String *str, enum_query_type query_type)
{
str->append('?');
}
else if (state == DEFAULT_VALUE)
{
str->append("default");
}
else
{
char buffer[STRING_BUFFER_USUAL_SIZE];
@ -3968,6 +3998,11 @@ Item_param::set_param_type_and_swap_value(Item_param *src)
}
void Item_param::set_default()
{
state= DEFAULT_VALUE;
}
/**
This operation is intended to store some item value in Item_param to be
used later.
@ -8579,42 +8614,8 @@ int Item_default_value::save_in_field(Field *field_arg, bool no_conversions)
calculate();
else
{
TABLE *table= field_arg->table;
THD *thd= table->in_use;
if (field_arg->flags & NO_DEFAULT_VALUE_FLAG &&
field_arg->real_type() != MYSQL_TYPE_ENUM)
{
if (field_arg->reset())
{
my_message(ER_CANT_CREATE_GEOMETRY_OBJECT,
ER_THD(thd, ER_CANT_CREATE_GEOMETRY_OBJECT), MYF(0));
return -1;
}
if (context->error_processor == &view_error_processor)
{
TABLE_LIST *view= table->pos_in_table_list->top_table();
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_NO_DEFAULT_FOR_VIEW_FIELD,
ER_THD(thd, ER_NO_DEFAULT_FOR_VIEW_FIELD),
view->view_db.str,
view->view_name.str);
}
else
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_NO_DEFAULT_FOR_FIELD,
ER_THD(thd, ER_NO_DEFAULT_FOR_FIELD),
field_arg->field_name);
}
return 1;
}
field_arg->set_default();
return
!field_arg->is_null() &&
field_arg->validate_value_in_record_with_warn(thd, table->record[0]) &&
thd->is_error() ? -1 : 0;
return field_arg->save_in_field_default_value(context->error_processor ==
&view_error_processor);
}
return Item_field::save_in_field(field_arg, no_conversions);
}

View file

@ -2798,7 +2798,7 @@ public:
{
NO_VALUE, NULL_VALUE, INT_VALUE, REAL_VALUE,
STRING_VALUE, TIME_VALUE, LONG_DATA_VALUE,
DECIMAL_VALUE
DECIMAL_VALUE, DEFAULT_VALUE
} state;
struct CONVERSION_INFO
@ -2842,6 +2842,13 @@ public:
}
};
/*
Used for bulk protocol. Indicates if we should expect
indicators byte before value of the parameter
*/
my_bool indicators;
enum enum_indicator_type indicator;
/*
A buffer for string and long data values. Historically all allocated
values returned from val_str() were treated as eligible to
@ -2882,6 +2889,7 @@ public:
bool get_date(MYSQL_TIME *tm, ulonglong fuzzydate);
int save_in_field(Field *field, bool no_conversions);
void set_default();
void set_null();
void set_int(longlong i, uint32 max_length_arg);
void set_double(double i);
@ -5102,6 +5110,10 @@ public:
:Item_field(thd, context_arg, (const char *)NULL, (const char *)NULL,
(const char *)NULL),
arg(a) {}
Item_default_value(THD *thd, Name_resolution_context *context_arg, Field *a)
:Item_field(thd, context_arg, (const char *)NULL, (const char *)NULL,
(const char *)NULL),
arg(NULL) {}
enum Type type() const { return DEFAULT_VALUE_ITEM; }
bool eq(const Item *item, bool binary_cmp) const;
bool fix_fields(THD *, Item **);

View file

@ -572,6 +572,7 @@ void Protocol::end_statement()
thd->get_stmt_da()->statement_warn_count());
break;
case Diagnostics_area::DA_OK:
case Diagnostics_area::DA_OK_BULK:
error= send_ok(thd->server_status,
thd->get_stmt_da()->statement_warn_count(),
thd->get_stmt_da()->affected_rows(),

View file

@ -7232,3 +7232,8 @@ ER_PARTITION_DEFAULT_ERROR
ukr "Припустимо мати тільки один DEFAULT розділ"
ER_REFERENCED_TRG_DOES_NOT_EXIST
eng "Referenced trigger '%s' for the given action time and event type does not exist"
ER_INVALID_DEFAULT_PARAM
eng "Default value is not supported for such parameter usage"
ukr "Значення за замовчуванням не підтримано для цього випадку використання параьетра"
ER_BINLOG_NON_SUPPORTED_BULK
eng "Only row based replication supported for bulk operations"

View file

@ -7810,9 +7810,10 @@ fill_record(THD *thd, TABLE *table_arg, List<Item> &fields, List<Item> &values,
if (table->next_number_field &&
rfield->field_index == table->next_number_field->field_index)
table->auto_increment_field_not_null= TRUE;
if (rfield->vcol_info &&
value->type() != Item::DEFAULT_VALUE_ITEM &&
value->type() != Item::NULL_ITEM &&
Item::Type type= value->type();
if (rfield->vcol_info &&
type != Item::DEFAULT_VALUE_ITEM &&
type != Item::NULL_ITEM &&
table->s->table_category != TABLE_CATEGORY_TEMPORARY)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
@ -8060,15 +8061,18 @@ fill_record(THD *thd, TABLE *table, Field **ptr, List<Item> &values,
value=v++;
if (field->field_index == autoinc_index)
table->auto_increment_field_not_null= TRUE;
if (field->vcol_info &&
value->type() != Item::DEFAULT_VALUE_ITEM &&
value->type() != Item::NULL_ITEM &&
table->s->table_category != TABLE_CATEGORY_TEMPORARY)
if (field->vcol_info)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN,
ER_THD(thd, ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN),
field->field_name, table->s->table_name.str);
Item::Type type= value->type();
if (type != Item::DEFAULT_VALUE_ITEM &&
type != Item::NULL_ITEM &&
table->s->table_category != TABLE_CATEGORY_TEMPORARY)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN,
ER_THD(thd, ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN),
field->field_name, table->s->table_name.str);
}
}
if (use_value)

View file

@ -854,6 +854,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
in_sub_stmt(0), log_all_errors(0),
binlog_unsafe_warning_flags(0),
binlog_table_maps(0),
bulk_param(0),
table_map_for_update(0),
m_examined_row_count(0),
accessed_rows_and_keys(0),
@ -5701,6 +5702,17 @@ int THD::decide_logging_format(TABLE_LIST *tables)
!(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db)))
{
if (is_bulk_op())
{
if (wsrep_binlog_format() == BINLOG_FORMAT_STMT)
{
my_error(ER_BINLOG_NON_SUPPORTED_BULK, MYF(0));
DBUG_PRINT("info",
("decision: no logging since an error was generated"));
DBUG_RETURN(-1);
}
}
/*
Compute one bit field with the union of all the engine
capabilities, and one with the intersection of all the engine
@ -5959,7 +5971,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
*/
my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0));
}
else if (wsrep_binlog_format() == BINLOG_FORMAT_ROW &&
else if ((wsrep_binlog_format() == BINLOG_FORMAT_ROW || is_bulk_op()) &&
sqlcom_can_generate_row_events(this))
{
/*
@ -6032,7 +6044,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
else
{
if (lex->is_stmt_unsafe() || lex->is_stmt_row_injection()
|| (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0)
|| (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0 ||
is_bulk_op())
{
/* log in row format! */
set_current_stmt_binlog_format_row_if_mixed();

View file

@ -2463,6 +2463,8 @@ public:
*/
Query_arena *stmt_arena;
void *bulk_param;
/*
map for tables that will be updated for a multi-table update query
statement, for other query statements, this will be zero.
@ -3438,6 +3440,12 @@ public:
To raise this flag, use my_error().
*/
inline bool is_error() const { return m_stmt_da->is_error(); }
void set_bulk_execution(void *bulk)
{
bulk_param= bulk;
m_stmt_da->set_bulk_execution(MY_TEST(bulk));
}
bool is_bulk_op() const { return MY_TEST(bulk_param); }
/// Returns Diagnostics-area for the current statement.
Diagnostics_area *get_stmt_da()
@ -5510,6 +5518,15 @@ public:
*/
#define CF_UPDATES_DATA (1U << 18)
/**
SP Bulk execution safe
*/
#define CF_SP_BULK_SAFE (1U << 19)
/**
SP Bulk execution optimized
*/
#define CF_SP_BULK_OPTIMIZED (1U << 20)
/* Bits in server_command_flags */
/**

View file

@ -320,7 +320,7 @@ Sql_condition::set_sqlstate(const char* sqlstate)
}
Diagnostics_area::Diagnostics_area(bool initialize)
: m_main_wi(0, false, initialize)
: is_bulk_execution(0), m_main_wi(0, false, initialize)
{
push_warning_info(&m_main_wi);
@ -330,7 +330,8 @@ Diagnostics_area::Diagnostics_area(bool initialize)
Diagnostics_area::Diagnostics_area(ulonglong warning_info_id,
bool allow_unlimited_warnings,
bool initialize)
: m_main_wi(warning_info_id, allow_unlimited_warnings, initialize)
: is_bulk_execution(0),
m_main_wi(warning_info_id, allow_unlimited_warnings, initialize)
{
push_warning_info(&m_main_wi);
@ -376,22 +377,33 @@ Diagnostics_area::set_ok_status(ulonglong affected_rows,
const char *message)
{
DBUG_ENTER("set_ok_status");
DBUG_ASSERT(! is_set());
DBUG_ASSERT(!is_set() || (m_status == DA_OK_BULK && is_bulk_op()));
/*
In production, refuse to overwrite an error or a custom response
with an OK packet.
*/
if (is_error() || is_disabled())
return;
m_statement_warn_count= current_statement_warn_count();
m_affected_rows= affected_rows;
/*
When running a bulk operation, m_status will be DA_OK for the first
operation and set to DA_OK_BULK for all following operations.
*/
if (m_status == DA_OK_BULK)
{
m_statement_warn_count+= current_statement_warn_count();
m_affected_rows+= affected_rows;
}
else
{
m_statement_warn_count= current_statement_warn_count();
m_affected_rows= affected_rows;
m_status= (is_bulk_op() ? DA_OK_BULK : DA_OK);
}
m_last_insert_id= last_insert_id;
if (message)
strmake_buf(m_message, message);
else
m_message[0]= '\0';
m_status= DA_OK;
DBUG_VOID_RETURN;
}

View file

@ -658,6 +658,8 @@ public:
DA_OK,
/** Set whenever one calls my_eof(). */
DA_EOF,
/** Set whenever one calls my_ok() in PS bulk mode. */
DA_OK_BULK,
/** Set whenever one calls my_error() or my_message(). */
DA_ERROR,
/** Set in case of a custom response, such as one from COM_STMT_PREPARE. */
@ -699,13 +701,21 @@ public:
bool is_disabled() const { return m_status == DA_DISABLED; }
void set_bulk_execution(bool bulk) { is_bulk_execution= bulk; }
bool is_bulk_op() const { return is_bulk_execution; }
enum_diagnostics_status status() const { return m_status; }
const char *message() const
{ DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK); return m_message; }
{ DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK ||
m_status == DA_OK_BULK); return m_message; }
bool skip_flush() const
{ DBUG_ASSERT(m_status == DA_OK); return m_skip_flush; }
{
DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK);
return m_skip_flush;
}
void set_skip_flush()
{ m_skip_flush= TRUE; }
@ -717,14 +727,21 @@ public:
{ DBUG_ASSERT(m_status == DA_ERROR); return m_sqlstate; }
ulonglong affected_rows() const
{ DBUG_ASSERT(m_status == DA_OK); return m_affected_rows; }
{
DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK);
return m_affected_rows;
}
ulonglong last_insert_id() const
{ DBUG_ASSERT(m_status == DA_OK); return m_last_insert_id; }
{
DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK);
return m_last_insert_id;
}
uint statement_warn_count() const
{
DBUG_ASSERT(m_status == DA_OK || m_status == DA_EOF);
DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK ||
m_status == DA_EOF);
return m_statement_warn_count;
}
@ -907,6 +924,8 @@ private:
enum_diagnostics_status m_status;
my_bool is_bulk_execution;
Warning_info m_main_wi;
Warning_info_list m_wi_stack;

View file

@ -77,6 +77,7 @@
#include "transaction.h"
#include "sql_audit.h"
#include "sql_derived.h" // mysql_handle_derived
#include "sql_prepare.h"
#include "debug_sync.h"
@ -661,7 +662,9 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
bool using_bulk_insert= 0;
uint value_count;
ulong counter = 1;
ulong iteration= 0;
ulonglong id;
ulong bulk_iterations= bulk_parameters_iterations(thd);
COPY_INFO info;
TABLE *table= 0;
List_iterator_fast<List_item> its(values_list);
@ -725,8 +728,11 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
THD_STAGE_INFO(thd, stage_init);
thd->lex->used_tables=0;
values= its++;
if (bulk_parameters_set(thd))
DBUG_RETURN(TRUE);
value_count= values->elements;
DBUG_ASSERT(bulk_iterations > 0);
if (mysql_prepare_insert(thd, table_list, table, fields, values,
update_fields, update_values, duplic, &unused_conds,
FALSE,
@ -885,106 +891,114 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
goto values_loop_end;
}
}
while ((values= its++))
do
{
if (fields.elements || !value_count)
{
/*
There are possibly some default values:
INSERT INTO t1 (fields) VALUES ...
INSERT INTO t1 VALUES ()
*/
restore_record(table,s->default_values); // Get empty record
table->reset_default_fields();
if (fill_record_n_invoke_before_triggers(thd, table, fields, *values, 0,
TRG_EVENT_INSERT))
{
if (values_list.elements != 1 && ! thd->is_error())
{
info.records++;
continue;
}
/*
TODO: set thd->abort_on_warning if values_list.elements == 1
and check that all items return warning in case of problem with
storing field.
*/
error=1;
break;
}
}
else
{
/*
No field list, all fields are set explicitly:
INSERT INTO t1 VALUES (values)
*/
if (thd->lex->used_tables) // Column used in values()
restore_record(table,s->default_values); // Get empty record
else
{
TABLE_SHARE *share= table->s;
if (iteration && bulk_parameters_set(thd))
goto abort;
while ((values= its++))
{
if (fields.elements || !value_count)
{
/*
Fix delete marker. No need to restore rest of record since it will
be overwritten by fill_record() anyway (and fill_record() does not
use default values in this case).
There are possibly some default values:
INSERT INTO t1 (fields) VALUES ...
INSERT INTO t1 VALUES ()
*/
#ifdef HAVE_valgrind
if (table->file->ha_table_flags() && HA_RECORD_MUST_BE_CLEAN_ON_WRITE)
restore_record(table,s->default_values); // Get empty record
else
#endif
table->record[0][0]= share->default_values[0];
/* Fix undefined null_bits. */
if (share->null_bytes > 1 && share->last_null_bit_pos)
restore_record(table,s->default_values); // Get empty record
table->reset_default_fields();
if (fill_record_n_invoke_before_triggers(thd, table, fields, *values, 0,
TRG_EVENT_INSERT))
{
table->record[0][share->null_bytes - 1]=
share->default_values[share->null_bytes - 1];
if (values_list.elements != 1 && ! thd->is_error())
{
info.records++;
continue;
}
/*
TODO: set thd->abort_on_warning if values_list.elements == 1
and check that all items return warning in case of problem with
storing field.
*/
error=1;
break;
}
}
if (fill_record_n_invoke_before_triggers(thd, table, table->field_to_fill(),
*values, 0, TRG_EVENT_INSERT))
else
{
if (values_list.elements != 1 && ! thd->is_error())
{
info.records++;
continue;
}
error=1;
break;
}
}
/*
No field list, all fields are set explicitly:
INSERT INTO t1 VALUES (values)
*/
if (thd->lex->used_tables) // Column used in values()
restore_record(table,s->default_values); // Get empty record
else
{
TABLE_SHARE *share= table->s;
if ((res= table_list->view_check_option(thd,
(values_list.elements == 1 ?
0 :
ignore))) ==
VIEW_CHECK_SKIP)
continue;
else if (res == VIEW_CHECK_ERROR)
{
error= 1;
break;
}
#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
{
LEX_STRING const st_query = { query, thd->query_length() };
DEBUG_SYNC(thd, "before_write_delayed");
error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
DEBUG_SYNC(thd, "after_write_delayed");
query=0;
}
else
/*
Fix delete marker. No need to restore rest of record since it will
be overwritten by fill_record() anyway (and fill_record() does not
use default values in this case).
*/
#ifdef HAVE_valgrind
if (table->file->ha_table_flags() && HA_RECORD_MUST_BE_CLEAN_ON_WRITE)
restore_record(table,s->default_values); // Get empty record
else
#endif
error=write_record(thd, table ,&info);
if (error)
break;
thd->get_stmt_da()->inc_current_row_for_warning();
}
table->record[0][0]= share->default_values[0];
/* Fix undefined null_bits. */
if (share->null_bytes > 1 && share->last_null_bit_pos)
{
table->record[0][share->null_bytes - 1]=
share->default_values[share->null_bytes - 1];
}
}
if (fill_record_n_invoke_before_triggers(thd, table,
table->field_to_fill(),
*values, 0, TRG_EVENT_INSERT))
{
if (values_list.elements != 1 && ! thd->is_error())
{
info.records++;
continue;
}
error=1;
break;
}
}
if ((res= table_list->view_check_option(thd,
(values_list.elements == 1 ?
0 :
ignore))) ==
VIEW_CHECK_SKIP)
continue;
else if (res == VIEW_CHECK_ERROR)
{
error= 1;
break;
}
#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
{
LEX_STRING const st_query = { query, thd->query_length() };
DEBUG_SYNC(thd, "before_write_delayed");
error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
DEBUG_SYNC(thd, "after_write_delayed");
query=0;
}
else
#endif
error=write_record(thd, table ,&info);
if (error)
break;
thd->get_stmt_da()->inc_current_row_for_warning();
}
its.rewind();
iteration++;
} while (iteration < bulk_iterations);
values_loop_end:
free_underlaid_joins(thd, &thd->lex->select_lex);
@ -1131,7 +1145,7 @@ values_loop_end:
retval= thd->lex->explain->send_explain(thd);
goto abort;
}
if (values_list.elements == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
if ((bulk_iterations * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
!thd->cuted_fields))
{
my_ok(thd, info.copied + info.deleted +

View file

@ -570,17 +570,19 @@ void init_update_queries(void)
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |
CF_CAN_BE_EXPLAINED |
CF_UPDATES_DATA;
CF_UPDATES_DATA | CF_SP_BULK_SAFE;
sql_command_flags[SQLCOM_UPDATE_MULTI]= CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |
CF_CAN_BE_EXPLAINED |
CF_UPDATES_DATA;
CF_UPDATES_DATA | CF_SP_BULK_SAFE;
sql_command_flags[SQLCOM_INSERT]= CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |
CF_CAN_BE_EXPLAINED |
CF_INSERTS_DATA;
CF_INSERTS_DATA |
CF_SP_BULK_SAFE |
CF_SP_BULK_OPTIMIZED;
sql_command_flags[SQLCOM_INSERT_SELECT]= CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |
@ -598,7 +600,7 @@ void init_update_queries(void)
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |
CF_CAN_BE_EXPLAINED |
CF_INSERTS_DATA;;
CF_INSERTS_DATA | CF_SP_BULK_SAFE;
sql_command_flags[SQLCOM_REPLACE_SELECT]= CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |

View file

@ -162,6 +162,9 @@ public:
Select_fetch_protocol_binary result;
Item_param **param_array;
Server_side_cursor *cursor;
uchar *packet;
uchar *packet_end;
ulong iterations;
uint param_count;
uint last_errno;
uint flags;
@ -180,11 +183,15 @@ public:
*/
uint select_number_after_prepare;
char last_error[MYSQL_ERRMSG_SIZE];
my_bool start_param;
#ifndef EMBEDDED_LIBRARY
bool (*set_params)(Prepared_statement *st, uchar *data, uchar *data_end,
uchar *read_pos, String *expanded_query);
bool (*set_bulk_params)(Prepared_statement *st,
uchar **read_pos, uchar *data_end, bool reset);
#else
bool (*set_params_data)(Prepared_statement *st, String *expanded_query);
/*TODO: add bulk support for builtin server */
#endif
bool (*set_params_from_actual_params)(Prepared_statement *stmt,
List<Item> &list,
@ -204,7 +211,13 @@ public:
bool execute_loop(String *expanded_query,
bool open_cursor,
uchar *packet_arg, uchar *packet_end_arg);
bool execute_bulk_loop(String *expanded_query,
bool open_cursor,
uchar *packet_arg, uchar *packet_end_arg,
ulong iterations);
bool execute_server_runnable(Server_runnable *server_runnable);
my_bool set_bulk_parameters(bool reset);
ulong bulk_iterations();
/* Destroy this statement */
void deallocate();
bool execute_immediate(const char *query, uint query_length);
@ -962,11 +975,59 @@ static bool insert_params(Prepared_statement *stmt, uchar *null_array,
}
static bool insert_bulk_params(Prepared_statement *stmt,
uchar **read_pos, uchar *data_end,
bool reset)
{
Item_param **begin= stmt->param_array;
Item_param **end= begin + stmt->param_count;
DBUG_ENTER("insert_params");
for (Item_param **it= begin; it < end; ++it)
{
Item_param *param= *it;
if (reset)
param->reset();
if (param->state != Item_param::LONG_DATA_VALUE)
{
if (param->indicators)
param->indicator= (enum_indicator_type) *((*read_pos)++);
else
param->indicator= STMT_INDICATOR_NONE;
if ((*read_pos) > data_end)
DBUG_RETURN(1);
switch (param->indicator)
{
case STMT_INDICATOR_NONE:
if ((*read_pos) >= data_end)
DBUG_RETURN(1);
param->set_param_func(param, read_pos, (uint) (data_end - (*read_pos)));
if (param->state == Item_param::NO_VALUE)
DBUG_RETURN(1);
break;
case STMT_INDICATOR_NULL:
param->set_null();
break;
case STMT_INDICATOR_DEFAULT:
param->set_default();
break;
}
}
else
DBUG_RETURN(1); // long is not supported here
}
DBUG_RETURN(0);
}
static bool setup_conversion_functions(Prepared_statement *stmt,
uchar **data, uchar *data_end)
uchar **data, uchar *data_end,
bool bulk_protocol= 0)
{
/* skip null bits */
uchar *read_pos= *data + (stmt->param_count+7) / 8;
uchar *read_pos= *data;
if (!bulk_protocol)
read_pos+= (stmt->param_count+7) / 8;
DBUG_ENTER("setup_conversion_functions");
@ -983,6 +1044,7 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
{
ushort typecode;
const uint signed_bit= 1 << 15;
const uint indicators_bit= 1 << 14;
if (read_pos >= data_end)
DBUG_RETURN(1);
@ -990,7 +1052,10 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
typecode= sint2korr(read_pos);
read_pos+= 2;
(**it).unsigned_flag= MY_TEST(typecode & signed_bit);
setup_one_conversion_function(thd, *it, (uchar) (typecode & ~signed_bit));
if (bulk_protocol)
(**it).indicators= MY_TEST(typecode & indicators_bit);
setup_one_conversion_function(thd, *it,
(uchar) (typecode & 0xff));
}
}
*data= read_pos;
@ -999,6 +1064,8 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
#else
//TODO: support bulk parameters
/**
Embedded counterparts of parameter assignment routines.
@ -2996,6 +3063,7 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
uchar *packet= (uchar*)packet_arg; // GCC 4.0.1 workaround
ulong stmt_id= uint4korr(packet);
ulong flags= (ulong) packet[4];
ulong iterations= uint4korr(packet + 5);
/* Query text for binary, general or slow log, if any of them is open */
String expanded_query;
uchar *packet_end= packet + packet_length;
@ -3021,12 +3089,16 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
thd->profiling.set_query_source(stmt->query(), stmt->query_length());
#endif
DBUG_PRINT("exec_query", ("%s", stmt->query()));
DBUG_PRINT("info",("stmt: 0x%lx", (long) stmt));
DBUG_PRINT("info",("stmt: 0x%p iterations: %lu", stmt, iterations));
open_cursor= MY_TEST(flags & (ulong) CURSOR_TYPE_READ_ONLY);
thd->protocol= &thd->protocol_binary;
stmt->execute_loop(&expanded_query, open_cursor, packet, packet_end);
if (iterations <= 1)
stmt->execute_loop(&expanded_query, open_cursor, packet, packet_end);
else
stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end,
iterations);
thd->protocol= save_protocol;
sp_cache_enforce_limit(thd->sp_proc_cache, stored_program_cache_size);
@ -3531,9 +3603,13 @@ Prepared_statement::Prepared_statement(THD *thd_arg)
result(thd_arg),
param_array(0),
cursor(0),
packet(0),
packet_end(0),
iterations(0),
param_count(0),
last_errno(0),
flags((uint) IS_IN_USE)
flags((uint) IS_IN_USE),
start_param(0)
{
init_sql_alloc(&main_mem_root, thd_arg->variables.query_alloc_block_size,
thd_arg->variables.query_prealloc_size, MYF(MY_THREAD_SPECIFIC));
@ -3569,7 +3645,9 @@ void Prepared_statement::setup_set_params()
set_params_from_actual_params= insert_params_from_actual_params_with_log;
#ifndef EMBEDDED_LIBRARY
set_params= insert_params_with_log;
set_bulk_params= insert_bulk_params; // TODO: add binlog support
#else
//TODO: add bulk support for bulk parameters
set_params_data= emb_insert_params_with_log;
#endif
}
@ -3578,7 +3656,9 @@ void Prepared_statement::setup_set_params()
set_params_from_actual_params= insert_params_from_actual_params;
#ifndef EMBEDDED_LIBRARY
set_params= insert_params;
set_bulk_params= insert_bulk_params;
#else
//TODO: add bulk support for bulk parameters
set_params_data= emb_insert_params;
#endif
}
@ -3935,6 +4015,7 @@ Prepared_statement::set_parameters(String *expanded_query,
@retval FALSE successfully executed the statement, perhaps
after having reprepared it a few times.
*/
const static int MAX_REPREPARE_ATTEMPTS= 3;
bool
Prepared_statement::execute_loop(String *expanded_query,
@ -3942,10 +4023,10 @@ Prepared_statement::execute_loop(String *expanded_query,
uchar *packet,
uchar *packet_end)
{
const int MAX_REPREPARE_ATTEMPTS= 3;
Reprepare_observer reprepare_observer;
bool error;
int reprepare_attempt= 0;
iterations= 0;
#ifndef DBUG_OFF
Item *free_list_state= thd->free_list;
#endif
@ -4037,6 +4118,199 @@ reexecute:
return error;
}
my_bool bulk_parameters_set(THD *thd)
{
DBUG_ENTER("bulk_parameters_set");
Prepared_statement *stmt= (Prepared_statement *) thd->bulk_param;
if (stmt && stmt->set_bulk_parameters(FALSE))
DBUG_RETURN(TRUE);
DBUG_RETURN(FALSE);
}
ulong bulk_parameters_iterations(THD *thd)
{
Prepared_statement *stmt= (Prepared_statement *) thd->bulk_param;
if (!stmt)
return 1;
return stmt->bulk_iterations();
}
my_bool Prepared_statement::set_bulk_parameters(bool reset)
{
DBUG_ENTER("Prepared_statement::set_bulk_parameters");
DBUG_PRINT("info", ("iteration: %lu", iterations));
if (iterations)
{
#ifndef EMBEDDED_LIBRARY
if ((*set_bulk_params)(this, &packet, packet_end, reset))
#else
// bulk parameters are not supported for embedded, so it will an error
#endif
{
my_error(ER_WRONG_ARGUMENTS, MYF(0),
"mysqld_stmt_bulk_execute");
reset_stmt_params(this);
DBUG_RETURN(true);
}
iterations--;
}
start_param= 0;
DBUG_RETURN(false);
}
ulong Prepared_statement::bulk_iterations()
{
if (iterations)
return iterations;
return start_param ? 1 : 0;
}
bool
Prepared_statement::execute_bulk_loop(String *expanded_query,
bool open_cursor,
uchar *packet_arg,
uchar *packet_end_arg,
ulong iterations_arg)
{
Reprepare_observer reprepare_observer;
bool error= 0;
packet= packet_arg;
packet_end= packet_end_arg;
iterations= iterations_arg;
start_param= true;
#ifndef DBUG_OFF
Item *free_list_state= thd->free_list;
#endif
thd->select_number= select_number_after_prepare;
thd->set_bulk_execution((void *)this);
/* Check if we got an error when sending long data */
if (state == Query_arena::STMT_ERROR)
{
my_message(last_errno, last_error, MYF(0));
thd->set_bulk_execution(0);
return TRUE;
}
if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_SAFE))
{
my_error(ER_UNSUPPORTED_PS, MYF(0));
thd->set_bulk_execution(0);
return TRUE;
}
#ifndef EMBEDDED_LIBRARY
if (setup_conversion_functions(this, &packet, packet_end, TRUE))
#else
// bulk parameters are not supported for embedded, so it will an error
#endif
{
my_error(ER_WRONG_ARGUMENTS, MYF(0),
"mysqld_stmt_bulk_execute");
reset_stmt_params(this);
thd->set_bulk_execution(0);
return true;
}
#ifdef NOT_YET_FROM_MYSQL_5_6
if (unlikely(thd->security_ctx->password_expired &&
!lex->is_change_password))
{
my_error(ER_MUST_CHANGE_PASSWORD, MYF(0));
thd->set_bulk_execution(0);
return true;
}
#endif
// iterations changed by set_bulk_parameters
while ((iterations || start_param) && !error && !thd->is_error())
{
int reprepare_attempt= 0;
/*
Here we set parameters for not optimized commands,
optimized commands do it inside thier internal loop.
*/
if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_OPTIMIZED))
{
if (set_bulk_parameters(TRUE))
{
thd->set_bulk_execution(0);
return true;
}
}
reexecute:
/*
If the free_list is not empty, we'll wrongly free some externally
allocated items when cleaning up after validation of the prepared
statement.
*/
DBUG_ASSERT(thd->free_list == free_list_state);
/*
Install the metadata observer. If some metadata version is
different from prepare time and an observer is installed,
the observer method will be invoked to push an error into
the error stack.
*/
if (sql_command_flags[lex->sql_command] & CF_REEXECUTION_FRAGILE)
{
reprepare_observer.reset_reprepare_observer();
DBUG_ASSERT(thd->m_reprepare_observer == NULL);
thd->m_reprepare_observer= &reprepare_observer;
}
error= execute(expanded_query, open_cursor) || thd->is_error();
thd->m_reprepare_observer= NULL;
#ifdef WITH_WSREP
if (WSREP_ON)
{
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
switch (thd->wsrep_conflict_state)
{
case CERT_FAILURE:
WSREP_DEBUG("PS execute fail for CERT_FAILURE: thd: %lld err: %d",
(longlong) thd->thread_id,
thd->get_stmt_da()->sql_errno() );
thd->wsrep_conflict_state = NO_CONFLICT;
break;
case MUST_REPLAY:
(void) wsrep_replay_transaction(thd);
break;
default:
break;
}
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
}
#endif /* WITH_WSREP */
if ((sql_command_flags[lex->sql_command] & CF_REEXECUTION_FRAGILE) &&
error && !thd->is_fatal_error && !thd->killed &&
reprepare_observer.is_invalidated() &&
reprepare_attempt++ < MAX_REPREPARE_ATTEMPTS)
{
DBUG_ASSERT(thd->get_stmt_da()->sql_errno() == ER_NEED_REPREPARE);
thd->clear_error();
error= reprepare();
if (! error) /* Success */
goto reexecute;
}
}
reset_stmt_params(this);
thd->set_bulk_execution(0);
return error;
}
bool
Prepared_statement::execute_server_runnable(Server_runnable *server_runnable)

View file

@ -72,6 +72,7 @@ private:
void mysqld_stmt_prepare(THD *thd, const char *packet, uint packet_length);
void mysqld_stmt_execute(THD *thd, char *packet, uint packet_length);
void mysqld_stmt_bulk_execute(THD *thd, char *packet, uint packet_length);
void mysqld_stmt_close(THD *thd, char *packet);
void mysql_sql_stmt_prepare(THD *thd);
void mysql_sql_stmt_execute(THD *thd);
@ -82,6 +83,8 @@ void mysqld_stmt_reset(THD *thd, char *packet);
void mysql_stmt_get_longdata(THD *thd, char *pos, ulong packet_length);
void reinit_stmt_before_use(THD *thd, LEX *lex);
ulong bulk_parameters_iterations(THD *thd);
my_bool bulk_parameters_set(THD *thd);
/**
Execute a fragment of server code in an isolated context, so that
it doesn't leave any effect on THD. THD must have no open tables.

View file

@ -265,7 +265,8 @@ void wsrep_replay_transaction(THD *thd)
}
else if (thd->get_stmt_da()->is_set())
{
if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK)
if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK &&
thd->get_stmt_da()->status() != Diagnostics_area::DA_OK_BULK)
{
WSREP_WARN("replay ok, thd has error status %d",
thd->get_stmt_da()->status());

View file

@ -4827,6 +4827,7 @@ static void end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
switch(da->status())
{
case Diagnostics_area::DA_OK_BULK:
case Diagnostics_area::DA_EMPTY:
break;
case Diagnostics_area::DA_OK:
@ -4960,6 +4961,7 @@ static void end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
switch (da->status())
{
case Diagnostics_area::DA_OK_BULK:
case Diagnostics_area::DA_EMPTY:
break;
case Diagnostics_area::DA_OK: