MDEV-34703 LOAD DATA INFILE using Innodb bulk load aborts

problem:
=======
- During load statement, InnoDB bulk operation relies on temporary
directory and it got crash when tmpdir is exhausted.

Solution:
========
During bulk insert, LOAD statement is building the clustered
index one record at a time instead of page. By doing this,
InnoDB does the following
1) Avoids creation of temporary file for clustered index.
2) Writes the undo log for first insert operation alone
This commit is contained in:
Thirunarayanan Balathandayuthapani 2025-01-15 13:57:27 +05:30
parent 4469540d39
commit 33a37042c3
10 changed files with 177 additions and 13 deletions

View file

@ -0,0 +1,50 @@
CREATE TABLE t1(f1 INT NOT NULL,f2 INT NOT NULL)ENGINE=InnoDB;
INSERT INTO t1 SELECT seq, seq from seq_1_to_131072;
INSERT INTO t1 VALUES(131073, 131073), (131074, 131073);
SELECT * INTO OUTFILE "VARDIR/tmp/t1.outfile" FROM t1;
# successful load statement using bulk insert
CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY,
f2 INT NOT NULL)ENGINE=InnoDB;
SET unique_checks=0, foreign_key_checks=0;
LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
COUNT(*)
131074
CHECK TABLE t2 EXTENDED;
Table Op Msg_type Msg_text
test.t2 check status OK
DROP TABLE t2;
CREATE TABLE t2(f1 INT NOT NULL, PRIMARY KEY(f1 DESC),
f2 INT NOT NULL)ENGINE=InnoDB;
LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
COUNT(*)
131074
CHECK TABLE t2 EXTENDED;
Table Op Msg_type Msg_text
test.t2 check status OK
DROP TABLE t2;
# load statement using bulk insert fails during secondary index
CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY,
f2 INT NOT NULL UNIQUE KEY)ENGINE=InnoDB;
LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2;
ERROR HY000: Got error 1 "Operation not permitted" during COMMIT
SELECT COUNT(*) FROM t2;
COUNT(*)
0
CHECK TABLE t2 EXTENDED;
Table Op Msg_type Msg_text
test.t2 check status OK
DROP TABLE t2;
# load statement using bulk insert fails during primary index
CREATE TABLE t2(f1 INT NOT NULL,
f2 INT NOT NULL PRIMARY KEY)ENGINE=InnoDB;
LOAD DATA INFILE 'VARDIR/tmp/t1.outfile' INTO TABLE t2;
ERROR 23000: Duplicate entry '131073' for key 'PRIMARY'
SELECT COUNT(*) FROM t2;
COUNT(*)
0
CHECK TABLE t2 EXTENDED;
Table Op Msg_type Msg_text
test.t2 check status OK
DROP TABLE t2, t1;

View file

@ -0,0 +1 @@
--innodb_sort_buffer_size=65536

View file

