MDEV-18543 IMPORT TABLESPACE fails after instant DROP COLUMN

ALTER TABLE IMPORT doesn't properly handle instant alter metadata.
This patch makes IMPORT read, parse and apply instant alter metadata at the
very beginning of operation. So, cases when source table has some metadata
and destination table doesn't have it now works fine.

DISCARD already removes instant metadata so importing normal table into
instant table worked fine before this patch.

decrypt_decompress(): decrypts and decompresses page if needed

handle_instant_metadata(): this should be the first thing to read source
table. Basically, it applies instant metadata to a destination
dict_table_t object. This is the first thing to read FSP flags so
all possible checks of it were moved to this function.

PageConverter::update_index_page(): it doesn't now read instant metadata.
This logic were moved into handle_instant_metadata()

row_import::match_flags(): this is a first part row_import::match_schema().
As a separate function it's used by handle_instant_metadata().

fil_space_t::is_full_crc32_compressed(): added convenient function

ha_innobase::discard_or_import_tablespace(): do not reload table definition
to read instant metadata because handle_instant_metadata() does it better.
The reverted code was originally added in
4e7ee166a9

ANONYMOUS_VAR: this is a handy thing to use along with make_scope_exit()

full_crc32_import.test shows different results, because no
dict_table_close() and dict_table_open_on_id() happens.
Thus, SHOW CREATE TABLE shows a little bit older table definition.
This commit is contained in:
Eugene Kosov 2021-08-10 00:54:12 +06:00
parent 36f8cca6f3
commit d74d95961a
15 changed files with 693 additions and 160 deletions

View file

@ -64,3 +64,11 @@ make_scope_exit(Callable &&f)
return detail::scope_exit<typename std::decay<Callable>::type>( return detail::scope_exit<typename std::decay<Callable>::type>(
std::forward<Callable>(f)); std::forward<Callable>(f));
} }
#define CONCAT_IMPL(x, y) x##y
#define CONCAT(x, y) CONCAT_IMPL(x, y)
#define ANONYMOUS_VARIABLE CONCAT(_anonymous_variable, __LINE__)
#define SCOPE_EXIT auto ANONYMOUS_VARIABLE= make_scope_exit

View file

@ -0,0 +1,24 @@
SET GLOBAL innodb_encrypt_tables = ON;
SET GLOBAL innodb_encryption_threads = 4;
CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT, i2 INT, i1 INT)
ENGINE=INNODB ENCRYPTED=YES ENCRYPTION_KEY_ID=4;
INSERT INTO t1 (i2) SELECT 4 FROM seq_1_to_1024;
ALTER TABLE t1 DROP COLUMN i2, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
DROP TABLE t2, t1;
CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT, i2 INT, i1 INT)
ENGINE=INNODB ENCRYPTED=YES ENCRYPTION_KEY_ID=4 PAGE_COMPRESSED=1;
INSERT INTO t1 (i2) SELECT 4 FROM seq_1_to_1024;
ALTER TABLE t1 DROP COLUMN i2, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
DROP TABLE t2, t1;
SET GLOBAL innodb_encrypt_tables = OFF;
SET GLOBAL innodb_encryption_threads = 0;

View file

@ -0,0 +1,52 @@
--source include/have_innodb.inc
--source include/have_sequence.inc
--source include/have_file_key_management_plugin.inc
SET GLOBAL innodb_encrypt_tables = ON;
SET GLOBAL innodb_encryption_threads = 4;
--let $MYSQLD_DATADIR= `SELECT @@datadir`
CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT, i2 INT, i1 INT)
ENGINE=INNODB ENCRYPTED=YES ENCRYPTION_KEY_ID=4;
INSERT INTO t1 (i2) SELECT 4 FROM seq_1_to_1024;
ALTER TABLE t1 DROP COLUMN i2, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
--copy_file $MYSQLD_DATADIR/test/t1.ibd $MYSQLD_DATADIR/test/t2.ibd
--copy_file $MYSQLD_DATADIR/test/t1.cfg $MYSQLD_DATADIR/test/t2.cfg
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
DROP TABLE t2, t1;
CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT, i2 INT, i1 INT)
ENGINE=INNODB ENCRYPTED=YES ENCRYPTION_KEY_ID=4 PAGE_COMPRESSED=1;
INSERT INTO t1 (i2) SELECT 4 FROM seq_1_to_1024;
ALTER TABLE t1 DROP COLUMN i2, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
--copy_file $MYSQLD_DATADIR/test/t1.ibd $MYSQLD_DATADIR/test/t2.ibd
--copy_file $MYSQLD_DATADIR/test/t1.cfg $MYSQLD_DATADIR/test/t2.cfg
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
DROP TABLE t2, t1;
SET GLOBAL innodb_encrypt_tables = OFF;
SET GLOBAL innodb_encryption_threads = 0;

View file

@ -1,3 +1,4 @@
call mtr.add_suppression("Index for table 'tab' is corrupt; try to repair it");
SET @row_format = @@GLOBAL.innodb_default_row_format; SET @row_format = @@GLOBAL.innodb_default_row_format;
# ########################################################### # ###########################################################
# Check with Import/Export tablespace with Default_row_format # Check with Import/Export tablespace with Default_row_format
@ -38,7 +39,7 @@ tab InnoDB # Compact # # # # # # NULL # NULL NULL latin1_swedish_ci NULL 0 N
ALTER TABLE tab DISCARD TABLESPACE; ALTER TABLE tab DISCARD TABLESPACE;
call mtr.add_suppression("InnoDB: Tried to read .* bytes at offset 0"); call mtr.add_suppression("InnoDB: Tried to read .* bytes at offset 0");
ALTER TABLE tab IMPORT TABLESPACE; ALTER TABLE tab IMPORT TABLESPACE;
ERROR HY000: Internal error: Cannot reset LSNs in table `test`.`tab` : I/O error ERROR HY000: Index for table 'tab' is corrupt; try to repair it
ALTER TABLE tab IMPORT TABLESPACE; ALTER TABLE tab IMPORT TABLESPACE;
SELECT * FROM tab; SELECT * FROM tab;
a a

