- Move DataPath from the MYCAT catalog to the ha_connect handler. Indeed

it belongs to each tables and the catalog being share between several
  instances of CONNECT, when a query implied several tables belonging to
  different databases, some where pointing on the wrong database. This
  fix bugs occuring in queries such as:
  INSERT into db1.t1 select * from db2.t2;
  Where the t1 data file was made in db2.
modified:
  storage/connect/catalog.h
  storage/connect/connect.cc
  storage/connect/filamdbf.cpp
  storage/connect/filamdbf.h
  storage/connect/ha_connect.cc
  storage/connect/ha_connect.h
  storage/connect/mycat.cc
  storage/connect/mycat.h
  storage/connect/plgdbsem.h
  storage/connect/plgdbutl.cpp
  storage/connect/reldef.cpp
  storage/connect/reldef.h
  storage/connect/tabfix.h
  storage/connect/tabfmt.cpp
  storage/connect/tabfmt.h
  storage/connect/tabmul.cpp
This commit is contained in:
Olivier Bertrand 2014-08-23 19:17:15 +02:00
parent f930f4eda9
commit 74a4672622
16 changed files with 113 additions and 40 deletions

View file

@ -1,7 +1,7 @@
/*************** Catalog H Declares Source Code File (.H) **************/
/* Name: CATALOG.H Version 3.2 */
/* Name: CATALOG.H Version 3.3 */
/* */
/* (C) Copyright to the author Olivier BERTRAND 2000-2012 */
/* (C) Copyright to the author Olivier BERTRAND 2000-2014 */
/* */
/* This file contains the CATALOG PlugDB classes definitions. */
/***********************************************************************/
@ -68,11 +68,11 @@ class DllExport CATALOG {
bool GetDefHuge(void) {return DefHuge;}
void SetDefHuge(bool b) {DefHuge = b;}
char *GetCbuf(void) {return Cbuf;}
char *GetDataPath(void) {return (char*)DataPath;}
//char *GetDataPath(void) {return (char*)DataPath;}
// Methods
virtual void Reset(void) {}
virtual void SetDataPath(PGLOBAL g, const char *path) {}
//virtual void SetDataPath(PGLOBAL g, const char *path) {}
virtual bool CheckName(PGLOBAL g, char *name) {return true;}
virtual bool ClearName(PGLOBAL g, PSZ name) {return true;}
virtual PRELDEF MakeOneTableDesc(PGLOBAL g, LPCSTR name, LPCSTR am) {return NULL;}
@ -106,7 +106,7 @@ class DllExport CATALOG {
int Cblen; /* Length of suballoc. buffer */
CURTAB Ctb; /* Used to enumerate tables */
bool DefHuge; /* true: tables default to huge */
LPCSTR DataPath; /* Is the Path of DB data dir */
//LPCSTR DataPath; /* Is the Path of DB data dir */
}; // end of class CATALOG
#endif // __CATALOG__H

View file

@ -122,9 +122,12 @@ bool CntCheckDB(PGLOBAL g, PHC handler, const char *pathname)
(dbuserp->Catalog) ? ((MYCAT*)dbuserp->Catalog)->GetHandler() : NULL,
handler);
// Set the database path for this table
handler->SetDataPath(g, pathname);
if (dbuserp->Catalog) {
// ((MYCAT *)dbuserp->Catalog)->SetHandler(handler); done later
((MYCAT *)dbuserp->Catalog)->SetDataPath(g, pathname);
// ((MYCAT *)dbuserp->Catalog)->SetDataPath(g, pathname);
return false; // Nothing else to do
} // endif Catalog
@ -141,7 +144,7 @@ bool CntCheckDB(PGLOBAL g, PHC handler, const char *pathname)
if (!(dbuserp->Catalog= new MYCAT(handler)))
return true;
((MYCAT *)dbuserp->Catalog)->SetDataPath(g, pathname);
//((MYCAT *)dbuserp->Catalog)->SetDataPath(g, pathname);
//dbuserp->UseTemp= TMP_AUTO;
/*********************************************************************/

