mariadb/storage/connect/tabtbl.cpp
Olivier Bertrand 4fd74200dd - Adding 3 new table types:
PROXY table base on another table. Used by several other types.
  XCOL  proxy on a table having a colummn containing a list of values
  OCCUR proxy on a table having several columns containing the same type
        of values that can be put in a unique column and several rows.
  TBL   Not new but now internally using the PROXY table class.
- Fix 2 bugs in add_field:
        Change '=' to ' ' after the COMMENT keyword.
        Quote column names between '`' in the SQL string.
- Update xml test result to the CONNECT version

added:
  storage/connect/taboccur.cpp
  storage/connect/taboccur.h
  storage/connect/tabutil.cpp
  storage/connect/tabutil.h
  storage/connect/tabxcl.cpp
  storage/connect/tabxcl.h
modified:
  storage/connect/CMakeLists.txt
  storage/connect/ha_connect.cc
  storage/connect/ha_connect.h
  storage/connect/mycat.cc
  storage/connect/myconn.cpp
  storage/connect/mysql-test/connect/r/xml.result
  storage/connect/plgdbsem.h
  storage/connect/tabmysql.cpp
  storage/connect/tabtbl.cpp
  storage/connect/tabtbl.h
  storage/connect/valblk.cpp
  storage/connect/valblk.h
2013-04-29 13:50:20 +02:00

789 lines
26 KiB
C++

