MDEV-15990 REPLACE on a precise-versioned table returns ER_DUP_ENTRY

We had a protection against it, by allowing versioned delete if:
trx->id != table->vers_start_id()

For replace this check fails: replace calls ha_delete_row(record[2]), but
table->vers_start_id() returns the value from record[0], which is irrelevant.

The same problem hits Field::is_max, which may have checked the wrong record.

Fix:
* Refactor Field::is_max to optionally accept a pointer as an argument.
* Refactor vers_start_id and vers_end_id to always accept a pointer to the
record. there is a difference with is_max is that is_max accepts the pointer to
the
field data, rather than to the record.

Method val_int() would be too effortful to refactor to accept the argument, so
instead the value in record is fetched directly, like it is done in
Field_longlong.
This commit is contained in:
Nikita Malyavin 2025-01-07 19:37:58 +01:00 committed by Aleksey Midenkov
commit aeb25743af
12 changed files with 79 additions and 40 deletions

View file

@ -63,9 +63,11 @@ connection default;
replace into t1 values (1),(2);
connection con1;
replace into t1 values (1),(2);
ERROR 23000: Duplicate entry '1' for key 'PRIMARY'
drop table t1;
#
# MDEV-14794 Limitations which the row end as a part of PK imposes due to
# CURRENT_TIMESTAMP behavior and otherwise
#
# MDEV-15330 Server crash or assertion `table->insert_values' failure in write_record upon LOAD DATA
#
create table t1 (a int, b int, c int, vc int as (c), unique(a), unique(b)) with system versioning;
@ -88,3 +90,18 @@ Note 1831 Duplicate index `data_2`. This is deprecated and will be disallowed in
alter table t1 add system versioning;
replace into t1 values ('o'), ('o');
drop table t1;
#
# MDEV-15990 REPLACE on a precise-versioned table returns duplicate key
# error (ER_DUP_ENTRY)
#
create or replace table t1 (
pk int primary key, i int,
row_start bigint unsigned as row start,
row_end bigint unsigned as row end,
period for system_time(row_start, row_end)
) engine=InnoDB with system versioning;
replace into t1 (pk,i) values (1,10),(1,100),(1,1000);
select pk, i, row_end from t1 for system_time all;
pk i row_end
1 1000 18446744073709551615
drop table t1;

View file

@ -116,4 +116,20 @@ alter table t1 add system versioning;
replace into t1 values ('o'), ('o');
drop table t1;
--echo #
--echo # MDEV-15990 REPLACE on a precise-versioned table returns duplicate key
--echo # error (ER_DUP_ENTRY)
--echo #
create or replace table t1 (
pk int primary key, i int,
row_start bigint unsigned as row start,
row_end bigint unsigned as row end,
period for system_time(row_start, row_end)
) engine=InnoDB with system versioning;
replace into t1 (pk,i) values (1,10),(1,100),(1,1000);
select pk, i, row_end from t1 for system_time all;
drop table t1;
--source suite/versioning/common_finish.inc

View file