@ -0,0 +1,52 @@
--source include/have_innodb.inc
--source include/have_sequence.inc
--source include/big_test.inc
CREATE TABLE t1(f1 INT NOT NULL,f2 INT NOT NULL)ENGINE=InnoDB;
INSERT INTO t1 SELECT seq, seq from seq_1_to_131072;
INSERT INTO t1 VALUES(131073, 131073), (131074, 131073);
--replace_result $MYSQLTEST_VARDIR VARDIR
--disable_cursor_protocol
--disable_ps2_protocol
eval SELECT * INTO OUTFILE "$MYSQLTEST_VARDIR/tmp/t1.outfile" FROM t1;
--enable_ps2_protocol
--enable_cursor_protocol
--echo # successful load statement using bulk insert
CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY,
f2 INT NOT NULL)ENGINE=InnoDB;
SET unique_checks=0, foreign_key_checks=0;
--replace_result $MYSQLTEST_VARDIR VARDIR
eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
CHECK TABLE t2 EXTENDED;
DROP TABLE t2;
CREATE TABLE t2(f1 INT NOT NULL, PRIMARY KEY(f1 DESC),
f2 INT NOT NULL)ENGINE=InnoDB;
--replace_result $MYSQLTEST_VARDIR VARDIR
eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
CHECK TABLE t2 EXTENDED;
DROP TABLE t2;
--echo # load statement using bulk insert fails during secondary index
CREATE TABLE t2(f1 INT NOT NULL PRIMARY KEY,
f2 INT NOT NULL UNIQUE KEY)ENGINE=InnoDB;
--replace_result $MYSQLTEST_VARDIR VARDIR
--error ER_ERROR_DURING_COMMIT
eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
CHECK TABLE t2 EXTENDED;
DROP TABLE t2;
--echo # load statement using bulk insert fails during primary index
CREATE TABLE t2(f1 INT NOT NULL,
f2 INT NOT NULL PRIMARY KEY)ENGINE=InnoDB;
--replace_result $MYSQLTEST_VARDIR VARDIR
--error ER_DUP_ENTRY
eval LOAD DATA INFILE '$MYSQLTEST_VARDIR/tmp/t1.outfile' INTO TABLE t2;
SELECT COUNT(*) FROM t2;
CHECK TABLE t2 EXTENDED;
--remove_file $MYSQLTEST_VARDIR/tmp/t1.outfile
DROP TABLE t2, t1;

View file

@ -725,7 +725,15 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list,
table->file->print_error(my_errno, MYF(0));
error= 1;
}
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
if (!error)
{
int err= table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
if (err == HA_ERR_FOUND_DUPP_KEY)
{
error= 1;
my_error(ER_ERROR_DURING_COMMIT, MYF(0), 1);
}
}
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
table->next_number_field=0;
}

View file

@ -15851,7 +15851,8 @@ ha_innobase::extra(
/* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */
if (dberr_t err = trx->bulk_insert_apply()) {
return err;
return convert_error_code_to_mysql(
err, 0, trx->mysql_thd);
}
break;
}

View file

@ -447,12 +447,15 @@ class row_merge_bulk_t
/** Block for encryption */
row_merge_block_t *m_crypt_block= nullptr;
public:
/** load statement */
bool m_load= false;
/** Constructor.
Create all merge files, merge buffer for all the table indexes
expect fts indexes.
Create a merge block which is used to write IO operation
@param table table which undergoes bulk insert operation */
row_merge_bulk_t(dict_table_t *table);
@param table table which undergoes bulk insert operation
@param load_sql load statement */
row_merge_bulk_t(dict_table_t *table, bool load_sql);
/** Destructor.
Remove all merge files, merge buffer for all table indexes. */

View file

