SQL: VTMD for SHOW CREATE fixes [related to #125]

This commit is contained in:
Aleksey Midenkov 2017-09-27 14:29:25 +03:00
parent 79e17b26fb
commit e9e3cb0f6e
5 changed files with 154 additions and 162 deletions

View file

@ -51,7 +51,7 @@ drop table tt_vtmd;
end~~
create table t (a int) with system versioning;
show create table t for system_time as of now;
ERROR 42S02: Table 'test.t_vtmd' doesn't exist
ERROR HY000: VTMD error: Table 'test.t_vtmd' doesn't exist
set versioning_alter_history=survive;
create or replace table t (a int) with system versioning;
show create table t for system_time between timestamp @tm1 and timestamp @tm1;
@ -59,9 +59,9 @@ ERROR HY000: SYSTEM_TIME range selector is prohibited
show create table t for system_time from timestamp @tm1 to timestamp @tm1;
ERROR HY000: SYSTEM_TIME range selector is prohibited
show create table t for system_time as of timestamp '01-01-1990';
ERROR HY000: VTMD error: failed to query VTMD table
ERROR HY000: VTMD error: Table 'test.t' doesn't exist
show create table t for system_time as of timestamp '01-01-2020';
ERROR HY000: VTMD error: failed to query VTMD table
ERROR HY000: VTMD error: Table 'test.t' doesn't exist
drop table t;
call drop_archives('t_vtmd');
drop table t_vtmd;

View file

@ -65,7 +65,7 @@ end~~
delimiter ;~~
create table t (a int) with system versioning;
--error ER_NO_SUCH_TABLE
--error ER_VERS_VTMD_ERROR
show create table t for system_time as of now;
set versioning_alter_history=survive;

View file

@ -1270,23 +1270,22 @@ mysqld_show_create(THD *thd, TABLE_LIST *table_list)
*/
MDL_savepoint mdl_savepoint= thd->mdl_context.mdl_savepoint();
TABLE_LIST tl;
bool versioned_query=
table_list->vers_conditions.type != FOR_SYSTEM_TIME_UNSPECIFIED;
String archive_name;
if (versioned_query)
TABLE_LIST archive;
bool versioned= table_list->vers_conditions;
if (versioned)
{
DBUG_ASSERT(table_list->vers_conditions.type == FOR_SYSTEM_TIME_AS_OF);
String archive_name;
DBUG_ASSERT(table_list->vers_conditions == FOR_SYSTEM_TIME_AS_OF);
VTMD_table vtmd(*table_list);
if (vtmd.find_archive_name(thd, archive_name))
goto exit;
tl.init_one_table(table_list->db, table_list->db_length, archive_name.ptr(),
archive.init_one_table(table_list->db, table_list->db_length, archive_name.ptr(),
archive_name.length(), archive_name.ptr(), TL_READ);
tl.alias= table_list->table_name;
tl.vers_force_alias= true;
table_list= &tl;
archive.alias= table_list->table_name;
archive.vers_force_alias= true;
table_list= &archive;
}
if (mysqld_show_create_get_fields(thd, table_list, &field_list, &buffer))
@ -1302,9 +1301,7 @@ mysqld_show_create(THD *thd, TABLE_LIST *table_list)
protocol->store(table_list->view_name.str, system_charset_info);
else
{
if (versioned_query)
protocol->store(tl.alias, system_charset_info);
else if (table_list->schema_table)
if (table_list->schema_table)
protocol->store(table_list->schema_table->table_name,
system_charset_info);
else
@ -1332,7 +1329,7 @@ mysqld_show_create(THD *thd, TABLE_LIST *table_list)
my_eof(thd);
exit:
if (versioned_query)
if (versioned)
{
/* If commit fails, we should be able to reset the OK status. */
thd->get_stmt_da()->set_overwrite_status(true);

View file

@ -54,17 +54,17 @@ VTMD_table::find_record(ulonglong sys_trx_end, bool &found)
key_buf_t key;
found= false;
DBUG_ASSERT(vtmd);
DBUG_ASSERT(vtmd.table);
if (key.allocate(vtmd->s->max_unique_length))
if (key.allocate(vtmd.table->s->max_unique_length))
return true;
DBUG_ASSERT(sys_trx_end);
vtmd->vers_end_field()->set_notnull();
vtmd->vers_end_field()->store(sys_trx_end, true);
key_copy(key, vtmd->record[0], vtmd->key_info + IDX_TRX_END, 0);
vtmd.table->vers_end_field()->set_notnull();
vtmd.table->vers_end_field()->store(sys_trx_end, true);
key_copy(key, vtmd.table->record[0], vtmd.table->key_info + IDX_TRX_END, 0);
error= vtmd->file->ha_index_read_idx_map(vtmd->record[1], IDX_TRX_END,
error= vtmd.table->file->ha_index_read_idx_map(vtmd.table->record[1], IDX_TRX_END,
key,
HA_WHOLE_KEY,
HA_READ_KEY_EXACT);
@ -72,27 +72,59 @@ VTMD_table::find_record(ulonglong sys_trx_end, bool &found)
{
if (error == HA_ERR_RECORD_DELETED || error == HA_ERR_KEY_NOT_FOUND)
return false;
vtmd->file->print_error(error, MYF(0));
vtmd.table->file->print_error(error, MYF(0));
return true;
}
restore_record(vtmd, record[1]);
restore_record(vtmd.table, record[1]);
found= true;
return false;
}
bool
VTMD_table::open(THD *thd, Local_da &local_da, bool *created)
{
if (created)
*created= false;
if (0 == vtmd_name.length() && about.vers_vtmd_name(vtmd_name))
return true;
while (true) // max 2 iterations
{
vtmd.init_one_table(
DB_WITH_LEN(about),
XSTRING_WITH_LEN(vtmd_name),
vtmd_name,
TL_WRITE_CONCURRENT_INSERT);
TABLE *res= open_log_table(thd, &vtmd, &open_tables_backup);
if (res)
return false;
if (created && !*created && local_da.is_error() && local_da.sql_errno() == ER_NO_SUCH_TABLE)
{
local_da.reset_diagnostics_area();
if (create(thd))
break;
*created= true;
}
else
break;
}
return true;
}
bool
VTMD_table::update(THD *thd, const char* archive_name)
{
TABLE_LIST vtmd_tl;
bool result= true;
bool close_log= false;
bool found= false;
bool created= false;
bool created;
int error;
size_t an_len= 0;
Open_tables_backup open_tables_backup;
ulonglong save_thd_options;
{
Local_da local_da(thd, ER_VERS_VTMD_ERROR);
@ -100,34 +132,10 @@ VTMD_table::update(THD *thd, const char* archive_name)
save_thd_options= thd->variables.option_bits;
thd->variables.option_bits&= ~OPTION_BIN_LOG;
if (about.vers_vtmd_name(vtmd_name))
goto quit;
if (open(thd, local_da, &created))
goto open_error;
while (true) // max 2 iterations
{
vtmd_tl.init_one_table(
DB_WITH_LEN(about),
XSTRING_WITH_LEN(vtmd_name),
vtmd_name,
TL_WRITE_CONCURRENT_INSERT);
vtmd= open_log_table(thd, &vtmd_tl, &open_tables_backup);
if (vtmd)
break;
if (!created && local_da.is_error() && local_da.sql_errno() == ER_NO_SUCH_TABLE)
{
local_da.reset_diagnostics_area();
if (create(thd))
goto quit;
created= true;
continue;
}
goto quit;
}
close_log= true;
if (!vtmd->versioned())
if (!vtmd.table->versioned())
{
my_message(ER_VERS_VTMD_ERROR, "VTMD is not versioned", MYF(0));
goto quit;
@ -136,64 +144,64 @@ VTMD_table::update(THD *thd, const char* archive_name)
if (!created && find_record(ULONGLONG_MAX, found))
goto quit;
if ((error= vtmd->file->extra(HA_EXTRA_MARK_AS_LOG_TABLE)))
if ((error= vtmd.table->file->extra(HA_EXTRA_MARK_AS_LOG_TABLE)))
{
vtmd->file->print_error(error, MYF(0));
vtmd.table->file->print_error(error, MYF(0));
goto quit;
}
/* Honor next number columns if present */
vtmd->next_number_field= vtmd->found_next_number_field;
vtmd.table->next_number_field= vtmd.table->found_next_number_field;
if (vtmd->s->fields != FIELD_COUNT)
if (vtmd.table->s->fields != FIELD_COUNT)
{
my_printf_error(ER_VERS_VTMD_ERROR, "`%s.%s` unexpected fields count: %d", MYF(0),
vtmd->s->db.str, vtmd->s->table_name.str, vtmd->s->fields);
vtmd.table->s->db.str, vtmd.table->s->table_name.str, vtmd.table->s->fields);
goto quit;
}
if (archive_name)
{
an_len= strlen(archive_name);
vtmd->field[FLD_ARCHIVE_NAME]->store(archive_name, an_len, table_alias_charset);
vtmd->field[FLD_ARCHIVE_NAME]->set_notnull();
vtmd.table->field[FLD_ARCHIVE_NAME]->store(archive_name, an_len, table_alias_charset);
vtmd.table->field[FLD_ARCHIVE_NAME]->set_notnull();
}
else
{
vtmd->field[FLD_ARCHIVE_NAME]->set_null();
vtmd.table->field[FLD_ARCHIVE_NAME]->set_null();
}
vtmd->field[FLD_COL_RENAMES]->set_null();
vtmd.table->field[FLD_COL_RENAMES]->set_null();
if (found)
{
if (thd->lex->sql_command == SQLCOM_CREATE_TABLE)
{
my_printf_error(ER_VERS_VTMD_ERROR, "`%s.%s` exists and not empty!", MYF(0),
vtmd->s->db.str, vtmd->s->table_name.str);
vtmd.table->s->db.str, vtmd.table->s->table_name.str);
goto quit;
}
vtmd->mark_columns_needed_for_update(); // not needed?
vtmd.table->mark_columns_needed_for_update(); // not needed?
if (archive_name)
{
vtmd->s->versioned= false;
error= vtmd->file->ha_update_row(vtmd->record[1], vtmd->record[0]);
vtmd->s->versioned= true;
vtmd.table->s->versioned= false;
error= vtmd.table->file->ha_update_row(vtmd.table->record[1], vtmd.table->record[0]);
vtmd.table->s->versioned= true;
if (!error)
{
if (thd->lex->sql_command == SQLCOM_DROP_TABLE)
{
error= vtmd->file->ha_delete_row(vtmd->record[0]);
error= vtmd.table->file->ha_delete_row(vtmd.table->record[0]);
}
else
{
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_ALTER_TABLE);
ulonglong sys_trx_end= (ulonglong) vtmd->vers_start_field()->val_int();
store_record(vtmd, record[1]);
vtmd->field[FLD_NAME]->store(TABLE_NAME_WITH_LEN(about), system_charset_info);
vtmd->field[FLD_NAME]->set_notnull();
vtmd->field[FLD_ARCHIVE_NAME]->set_null();
error= vtmd->file->ha_update_row(vtmd->record[1], vtmd->record[0]);
ulonglong sys_trx_end= (ulonglong) vtmd.table->vers_start_field()->val_int();
store_record(vtmd.table, record[1]);
vtmd.table->field[FLD_NAME]->store(TABLE_NAME_WITH_LEN(about), system_charset_info);
vtmd.table->field[FLD_NAME]->set_notnull();
vtmd.table->field[FLD_ARCHIVE_NAME]->set_null();
error= vtmd.table->file->ha_update_row(vtmd.table->record[1], vtmd.table->record[0]);
if (error)
goto err;
@ -203,60 +211,58 @@ VTMD_table::update(THD *thd, const char* archive_name)
bool found;
if (find_record(sys_trx_end, found))
goto quit;
if (!found || !vtmd->field[FLD_ARCHIVE_NAME]->is_null())
if (!found || !vtmd.table->field[FLD_ARCHIVE_NAME]->is_null())
break;
store_record(vtmd, record[1]);
vtmd->field[FLD_ARCHIVE_NAME]->store(archive_name, an_len, table_alias_charset);
vtmd->field[FLD_ARCHIVE_NAME]->set_notnull();
vtmd->s->versioned= false;
error= vtmd->file->ha_update_row(vtmd->record[1], vtmd->record[0]);
vtmd->s->versioned= true;
store_record(vtmd.table, record[1]);
vtmd.table->field[FLD_ARCHIVE_NAME]->store(archive_name, an_len, table_alias_charset);
vtmd.table->field[FLD_ARCHIVE_NAME]->set_notnull();
vtmd.table->s->versioned= false;
error= vtmd.table->file->ha_update_row(vtmd.table->record[1], vtmd.table->record[0]);
vtmd.table->s->versioned= true;
if (error)
goto err;
sys_trx_end= (ulonglong) vtmd->vers_start_field()->val_int();
sys_trx_end= (ulonglong) vtmd.table->vers_start_field()->val_int();
} // while (true)
} // else (thd->lex->sql_command != SQLCOM_DROP_TABLE)
} // if (!error)
} // if (archive_name)
else
{
vtmd->field[FLD_NAME]->store(TABLE_NAME_WITH_LEN(about), system_charset_info);
vtmd->field[FLD_NAME]->set_notnull();
error= vtmd->file->ha_update_row(vtmd->record[1], vtmd->record[0]);
vtmd.table->field[FLD_NAME]->store(TABLE_NAME_WITH_LEN(about), system_charset_info);
vtmd.table->field[FLD_NAME]->set_notnull();
error= vtmd.table->file->ha_update_row(vtmd.table->record[1], vtmd.table->record[0]);
}
} // if (found)
else
{
vtmd->field[FLD_NAME]->store(TABLE_NAME_WITH_LEN(about), system_charset_info);
vtmd->field[FLD_NAME]->set_notnull();
vtmd->mark_columns_needed_for_insert(); // not needed?
error= vtmd->file->ha_write_row(vtmd->record[0]);
vtmd.table->field[FLD_NAME]->store(TABLE_NAME_WITH_LEN(about), system_charset_info);
vtmd.table->field[FLD_NAME]->set_notnull();
vtmd.table->mark_columns_needed_for_insert(); // not needed?
error= vtmd.table->file->ha_write_row(vtmd.table->record[0]);
}
if (error)
{
err:
vtmd->file->print_error(error, MYF(0));
vtmd.table->file->print_error(error, MYF(0));
}
else
result= local_da.is_error();
}
quit:
if (close_log)
close_log_table(thd, &open_tables_backup);
close_log_table(thd, &open_tables_backup);
open_error:
thd->variables.option_bits= save_thd_options;
return result;
}
bool
VTMD_rename::move_archives(THD *thd, LString &new_db)
{
TABLE_LIST vtmd_tl;
vtmd_tl.init_one_table(
vtmd.init_one_table(
DB_WITH_LEN(about),
XSTRING_WITH_LEN(vtmd_name),
vtmd_name,
@ -269,66 +275,66 @@ VTMD_rename::move_archives(THD *thd, LString &new_db)
Open_tables_backup open_tables_backup;
key_buf_t key;
vtmd= open_log_table(thd, &vtmd_tl, &open_tables_backup);
if (!vtmd)
TABLE *res= open_log_table(thd, &vtmd, &open_tables_backup);
if (!res)
return true;
if (key.allocate(vtmd->key_info[IDX_ARCHIVE_NAME].key_length))
if (key.allocate(vtmd.table->key_info[IDX_ARCHIVE_NAME].key_length))
{
close_log_table(thd, &open_tables_backup);
return true;
}
if ((error= vtmd->file->ha_start_keyread(IDX_ARCHIVE_NAME)))
if ((error= vtmd.table->file->ha_start_keyread(IDX_ARCHIVE_NAME)))
goto err;
end_keyread= true;
if ((error= vtmd->file->ha_index_init(IDX_ARCHIVE_NAME, true)))
if ((error= vtmd.table->file->ha_index_init(IDX_ARCHIVE_NAME, true)))
goto err;
index_end= true;
error= vtmd->file->ha_index_first(vtmd->record[0]);
error= vtmd.table->file->ha_index_first(vtmd.table->record[0]);
while (!error)
{
if (!vtmd->field[FLD_ARCHIVE_NAME]->is_null())
if (!vtmd.table->field[FLD_ARCHIVE_NAME]->is_null())
{
vtmd->field[FLD_ARCHIVE_NAME]->val_str(&archive);
vtmd.table->field[FLD_ARCHIVE_NAME]->val_str(&archive);
key_copy(key,
vtmd->record[0],
&vtmd->key_info[IDX_ARCHIVE_NAME],
vtmd->key_info[IDX_ARCHIVE_NAME].key_length,
vtmd.table->record[0],
&vtmd.table->key_info[IDX_ARCHIVE_NAME],
vtmd.table->key_info[IDX_ARCHIVE_NAME].key_length,
false);
error= vtmd->file->ha_index_read_map(
vtmd->record[0],
error= vtmd.table->file->ha_index_read_map(
vtmd.table->record[0],
key,
vtmd->key_info[IDX_ARCHIVE_NAME].ext_key_part_map,
vtmd.table->key_info[IDX_ARCHIVE_NAME].ext_key_part_map,
HA_READ_PREFIX_LAST);
if (!error)
{
if ((rc= move_table(thd, archive, new_db)))
break;
error= vtmd->file->ha_index_next(vtmd->record[0]);
error= vtmd.table->file->ha_index_next(vtmd.table->record[0]);
}
}
else
{
archive.length(0);
error= vtmd->file->ha_index_next(vtmd->record[0]);
error= vtmd.table->file->ha_index_next(vtmd.table->record[0]);
}
}
if (error && error != HA_ERR_END_OF_FILE)
{
err:
vtmd->file->print_error(error, MYF(0));
vtmd.table->file->print_error(error, MYF(0));
rc= true;
}
if (index_end)
vtmd->file->ha_index_end();
vtmd.table->file->ha_index_end();
if (end_keyread)
vtmd->file->ha_end_keyread();
vtmd.table->file->ha_end_keyread();
close_log_table(thd, &open_tables_backup);
return rc;
@ -501,80 +507,64 @@ VTMD_table::archive_name(
bool
VTMD_table::find_archive_name(THD *thd, String &out)
{
String vtmd_name;
if (about.vers_vtmd_name(vtmd_name))
return true;
READ_RECORD info;
int error= 0;
int error;
SQL_SELECT *select= NULL;
COND *conds= NULL;
List<TABLE_LIST> dummy;
SELECT_LEX &select_lex= thd->lex->select_lex;
TABLE_LIST tl;
tl.init_one_table(about.db, about.db_length, vtmd_name.ptr(),
vtmd_name.length(), vtmd_name.ptr(), TL_READ);
Open_tables_backup open_tables_backup;
if (!(vtmd= open_log_table(thd, &tl, &open_tables_backup)))
{
my_error(ER_VERS_VTMD_ERROR, MYF(0), "failed to open VTMD table");
Local_da local_da(thd, ER_VERS_VTMD_ERROR);
if (open(thd, local_da))
return true;
}
Name_resolution_context &ctx= thd->lex->select_lex.context;
TABLE_LIST *table_list= ctx.table_list;
TABLE_LIST *first_name_resolution_table= ctx.first_name_resolution_table;
table_map map = tl.table->map;
ctx.table_list= &tl;
ctx.first_name_resolution_table= &tl;
tl.table->map= 1;
table_map map = vtmd.table->map;
ctx.table_list= &vtmd;
ctx.first_name_resolution_table= &vtmd;
vtmd.table->map= 1;
tl.vers_conditions= about.vers_conditions;
if ((error= vers_setup_select(thd, &tl, &conds, &select_lex)) ||
(error= setup_conds(thd, &tl, dummy, &conds)))
{
my_error(ER_VERS_VTMD_ERROR, MYF(0),
"failed to setup conditions for querying VTMD table");
vtmd.vers_conditions= about.vers_conditions;
if ((error= vers_setup_select(thd, &vtmd, &conds, &select_lex)) ||
(error= setup_conds(thd, &vtmd, dummy, &conds)))
goto err;
}
select= make_select(tl.table, 0, 0, conds, NULL, 0, &error);
select= make_select(vtmd.table, 0, 0, conds, NULL, 0, &error);
if (error)
goto loc_err;
if ((error=
init_read_record(&info, thd, tl.table, select, NULL, 1, 1, false)))
error= init_read_record(&info, thd, vtmd.table, select, NULL,
1 /* use_record_cache */, true /* print_error */,
false /* disable_rr_cache */);
if (error)
goto loc_err;
while (!(error= info.read_record(&info)) && !thd->killed && !thd->is_error())
{
if (select->skip_record(thd) > 0)
{
tl.table->field[FLD_ARCHIVE_NAME]->val_str(&out);
if (out.length() == 0)
{
// Handle AS OF NOW or just RENAMEd case
out.set(about.table_name, about.table_name_length,
system_charset_info);
}
vtmd.table->field[FLD_ARCHIVE_NAME]->val_str(&out);
if (out.length() == 0) // Handle AS OF NOW or RENAME TABLE case
out.set(about.table_name, about.table_name_length, system_charset_info);
break;
}
}
loc_err:
if (error)
my_error(ER_VERS_VTMD_ERROR, MYF(0), "failed to query VTMD table");
if (error < 0)
my_error(ER_NO_SUCH_TABLE, MYF(0), about.db, about.alias);
loc_err:
end_read_record(&info);
err:
delete select;
ctx.table_list= table_list;
ctx.first_name_resolution_table= first_name_resolution_table;
tl.table->map= map;
vtmd.table->map= map;
close_log_table(thd, &open_tables_backup);
return error ? true : false;
DBUG_ASSERT(!error || local_da.is_error());
return error;
}
static

View file

@ -48,8 +48,10 @@ class THD;
class VTMD_table
{
Open_tables_backup open_tables_backup;
protected:
TABLE *vtmd;
TABLE_LIST vtmd;
const TABLE_LIST &about;
SString_t vtmd_name;
@ -72,13 +74,16 @@ public:
};
VTMD_table(TABLE_LIST &_about) :
vtmd(NULL),
about(_about)
{}
{
vtmd.table= NULL;
}
bool create(THD *thd);
bool find_record(ulonglong sys_trx_end, bool &found);
bool open(THD *thd, Local_da &local_da, bool *created= NULL);
bool update(THD *thd, const char* archive_name= NULL);
bool setup_select(THD *thd);
static void archive_name(THD *thd, const char *table_name, char *new_name, size_t new_name_size);
void archive_name(THD *thd, char *new_name, size_t new_name_size)