/************* TabTbl C++ Program Source Code File (.CPP) **************/
/* PROGRAM NAME: TABTBL */
/* ------------- */
/* Version 1.5 */
/* */
/* COPYRIGHT: */
/* ---------- */
/* (C) Copyright to PlugDB Software Development 2008-2013 */
/* Author: Olivier BERTRAND */
/* */
/* WHAT THIS PROGRAM DOES: */
/* ----------------------- */
/* This program are the TDBTBL class DB routines. */
/* */
/* WHAT YOU NEED TO COMPILE THIS PROGRAM: */
/* -------------------------------------- */
/* */
/* REQUIRED FILES: */
/* --------------- */
/* TABTBL.CPP - Source code */
/* PLGDBSEM.H - DB application declaration file */
/* TABDOS.H - TABDOS classes declaration file */
/* TABTBL.H - TABTBL classes declaration file */
/* GLOBAL.H - Global declaration file */
/* */
/* REQUIRED LIBRARIES: */
/* ------------------- */
/* Large model C library */
/* */
/* REQUIRED PROGRAMS: */
/* ------------------ */
/* IBM, Borland, GNU or Microsoft C++ Compiler and Linker */
/* */
/***********************************************************************/
/***********************************************************************/
/* Include relevant section of system dependant header files. */
/***********************************************************************/
//#include "sql_base.h"
#include "my_global.h"
#if defined(WIN32)
#include <stdlib.h>
#include <stdio.h>
#if defined(__BORLANDC__)
#define __MFC_COMPAT__ // To define min/max as macro
#endif
//#include <windows.h>
#else
#if defined(UNIX)
#include <fnmatch.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "osutil.h"
#else
//#include <io.h>
#endif
//#include <fcntl.h>
#endif
/***********************************************************************/
/* Include application header files: */
/***********************************************************************/
#include "table.h" // MySQL table definitions
#include "global.h" // global declarations
#include "plgdbsem.h" // DB application declarations
#include "reldef.h" // DB definition declares
//#include "filter.h" // FILTER classes dcls
#include "filamtxt.h"
#include "tabcol.h"
#include "tabdos.h" // TDBDOS and DOSCOL class dcls
#include "tabtbl.h"
#if defined(MYSQL_SUPPORT)
#include "tabmysql.h"
#endif // MYSQL_SUPPORT
#include "ha_connect.h"
#include "mycat.h" // For GetHandler
extern "C" int trace;
/* ---------------------------- Class TBLDEF ---------------------------- */
/**************************************************************************/
/* Constructor. */
/**************************************************************************/
TBLDEF::TBLDEF(void)
{
//To_Tables = NULL;
Ntables = 0;
Pseudo = 3;
} // end of TBLDEF constructor
/**************************************************************************/
/* DefineAM: define specific AM block values from XDB file. */
/**************************************************************************/
bool TBLDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff)
{
char *tablist, *dbname;
Desc = "Table list table";
tablist = Cat->GetStringCatInfo(g, "Tablist", "");
dbname = Cat->GetStringCatInfo(g, "Dbname", "*");
Ntables = 0;
if (*tablist) {
char *p, *pn, *pdb;
PTABLE tbl;
for (pdb = tablist; ;) {
if ((p = strchr(pdb, ',')))
*p = 0;
// Analyze the table name, it may have the format:
// [dbname.]tabname
if ((pn = strchr(pdb, '.'))) {
*pn++ = 0;
} else {
pn = pdb;
pdb = dbname;
} // endif p
// Allocate the TBLIST block for that table
tbl = new(g) XTAB(pn);
tbl->SetQualifier(pdb);
if (trace)
htrc("TBL: Name=%s db=%s\n", tbl->GetName(), tbl->GetQualifier());
// Link the blocks
if (Tablep)
Tablep->Link(tbl);
else
Tablep = tbl;
Ntables++;
if (p)
pdb = pn + strlen(pn) + 1;
else
break;
} // endfor pdb
Maxerr = Cat->GetIntCatInfo("Maxerr", 0);
Accept = (Cat->GetBoolCatInfo("Accept", 0) != 0);
} // endif fsec || tablist
return FALSE;
} // end of DefineAM
/***********************************************************************/
/* GetTable: makes a new Table Description Block. */
/***********************************************************************/
PTDB TBLDEF::GetTable(PGLOBAL g, MODE m)
{
if (Catfunc == FNC_COL)
return new(g) TDBTBC(this);
else
return new(g) TDBTBL(this);
} // end of GetTable
/* ------------------------- Class TDBTBL ---------------------------- */
/***********************************************************************/
/* TDBTBL constructors. */
/***********************************************************************/
TDBTBL::TDBTBL(PTBLDEF tdp) : TDBPRX(tdp)
{
Tablist = NULL;
CurTable = NULL;
//Tdbp = NULL;
Accept = tdp->Accept;
Maxerr = tdp->Maxerr;
Nbf = 0;
Rows = 0;
Crp = 0;
// NTables = 0;
// iTable = 0;
} // end of TDBTBL standard constructor
/***********************************************************************/
/* Allocate TBL column description block. */
/***********************************************************************/
PCOL TDBTBL::MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n)
{
return new(g) PRXCOL(cdp, this, cprec, n);
} // end of MakeCol
/***********************************************************************/
/* InsertSpecialColumn: Put a special column ahead of the column list.*/
/***********************************************************************/
PCOL TDBTBL::InsertSpecialColumn(PGLOBAL g, PCOL scp)
{
PCOL colp;
if (!scp->IsSpecial())
return NULL;
if (scp->GetAmType() == TYPE_AM_TABID)
// This special column is handled locally
colp = new((TIDBLK*)scp) TBTBLK(scp->GetValue());
else // Other special columns are treated normally
colp = scp;
colp->SetNext(Columns);
Columns = colp;
return colp;
} // end of InsertSpecialColumn
#if 0
/***********************************************************************/
/* Get the PTDB of a table of the list. */
/***********************************************************************/
PTDB TDBTBL::GetSubTable(PGLOBAL g, PTBL tblp, PTABLE tabp)
{
//char *db;
bool mysql;
PTDB tdbp = NULL;
TABLE_SHARE *s;
PCATLG cat = To_Def->GetCat();
PHC hc = ((MYCAT*)cat)->GetHandler();
THD *thd = (hc->GetTable())->in_use;
if (!thd)
return NULL; // Should not happen anymore
//if (tblp->DB)
// db = tblp->DB;
//else
// db = (char*)hc->GetDBName(NULL);
//if (!(s = GetTableShare(g, thd, db, tblp->Name, mysql)))
if (!(s = GetTableShare(g, thd, tblp->DB, tblp->Name, mysql)))
return NULL;
if (mysql) {
#if defined(MYSQL_SUPPORT)
// Access sub-table via MySQL API
if (!(tdbp= cat->GetTable(g, tabp, MODE_READ, "MYSQL"))) {
sprintf(g->Message, "Cannot access %s.%s", tblp->DB, tblp->Name);
goto err;
} // endif Define
if (tabp->GetQualifier())
((PTDBMY)tdbp)->SetDatabase(tabp->GetQualifier());
#else // !MYSQL_SUPPORT
sprintf(g->Message, "%s.%s is not a CONNECT table",
db, tblp->Name);
goto err;
#endif // MYSQL_SUPPORT
} else {
// Sub-table is a CONNECT table
hc->tshp = s;
tdbp = cat->GetTable(g, tabp);
hc->tshp = NULL;
} // endif plugin
if (trace && tdbp)
htrc("Subtable %s in %s\n",
tblp->Name, SVP(((PTDBASE)tdbp)->GetDef()->GetDB()));
err:
free_table_share(s);
return tdbp;
} // end of GetSubTable
#endif // 0
/***********************************************************************/
/* Initializes the table table list. */
/***********************************************************************/
bool TDBTBL::InitTableList(PGLOBAL g)
{
char *colname;
int n, colpos;
PTABLE tp, tabp;
PTDB tdbp;
PCOL colp;
PTBLDEF tdp = (PTBLDEF)To_Def;
// PlugSetPath(filename, Tdbp->GetFile(g), Tdbp->GetPath());
for (n = 0, tp = tdp->Tablep; tp; tp = tp->GetNext()) {
if (TestFil(g, To_Filter, tp)) {
tabp = new(g) XTAB(tp);
// Get the table description block of this table
if (!(tdbp = GetSubTable(g, tabp))) {
if (++Nbf > Maxerr)
return TRUE; // Error return
else
continue; // Skip this table
} // endif tdbp
// We must allocate subtable columns before GetMaxSize is called
// because some (PLG, ODBC?) need to have their columns attached.
// Real initialization will be done later.
for (PCOL cp = Columns; cp; cp = cp->GetNext())
if (!cp->IsSpecial()) {
colname = cp->GetName();
colpos = ((PPRXCOL)cp)->Colnum;
// We try first to get the column by name
if (!(colp = tdbp->ColDB(g, colname, 0)) && colpos)
// When unsuccessful, if a column number was specified
// try to get the column by its position in the table
colp = tdbp->ColDB(g, NULL, colpos);
if (!colp) {
if (!Accept) {
sprintf(g->Message, MSG(NO_MATCHING_COL),
colname, tdbp->GetName());
return TRUE; // Error return
} // endif !Accept
} else // this is needed by some tables (which?)
colp->SetColUse(cp->GetColUse());
} // endif !special
if (Tablist)
Tablist->Link(tabp);
else
Tablist = tabp;
n++;
} // endif filp
} // endfor tblp
//NumTables = n;
To_Filter = NULL; // To avoid doing it several times
return FALSE;
} // end of InitTableList
/***********************************************************************/
/* Test the tablename against the pseudo "local" filter. */
/***********************************************************************/
bool TDBTBL::TestFil(PGLOBAL g, PFIL filp, PTABLE tabp)
{
char *fil, op[8], tn[NAME_LEN];
bool neg;
if (!filp)
return TRUE;
else if (strstr(filp, " OR ") || strstr(filp, " AND "))
return TRUE; // Not handled yet
else
fil = filp + (*filp == '(' ? 1 : 0);
if (sscanf(fil, "TABID %s", op) != 1)
return TRUE; // ignore invalid filter
if ((neg = !strcmp(op, "NOT")))
strcpy(op, "IN");
if (!strcmp(op, "=")) {
// Temporarily, filter must be "TABID = 'value'" only
if (sscanf(fil, "TABID = '%[^']'", tn) != 1)
return TRUE; // ignore invalid filter
return !stricmp(tn, tabp->GetName());
} else if (!strcmp(op, "IN")) {
char *p, *tnl = (char*)PlugSubAlloc(g, NULL, strlen(fil) - 10);
int n;
if (neg)
n = sscanf(fil, "TABID NOT IN (%[^)])", tnl);
else
n = sscanf(fil, "TABID IN (%[^)])", tnl);
if (n != 1)
return TRUE; // ignore invalid filter
while (tnl) {
if ((p = strchr(tnl, ',')))
*p++ = 0;
if (sscanf(tnl, "'%[^']'", tn) != 1)
return TRUE; // ignore invalid filter
else if (!stricmp(tn, tabp->GetName()))
return !neg; // Found
tnl = p;
} // endwhile
return neg; // Not found
} // endif op
return TRUE; // invalid operator
} // end of TestFil
#if 0
/***********************************************************************/
/* TBL GetProgMax: get the max value for progress information. */
/***********************************************************************/
int TDBTBL::GetProgMax(PGLOBAL g)
{
int n, pmx = 0;
if (!Tablist && InitTableList(g))
return -1;
for (PTABLE tabp = Tablist; tabp; tblp = tabp->GetNext())
if ((n = tabp->GetTo_Tdb()->GetProgMax(g)) > 0)
pmx += n;
return pmx;
} // end of GetProgMax
/***********************************************************************/
/* TBL GetProgCur: get the current value for progress information. */
/***********************************************************************/
int TDBTBL::GetProgCur(void)
{
return Crp + Tdbp->GetProgCur();
} // end of GetProgCur
/***********************************************************************/
/* TBL Cardinality: returns table cardinality in number of rows. */
/* This function can be called with a null argument to test the */
/* availability of Cardinality implementation (1 yes, 0 no). */
/* Can be used on Multiple FIX table only. */
/***********************************************************************/
int TDBTBL::Cardinality(PGLOBAL g)
{
if (!g)
return Tdbp->Cardinality(g);
if (!Tablist && InitTableList(g))
return -1;
int n, card = 0;
for (int i = 0; i < NumFiles; i++) {
Tdbp->SetFile(g, Filenames[i]);
Tdbp->ResetSize();
if ((n = Tdbp->Cardinality(g)) < 0) {
// strcpy(g->Message, MSG(BAD_CARDINALITY));
return -1;
} // endif n
card += n;
} // endfor i
return card;
} // end of Cardinality
#endif // 0
/***********************************************************************/
/* Sum up the sizes of all sub-tables. */
/***********************************************************************/
int TDBTBL::GetMaxSize(PGLOBAL g)
{
if (MaxSize < 0) {
int mxsz;
if (!Tablist && InitTableList(g))
return 0; // Cannot be calculated at this stage
// if (Use == USE_OPEN) {
// strcpy(g->Message, MSG(MAXSIZE_ERROR));
// return -1;
// } else
MaxSize = 0;
for (PTABLE tabp = Tablist; tabp; tabp = tabp->GetNext()) {
if ((mxsz = tabp->GetTo_Tdb()->GetMaxSize(g)) < 0) {
MaxSize = -1;
return mxsz;
} // endif mxsz
MaxSize += mxsz;
} // endfor i
} // endif MaxSize
return MaxSize;
} // end of GetMaxSize
/***********************************************************************/
/* Reset read/write position values. */
/***********************************************************************/
void TDBTBL::ResetDB(void)
{
for (PCOL colp = Columns; colp; colp = colp->GetNext())
if (colp->GetAmType() == TYPE_AM_TABID)
colp->COLBLK::Reset();
for (PTABLE tabp = Tablist; tabp; tabp = tabp->GetNext())
((PTDBASE)tabp->GetTo_Tdb())->ResetDB();
Tdbp = (PTDBASE)Tablist->GetTo_Tdb();
Crp = 0;
} // end of ResetDB
/***********************************************************************/
/* Returns RowId if b is false or Rownum if b is true. */
/***********************************************************************/
int TDBTBL::RowNumber(PGLOBAL g, bool b)
{
return Tdbp->RowNumber(g) + ((b) ? 0 : Rows);
} // end of RowNumber
/***********************************************************************/
/* TBL Access Method opening routine. */
/* Open first file, other will be opened sequencially when reading. */
/***********************************************************************/
bool TDBTBL::OpenDB(PGLOBAL g)
{
if (trace)
htrc("TBL OpenDB: tdbp=%p tdb=R%d use=%d key=%p mode=%d\n",
this, Tdb_No, Use, To_Key_Col, Mode);
if (Use == USE_OPEN) {
/*******************************************************************/
/* Table already open, replace it at its beginning. */
/*******************************************************************/
ResetDB();
return Tdbp->OpenDB(g); // Re-open fist table
} // endif use
#if 0
/*********************************************************************/
/* Direct access needed for join or sorting. */
/*********************************************************************/
if (NeedIndexing(g)) {
// Direct access of TBL tables is not implemented yet
strcpy(g->Message, MSG(NO_MUL_DIR_ACC));
return TRUE;
} // endif NeedIndexing
#endif // 0
/*********************************************************************/
/* When GetMaxsize was called, To_Filter was not set yet. */
/*********************************************************************/
if (To_Filter && Tablist) {
Tablist = NULL;
Nbf = 0;
} // endif To_Filter
/*********************************************************************/
/* Open the first table of the list. */
/*********************************************************************/
if (!Tablist && InitTableList(g)) // done in GetMaxSize
return TRUE;
if ((CurTable = Tablist)) {
Tdbp = (PTDBASE)CurTable->GetTo_Tdb();
Tdbp->SetMode(Mode);
// Tdbp->ResetDB();
// Tdbp->ResetSize();
// Check and initialize the subtable columns
for (PCOL cp = Columns; cp; cp = cp->GetNext())
if (cp->GetAmType() == TYPE_AM_TABID)
cp->COLBLK::Reset();
else if (((PPRXCOL)cp)->Init(g))
return TRUE;
if (trace)
htrc("Opening subtable %s\n", Tdbp->GetName());
// Now we can safely open the table
if (Tdbp->OpenDB(g))
return TRUE;
} // endif *Tablist
Use = USE_OPEN;
return FALSE;
} // end of OpenDB
/***********************************************************************/
/* ReadDB: Data Base read routine for MUL access method. */
/***********************************************************************/
int TDBTBL::ReadDB(PGLOBAL g)
{
int rc;
if (!CurTable)
return RC_EF;
else if (To_Kindex) {
/*******************************************************************/
/* Reading is by an index table. */
/*******************************************************************/
strcpy(g->Message, MSG(NO_INDEX_READ));
rc = RC_FX;
} else {
/*******************************************************************/
/* Now start the reading process. */
/*******************************************************************/
retry:
rc = Tdbp->ReadDB(g);
if (rc == RC_EF) {
// Total number of rows met so far
Rows += Tdbp->RowNumber(g) - 1;
Crp += Tdbp->GetProgMax(g);
if ((CurTable = CurTable->GetNext())) {
/***************************************************************/
/* Continue reading from next table file. */
/***************************************************************/
Tdbp->CloseDB(g);
Tdbp = (PTDBASE)CurTable->GetTo_Tdb();
// Check and initialize the subtable columns
for (PCOL cp = Columns; cp; cp = cp->GetNext())
if (cp->GetAmType() == TYPE_AM_TABID)
cp->COLBLK::Reset();
else if (((PPRXCOL)cp)->Init(g))
return RC_FX;
if (trace)
htrc("Opening subtable %s\n", Tdbp->GetName());
// Now we can safely open the table
if (Tdbp->OpenDB(g)) // Open next table
return RC_FX;
goto retry;
} // endif iFile
} else if (rc == RC_FX)
strcat(strcat(strcat(g->Message, " ("), Tdbp->GetName()), ")");
} // endif To_Kindex
return rc;
} // end of ReadDB
#if 0
/***********************************************************************/
/* Data Base write routine for MUL access method. */
/***********************************************************************/
int TDBTBL::WriteDB(PGLOBAL g)
{
strcpy(g->Message, MSG(TABMUL_READONLY));
return RC_FX; // NIY
} // end of WriteDB
/***********************************************************************/
/* Data Base delete line routine for MUL access method. */
/***********************************************************************/
int TDBTBL::DeleteDB(PGLOBAL g, int irc)
{
strcpy(g->Message, MSG(TABMUL_READONLY));
return RC_FX; // NIY
} // end of DeleteDB
/***********************************************************************/
/* Data Base close routine for MUL access method. */
/***********************************************************************/
void TDBTBL::CloseDB(PGLOBAL g)
{
if (Tdbp)
Tdbp->CloseDB(g);
} // end of CloseDB
#endif // 0
/* ---------------------------- TBLCOL ------------------------------- */
#if 0
/***********************************************************************/
/* TBLCOL public constructor. */
/***********************************************************************/
TBLCOL::TBLCOL(PCOLDEF cdp, PTDB tdbp, PCOL cprec, int i, PSZ am)
: COLBLK(cdp, tdbp, i)
{
if (cprec) {
Next = cprec->GetNext();
cprec->SetNext(this);
} else {
Next = tdbp->GetColumns();
tdbp->SetColumns(this);
} // endif cprec
// Set additional Dos access method information for column.
Long = cdp->GetLong(); // ???
//strcpy(F_Date, cdp->F_Date);
Colp = NULL;
To_Val = NULL;
Pseudo = FALSE;
Colnum = cdp->GetOffset(); // If columns are retrieved by number
if (trace)
htrc(" making new %sCOL C%d %s at %p\n", am, Index, Name, this);
} // end of TBLCOL constructor
/***********************************************************************/
/* TBLCOL public constructor. */
/***********************************************************************/
TBLCOL::TBLCOL(SPCBLK *scp, PTDB tdbp) : COLBLK(scp->GetName(), tdbp, 0)
{
// Set additional TBL access method information for pseudo column.
Is_Key = Was_Key = scp->IsKey();
Long = scp->GetLength();
Buf_Type = scp->GetResultType();
*Format.Type = (Buf_Type == TYPE_INT) ? 'N' : 'C';
Format.Length = Long;
Colp = NULL;
To_Val = NULL;
Pseudo = TRUE;
} // end of TBLCOL constructor
/***********************************************************************/
/* TBLCOL constructor used for copying columns. */
/* tdbp is the pointer to the new table descriptor. */
/***********************************************************************/
TBLCOL::TBLCOL(TBLCOL *col1, PTDB tdbp) : COLBLK(col1, tdbp)
{
Long = col1->Long;
Colp = col1->Colp;
To_Val = col1->To_Val;
Pseudo = col1->Pseudo;
} // end of TBLCOL copy constructor
/***********************************************************************/
/* TBLCOL initialization routine. */
/* Look for the matching column in the current table. */
/***********************************************************************/
bool TBLCOL::Init(PGLOBAL g)
{
PTDBTBL tdbp = (PTDBTBL)To_Tdb;
To_Val = NULL;
if (!(Colp = tdbp->Tdbp->ColDB(g, Name, 0)) && Colnum)
Colp = tdbp->Tdbp->ColDB(g, NULL, Colnum);
if (Colp) {
Colp->InitValue(g); // May not have been done elsewhere
To_Val = Colp->GetValue();
} else if (!tdbp->Accept) {
sprintf(g->Message, MSG(NO_MATCHING_COL), Name, tdbp->Tdbp->GetName());
return TRUE;
} else {
if (Nullable)
Value->SetNull(true);
Value->Reset();
} // endif's
return FALSE;
} // end of Init
/***********************************************************************/
/* ReadColumn: */
/***********************************************************************/
void TBLCOL::ReadColumn(PGLOBAL g)
{
if (trace)
htrc("TBL ReadColumn: name=%s\n", Name);
if (Colp) {
Colp->ReadColumn(g);
Value->SetValue_pval(To_Val);
// Set null when applicable
if (Nullable)
Value->SetNull(Value->IsNull());
} // endif Colp
} // end of ReadColumn
#endif // 0
/* ---------------------------- TBTBLK ------------------------------- */
/***********************************************************************/
/* ReadColumn: */
/***********************************************************************/
void TBTBLK::ReadColumn(PGLOBAL g)
{
if (trace)
htrc("TBT ReadColumn: name=%s\n", Name);
Value->SetValue_psz((char*)((PTDBTBL)To_Tdb)->Tdbp->GetName());
} // end of ReadColumn
/* ------------------------------------------------------------------- */