@ -4811,17 +4811,17 @@ void Field_longlong::set_max()
int8store(ptr, unsigned_flag ? ULONGLONG_MAX : LONGLONG_MAX);
}
bool Field_longlong::is_max()
bool Field_longlong::is_max(const uchar *ptr_arg) const
{
DBUG_ASSERT(marked_for_read());
if (unsigned_flag)
{
ulonglong j;
j= uint8korr(ptr);
j= uint8korr(ptr_arg);
return j == ULONGLONG_MAX;
}
longlong j;
j= sint8korr(ptr);
j= sint8korr(ptr_arg);
return j == LONGLONG_MAX;
}
@ -5873,16 +5873,16 @@ void Field_timestampf::set_max()
DBUG_VOID_RETURN;
}
bool Field_timestampf::is_max()
bool Field_timestampf::is_max(const uchar *ptr_arg) const
{
longlong timestamp= mi_uint4korr(ptr);
longlong timestamp= mi_uint4korr(ptr_arg);
DBUG_ENTER("Field_timestampf::is_max");
DBUG_ASSERT(marked_for_read());
/* Allow old max value and new max value */
DBUG_RETURN((timestamp == TIMESTAMP_MAX_VALUE ||
timestamp == INT_MAX32) &&
mi_uint3korr(ptr + 4) == TIME_MAX_SECOND_PART);
mi_uint3korr(ptr_arg + 4) == TIME_MAX_SECOND_PART);
}
my_time_t Field_timestampf::get_timestamp(const uchar *pos,

View file

@ -801,8 +801,9 @@ public:
*/
virtual void set_max()
{ DBUG_ASSERT(0); }
virtual bool is_max()
virtual bool is_max(const uchar *ptr_arg) const
{ DBUG_ASSERT(0); return false; }
bool is_max() const { return is_max(ptr); }
uchar *ptr; // Position to field in record
@ -2910,7 +2911,7 @@ public:
return unpack_int64(to, from, from_end);
}
void set_max() override;
bool is_max() override;
bool is_max(const uchar *ptr_arg) const override;
ulonglong get_max_int_value() const override
{
return unsigned_flag ? 0xFFFFFFFFFFFFFFFFULL : 0x7FFFFFFFFFFFFFFFULL;
@ -3490,7 +3491,7 @@ public:
return memcmp(a_ptr, b_ptr, pack_length());
}
void set_max() override;
bool is_max() override;
bool is_max(const uchar *ptr_arg) const override;
my_time_t get_timestamp(const uchar *pos, ulong *sec_part) const override;
bool val_native(Native *to) override;
uint size_of() const override { return sizeof *this; }
@ -6040,17 +6041,21 @@ bool check_expression(Virtual_column_info *vcol, const Lex_ident_column &name,
#define f_visibility(x) (static_cast<field_visibility_t> ((x) & INVISIBLE_MAX_BITS))
inline
ulonglong TABLE::vers_end_id() const
ulonglong TABLE::vers_end_id(const uchar *record_arg) const
{
DBUG_ASSERT(versioned(VERS_TRX_ID));
return static_cast<ulonglong>(vers_end_field()->val_int());
DBUG_ASSERT(dynamic_cast<Field_longlong*>(vers_end_field()));
const uchar *ptr= vers_end_field()->ptr_in_record(record_arg);
return static_cast<ulonglong>(sint8korr(ptr));
}
inline
ulonglong TABLE::vers_start_id() const
ulonglong TABLE::vers_start_id(const uchar *record_arg) const
{
DBUG_ASSERT(versioned(VERS_TRX_ID));
return static_cast<ulonglong>(vers_start_field()->val_int());
DBUG_ASSERT(dynamic_cast<Field_longlong*>(vers_start_field()));
const uchar *ptr= vers_start_field()->ptr_in_record(record_arg);
return static_cast<ulonglong>(sint8korr(ptr));
}
inline

View file

@ -7999,8 +7999,12 @@ int handler::ha_check_overlaps(const uchar *old_data, const uchar* new_data)
DBUG_ASSERT(this == table->file);
if (!table_share->period.unique_keys)
return 0;
if (table->versioned() && !table->vers_end_field()->is_max())
return 0;
if (table->versioned())
{
Field *end= table->vers_end_field();
if (!end->is_max(end->ptr_in_record(new_data)))
return 0;
}
const bool after_write= ha_table_flags() & HA_CHECK_UNIQUE_AFTER_WRITE;
const bool is_update= !after_write && old_data;

View file

@ -7935,8 +7935,6 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
return error;
}
const bool history_change= m_table->versioned() ?
!m_table->vers_end_field()->is_max() : false;
TABLE_LIST *tl= m_table->pos_in_table_list;
uint8 trg_event_map_save= tl->trg_event_map;
@ -7997,8 +7995,12 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
m_table->vers_update_fields();
m_table->vers_fix_old_timestamp(rgi);
}
if (!history_change && !m_table->vers_end_field()->is_max())
Field *end= m_table->vers_end_field();
const uchar *old_ptr= end->ptr_in_record(m_table->record[1]);
if (end->is_max(old_ptr) && !end->is_max())
{
// This is a versioned delete, and we'll have to invoke ON DELETE actions
tl->trg_event_map|= trg2bit(TRG_EVENT_DELETE);
}
}