@ -451,12 +451,13 @@ public:
}
/** Notify the start of a bulk insert operation
@param table table to do bulk operation */
void start_bulk_insert(dict_table_t *table)
@param table table to do bulk operation
@param sql_load load data statement */
void start_bulk_insert(dict_table_t *table, bool sql_load)
{
first|= BULK;
if (!table->is_temporary())
bulk_store= new row_merge_bulk_t(table);
bulk_store= new row_merge_bulk_t(table, sql_load);
}
/** Notify the end of a bulk insert operation */
@ -511,6 +512,12 @@ public:
return bulk_store && is_bulk_insert();
}
/** @return whether buffered insert load statement is being executed */
bool load_stmt() const
{
return bulk_store && bulk_store->m_load;
}
/** Free bulk insert operation */
void clear_bulk_buffer()
{
@ -1154,15 +1161,18 @@ public:
/** @return logical modification time of a table only
if the table has bulk buffer exist in the transaction */
trx_mod_table_time_t *check_bulk_buffer(dict_table_t *table)
trx_mod_table_time_t *use_bulk_buffer(dict_index_t *index)
{
if (UNIV_LIKELY(!bulk_insert))
return nullptr;
ut_ad(table->skip_alter_undo || !check_unique_secondary);
ut_ad(table->skip_alter_undo || !check_foreigns);
auto it= mod_tables.find(table);
auto it= mod_tables.find(index->table);
if (it == mod_tables.end() || !it->second.bulk_buffer_exist())
return nullptr;
/* Avoid using bulk buffer for load statement */
if (index->is_clust() && it->second.load_stmt())
return nullptr;
return &it->second;
}

View file

@ -2811,7 +2811,7 @@ avoid_bulk:
trx_start_if_not_started(trx, true);
trx->bulk_insert = true;
auto m = trx->mod_tables.emplace(index->table, 0);
m.first->second.start_bulk_insert(index->table);
m.first->second.start_bulk_insert(index->table, false);
err = m.first->second.bulk_insert_buffered(
*entry, *index, trx);
goto err_exit;
@ -3430,7 +3430,7 @@ row_ins_index_entry(
return(DB_LOCK_WAIT);});
if (index->is_btree()) {
if (auto t= trx->check_bulk_buffer(index->table)) {
if (auto t= trx->use_bulk_buffer(index)) {
/* MDEV-25036 FIXME:
row_ins_check_foreign_constraint() check
should be done before buffering the insert

View file

@ -5053,8 +5053,10 @@ dberr_t row_merge_bulk_t::alloc_block()
return DB_SUCCESS;
}
row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table)
row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table,
bool load_sql):m_load(load_sql)
{
m_load= load_sql;
ulint n_index= 0;
for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
index; index= UT_LIST_GET_NEXT(indexes, index))
@ -5254,6 +5256,31 @@ add_to_buf:
}
func_exit:
if (m_load && ind.is_clust())
{
/* During bulk insert load operation, InnoDB is building the
clustered index one record at a time. InnoDB uses bulk insert
operation for the first insert statement only. Consecutive
insert operation does follow normal insert codepath and
avoids writing undo log records */
dict_index_t *index= const_cast<dict_index_t*>(&ind);
BtrBulk btr_bulk(index, trx);
err= row_merge_insert_index_tuples(index, index->table,
OS_FILE_CLOSED, nullptr,
&m_merge_buf[0], &btr_bulk,
0, 0, 0, nullptr,
index->table->space_id,
nullptr,
m_blob_file.fd == OS_FILE_CLOSED
? nullptr : &m_blob_file);
if (err != DB_SUCCESS)
trx->error_info= index;
else if (index->table->persistent_autoinc)
btr_write_autoinc(index, 1);
err= btr_bulk.finish(err);
if (err == DB_SUCCESS && index->is_clust())
index->table->stat_n_rows= 1;
}
if (large_tuple_heap)
mem_heap_free(large_tuple_heap);
return err;
@ -5332,6 +5359,15 @@ dberr_t row_merge_bulk_t::write_to_table(dict_table_t *table, trx_t *trx)
if (!index->is_btree())
continue;
/** During load bulk, InnoDB does build the clustered index
by one record at a time and doesn't use bulk buffer.
So skip the clustered index while applying the buffered
bulk operation */
if (index->is_clust() && m_load)
{
i++;
continue;
}
dberr_t err= write_to_index(i, trx);
switch (err) {
default:
@ -5347,6 +5383,7 @@ dberr_t row_merge_bulk_t::write_to_table(dict_table_t *table, trx_t *trx)
i++;
}
m_load= false;
return DB_SUCCESS;
}

View file

@ -1865,7 +1865,9 @@ trx_undo_report_row_operation(
} else if (index->table->is_temporary()) {
} else if (trx_has_lock_x(*trx, *index->table)
&& index->table->bulk_trx_id == trx->id) {
m.first->second.start_bulk_insert(index->table);
m.first->second.start_bulk_insert(
index->table,
thd_sql_command(trx->mysql_thd) == SQLCOM_LOAD);
if (dberr_t err = m.first->second.bulk_insert_buffered(
*clust_entry, *index, trx)) {