View file

@ -46,7 +46,7 @@ t1 CREATE TABLE `t1` (
`b` blob DEFAULT NULL, `b` blob DEFAULT NULL,
`c` blob DEFAULT NULL, `c` blob DEFAULT NULL,
PRIMARY KEY (`a`) PRIMARY KEY (`a`)
) ENGINE=InnoDB AUTO_INCREMENT=46 DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC ) ENGINE=InnoDB AUTO_INCREMENT=57 DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC
UPDATE t1 set b = repeat("de", 100) where b = repeat("cd", 200); UPDATE t1 set b = repeat("de", 100) where b = repeat("cd", 200);
explain SELECT a FROM t1 where b = repeat("de", 100); explain SELECT a FROM t1 where b = repeat("de", 100);
id select_type table type possible_keys key key_len ref rows Extra id select_type table type possible_keys key key_len ref rows Extra
@ -126,7 +126,7 @@ t1 CREATE TABLE `t1` (
`c2` point NOT NULL, `c2` point NOT NULL,
`c3` linestring NOT NULL, `c3` linestring NOT NULL,
PRIMARY KEY (`c1`) PRIMARY KEY (`c1`)
) ENGINE=InnoDB AUTO_INCREMENT=14325 DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC ) ENGINE=InnoDB AUTO_INCREMENT=16372 DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC
UPDATE t1 SET C2 = ST_GeomFromText('POINT(0 0)'); UPDATE t1 SET C2 = ST_GeomFromText('POINT(0 0)');
SELECT COUNT(*) FROM t1; SELECT COUNT(*) FROM t1;
COUNT(*) COUNT(*)

View file

@ -951,7 +951,7 @@ ERROR HY000: Tablespace has been discarded for table `t1`
restore: t1 .ibd and .cfg files restore: t1 .ibd and .cfg files
SET SESSION debug_dbug="+d,fsp_flags_is_valid_failure"; SET SESSION debug_dbug="+d,fsp_flags_is_valid_failure";
ALTER TABLE t1 IMPORT TABLESPACE; ALTER TABLE t1 IMPORT TABLESPACE;
ERROR HY000: Internal error: Cannot reset LSNs in table `test`.`t1` : Data structure corruption ERROR HY000: Index for table 't1' is corrupt; try to repair it
SET SESSION debug_dbug=@saved_debug_dbug; SET SESSION debug_dbug=@saved_debug_dbug;
DROP TABLE t1; DROP TABLE t1;
unlink: t1.ibd unlink: t1.ibd

View file

@ -61,13 +61,10 @@ alter table t1 discard tablespace;
flush tables t2 for export; flush tables t2 for export;
unlock tables; unlock tables;
alter table t1 import tablespace; alter table t1 import tablespace;
ERROR HY000: Schema mismatch (Index field count 4 doesn't match tablespace metadata file value 5)
select * from t1; select * from t1;
ERROR HY000: Tablespace has been discarded for table `t1` z
alter table t1 import tablespace; 42
ERROR HY000: Internal error: Cannot reset LSNs in table `test`.`t1` : Unsupported 41
select * from t1;
ERROR HY000: Tablespace has been discarded for table `t1`
drop table t2; drop table t2;
drop table t1; drop table t1;
CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT, i1 INT) ENGINE=INNODB; CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT, i1 INT) ENGINE=INNODB;
@ -78,3 +75,46 @@ FLUSH TABLE t1 FOR EXPORT;
UNLOCK TABLES; UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE; ALTER TABLE t2 IMPORT TABLESPACE;
DROP TABLE t2, t1; DROP TABLE t2, t1;
CREATE TABLE t1 (id INT PRIMARY KEY, i2 INT, i1 INT) ENGINE=INNODB;
INSERT INTO t1 VALUES (1, 1, 1);
ALTER TABLE t1 MODIFY COLUMN i2 INT AFTER i1, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
SELECT * FROM t2;
id i1 i2
1 1 1
DROP TABLE t2, t1;
CREATE TABLE t1 (id INT PRIMARY KEY, i2 INT, i1 INT) ENGINE=INNODB;
INSERT INTO t1 VALUES (1, 1, 1);
ALTER TABLE t1 DROP COLUMN i2, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
SELECT * FROM t2;
id i1
1 1
DROP TABLE t2, t1;
CREATE TABLE t1 (id INT PRIMARY KEY, i2 INT, i1 INT)
ENGINE=INNODB PAGE_COMPRESSED=1;
INSERT INTO t1 VALUES (1, 1, 1);
ALTER TABLE t1 DROP COLUMN i2, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
DROP TABLE t2, t1;
CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT, i2 INT, i1 INT) ENGINE=INNODB;
INSERT INTO t1 (i2) SELECT 4 FROM seq_1_to_1024;
ALTER TABLE t1 DROP COLUMN i2, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
DROP TABLE t2, t1;

View file

