MDEV-15250 UPSERT during ALTER TABLE results in 'Duplicate entry' error for alter

- InnoDB should avoid bulk insert operation when table has active
DDL. Because bulk insert writes only one undo log as TRX_UNDO_EMPTY
and logging of concurrent DML happens at commit time uses undo log
record to parse and get the value and operation.

- Removed ROW_T_EMPTY, ROW_OP_EMPTY and their associated functions
and also the test case which tries to log the ROW_OP_EMPTY
when table has active DDL.
This commit is contained in:
Thirunarayanan Balathandayuthapani 2022-04-26 16:18:45 +05:30
parent cad792c686
commit 7c0b9c6020
5 changed files with 19 additions and 158 deletions

View file

@ -27,35 +27,6 @@ t1 CREATE TABLE `t1` (
`f2` int(11) NOT NULL, `f2` int(11) NOT NULL,
PRIMARY KEY (`f1`) PRIMARY KEY (`f1`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 ) ENGINE=InnoDB DEFAULT CHARSET=latin1
INSERT INTO t1 VALUES(1, 1);
connection con1;
START TRANSACTION WITH CONSISTENT SNAPSHOT;
connect con2,localhost,root,,,;
DELETE FROM t1;
connection default;
SET DEBUG_SYNC="innodb_inplace_alter_table_enter SIGNAL purge_resume WAIT_FOR dml_commit";
ALTER TABLE t1 ADD INDEX(f2, f1);
connection con1;
COMMIT;
connection con2;
SET GLOBAL innodb_purge_rseg_truncate_frequency=1;
InnoDB 1 transactions not purged
SET unique_checks=0, foreign_key_checks=0;
BEGIN;
INSERT INTO t1 VALUES(2, 2);
ROLLBACK;
SET DEBUG_SYNC="now SIGNAL dml_commit";
connection default;
SHOW CREATE TABLE t1;
Table Create Table
t1 CREATE TABLE `t1` (
`f1` int(11) NOT NULL,
`f2` int(11) NOT NULL,
PRIMARY KEY (`f1`),
KEY `f2` (`f2`,`f1`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
disconnect con1; disconnect con1;
disconnect con2;
DROP TABLE t1; DROP TABLE t1;
SET DEBUG_SYNC=RESET; SET DEBUG_SYNC=RESET;
SET GLOBAL innodb_purge_rseg_truncate_frequency=default;

View file

@ -31,33 +31,6 @@ ROLLBACK;
connection default; connection default;
SELECT * FROM t1; SELECT * FROM t1;
SHOW CREATE TABLE t1; SHOW CREATE TABLE t1;
# Online alter logs ROW_LOG_EMPTY when table does bulk insert
INSERT INTO t1 VALUES(1, 1);
connection con1;
START TRANSACTION WITH CONSISTENT SNAPSHOT;
connect(con2,localhost,root,,,);
DELETE FROM t1;
connection default;
SET DEBUG_SYNC="innodb_inplace_alter_table_enter SIGNAL purge_resume WAIT_FOR dml_commit";
send ALTER TABLE t1 ADD INDEX(f2, f1);
connection con1;
COMMIT;
connection con2;
let $wait_all_purged=1;
SET GLOBAL innodb_purge_rseg_truncate_frequency=1;
--source include/wait_all_purged.inc
SET unique_checks=0, foreign_key_checks=0;
BEGIN;
INSERT INTO t1 VALUES(2, 2);
ROLLBACK;
SET DEBUG_SYNC="now SIGNAL dml_commit";
connection default;
reap;
SHOW CREATE TABLE t1;
disconnect con1; disconnect con1;
disconnect con2;
DROP TABLE t1; DROP TABLE t1;
SET DEBUG_SYNC=RESET; SET DEBUG_SYNC=RESET;
SET GLOBAL innodb_purge_rseg_truncate_frequency=default;

View file

@ -85,7 +85,7 @@ inline void row_log_abort_sec(dict_index_t *index)
/** Logs an operation to a secondary index that is (or was) being created. /** Logs an operation to a secondary index that is (or was) being created.
@param index index, S or X latched @param index index, S or X latched
@param tuple index tuple (NULL= empty the index) @param tuple index tuple
@param trx_id transaction ID for insert, or 0 for delete @param trx_id transaction ID for insert, or 0 for delete
@retval false if row_log_apply() failure happens @retval false if row_log_apply() failure happens
or true otherwise */ or true otherwise */

View file

@ -2626,6 +2626,7 @@ commit_exit:
&& block->page.id().page_no() == index->page && block->page.id().page_no() == index->page
&& !index->table->skip_alter_undo && !index->table->skip_alter_undo
&& !index->table->n_rec_locks && !index->table->n_rec_locks
&& !index->table->is_active_ddl()
&& !trx->is_wsrep() /* FIXME: MDEV-24623 */ && !trx->is_wsrep() /* FIXME: MDEV-24623 */
&& !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) { && !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) {
DEBUG_SYNC_C("empty_root_page_insert"); DEBUG_SYNC_C("empty_root_page_insert");

View file

@ -54,9 +54,7 @@ enum row_tab_op {
/** Update a record in place */ /** Update a record in place */
ROW_T_UPDATE, ROW_T_UPDATE,
/** Delete (purge) a record */ /** Delete (purge) a record */
ROW_T_DELETE, ROW_T_DELETE
/** Empty the table */
ROW_T_EMPTY
}; };
/** Index record modification operations during online index creation */ /** Index record modification operations during online index creation */
@ -64,9 +62,7 @@ enum row_op {
/** Insert a record */ /** Insert a record */
ROW_OP_INSERT = 0x61, ROW_OP_INSERT = 0x61,
/** Delete a record */ /** Delete a record */
ROW_OP_DELETE, ROW_OP_DELETE
/** Empy the index */
ROW_OP_EMPTY
}; };
/** Size of the modification log entry header, in bytes */ /** Size of the modification log entry header, in bytes */
@ -254,26 +250,9 @@ row_log_block_free(
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/** Empty the online log.
@param index index log to be cleared */
static void row_log_empty(dict_index_t *index)
{
ut_ad(index->lock.have_s());
row_log_t *log= index->online_log;
mysql_mutex_lock(&log->mutex);
row_log_block_free(log->tail);
row_log_block_free(log->head);
row_merge_file_destroy_low(log->fd);
log->fd= OS_FILE_CLOSED;
log->tail.total= log->tail.blocks= log->tail.bytes= 0;
log->head.total= log->head.blocks= log->head.bytes= 0;
mysql_mutex_unlock(&log->mutex);
}
/** Logs an operation to a secondary index that is (or was) being created. /** Logs an operation to a secondary index that is (or was) being created.
@param index index, S or X latched @param index index, S or X latched
@param tuple index tuple (NULL= empty the index) @param tuple index tuple
@param trx_id transaction ID for insert, or 0 for delete @param trx_id transaction ID for insert, or 0 for delete
@retval false if row_log_apply() failure happens @retval false if row_log_apply() failure happens
or true otherwise */ or true otherwise */
@ -288,8 +267,8 @@ bool row_log_online_op(dict_index_t *index, const dtuple_t *tuple,
row_log_t* log; row_log_t* log;
bool success= true; bool success= true;
ut_ad(!tuple || dtuple_validate(tuple)); ut_ad(dtuple_validate(tuple));
ut_ad(!tuple || dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index)); ut_ad(dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index));
ut_ad(index->lock.have_x() || index->lock.have_s()); ut_ad(index->lock.have_x() || index->lock.have_s());
if (index->is_corrupted()) { if (index->is_corrupted()) {
@ -304,21 +283,14 @@ bool row_log_online_op(dict_index_t *index, const dtuple_t *tuple,
row_merge_buf_encode(), because here we do not encode row_merge_buf_encode(), because here we do not encode
extra_size+1 (and reserve 0 as the end-of-chunk marker). */ extra_size+1 (and reserve 0 as the end-of-chunk marker). */
if (!tuple) { size = rec_get_converted_size_temp<false>(
row_log_empty(index); index, tuple->fields, tuple->n_fields, &extra_size);
mrec_size = 4; ut_ad(size >= extra_size);
extra_size = 0; ut_ad(size <= sizeof log->tail.buf);
size = 2;
} else {
size = rec_get_converted_size_temp<false>(
index, tuple->fields, tuple->n_fields, &extra_size);
ut_ad(size >= extra_size);
ut_ad(size <= sizeof log->tail.buf);
mrec_size = ROW_LOG_HEADER_SIZE mrec_size = ROW_LOG_HEADER_SIZE
+ (extra_size >= 0x80) + size + (extra_size >= 0x80) + size
+ (trx_id ? DATA_TRX_ID_LEN : 0); + (trx_id ? DATA_TRX_ID_LEN : 0);
}
log = index->online_log; log = index->online_log;
mysql_mutex_lock(&log->mutex); mysql_mutex_lock(&log->mutex);
@ -348,8 +320,6 @@ start_log:
*b++ = ROW_OP_INSERT; *b++ = ROW_OP_INSERT;
trx_write_trx_id(b, trx_id); trx_write_trx_id(b, trx_id);
b += DATA_TRX_ID_LEN; b += DATA_TRX_ID_LEN;
} else if (!tuple) {
*b++ = ROW_OP_EMPTY;
} else { } else {
*b++ = ROW_OP_DELETE; *b++ = ROW_OP_DELETE;
} }
@ -362,13 +332,8 @@ start_log:
*b++ = (byte) extra_size; *b++ = (byte) extra_size;
} }
if (tuple) { rec_convert_dtuple_to_temp<false>(
rec_convert_dtuple_to_temp<false>( b + extra_size, index, tuple->fields, tuple->n_fields);
b + extra_size, index, tuple->fields,
tuple->n_fields);
} else {
memset(b, 0, 2);
}
b += size; b += size;
@ -2416,11 +2381,6 @@ row_log_table_apply_op(
thr, new_trx_id_col, thr, new_trx_id_col,
mrec, offsets, offsets_heap, heap, dup, old_pk); mrec, offsets, offsets_heap, heap, dup, old_pk);
break; break;
case ROW_T_EMPTY:
dup->index->online_log->table->clear(thr);
log->head.total += 1;
next_mrec = mrec;
break;
} }
ut_ad(log->head.total <= log->tail.total); ut_ad(log->head.total <= log->tail.total);
@ -3210,9 +3170,6 @@ row_log_apply_op_low(
} }
goto duplicate; goto duplicate;
case ROW_OP_EMPTY:
ut_ad(0);
break;
} }
} else { } else {
switch (op) { switch (op) {
@ -3283,9 +3240,6 @@ insert_the_rec:
0, NULL, &mtr); 0, NULL, &mtr);
ut_ad(!big_rec); ut_ad(!big_rec);
break; break;
case ROW_OP_EMPTY:
ut_ad(0);
break;
} }
mem_heap_empty(offsets_heap); mem_heap_empty(offsets_heap);
} }
@ -3360,15 +3314,6 @@ row_log_apply_op(
op = static_cast<enum row_op>(*mrec++); op = static_cast<enum row_op>(*mrec++);
trx_id = 0; trx_id = 0;
break; break;
case ROW_OP_EMPTY:
{
mem_heap_t* heap = mem_heap_create(512);
que_fork_t* fork = que_fork_create(heap);
que_thr_t* thr = que_thr_create(fork, heap, nullptr);
index->clear(thr);
mem_heap_free(heap);
return mrec + 4;
}
default: default:
corrupted: corrupted:
ut_ad(0); ut_ad(0);
@ -3828,21 +3773,6 @@ unsigned row_log_get_n_core_fields(const dict_index_t *index)
return index->online_log->n_core_fields; return index->online_log->n_core_fields;
} }
/** Notify that the table was emptied by concurrent rollback or purge.
@param index clustered index */
static void row_log_table_empty(dict_index_t *index)
{
ut_ad(index->lock.have_s());
row_log_empty(index);
row_log_t* log= index->online_log;
ulint avail_size;
if (byte *b= row_log_table_open(log, 1, &avail_size))
{
*b++ = ROW_T_EMPTY;
row_log_table_close(index, b, 1, avail_size);
}
}
dberr_t row_log_get_error(const dict_index_t *index) dberr_t row_log_get_error(const dict_index_t *index)
{ {
ut_ad(index->online_log); ut_ad(index->online_log);
@ -3851,7 +3781,6 @@ dberr_t row_log_get_error(const dict_index_t *index)
void dict_table_t::clear(que_thr_t *thr) void dict_table_t::clear(que_thr_t *thr)
{ {
bool rebuild= false;
for (dict_index_t *index= UT_LIST_GET_FIRST(indexes); index; for (dict_index_t *index= UT_LIST_GET_FIRST(indexes); index;
index= UT_LIST_GET_NEXT(indexes, index)) index= UT_LIST_GET_NEXT(indexes, index))
{ {
@ -3862,26 +3791,13 @@ void dict_table_t::clear(que_thr_t *thr)
case ONLINE_INDEX_ABORTED: case ONLINE_INDEX_ABORTED:
case ONLINE_INDEX_ABORTED_DROPPED: case ONLINE_INDEX_ABORTED_DROPPED:
continue; continue;
case ONLINE_INDEX_COMPLETE: case ONLINE_INDEX_COMPLETE:
break; break;
case ONLINE_INDEX_CREATION: case ONLINE_INDEX_CREATION:
index->lock.s_lock(SRW_LOCK_CALL); ut_ad("invalid type" == 0);
if (dict_index_get_online_status(index) == ONLINE_INDEX_CREATION) MY_ASSERT_UNREACHABLE();
{ break;
if (index->is_clust())
{
row_log_table_empty(index);
rebuild= true;
}
else if (!rebuild)
row_log_online_op(index, nullptr, 0);
}
index->lock.s_unlock();
} }
index->clear(thr); index->clear(thr);
} }
} }