View file

@ -176,7 +176,7 @@ static int dbfhead(PGLOBAL g, FILE *file, PSZ fn, DBFHEADER *buf)
/* DBFColumns: constructs the result blocks containing the description */
/* of all the columns of a DBF file that will be retrieved by #GetData. */
/****************************************************************************/
PQRYRES DBFColumns(PGLOBAL g, const char *fn, BOOL info)
PQRYRES DBFColumns(PGLOBAL g, char *dp, const char *fn, BOOL info)
{
int buftyp[] = {TYPE_STRING, TYPE_SHORT, TYPE_STRING,
TYPE_INT, TYPE_INT, TYPE_SHORT};
@ -205,7 +205,7 @@ PQRYRES DBFColumns(PGLOBAL g, const char *fn, BOOL info)
/************************************************************************/
/* Open the input file. */
/************************************************************************/
PlugSetPath(filename, fn, PlgGetDataPath(g));
PlugSetPath(filename, fn, dp);
if (!(infile= global_fopen(g, MSGID_CANNOT_OPEN, filename, "rb")))
return NULL;

View file

@ -1,7 +1,7 @@
/***************** FilAmDbf H Declares Source Code File (.H) ****************/
/* Name: filamdbf.h Version 1.3 */
/* Name: filamdbf.h Version 1.4 */
/* */
/* (C) Copyright to the author Olivier BERTRAND 2005-2012 */
/* (C) Copyright to the author Olivier BERTRAND 2005-2014 */
/* */
/* This file contains the DBF file access method classes declares. */
/****************************************************************************/
@ -19,7 +19,7 @@ typedef class DBMFAM *PDBMFAM;
/****************************************************************************/
/* Functions used externally. */
/****************************************************************************/
PQRYRES DBFColumns(PGLOBAL g, const char *fn, BOOL info);
PQRYRES DBFColumns(PGLOBAL g, char *dp, const char *fn, BOOL info);
/****************************************************************************/
/* This is the base class for dBASE file access methods. */

View file

@ -563,6 +563,11 @@ ha_connect::ha_connect(handlerton *hton, TABLE_SHARE *table_arg)
xp= (table) ? GetUser(ha_thd(), NULL) : NULL;
if (xp)
xp->SetHandler(this);
#if defined(WIN32)
datapath= ".\\";
#else // !WIN32
datapath= "./";
#endif // !WIN32
tdbp= NULL;
sdvalin= NULL;
sdvalout= NULL;
@ -1402,6 +1407,14 @@ void ha_connect::AddColName(char *cp, Field *fp)
} // end of AddColName
#endif // 0
/***********************************************************************/
/* This function sets the current database path. */
/***********************************************************************/
void ha_connect::SetDataPath(PGLOBAL g, const char *path)
{
datapath= SetPath(g, path);
} // end of SetDataPath
/****************************************************************************/
/* Get the table description block of a CONNECT table. */
/****************************************************************************/
@ -3440,8 +3453,10 @@ int ha_connect::info(uint flag)
} // endif xmod
// This is necessary for getting file length
if (cat && table)
cat->SetDataPath(g, table->s->db.str);
// if (cat && table)
// cat->SetDataPath(g, table->s->db.str);
if (table)
SetDataPath(g, table->s->db.str);
else
DBUG_RETURN(HA_ERR_INTERNAL_ERROR); // Should never happen
@ -4764,7 +4779,7 @@ static int connect_assisted_discovery(handlerton *hton, THD* thd,
const char *fncn= "?";
const char *user, *fn, *db, *host, *pwd, *sep, *tbl, *src;
const char *col, *ocl, *rnk, *pic, *fcl, *skc;
char *tab, *dsn, *shm;
char *tab, *dsn, *shm, *dpath;
#if defined(WIN32)
char *nsp= NULL, *cls= NULL;
#endif // WIN32
@ -5009,10 +5024,12 @@ static int connect_assisted_discovery(handlerton *hton, THD* thd,
char *cnm, *rem, *dft, *xtra;
int i, len, prec, dec, typ, flg;
if (cat)
cat->SetDataPath(g, table_s->db.str);
else
return HA_ERR_INTERNAL_ERROR; // Should never happen
// if (cat)
// cat->SetDataPath(g, table_s->db.str);
// else
// return HA_ERR_INTERNAL_ERROR; // Should never happen
dpath= SetPath(g, table_s->db.str);
if (src && ttp != TAB_PIVOT && ttp != TAB_ODBC) {
qrp= SrcColumns(g, host, db, user, pwd, src, port);
@ -5025,7 +5042,7 @@ static int connect_assisted_discovery(handlerton *hton, THD* thd,
} else switch (ttp) {
case TAB_DBF:
qrp= DBFColumns(g, fn, fnc == FNC_COL);
qrp= DBFColumns(g, dpath, fn, fnc == FNC_COL);
break;
#if defined(ODBC_SUPPORT)
case TAB_ODBC:
@ -5062,7 +5079,7 @@ static int connect_assisted_discovery(handlerton *hton, THD* thd,
break;
#endif // MYSQL_SUPPORT
case TAB_CSV:
qrp= CSVColumns(g, fn, spc, qch, hdr, mxe, fnc == FNC_COL);
qrp= CSVColumns(g, dpath, fn, spc, qch, hdr, mxe, fnc == FNC_COL);
break;
#if defined(WIN32)
case TAB_WMI:
@ -5728,8 +5745,10 @@ int ha_connect::create(const char *name, TABLE *table_arg,
PDBUSER dup= PlgGetUser(g);
PCATLG cat= (dup) ? dup->Catalog : NULL;
SetDataPath(g, table_arg->s->db.str);
if (cat) {
cat->SetDataPath(g, table_arg->s->db.str);
// cat->SetDataPath(g, table_arg->s->db.str);
#if defined(WITH_PARTITION_STORAGE_ENGINE)
if (part_info)

View file

@ -210,7 +210,9 @@ public:
bool IsSameIndex(PIXDEF xp1, PIXDEF xp2);
bool IsPartitioned(void);
bool IsUnique(uint n);
char *GetDataPath(void) {return (char*)datapath;}
void SetDataPath(PGLOBAL g, const char *path);
PTDB GetTDB(PGLOBAL g);
int OpenTable(PGLOBAL g, bool del= false);
bool CheckColumnList(PGLOBAL g);
@ -521,6 +523,7 @@ protected:
ulong hnum; // The number of this handler
query_id_t valid_query_id; // The one when tdbp was allocated
query_id_t creat_query_id; // The one when handler was allocated
char *datapath; // Is the Path of DB data directory
PTDB tdbp; // To table class object
PVAL sdvalin; // Used to convert date values
PVAL sdvalout; // Used to convert date values

View file

@ -405,9 +405,9 @@ PQRYRES OEMColumns(PGLOBAL g, PTOS topt, char *tab, char *db, bool info)
CATALOG::CATALOG(void)
{
#if defined(WIN32)
DataPath= ".\\";
//DataPath= ".\\";
#else // !WIN32
DataPath= "./";
//DataPath= "./";
#endif // !WIN32
memset(&Ctb, 0, sizeof(CURTAB));
Cbuf= NULL;
@ -433,6 +433,7 @@ void MYCAT::Reset(void)
{
} // end of Reset
#if 0
/***********************************************************************/
/* This function sets the current database path. */
/***********************************************************************/
@ -463,6 +464,7 @@ void MYCAT::SetPath(PGLOBAL g, LPCSTR *datapath, const char *path)
} // endif path
} // end of SetDataPath
#endif // 0
/***********************************************************************/
/* GetTableDesc: retrieve a table descriptor. */
@ -560,7 +562,7 @@ PTDB MYCAT::GetTable(PGLOBAL g, PTABLE tablep, MODE mode, LPCSTR type)
printf("tdb=%p type=%s\n", tdp, tdp->GetType());
if (tablep->GetQualifier())
SetPath(g, &tdp->Database, tablep->GetQualifier());
tdp->Database = SetPath(g, tablep->GetQualifier());
tdbp= tdp->GetTable(g, mode);
} // endif tdp

View file

@ -56,8 +56,8 @@ class MYCAT : public CATALOG {
// Methods
void Reset(void);
void SetDataPath(PGLOBAL g, const char *path)
{SetPath(g, &DataPath, path);}
//void SetDataPath(PGLOBAL g, const char *path)
// {SetPath(g, &DataPath, path);}
bool StoreIndex(PGLOBAL g, PTABDEF defp) {return false;} // Temporary
PRELDEF GetTableDesc(PGLOBAL g, LPCSTR name,
LPCSTR type, PRELDEF *prp = NULL);
@ -67,7 +67,7 @@ class MYCAT : public CATALOG {
protected:
PRELDEF MakeTableDesc(PGLOBAL g, LPCSTR name, LPCSTR am);
void SetPath(PGLOBAL g, LPCSTR *datapath, const char *path);
//void SetPath(PGLOBAL g, LPCSTR *datapath, const char *path);
// Members
ha_connect *Hc; // The Connect handler

View file

@ -549,7 +549,8 @@ typedef struct _colres {
PPARM Vcolist(PGLOBAL, PTDB, PSZ, bool);
void PlugPutOut(PGLOBAL, FILE *, short, void *, uint);
void PlugLineDB(PGLOBAL, PSZ, short, void *, uint);
char *PlgGetDataPath(PGLOBAL g);
//ar *PlgGetDataPath(PGLOBAL g);
char *SetPath(PGLOBAL g, const char *path);
char *ExtractFromPath(PGLOBAL, char *, char *, OPVAL);
void AddPointer(PTABS, void *);
PDTP MakeDateFormat(PGLOBAL, PSZ, bool, bool, int);

View file

@ -373,6 +373,7 @@ PCATLG PlgGetCatalog(PGLOBAL g, bool jump)
return cat;
} // end of PlgGetCatalog
#if 0
/***********************************************************************/
/* PlgGetDataPath: returns the default data path. */
/***********************************************************************/
@ -382,6 +383,39 @@ char *PlgGetDataPath(PGLOBAL g)
return (cat) ? cat->GetDataPath() : NULL;
} // end of PlgGetDataPath
#endif // 0
/***********************************************************************/
/* This function returns a database path. */
/***********************************************************************/
char *SetPath(PGLOBAL g, const char *path)
{
char *buf= NULL;
if (path) {
size_t len= strlen(path) + (*path != '.' ? 4 : 1);
buf= (char*)PlugSubAlloc(g, NULL, len);
if (PlugIsAbsolutePath(path)) {
strcpy(buf, path);
return buf;
} // endif path
if (*path != '.') {
#if defined(WIN32)
char *s= "\\";
#else // !WIN32
char *s= "/";
#endif // !WIN32
strcat(strcat(strcat(strcpy(buf, "."), s), path), s);
} else
strcpy(buf, path);
} // endif path
return buf;
} // end of SetPath
/***********************************************************************/
/* Extract from a path name the required component. */

View file

@ -227,6 +227,14 @@ bool TABDEF::Define(PGLOBAL g, PCATLG cat, LPCSTR name, LPCSTR am)
return DefineAM(g, am, poff);
} // end of Define
/***********************************************************************/
/* This function returns the database data path. */
/***********************************************************************/
PSZ TABDEF::GetPath(void)
{
return (Database) ? (PSZ)Database : Hc->GetDataPath();
} // end of GetPath
/***********************************************************************/
/* This function returns column table information. */
/***********************************************************************/

View file

@ -1,5 +1,5 @@
/*************** RelDef H Declares Source Code File (.H) ***************/
/* Name: RELDEF.H Version 1.4 */
/* Name: RELDEF.H Version 1.5 */
/* */
/* (C) Copyright to the author Olivier BERTRAND 2004-2014 */
/* */
@ -79,8 +79,9 @@ class DllExport TABDEF : public RELDEF { /* Logical table descriptor */
void SetNext(PTABDEF tdfp) {Next = tdfp;}
int GetMultiple(void) {return Multiple;}
int GetPseudo(void) {return Pseudo;}
PSZ GetPath(void)
{return (Database) ? (PSZ)Database : Cat->GetDataPath();}
PSZ GetPath(void);
//PSZ GetPath(void)
// {return (Database) ? (PSZ)Database : Cat->GetDataPath();}
bool SepIndex(void) {return GetBoolCatInfo("SepIndex", false);}
bool IsReadOnly(void) {return Read_Only;}
virtual AMT GetDefType(void) {return TYPE_AM_TAB;}

View file

@ -91,7 +91,8 @@ class TDBDCL : public TDBCAT {
protected:
// Specific routines
virtual PQRYRES GetResult(PGLOBAL g) {return DBFColumns(g, Fn, false);}
virtual PQRYRES GetResult(PGLOBAL g)
{return DBFColumns(g, ((PTABDEF)To_Def)->GetPath(), Fn, false);}
// Members
char *Fn; // The DBF file (path) name

View file

@ -76,8 +76,8 @@ extern "C" USETEMP Use_Temp;
/* of types (TYPE_STRING < TYPE_DOUBLE < TYPE_INT) (1 < 2 < 7). */
/* If these values are changed, this will have to be revisited. */
/***********************************************************************/
PQRYRES CSVColumns(PGLOBAL g, const char *fn, char sep, char q,
int hdr, int mxr, bool info)
PQRYRES CSVColumns(PGLOBAL g, char *dp, const char *fn, char sep,
char q, int hdr, int mxr, bool info)
{
static int buftyp[] = {TYPE_STRING, TYPE_SHORT, TYPE_STRING,
TYPE_INT, TYPE_INT, TYPE_SHORT};
@ -131,7 +131,7 @@ PQRYRES CSVColumns(PGLOBAL g, const char *fn, char sep, char q,
/*********************************************************************/
/* Open the input file. */
/*********************************************************************/
PlugSetPath(filename, fn, PlgGetDataPath(g));
PlugSetPath(filename, fn, dp);
if (!(infile= global_fopen(g, MSGID_CANNOT_OPEN, filename, "r")))
return NULL;
@ -1471,7 +1471,8 @@ TDBCCL::TDBCCL(PCSVDEF tdp) : TDBCAT(tdp)
/***********************************************************************/
PQRYRES TDBCCL::GetResult(PGLOBAL g)
{
return CSVColumns(g, Fn, Sep, Qtd, Hdr, Mxr, false);
return CSVColumns(g, ((PTABDEF)To_Def)->GetPath(),
Fn, Sep, Qtd, Hdr, Mxr, false);
} // end of GetResult
/* ------------------------ End of TabFmt ---------------------------- */

View file

@ -13,8 +13,8 @@ typedef class TDBFMT *PTDBFMT;
/***********************************************************************/
/* Functions used externally. */
/***********************************************************************/
PQRYRES CSVColumns(PGLOBAL g, const char *fn, char sep, char q,
int hdr, int mxr, bool info);
PQRYRES CSVColumns(PGLOBAL g, char *dp, const char *fn, char sep,
char q, int hdr, int mxr, bool info);
/***********************************************************************/
/* CSV table. */

View file

@ -680,7 +680,7 @@ char* TDBDIR::Path(PGLOBAL g)
#if defined(WIN32)
if (!*Drive) {
PlugSetPath(Fpath, To_File, cat->GetDataPath());
PlugSetPath(Fpath, To_File, ((PTABDEF)To_Def)->GetPath());
_splitpath(Fpath, Drive, Direc, Fname, Ftype);
} else
_makepath(Fpath, Drive, Direc, Fname, Ftype); // Usefull ???