@ -1,5 +1,8 @@
--source include/have_innodb.inc --source include/have_innodb.inc
call mtr.add_suppression("Index for table 'tab' is corrupt; try to repair it");
SET @row_format = @@GLOBAL.innodb_default_row_format; SET @row_format = @@GLOBAL.innodb_default_row_format;
# set the variables # set the variables
@ -79,7 +82,7 @@ ALTER TABLE tab DISCARD TABLESPACE;
call mtr.add_suppression("InnoDB: Tried to read .* bytes at offset 0"); call mtr.add_suppression("InnoDB: Tried to read .* bytes at offset 0");
--error ER_INTERNAL_ERROR --error ER_NOT_KEYFILE
ALTER TABLE tab IMPORT TABLESPACE; ALTER TABLE tab IMPORT TABLESPACE;
--remove_file $MYSQLD_DATADIR/test/tab.ibd --remove_file $MYSQLD_DATADIR/test/tab.ibd
--move_file $MYSQLD_DATADIR/tab.ibd $MYSQLD_DATADIR/test/tab.ibd --move_file $MYSQLD_DATADIR/tab.ibd $MYSQLD_DATADIR/test/tab.ibd

View file

@ -1375,7 +1375,7 @@ SET SESSION debug_dbug="+d,fsp_flags_is_valid_failure";
--replace_regex /'.*t1.cfg'/'t1.cfg'/ --replace_regex /'.*t1.cfg'/'t1.cfg'/
--error ER_INTERNAL_ERROR --error ER_NOT_KEYFILE
ALTER TABLE t1 IMPORT TABLESPACE; ALTER TABLE t1 IMPORT TABLESPACE;
SET SESSION debug_dbug=@saved_debug_dbug; SET SESSION debug_dbug=@saved_debug_dbug;

View file

@ -1,4 +1,7 @@
--source include/have_innodb.inc --source include/have_innodb.inc
--source include/have_sequence.inc
--source include/innodb_checksum_algorithm.inc
set default_storage_engine=innodb; set default_storage_engine=innodb;
--echo # --echo #
@ -70,14 +73,7 @@ flush tables t2 for export;
--copy_file $MYSQLD_DATADIR/test/t2.ibd $MYSQLD_DATADIR/test/t1.ibd --copy_file $MYSQLD_DATADIR/test/t2.ibd $MYSQLD_DATADIR/test/t1.ibd
unlock tables; unlock tables;
--error ER_TABLE_SCHEMA_MISMATCH
alter table t1 import tablespace; alter table t1 import tablespace;
--error ER_TABLESPACE_DISCARDED
select * from t1;
--remove_file $MYSQLD_DATADIR/test/t1.cfg
--error ER_INTERNAL_ERROR
alter table t1 import tablespace;
--error ER_TABLESPACE_DISCARDED
select * from t1; select * from t1;
drop table t2; drop table t2;
@ -101,3 +97,86 @@ UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE; ALTER TABLE t2 IMPORT TABLESPACE;
DROP TABLE t2, t1; DROP TABLE t2, t1;
CREATE TABLE t1 (id INT PRIMARY KEY, i2 INT, i1 INT) ENGINE=INNODB;
INSERT INTO t1 VALUES (1, 1, 1);
ALTER TABLE t1 MODIFY COLUMN i2 INT AFTER i1, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
--copy_file $MYSQLD_DATADIR/test/t1.ibd $MYSQLD_DATADIR/test/t2.ibd
--copy_file $MYSQLD_DATADIR/test/t1.cfg $MYSQLD_DATADIR/test/t2.cfg
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
SELECT * FROM t2;
DROP TABLE t2, t1;
CREATE TABLE t1 (id INT PRIMARY KEY, i2 INT, i1 INT) ENGINE=INNODB;
INSERT INTO t1 VALUES (1, 1, 1);
ALTER TABLE t1 DROP COLUMN i2, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
--copy_file $MYSQLD_DATADIR/test/t1.ibd $MYSQLD_DATADIR/test/t2.ibd
--copy_file $MYSQLD_DATADIR/test/t1.cfg $MYSQLD_DATADIR/test/t2.cfg
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
SELECT * FROM t2;
DROP TABLE t2, t1;
CREATE TABLE t1 (id INT PRIMARY KEY, i2 INT, i1 INT)
ENGINE=INNODB PAGE_COMPRESSED=1;
INSERT INTO t1 VALUES (1, 1, 1);
ALTER TABLE t1 DROP COLUMN i2, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
--copy_file $MYSQLD_DATADIR/test/t1.ibd $MYSQLD_DATADIR/test/t2.ibd
--copy_file $MYSQLD_DATADIR/test/t1.cfg $MYSQLD_DATADIR/test/t2.cfg
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
DROP TABLE t2, t1;
CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT, i2 INT, i1 INT) ENGINE=INNODB;
INSERT INTO t1 (i2) SELECT 4 FROM seq_1_to_1024;
ALTER TABLE t1 DROP COLUMN i2, ALGORITHM=INSTANT;
CREATE TABLE t2 LIKE t1;
ALTER TABLE t2 DISCARD TABLESPACE;
FLUSH TABLE t1 FOR EXPORT;
--copy_file $MYSQLD_DATADIR/test/t1.ibd $MYSQLD_DATADIR/test/t2.ibd
--copy_file $MYSQLD_DATADIR/test/t1.cfg $MYSQLD_DATADIR/test/t2.cfg
UNLOCK TABLES;
ALTER TABLE t2 IMPORT TABLESPACE;
DROP TABLE t2, t1;

View file

@ -407,7 +407,7 @@ ERROR HY000: Tablespace has been discarded for table `t1`
restore: t1 .ibd and .cfg files restore: t1 .ibd and .cfg files
SET SESSION debug_dbug="+d,fsp_flags_is_valid_failure"; SET SESSION debug_dbug="+d,fsp_flags_is_valid_failure";
ALTER TABLE t1 IMPORT TABLESPACE; ALTER TABLE t1 IMPORT TABLESPACE;
ERROR HY000: Internal error: Cannot reset LSNs in table `test`.`t1` : Data structure corruption ERROR HY000: Index for table 't1' is corrupt; try to repair it
SET SESSION debug_dbug=@saved_debug_dbug; SET SESSION debug_dbug=@saved_debug_dbug;
DROP TABLE t1; DROP TABLE t1;
unlink: t1.ibd unlink: t1.ibd