View file

@ -292,7 +292,8 @@ int update_portion_of_time(THD *thd, TABLE *table,
inline
int TABLE::delete_row()
{
if (!versioned(VERS_TIMESTAMP) || !vers_end_field()->is_max())
if (!versioned(VERS_TIMESTAMP) ||
!vers_end_field()->is_max())
return file->ha_delete_row(record[0]);
store_record(this, record[1]);

View file

@ -2141,25 +2141,12 @@ int Write_record::replace_row(ha_rows *inserted, ha_rows *deleted)
if (can_optimize && key_nr == last_unique_key)
{
DBUG_PRINT("info", ("Updating row using ha_update_row()"));
if (table->versioned(VERS_TRX_ID))
table->vers_start_field()->store(0, false);
error= table->file->ha_update_row(table->record[1], table->record[0]);
if (likely(!error))
++*deleted;
else if (error != HA_ERR_RECORD_IS_THE_SAME)
return on_ha_error(error);
if (versioned)
{
store_record(table, record[2]);
error= vers_insert_history_row(table);
restore_record(table, record[2]);
if (unlikely(error))
return on_ha_error(error);
}
break;
}
else

View file

@ -124,7 +124,8 @@ public:
bool has_delete_triggers= use_triggers &&
table->triggers->has_delete_triggers();
bool referenced_by_fk= table->file->referenced_by_foreign_key();
can_optimize= !referenced_by_fk && !has_delete_triggers;
can_optimize= !referenced_by_fk && !has_delete_triggers
&& !versioned && !table->versioned();
}
}
Write_record(THD *thd, TABLE *table, COPY_INFO *info,

View file

@ -7803,6 +7803,11 @@ static void do_mark_index_columns(TABLE *table, uint index,
table->s->primary_key != MAX_KEY && table->s->primary_key != index)
do_mark_index_columns(table, table->s->primary_key, bitmap, read);
if (table->versioned(VERS_TRX_ID))
{
table->vers_start_field()->register_field_in_read_map();
table->vers_end_field()->register_field_in_read_map();
}
}
/*
mark columns used by key, but don't reset other fields

View file

@ -1846,7 +1846,7 @@ public:
check_assignability_all_visible_fields(values, ignore);
}
bool insert_all_rows_into_tmp_table(THD *thd,
bool insert_all_rows_into_tmp_table(THD *thd,
TABLE *tmp_table,
TMP_TABLE_PARAM *tmp_table_param,
bool with_cleanup);
@ -1971,8 +1971,8 @@ public:
HA_DO_RANGE_FILTER_PUSHDOWN);
}
ulonglong vers_start_id() const;
ulonglong vers_end_id() const;
ulonglong vers_start_id(const uchar *ptr) const;
ulonglong vers_end_id(const uchar *ptr) const;
#ifdef WITH_PARTITION_STORAGE_ENGINE
bool vers_switch_partition(THD *thd, TABLE_LIST *table_list,
Open_table_context *ot_ctx);

View file

@ -8594,7 +8594,7 @@ ha_innobase::update_row(
if (error == DB_SUCCESS && m_prebuilt->versioned_write
/* Multiple UPDATE of same rows in single transaction create
historical rows only once. */
&& trx->id != table->vers_start_id()) {
&& trx->id != table->vers_start_id(new_row)) {
/* UPDATE is not used by ALTER TABLE. Just precaution
as we don't need history generation for ALTER TABLE. */
ut_ad(thd_sql_command(m_user_thd) != SQLCOM_ALTER_TABLE);
@ -8709,8 +8709,9 @@ ha_innobase::delete_row(
/* This is a delete */
m_prebuilt->upd_node->is_delete = table->versioned_write(VERS_TRX_ID)
&& table->vers_end_field()->is_max()
&& trx->id != table->vers_start_id()
&& table->vers_end_field()->is_max(
table->vers_end_field()->ptr_in_record(record))
&& trx->id != table->vers_start_id(record)
? VERSIONED_DELETE
: PLAIN_DELETE;
trx->fts_next_doc_id = 0;