MDEV-4955 discover of table non-existance on CREATE

Fix ha_table_exists() to take discovery into account correctly.
It must be able to discover both table existence (when no frm is
found) and table non-existance (when frm was found).
This commit is contained in:
Sergei Golubchik 2014-02-27 22:43:42 +01:00
commit 8d0238a6d8
6 changed files with 73 additions and 85 deletions

View file

@ -386,12 +386,13 @@ static int ha_finish_errors(void)
static volatile int32 need_full_discover_for_existence= 0;
static volatile int32 engines_with_discover_table_names= 0;
static volatile int32 engines_with_discover= 0;
static int full_discover_for_existence(handlerton *, const char *, const char *)
{ return 1; }
{ return 0; }
static int ext_based_existence(handlerton *, const char *, const char *)
{ return 1; }
{ return 0; }
static int hton_ext_based_table_discovery(handlerton *hton, LEX_STRING *db,
MY_DIR *dir, handlerton::discovered_list *result)
@ -411,6 +412,9 @@ static void update_discovery_counters(handlerton *hton, int val)
if (hton->discover_table_names)
my_atomic_add32(&engines_with_discover_table_names, val);
if (hton->discover_table)
my_atomic_add32(&engines_with_discover, val);
}
int ha_finalize_handlerton(st_plugin_int *plugin)
@ -4787,7 +4791,9 @@ int ha_discover_table(THD *thd, TABLE_SHARE *share)
DBUG_ASSERT(share->error == OPEN_FRM_OPEN_ERROR); // share is not OK yet
if (share->db_plugin)
if (!engines_with_discover)
found= FALSE;
else if (share->db_plugin)
found= discover_handlerton(thd, share->db_plugin, share);
else
found= plugin_foreach(thd, discover_handlerton,
@ -4811,6 +4817,7 @@ struct st_discover_existence_args
size_t path_len;
const char *db, *table_name;
handlerton *hton;
bool frm_exists;
};
static my_bool discover_existence(THD *thd, plugin_ref plugin,
@ -4819,7 +4826,7 @@ static my_bool discover_existence(THD *thd, plugin_ref plugin,
st_discover_existence_args *args= (st_discover_existence_args*)arg;
handlerton *ht= plugin_hton(plugin);
if (ht->state != SHOW_OPTION_YES || !ht->discover_table_existence)
return FALSE;
return args->frm_exists;
args->hton= ht;
@ -4874,17 +4881,80 @@ private:
If the 'hton' is not NULL, it's set to the handlerton of the storage engine
of this table, or to view_pseudo_hton if the frm belongs to a view.
This function takes discovery correctly into account. If frm is found,
it discovers the table to make sure it really exists in the engine.
If no frm is found it discovers the table, in case it still exists in
the engine.
While it tries to cut corners (don't open .frm if no discovering engine is
enabled, no full discovery if all discovering engines support
discover_table_existence, etc), it still *may* be quite expensive
and must be used sparingly.
@retval true Table exists (even if the error occurred, like bad frm)
@retval false Table does not exist (one can do CREATE TABLE table_name)
@note if frm exists and the table in engine doesn't, *hton will be set,
but the return value will be false.
@note if frm file exists, but the table cannot be opened (engine not
loaded, frm is invalid), the return value will be true, but
*hton will be NULL.
*/
bool ha_table_exists(THD *thd, const char *db, const char *table_name,
handlerton **hton)
{
handlerton *dummy;
DBUG_ENTER("ha_table_exists");
if (hton)
*hton= 0;
else if (engines_with_discover)
hton= &dummy;
TABLE_SHARE *share= tdc_lock_share(db, table_name);
if (share)
{
if (hton)
*hton= share->db_type();
tdc_unlock_share(share);
DBUG_RETURN(TRUE);
}
char path[FN_REFLEN + 1];
size_t path_len = build_table_filename(path, sizeof(path) - 1,
db, table_name, "", 0);
st_discover_existence_args args= {path, path_len, db, table_name, 0, true};
if (file_ext_exists(path, path_len, reg_ext))
{
bool exists= true;
if (hton)
{
enum legacy_db_type db_type;
if (dd_frm_type(thd, path, &db_type) != FRMTYPE_VIEW)
{
handlerton *ht= ha_resolve_by_legacy_type(thd, db_type);
if ((*hton= ht))
// verify that the table really exists
exists= discover_existence(thd,
plugin_int_to_ref(hton2plugin[ht->slot]), &args);
}
else
*hton= view_pseudo_hton;
}
DBUG_RETURN(exists);
}
args.frm_exists= false;
if (plugin_foreach(thd, discover_existence, MYSQL_STORAGE_ENGINE_PLUGIN,
&args))
{
if (hton)
*hton= args.hton;
DBUG_RETURN(TRUE);
}
if (need_full_discover_for_existence)
{
@ -4909,42 +4979,6 @@ bool ha_table_exists(THD *thd, const char *db, const char *table_name,
DBUG_RETURN(!no_such_table_handler.safely_trapped_errors());
}
TABLE_SHARE *share= tdc_lock_share(db, table_name);
if (share)
{
if (hton)
*hton= share->db_type();
tdc_unlock_share(share);
DBUG_RETURN(TRUE);
}
char path[FN_REFLEN + 1];
size_t path_len = build_table_filename(path, sizeof(path) - 1,
db, table_name, "", 0);
if (file_ext_exists(path, path_len, reg_ext))
{
if (hton)
{
enum legacy_db_type db_type;
if (dd_frm_type(thd, path, &db_type) != FRMTYPE_VIEW)
*hton= ha_resolve_by_legacy_type(thd, db_type);
else
*hton= view_pseudo_hton;
}
DBUG_RETURN(TRUE);
}
st_discover_existence_args args= {path, path_len, db, table_name, 0};
if (plugin_foreach(thd, discover_existence, MYSQL_STORAGE_ENGINE_PLUGIN,
&args))
{
if (hton)
*hton= args.hton;
DBUG_RETURN(TRUE);
}
DBUG_RETURN(FALSE);
}