View file

@ -624,7 +624,7 @@ EOF
SET SESSION debug_dbug="+d,fsp_flags_is_valid_failure"; SET SESSION debug_dbug="+d,fsp_flags_is_valid_failure";
--error ER_INTERNAL_ERROR --error ER_NOT_KEYFILE
ALTER TABLE t1 IMPORT TABLESPACE; ALTER TABLE t1 IMPORT TABLESPACE;
SET SESSION debug_dbug=@saved_debug_dbug; SET SESSION debug_dbug=@saved_debug_dbug;

View file

@ -13117,25 +13117,6 @@ ha_innobase::discard_or_import_tablespace(
err, m_prebuilt->table->flags, NULL)); err, m_prebuilt->table->flags, NULL));
} }
/* Evict and reload the table definition in order to invoke
btr_cur_instant_init(). */
table_id_t id = m_prebuilt->table->id;
ut_ad(id);
mutex_enter(&dict_sys.mutex);
dict_table_close(m_prebuilt->table, TRUE, FALSE);
dict_sys.remove(m_prebuilt->table);
m_prebuilt->table = dict_table_open_on_id(id, TRUE,
DICT_TABLE_OP_NORMAL);
mutex_exit(&dict_sys.mutex);
if (!m_prebuilt->table) {
err = DB_TABLE_NOT_FOUND;
} else {
if (const Field* ai = table->found_next_number_field) {
initialize_auto_increment(m_prebuilt->table, ai);
}
dict_stats_init(m_prebuilt->table);
}
if (dict_stats_is_persistent_enabled(m_prebuilt->table)) { if (dict_stats_is_persistent_enabled(m_prebuilt->table)) {
dberr_t ret; dberr_t ret;

View file

@ -316,6 +316,18 @@ public:
static bool full_crc32(ulint flags) { static bool full_crc32(ulint flags) {
return flags & FSP_FLAGS_FCRC32_MASK_MARKER; return flags & FSP_FLAGS_FCRC32_MASK_MARKER;
} }
/** Determine if full_crc32 is used along with compression */
static bool is_full_crc32_compressed(ulint flags)
{
if (full_crc32(flags))
{
ulint algo= FSP_FLAGS_FCRC32_GET_COMPRESSED_ALGO(flags);
DBUG_ASSERT(algo <= PAGE_ALGORITHM_LAST);
return algo > 0;
}
return false;
}
/** @return whether innodb_checksum_algorithm=full_crc32 is active */ /** @return whether innodb_checksum_algorithm=full_crc32 is active */
bool full_crc32() const { return full_crc32(flags); } bool full_crc32() const { return full_crc32(flags); }
/** Determine the logical page size. /** Determine the logical page size.
@ -377,19 +389,13 @@ public:
unsigned zip_size() const { return zip_size(flags); } unsigned zip_size() const { return zip_size(flags); }
/** @return the physical page size */ /** @return the physical page size */
unsigned physical_size() const { return physical_size(flags); } unsigned physical_size() const { return physical_size(flags); }
/** Check whether the compression enabled in tablespace. /** Check whether the compression enabled in tablespace.
@param[in] flags tablespace flags */ @param[in] flags tablespace flags */
static bool is_compressed(ulint flags) { static bool is_compressed(ulint flags)
{
if (full_crc32(flags)) { return is_full_crc32_compressed(flags)
ulint algo = FSP_FLAGS_FCRC32_GET_COMPRESSED_ALGO( || FSP_FLAGS_HAS_PAGE_COMPRESSION(flags);
flags); }
DBUG_ASSERT(algo <= PAGE_ALGORITHM_LAST);
return algo > 0;
}
return FSP_FLAGS_HAS_PAGE_COMPRESSION(flags);
}
/** @return whether the compression enabled for the tablespace. */ /** @return whether the compression enabled for the tablespace. */
bool is_compressed() const { return is_compressed(flags); } bool is_compressed() const { return is_compressed(flags); }

View file

@ -40,6 +40,7 @@ Created 2012-02-08 by Sunny Bains.
#include "row0quiesce.h" #include "row0quiesce.h"
#include "fil0pagecompress.h" #include "fil0pagecompress.h"
#include "trx0undo.h" #include "trx0undo.h"
#include "row0row.h"
#ifdef HAVE_LZO #ifdef HAVE_LZO
#include "lzo/lzo1x.h" #include "lzo/lzo1x.h"
#endif #endif
@ -47,6 +48,8 @@ Created 2012-02-08 by Sunny Bains.
#include "snappy-c.h" #include "snappy-c.h"
#endif #endif
#include "scope.h"
#include <vector> #include <vector>
#ifdef HAVE_MY_AES_H #ifdef HAVE_MY_AES_H
@ -190,6 +193,9 @@ struct row_import {
dberr_t match_schema( dberr_t match_schema(
THD* thd) UNIV_NOTHROW; THD* thd) UNIV_NOTHROW;
dberr_t match_flags(THD *thd) const ;
dict_table_t* m_table; /*!< Table instance */ dict_table_t* m_table; /*!< Table instance */
ulint m_version; /*!< Version of config file */ ulint m_version; /*!< Version of config file */
@ -575,8 +581,6 @@ AbstractCallback::init(
if (!fil_space_t::is_valid_flags(m_space_flags, true)) { if (!fil_space_t::is_valid_flags(m_space_flags, true)) {
ulint cflags = fsp_flags_convert_from_101(m_space_flags); ulint cflags = fsp_flags_convert_from_101(m_space_flags);
if (cflags == ULINT_UNDEFINED) { if (cflags == ULINT_UNDEFINED) {
ib::error() << "Invalid FSP_SPACE_FLAGS="
<< ib::hex(m_space_flags);
return(DB_CORRUPTION); return(DB_CORRUPTION);
} }
m_space_flags = cflags; m_space_flags = cflags;
@ -1103,7 +1107,8 @@ row_import::match_index_columns(
for (ulint i = 0; i < index->n_fields; ++i, ++field, ++cfg_field) { for (ulint i = 0; i < index->n_fields; ++i, ++field, ++cfg_field) {
if (strcmp(field->name(), cfg_field->name()) != 0) { if (field->name() && cfg_field->name()
&& strcmp(field->name(), cfg_field->name()) != 0) {
ib_errf(thd, IB_LOG_LEVEL_ERROR, ib_errf(thd, IB_LOG_LEVEL_ERROR,
ER_TABLE_SCHEMA_MISMATCH, ER_TABLE_SCHEMA_MISMATCH,
"Index field name %s doesn't match" "Index field name %s doesn't match"
@ -1272,6 +1277,61 @@ row_import::match_table_columns(
return(err); return(err);
} }
dberr_t row_import::match_flags(THD *thd) const
{
ulint mismatch= (m_table->flags ^ m_flags) & ~DICT_TF_MASK_DATA_DIR;
if (!mismatch)
return DB_SUCCESS;
const char *msg;
if (mismatch & DICT_TF_MASK_ZIP_SSIZE)
{
if ((m_table->flags & DICT_TF_MASK_ZIP_SSIZE) &&
(m_flags & DICT_TF_MASK_ZIP_SSIZE))
{
switch (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
case 0U << DICT_TF_POS_ZIP_SSIZE:
goto uncompressed;
case 1U << DICT_TF_POS_ZIP_SSIZE:
msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=1";
break;
case 2U << DICT_TF_POS_ZIP_SSIZE:
msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=2";
break;
case 3U << DICT_TF_POS_ZIP_SSIZE:
msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=4";
break;
case 4U << DICT_TF_POS_ZIP_SSIZE:
msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=8";
break;
case 5U << DICT_TF_POS_ZIP_SSIZE:
msg= "ROW_FORMAT=COMPRESSED KEY_BLOCK_SIZE=16";
break;
default:
msg= "strange KEY_BLOCK_SIZE";
}
}
else if (m_flags & DICT_TF_MASK_ZIP_SSIZE)
msg= "ROW_FORMAT=COMPRESSED";
else
goto uncompressed;
}
else
{
uncompressed:
msg= (m_flags & DICT_TF_MASK_ATOMIC_BLOBS) ? "ROW_FORMAT=DYNAMIC"
: (m_flags & DICT_TF_MASK_COMPACT) ? "ROW_FORMAT=COMPACT"
: "ROW_FORMAT=REDUNDANT";
}
ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
"Table flags don't match, server table has 0x%x and the meta-data "
"file has 0x%zx; .cfg file uses %s",
m_table->flags, m_flags, msg);
return DB_ERROR;
}
/** Check if the table (and index) schema that was read from the .cfg file /** Check if the table (and index) schema that was read from the .cfg file
matches the in memory table definition. matches the in memory table definition.
@param thd MySQL session variable @param thd MySQL session variable
@ -1282,60 +1342,7 @@ row_import::match_schema(
{ {
/* Do some simple checks. */ /* Do some simple checks. */
if (ulint mismatch = (m_table->flags ^ m_flags) if (m_table->n_cols != m_n_cols) {
& ~DICT_TF_MASK_DATA_DIR) {
const char* msg;
if (mismatch & DICT_TF_MASK_ZIP_SSIZE) {
if ((m_table->flags & DICT_TF_MASK_ZIP_SSIZE)
&& (m_flags & DICT_TF_MASK_ZIP_SSIZE)) {
switch (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
case 0U << DICT_TF_POS_ZIP_SSIZE:
goto uncompressed;
case 1U << DICT_TF_POS_ZIP_SSIZE:
msg = "ROW_FORMAT=COMPRESSED"
" KEY_BLOCK_SIZE=1";
break;
case 2U << DICT_TF_POS_ZIP_SSIZE:
msg = "ROW_FORMAT=COMPRESSED"
" KEY_BLOCK_SIZE=2";
break;
case 3U << DICT_TF_POS_ZIP_SSIZE:
msg = "ROW_FORMAT=COMPRESSED"
" KEY_BLOCK_SIZE=4";
break;
case 4U << DICT_TF_POS_ZIP_SSIZE:
msg = "ROW_FORMAT=COMPRESSED"
" KEY_BLOCK_SIZE=8";
break;
case 5U << DICT_TF_POS_ZIP_SSIZE:
msg = "ROW_FORMAT=COMPRESSED"
" KEY_BLOCK_SIZE=16";
break;
default:
msg = "strange KEY_BLOCK_SIZE";
}
} else if (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
msg = "ROW_FORMAT=COMPRESSED";
} else {
goto uncompressed;
}
} else {
uncompressed:
msg = (m_flags & DICT_TF_MASK_ATOMIC_BLOBS)
? "ROW_FORMAT=DYNAMIC"
: (m_flags & DICT_TF_MASK_COMPACT)
? "ROW_FORMAT=COMPACT"
: "ROW_FORMAT=REDUNDANT";
}
ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
"Table flags don't match, server table has 0x%x"
" and the meta-data file has 0x" ULINTPFx ";"
" .cfg file uses %s",
m_table->flags, m_flags, msg);
return(DB_ERROR);
} else if (m_table->n_cols != m_n_cols) {
ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH, ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
"Number of columns don't match, table has %u " "Number of columns don't match, table has %u "
"columns but the tablespace meta-data file has " "columns but the tablespace meta-data file has "
@ -1957,40 +1964,7 @@ PageConverter::update_index_page(
if (dict_index_is_clust(m_index->m_srv_index)) { if (dict_index_is_clust(m_index->m_srv_index)) {
dict_index_t* index = const_cast<dict_index_t*>( dict_index_t* index = const_cast<dict_index_t*>(
m_index->m_srv_index); m_index->m_srv_index);
if (block->page.id.page_no() == index->page) { if (block->page.id.page_no() != index->page) {
/* Preserve the PAGE_ROOT_AUTO_INC. */
if (index->table->supports_instant()) {
if (btr_cur_instant_root_init(index, page)) {
return(DB_CORRUPTION);
}
if (index->n_core_fields > index->n_fields) {
/* Some columns have been dropped.
Refuse to IMPORT TABLESPACE for now.
NOTE: This is not an accurate check.
Columns could have been both
added and dropped instantly.
For an accurate check, we must read
the metadata BLOB page pointed to
by the leftmost leaf page.
But we would have to read
those pages in a special way,
bypassing the buffer pool! */
return DB_UNSUPPORTED;
}
/* Provisionally set all instantly
added columns to be DEFAULT NULL. */
for (unsigned i = index->n_core_fields;
i < index->n_fields; i++) {
dict_col_t* col = index->fields[i].col;
col->def_val.len = UNIV_SQL_NULL;
col->def_val.data = NULL;
}
}
} else {
/* Clear PAGE_MAX_TRX_ID so that it can be /* Clear PAGE_MAX_TRX_ID so that it can be
used for other purposes in the future. IMPORT used for other purposes in the future. IMPORT
in MySQL 5.6, 5.7 and MariaDB 10.0 and 10.1 in MySQL 5.6, 5.7 and MariaDB 10.0 and 10.1
@ -3131,6 +3105,371 @@ row_import_read_meta_data(
return(DB_ERROR); return(DB_ERROR);
} }
#define BTR_BLOB_HDR_PART_LEN 0 /*!< BLOB part len on this page */
#define BTR_BLOB_HDR_NEXT_PAGE_NO 4 /*!< next BLOB part page no,
FIL_NULL if none */
#define BTR_BLOB_HDR_SIZE 8 /*!< Size of a BLOB part header, in bytes */
/* decrypt and decompress page if needed */
static dberr_t decrypt_decompress(fil_space_crypt_t *space_crypt,
size_t space_flags, span<byte> page,
size_t space_id, byte *page_compress_buf)
{
auto *data= page.data();
if (space_crypt && space_crypt->should_encrypt())
{
if (!buf_page_verify_crypt_checksum(data, space_flags))
return DB_CORRUPTION;
dberr_t err;
if (!fil_space_decrypt(space_id, space_crypt, data, page.size(),
space_flags, data, &err))
return err ? err : DB_CORRUPTION;
}
else if (fil_page_is_compressed_encrypted(data))
return DB_CORRUPTION;
const bool is_full_crc32_compressed=
fil_space_t::is_full_crc32_compressed(space_flags);
const bool page_actually_compressed=
(is_full_crc32_compressed &&
buf_page_is_compressed(data, space_flags)) ||
fil_page_is_compressed_encrypted(data) || fil_page_is_compressed(data);
if (page_actually_compressed)
{
if (!is_full_crc32_compressed && !fil_space_t::is_compressed(space_flags))
return DB_CORRUPTION;
auto compress_length=
fil_page_decompress(page_compress_buf, data, space_flags);
ut_ad(compress_length != srv_page_size);
if (compress_length == 0)
return DB_CORRUPTION;
}
return DB_SUCCESS;
}
static size_t get_buf_size()
{
return srv_page_size
#ifdef HAVE_LZO
+ LZO1X_1_15_MEM_COMPRESS
#elif defined HAVE_SNAPPY
+ snappy_max_compressed_length(srv_page_size)
#endif
;
}
/* find, parse instant metadata, performing variaous checks,
and apply it to dict_table_t
@return DB_SUCCESS or some error */
static dberr_t handle_instant_metadata(dict_table_t *table,
const row_import &cfg)
{
dict_get_and_save_data_dir_path(table, false);
char *filepath;
if (DICT_TF_HAS_DATA_DIR(table->flags))
{
ut_a(table->data_dir_path);
filepath=
fil_make_filepath(table->data_dir_path, table->name.m_name, IBD, true);
}
else
filepath= fil_make_filepath(nullptr, table->name.m_name, IBD, false);
if (!filepath)
return DB_OUT_OF_MEMORY;
SCOPE_EXIT([filepath]() { ut_free(filepath); });
bool success;
auto file= os_file_create_simple_no_error_handling(
innodb_data_file_key, filepath, OS_FILE_OPEN, OS_FILE_READ_WRITE, false,
&success);
if (!success)
return DB_IO_ERROR;
if (os_file_get_size(file) < srv_page_size * 4)
return DB_CORRUPTION;
SCOPE_EXIT([&file]() { os_file_close(file); });
std::unique_ptr<byte[], decltype(&aligned_free)> first_page(
static_cast<byte *>(aligned_malloc(srv_page_size, srv_page_size)),
&aligned_free);
if (dberr_t err= os_file_read_no_error_handling(IORequest(IORequest::READ),
file, first_page.get(), 0,
srv_page_size, nullptr))
return err;
auto space_flags= fsp_header_get_flags(first_page.get());
if (!fil_space_t::is_valid_flags(space_flags, true))
{
auto cflags= fsp_flags_convert_from_101(space_flags);
if (cflags == ULINT_UNDEFINED)
{
ib::error() << "Invalid FSP_SPACE_FLAGS=" << ib::hex(space_flags);
return DB_CORRUPTION;
}
space_flags= cflags;
}
if (!cfg.m_missing)
{
if (dberr_t err= cfg.match_flags(current_thd))
return err;
}
const auto zip_size= fil_space_t::zip_size(space_flags);
const uint64_t physical_size= zip_size ? zip_size : srv_page_size;
ut_ad(physical_size <= UNIV_PAGE_SIZE_MAX);
const auto space_id= page_get_space_id(first_page.get());
auto *space_crypt= fil_space_read_crypt_data(zip_size, first_page.get());
SCOPE_EXIT([&space_crypt]() {
if (space_crypt)
fil_space_destroy_crypt_data(&space_crypt);
});
std::unique_ptr<byte[], decltype(&aligned_free)> page(
static_cast<byte *>(
aligned_malloc(UNIV_PAGE_SIZE_MAX, UNIV_PAGE_SIZE_MAX)),
&aligned_free);
if (dberr_t err= os_file_read_no_error_handling(
IORequest(IORequest::READ), file, page.get(), 3 * physical_size,
physical_size, nullptr))
return err;
std::unique_ptr<byte[]> page_compress_buf(new byte[get_buf_size()]);
if (dberr_t err=
decrypt_decompress(space_crypt, space_flags,
{page.get(), static_cast<size_t>(physical_size)},
space_id, page_compress_buf.get()))
return err;
if (table->supports_instant())
{
dict_index_t *index= dict_table_get_first_index(table);
auto tmp1= table->space_id;
table->space_id= page_get_space_id(page.get());
SCOPE_EXIT([tmp1, table]() { table->space_id= tmp1; });
auto tmp2= index->page;
index->page= page_get_page_no(page.get());
SCOPE_EXIT([tmp2, index]() { index->page= tmp2; });
if (!page_is_comp(page.get()) != !dict_table_is_comp(table))
{
ib_errf(current_thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
"ROW_FORMAT mismatch");
return DB_CORRUPTION;
}
if (btr_cur_instant_root_init(index, page.get()))
return DB_ERROR;
ut_ad(index->n_core_null_bytes != dict_index_t::NO_CORE_NULL_BYTES);
if (fil_page_get_type(page.get()) == FIL_PAGE_INDEX)
{
ut_ad(!index->is_instant());
return DB_SUCCESS;
}
mem_heap_t *heap= NULL;
SCOPE_EXIT([&heap]() { mem_heap_free(heap); });
while (btr_page_get_level(page.get()) != 0)
{
const rec_t *rec= page_rec_get_next(page_get_infimum_rec(page.get()));
/* Relax the assertion in rec_init_offsets(). */
ut_ad(!index->in_instant_init);
ut_d(index->in_instant_init= true);
rec_offs *offsets=
rec_get_offsets(rec, index, nullptr, 0, ULINT_UNDEFINED, &heap);
ut_d(index->in_instant_init= false);
uint64_t child_page_no= btr_node_ptr_get_child_page_no(rec, offsets);
if (dberr_t err= os_file_read_no_error_handling(
IORequest(IORequest::READ), file, page.get(),
child_page_no * physical_size, physical_size, nullptr))
return err;
if (dberr_t err= decrypt_decompress(
space_crypt, space_flags,
{page.get(), static_cast<size_t>(physical_size)}, space_id,
page_compress_buf.get()))
return err;
}
const auto *rec= page_rec_get_next(page_get_infimum_rec(page.get()));
const auto comp= dict_table_is_comp(index->table);
const auto info_bits= rec_get_info_bits(rec, comp);
if (page_rec_is_supremum(rec) || !(info_bits & REC_INFO_MIN_REC_FLAG))
{
ib::error() << "Table " << index->table->name
<< " is missing instant ALTER metadata";
index->table->corrupted= true;
return DB_CORRUPTION;
}
if ((info_bits & ~REC_INFO_DELETED_FLAG) != REC_INFO_MIN_REC_FLAG ||
(comp && rec_get_status(rec) != REC_STATUS_INSTANT))
{
incompatible:
ib::error() << "Table " << index->table->name
<< " contains unrecognizable instant ALTER metadata";
index->table->corrupted= true;
return DB_CORRUPTION;
}
if (info_bits & REC_INFO_DELETED_FLAG)
{
ulint trx_id_offset= index->trx_id_offset;
ut_ad(index->n_uniq);
if (trx_id_offset)
{
}
else if (index->table->not_redundant())
{
for (uint i= index->n_uniq; i--;)
trx_id_offset+= index->fields[i].fixed_len;
}
else if (rec_get_1byte_offs_flag(rec))
{
trx_id_offset= rec_1_get_field_end_info(rec, index->n_uniq - 1);
ut_ad(!(trx_id_offset & REC_1BYTE_SQL_NULL_MASK));
trx_id_offset&= ~REC_1BYTE_SQL_NULL_MASK;
}
else
{
trx_id_offset= rec_2_get_field_end_info(rec, index->n_uniq - 1);
ut_ad(!(trx_id_offset & REC_2BYTE_SQL_NULL_MASK));
trx_id_offset&= ~REC_2BYTE_SQL_NULL_MASK;
}
const byte *ptr=
rec + trx_id_offset + (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
if (mach_read_from_4(ptr + BTR_EXTERN_LEN))
goto incompatible;
uint len= mach_read_from_4(ptr + BTR_EXTERN_LEN + 4);
if (!len || mach_read_from_4(ptr + BTR_EXTERN_OFFSET) != FIL_PAGE_DATA)
goto incompatible;
std::unique_ptr<byte[], decltype(&aligned_free)> second_page(
static_cast<byte *>(aligned_malloc(physical_size, physical_size)),
&aligned_free);
if (dberr_t err= os_file_read_no_error_handling(
IORequest(IORequest::READ), file, second_page.get(),
mach_read_from_4(ptr + BTR_EXTERN_PAGE_NO) * physical_size,
srv_page_size, nullptr))
return err;
if (dberr_t err= decrypt_decompress(
space_crypt, space_flags,
{second_page.get(), static_cast<size_t>(physical_size)},
space_id, page_compress_buf.get()))
return err;
if (fil_page_get_type(second_page.get()) != FIL_PAGE_TYPE_BLOB ||
mach_read_from_4(
&second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_NEXT_PAGE_NO]) !=
FIL_NULL ||
mach_read_from_4(
&second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_PART_LEN]) != len)
goto incompatible;
/* The unused part of the BLOB page should be zero-filled. */
for (const byte *
b= second_page.get() + (FIL_PAGE_DATA + BTR_BLOB_HDR_SIZE) +
len,
*const end= second_page.get() + srv_page_size - BTR_EXTERN_LEN;
b < end;)
{
if (*b++)
goto incompatible;
}
if (index->table->deserialise_columns(
&second_page[FIL_PAGE_DATA + BTR_BLOB_HDR_SIZE], len))
goto incompatible;
}
rec_offs *offsets= rec_get_offsets(
rec, index, nullptr, index->n_core_fields, ULINT_UNDEFINED, &heap);
if (rec_offs_any_default(offsets))
{
inconsistent:
goto incompatible;
}
/* In fact, because we only ever append fields to the metadata
record, it is also OK to perform READ UNCOMMITTED and
then ignore any extra fields, provided that
trx_sys.is_registered(DB_TRX_ID). */
if (rec_offs_n_fields(offsets) >
ulint(index->n_fields) + !!index->table->instant &&
!trx_sys.is_registered(current_trx(),
row_get_rec_trx_id(rec, index, offsets)))
goto inconsistent;
for (unsigned i= index->n_core_fields; i < index->n_fields; i++)
{
dict_col_t *col= index->fields[i].col;
const unsigned o= i + !!index->table->instant;
ulint len;
const byte *data= rec_get_nth_field(rec, offsets, o, &len);
ut_ad(!col->is_added());
ut_ad(!col->def_val.data);
col->def_val.len= len;
switch (len) {
case UNIV_SQL_NULL:
continue;
case 0:
col->def_val.data= field_ref_zero;
continue;
}
ut_ad(len != UNIV_SQL_DEFAULT);
if (!rec_offs_nth_extern(offsets, o))
col->def_val.data= mem_heap_dup(index->table->heap, data, len);
else if (len < BTR_EXTERN_FIELD_REF_SIZE ||
!memcmp(data + len - BTR_EXTERN_FIELD_REF_SIZE, field_ref_zero,
BTR_EXTERN_FIELD_REF_SIZE))
{
col->def_val.len= UNIV_SQL_DEFAULT;
goto inconsistent;
}
else
{
col->def_val.data= btr_copy_externally_stored_field(
&col->def_val.len, data, srv_page_size, len, index->table->heap);
}
}
}
return DB_SUCCESS;
}
/** /**
Read the contents of the <tablename>.cfg file. Read the contents of the <tablename>.cfg file.
@return DB_SUCCESS or error code. */ @return DB_SUCCESS or error code. */
@ -3441,14 +3780,7 @@ dberr_t FetchIndexRootPages::run(const fil_iterator_t& iter,
{ {
const unsigned zip_size= fil_space_t::zip_size(m_space_flags); const unsigned zip_size= fil_space_t::zip_size(m_space_flags);
const unsigned size= zip_size ? zip_size : unsigned(srv_page_size); const unsigned size= zip_size ? zip_size : unsigned(srv_page_size);
const ulint buf_size= byte* page_compress_buf= static_cast<byte*>(malloc(get_buf_size()));
#ifdef HAVE_LZO
LZO1X_1_15_MEM_COMPRESS+
#elif defined HAVE_SNAPPY
snappy_max_compressed_length(srv_page_size) +
#endif
srv_page_size;
byte* page_compress_buf = static_cast<byte*>(malloc(buf_size));
const bool full_crc32 = fil_space_t::full_crc32(m_space_flags); const bool full_crc32 = fil_space_t::full_crc32(m_space_flags);
bool skip_checksum_check = false; bool skip_checksum_check = false;
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
@ -3541,14 +3873,7 @@ static dberr_t fil_iterate(
const ulint size = callback.physical_size(); const ulint size = callback.physical_size();
ulint n_bytes = iter.n_io_buffers * size; ulint n_bytes = iter.n_io_buffers * size;
const ulint buf_size = srv_page_size byte* page_compress_buf= static_cast<byte*>(malloc(get_buf_size()));
#ifdef HAVE_LZO
+ LZO1X_1_15_MEM_COMPRESS
#elif defined HAVE_SNAPPY
+ snappy_max_compressed_length(srv_page_size)
#endif
;
byte* page_compress_buf = static_cast<byte*>(malloc(buf_size));
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
if (!page_compress_buf) { if (!page_compress_buf) {
@ -4122,6 +4447,11 @@ row_import_for_mysql(
if (err == DB_SUCCESS) { if (err == DB_SUCCESS) {
if (dberr_t err = handle_instant_metadata(table, cfg)) {
rw_lock_s_unlock(&dict_sys.latch);
return row_import_error(prebuilt, trx, err);
}
/* We have a schema file, try and match it with our /* We have a schema file, try and match it with our
data dictionary. */ data dictionary. */
@ -4179,6 +4509,15 @@ row_import_for_mysql(
if (err == DB_SUCCESS) { if (err == DB_SUCCESS) {
err = cfg.set_root_by_heuristic(); err = cfg.set_root_by_heuristic();
if (err == DB_SUCCESS) {
if (dberr_t err =
handle_instant_metadata(table,
cfg)) {
return row_import_error(
prebuilt, trx, err);
}
}
} }
} }