mirror of
https://github.com/MariaDB/server.git
synced 2026-05-06 15:15:34 +02:00
Propagating DictTabInfo in signal SubTableData for event TE_ALTER
This commit is contained in:
parent
68c1d22b87
commit
667c0e93a4
20 changed files with 302 additions and 127 deletions
|
|
@ -754,7 +754,7 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
|
|||
#define GSN_SUB_SYNC_REQ 582
|
||||
#define GSN_SUB_SYNC_REF 583
|
||||
#define GSN_SUB_SYNC_CONF 584
|
||||
#define GSN_SUB_META_DATA 585
|
||||
/* 585 unused */
|
||||
#define GSN_SUB_TABLE_DATA 586
|
||||
|
||||
#define GSN_CREATE_TABLE_REQ 587
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ class AlterTabReq {
|
|||
friend class Dbdih;
|
||||
friend class Dbtc;
|
||||
friend class Dblqh;
|
||||
friend class Suma;
|
||||
|
||||
/**
|
||||
* For printing
|
||||
|
|
@ -103,7 +104,6 @@ class AlterTabConf {
|
|||
friend class Dbtc;
|
||||
friend class Dblqh;
|
||||
friend class Dbtup;
|
||||
friend class Suma;
|
||||
|
||||
/**
|
||||
* For printing
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ class AlterTableReq {
|
|||
* Sender(s) / Reciver(s)
|
||||
*/
|
||||
friend class NdbTableImpl;
|
||||
friend class NdbEventOperationImpl;
|
||||
friend class NdbDictInterface;
|
||||
friend class Dbdict;
|
||||
|
||||
|
|
|
|||
|
|
@ -277,22 +277,6 @@ struct SubSyncConf {
|
|||
Uint32 senderData;
|
||||
};
|
||||
|
||||
struct SubMetaData {
|
||||
/**
|
||||
* Sender(s)/Reciver(s)
|
||||
*/
|
||||
friend struct SumaParticipant;
|
||||
friend struct Grep;
|
||||
|
||||
friend bool printSUB_META_DATA(FILE *, const Uint32 *, Uint32, Uint16);
|
||||
STATIC_CONST( SignalLength = 3 );
|
||||
SECTION( DICT_TAB_INFO = 0 );
|
||||
|
||||
Uint32 gci;
|
||||
Uint32 senderData;
|
||||
Uint32 tableId;
|
||||
};
|
||||
|
||||
struct SubTableData {
|
||||
/**
|
||||
* Sender(s)/Reciver(s)
|
||||
|
|
@ -301,7 +285,11 @@ struct SubTableData {
|
|||
friend struct Grep;
|
||||
|
||||
friend bool printSUB_TABLE_DATA(FILE *, const Uint32 *, Uint32, Uint16);
|
||||
STATIC_CONST( SignalLength = 5 );
|
||||
STATIC_CONST( SignalLength = 7 );
|
||||
SECTION( DICT_TAB_INFO = 0 );
|
||||
SECTION( ATTR_INFO = 0 );
|
||||
SECTION( AFTER_VALUES = 1 );
|
||||
SECTION( BEFORE_VALUES = 2 );
|
||||
|
||||
enum LogType {
|
||||
SCAN = 1,
|
||||
|
|
@ -317,6 +305,8 @@ struct SubTableData {
|
|||
Uint8 ndbd_nodeid;
|
||||
Uint8 not_used3;
|
||||
Uint32 logType;
|
||||
Uint32 changeMask;
|
||||
Uint32 totalLen;
|
||||
};
|
||||
|
||||
struct SubSyncContinueReq {
|
||||
|
|
|
|||
|
|
@ -162,6 +162,7 @@ public:
|
|||
|
||||
class Table; // forward declaration
|
||||
class Tablespace; // forward declaration
|
||||
// class NdbEventOperation; // forward declaration
|
||||
|
||||
/**
|
||||
* @class Column
|
||||
|
|
@ -884,6 +885,7 @@ public:
|
|||
private:
|
||||
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
||||
friend class NdbTableImpl;
|
||||
friend class NdbEventOperationImpl;
|
||||
#endif
|
||||
class NdbTableImpl & m_impl;
|
||||
Table(NdbTableImpl&);
|
||||
|
|
@ -1181,6 +1183,12 @@ public:
|
|||
* Get unique identifier for the event
|
||||
*/
|
||||
const char *getName() const;
|
||||
/**
|
||||
* Get table that the event is defined on
|
||||
*
|
||||
* @return pointer to table or NULL if no table has been defined
|
||||
*/
|
||||
const NdbDictionary::Table * getTable() const;
|
||||
/**
|
||||
* Define table on which events should be detected
|
||||
*
|
||||
|
|
|
|||
|
|
@ -168,6 +168,26 @@ public:
|
|||
*/
|
||||
NdbDictionary::Event::TableEvent getEventType() const;
|
||||
|
||||
/**
|
||||
* Check if table name has changed, for event TE_ALTER
|
||||
*/
|
||||
const bool tableNameChanged() const;
|
||||
|
||||
/**
|
||||
* Check if table frm has changed, for event TE_ALTER
|
||||
*/
|
||||
const bool tableFrmChanged() const;
|
||||
|
||||
/**
|
||||
* Check if table fragmentation has changed, for event TE_ALTER
|
||||
*/
|
||||
const bool tableFragmentationChanged() const;
|
||||
|
||||
/**
|
||||
* Check if table range partition list name has changed, for event TE_ALTER
|
||||
*/
|
||||
const bool tableRangeListChanged() const;
|
||||
|
||||
/**
|
||||
* Retrieve the GCI of the latest retrieved event
|
||||
*
|
||||
|
|
@ -192,14 +212,13 @@ public:
|
|||
|
||||
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
||||
/** these are subject to change at any time */
|
||||
const NdbDictionary::Table *getTable() const;
|
||||
const NdbDictionary::Event *getEvent() const;
|
||||
const NdbRecAttr *getFirstPkAttr() const;
|
||||
const NdbRecAttr *getFirstPkPreAttr() const;
|
||||
const NdbRecAttr *getFirstDataAttr() const;
|
||||
const NdbRecAttr *getFirstDataPreAttr() const;
|
||||
|
||||
bool validateTable(NdbDictionary::Table &table) const;
|
||||
// bool validateTable(NdbDictionary::Table &table) const;
|
||||
|
||||
void setCustomData(void * data);
|
||||
void * getCustomData() const;
|
||||
|
|
|
|||
|
|
@ -166,7 +166,6 @@ SignalDataPrintFunctions[] = {
|
|||
{ GSN_SUB_SYNC_REQ, printSUB_SYNC_REQ },
|
||||
{ GSN_SUB_SYNC_REF, printSUB_SYNC_REF },
|
||||
{ GSN_SUB_SYNC_CONF, printSUB_SYNC_CONF },
|
||||
{ GSN_SUB_META_DATA, printSUB_META_DATA },
|
||||
{ GSN_SUB_TABLE_DATA, printSUB_TABLE_DATA },
|
||||
{ GSN_SUB_SYNC_CONTINUE_REQ, printSUB_SYNC_CONTINUE_REQ },
|
||||
{ GSN_SUB_SYNC_CONTINUE_REF, printSUB_SYNC_CONTINUE_REF },
|
||||
|
|
|
|||
|
|
@ -540,7 +540,6 @@ const GsnName SignalNames [] = {
|
|||
,{ GSN_SUB_SYNC_REQ, "SUB_SYNC_REQ" }
|
||||
,{ GSN_SUB_SYNC_REF, "SUB_SYNC_REF" }
|
||||
,{ GSN_SUB_SYNC_CONF, "SUB_SYNC_CONF" }
|
||||
,{ GSN_SUB_META_DATA, "SUB_META_DATA" }
|
||||
,{ GSN_SUB_TABLE_DATA, "SUB_TABLE_DATA" }
|
||||
,{ GSN_SUB_SYNC_CONTINUE_REQ, "SUB_SYNC_CONTINUE_REQ" }
|
||||
,{ GSN_SUB_SYNC_CONTINUE_REF, "SUB_SYNC_CONTINUE_REF" }
|
||||
|
|
|
|||
|
|
@ -169,17 +169,6 @@ printSUB_SYNC_CONF(FILE * output, const Uint32 * theData,
|
|||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
printSUB_META_DATA(FILE * output, const Uint32 * theData,
|
||||
Uint32 len, Uint16 receiverBlockNo) {
|
||||
const SubMetaData * const sig = (SubMetaData *)theData;
|
||||
fprintf(output, " gci: %x\n", sig->gci);
|
||||
fprintf(output, " senderData: %x\n", sig->senderData);
|
||||
fprintf(output, " senderData: %x\n", sig->senderData);
|
||||
fprintf(output, " tableId: %x\n", sig->tableId);
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
printSUB_TABLE_DATA(FILE * output, const Uint32 * theData,
|
||||
Uint32 len, Uint16 receiverBlockNo) {
|
||||
|
|
|
|||
|
|
@ -4630,12 +4630,7 @@ Dbdict::alterTab_writeSchemaConf(Signal* signal,
|
|||
|
||||
SegmentedSectionPtr tabInfoPtr;
|
||||
getSection(tabInfoPtr, alterTabPtr.p->m_tabInfoPtrI);
|
||||
|
||||
writeTableFile(signal, tableId, tabInfoPtr, &callback);
|
||||
|
||||
alterTabPtr.p->m_tabInfoPtrI = RNIL;
|
||||
signal->setSection(tabInfoPtr, 0);
|
||||
releaseSections(signal);
|
||||
}
|
||||
|
||||
void
|
||||
|
|
@ -4649,8 +4644,32 @@ Dbdict::alterTab_writeTableConf(Signal* signal,
|
|||
Uint32 coordinatorRef = alterTabPtr.p->m_coordinatorRef;
|
||||
TableRecordPtr tabPtr;
|
||||
c_tableRecordPool.getPtr(tabPtr, alterTabPtr.p->m_alterTableId);
|
||||
|
||||
// Alter table commit request handled successfully
|
||||
// Inform Suma so it can send events to any subscribers of the table
|
||||
AlterTabReq * req = (AlterTabReq*)signal->getDataPtrSend();
|
||||
if (coordinatorRef == reference())
|
||||
req->senderRef = alterTabPtr.p->m_senderRef;
|
||||
else
|
||||
req->senderRef = 0;
|
||||
req->senderData = callbackData;
|
||||
req->tableId = tabPtr.p->tableId;
|
||||
req->tableVersion = tabPtr.p->tableVersion;
|
||||
req->gci = tabPtr.p->gciTableCreated;
|
||||
req->requestType = AlterTabReq::AlterTableCommit;
|
||||
req->changeMask = alterTabPtr.p->m_changeMask;
|
||||
SegmentedSectionPtr tabInfoPtr;
|
||||
getSection(tabInfoPtr, alterTabPtr.p->m_tabInfoPtrI);
|
||||
signal->setSection(tabInfoPtr, AlterTabReq::DICT_TAB_INFO);
|
||||
#ifndef DBUG_OFF
|
||||
ndbout_c("DICT_TAB_INFO in DICT");
|
||||
SimplePropertiesSectionReader reader(tabInfoPtr, getSectionSegmentPool());
|
||||
reader.printAll(ndbout);
|
||||
#endif
|
||||
EXECUTE_DIRECT(SUMA, GSN_ALTER_TAB_REQ, signal,
|
||||
AlterTabReq::SignalLength);
|
||||
releaseSections(signal);
|
||||
alterTabPtr.p->m_tabInfoPtrI = RNIL;
|
||||
jamEntry();
|
||||
AlterTabConf * conf = (AlterTabConf*)signal->getDataPtrSend();
|
||||
conf->senderRef = reference();
|
||||
conf->senderData = callbackData;
|
||||
|
|
@ -4658,17 +4677,7 @@ Dbdict::alterTab_writeTableConf(Signal* signal,
|
|||
conf->tableVersion = tabPtr.p->tableVersion;
|
||||
conf->gci = tabPtr.p->gciTableCreated;
|
||||
conf->requestType = AlterTabReq::AlterTableCommit;
|
||||
{
|
||||
AlterTabConf tmp= *conf;
|
||||
if (coordinatorRef == reference())
|
||||
conf->senderRef = alterTabPtr.p->m_senderRef;
|
||||
else
|
||||
conf->senderRef = 0;
|
||||
EXECUTE_DIRECT(SUMA, GSN_ALTER_TAB_CONF, signal,
|
||||
AlterTabConf::SignalLength);
|
||||
jamEntry();
|
||||
*conf= tmp;
|
||||
}
|
||||
conf->changeMask = alterTabPtr.p->m_changeMask;
|
||||
sendSignal(coordinatorRef, GSN_ALTER_TAB_CONF, signal,
|
||||
AlterTabConf::SignalLength, JBB);
|
||||
|
||||
|
|
|
|||
|
|
@ -2548,8 +2548,9 @@ Suma::reportAllSubscribers(Signal *signal,
|
|||
data->gci = m_last_complete_gci + 1;
|
||||
data->tableId = subPtr.p->m_tableId;
|
||||
data->operation = table_event;
|
||||
data->logType = 0;
|
||||
data->ndbd_nodeid = refToNode(reference());
|
||||
data->changeMask = 0;
|
||||
data->totalLen = 0;
|
||||
|
||||
TablePtr tabPtr;
|
||||
c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
|
||||
|
|
@ -3166,7 +3167,9 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
|
|||
LinearSectionPtr ptr[3];
|
||||
const Uint32 nptr= reformat(signal, ptr,
|
||||
f_buffer, sz, b_buffer, b_trigBufferSize);
|
||||
|
||||
Uint32 ptrLen= 0;
|
||||
for(Uint32 i =0; i < nptr; i++)
|
||||
ptrLen+= ptr[i].sz;
|
||||
/**
|
||||
* Signal to subscriber(s)
|
||||
*/
|
||||
|
|
@ -3177,6 +3180,8 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
|
|||
data->tableId = tabPtr.p->m_tableId;
|
||||
data->operation = event;
|
||||
data->logType = 0;
|
||||
data->changeMask = 0;
|
||||
data->totalLen = ptrLen;
|
||||
|
||||
{
|
||||
LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers);
|
||||
|
|
@ -3424,17 +3429,19 @@ Suma::execDROP_TAB_CONF(Signal *signal)
|
|||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
static Uint32 b_dti_buf[10000];
|
||||
|
||||
void
|
||||
Suma::execALTER_TAB_CONF(Signal *signal)
|
||||
Suma::execALTER_TAB_REQ(Signal *signal)
|
||||
{
|
||||
jamEntry();
|
||||
DBUG_ENTER("Suma::execALTER_TAB_CONF");
|
||||
ndbassert(signal->getNoOfSections() == 0);
|
||||
|
||||
AlterTabConf * const conf = (AlterTabConf*)signal->getDataPtr();
|
||||
Uint32 senderRef= conf->senderRef;
|
||||
Uint32 tableId= conf->tableId;
|
||||
DBUG_ENTER("Suma::execALTER_TAB_REQ");
|
||||
ndbassert(signal->getNoOfSections() == 1);
|
||||
|
||||
AlterTabReq * const req = (AlterTabReq*)signal->getDataPtr();
|
||||
Uint32 senderRef= req->senderRef;
|
||||
Uint32 tableId= req->tableId;
|
||||
Uint32 changeMask= req->changeMask;
|
||||
TablePtr tabPtr;
|
||||
if (!c_tables.find(tabPtr, tableId) ||
|
||||
tabPtr.p->m_state == Table::DROPPED ||
|
||||
|
|
@ -3453,13 +3460,30 @@ Suma::execALTER_TAB_CONF(Signal *signal)
|
|||
DBUG_VOID_RETURN;
|
||||
}
|
||||
// dict coordinator sends info to API
|
||||
|
||||
|
||||
// Copy DICT_TAB_INFO to local buffer
|
||||
SegmentedSectionPtr tabInfoPtr;
|
||||
signal->getSection(tabInfoPtr, AlterTabReq::DICT_TAB_INFO);
|
||||
#ifndef DBUG_OFF
|
||||
ndbout_c("DICT_TAB_INFO in SUMA, tabInfoPtr.sz = %d", tabInfoPtr.sz);
|
||||
SimplePropertiesSectionReader reader(tabInfoPtr, getSectionSegmentPool());
|
||||
reader.printAll(ndbout);
|
||||
#endif
|
||||
copy(b_dti_buf, tabInfoPtr);
|
||||
LinearSectionPtr ptr[3];
|
||||
ptr[0].p = b_dti_buf;
|
||||
ptr[0].sz = tabInfoPtr.sz;
|
||||
|
||||
releaseSections(signal);
|
||||
|
||||
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
|
||||
data->gci = m_last_complete_gci+1;
|
||||
data->tableId = tableId;
|
||||
data->operation = NdbDictionary::Event::_TE_ALTER;
|
||||
data->req_nodeid = refToNode(senderRef);
|
||||
|
||||
data->logType = 0;
|
||||
data->changeMask = changeMask;
|
||||
data->totalLen = tabInfoPtr.sz;
|
||||
{
|
||||
LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
|
||||
SubscriberPtr subbPtr;
|
||||
|
|
@ -3479,8 +3503,9 @@ Suma::execALTER_TAB_CONF(Signal *signal)
|
|||
}
|
||||
|
||||
data->senderData= subbPtr.p->m_senderData;
|
||||
sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
|
||||
SubTableData::SignalLength, JBB);
|
||||
Callback c = { 0, 0 };
|
||||
sendFragmentedSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
|
||||
SubTableData::SignalLength, JBB, ptr, 1, c);
|
||||
DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
|
||||
}
|
||||
}
|
||||
|
|
@ -4607,7 +4632,9 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
|
|||
const Uint32 nptr= reformat(signal, ptr,
|
||||
src, sz_1,
|
||||
src + sz_1, sz - 2 - sz_1);
|
||||
|
||||
Uint32 ptrLen= 0;
|
||||
for(Uint32 i =0; i < nptr; i++)
|
||||
ptrLen+= ptr[i].sz;
|
||||
/**
|
||||
* Signal to subscriber(s)
|
||||
*/
|
||||
|
|
@ -4619,6 +4646,8 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
|
|||
data->tableId = tabPtr.p->m_tableId;
|
||||
data->operation = event;
|
||||
data->logType = 0;
|
||||
data->changeMask = 0;
|
||||
data->totalLen = ptrLen;
|
||||
|
||||
{
|
||||
LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers);
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ public:
|
|||
void execGET_TABLEID_REF(Signal* signal);
|
||||
|
||||
void execDROP_TAB_CONF(Signal* signal);
|
||||
void execALTER_TAB_CONF(Signal* signal);
|
||||
void execALTER_TAB_REQ(Signal* signal);
|
||||
void execCREATE_TAB_CONF(Signal* signal);
|
||||
/**
|
||||
* Scan interface
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ Suma::Suma(const Configuration & conf) :
|
|||
* Dict interface
|
||||
*/
|
||||
addRecSignal(GSN_DROP_TAB_CONF, &Suma::execDROP_TAB_CONF);
|
||||
addRecSignal(GSN_ALTER_TAB_CONF, &Suma::execALTER_TAB_CONF);
|
||||
addRecSignal(GSN_ALTER_TAB_REQ, &Suma::execALTER_TAB_REQ);
|
||||
addRecSignal(GSN_CREATE_TAB_CONF, &Suma::execCREATE_TAB_CONF);
|
||||
|
||||
#if 0
|
||||
|
|
|
|||
|
|
@ -827,6 +827,12 @@ NdbDictionary::Event::setTable(const Table& table)
|
|||
m_impl.setTable(table);
|
||||
}
|
||||
|
||||
const NdbDictionary::Table *
|
||||
NdbDictionary::Event::getTable() const
|
||||
{
|
||||
return m_impl.getTable();
|
||||
}
|
||||
|
||||
void
|
||||
NdbDictionary::Event::setTable(const char * table)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -1069,7 +1069,6 @@ void NdbEventImpl::init()
|
|||
{
|
||||
m_eventId= RNIL;
|
||||
m_eventKey= RNIL;
|
||||
m_tableId= RNIL;
|
||||
mi_type= 0;
|
||||
m_dur= NdbDictionary::Event::ED_UNDEFINED;
|
||||
m_tableImpl= NULL;
|
||||
|
|
@ -1080,6 +1079,8 @@ NdbEventImpl::~NdbEventImpl()
|
|||
{
|
||||
for (unsigned i = 0; i < m_columns.size(); i++)
|
||||
delete m_columns[i];
|
||||
if (m_tableImpl)
|
||||
delete m_tableImpl;
|
||||
}
|
||||
|
||||
void NdbEventImpl::setName(const char * name)
|
||||
|
|
@ -1095,10 +1096,29 @@ const char *NdbEventImpl::getName() const
|
|||
void
|
||||
NdbEventImpl::setTable(const NdbDictionary::Table& table)
|
||||
{
|
||||
m_tableImpl= &NdbTableImpl::getImpl(table);
|
||||
setTable(&NdbTableImpl::getImpl(table));
|
||||
m_tableName.assign(m_tableImpl->getName());
|
||||
}
|
||||
|
||||
void
|
||||
NdbEventImpl::setTable(NdbTableImpl *tableImpl)
|
||||
{
|
||||
DBUG_ASSERT(tableImpl->m_status != NdbDictionary::Object::Invalid);
|
||||
if (!m_tableImpl)
|
||||
m_tableImpl = new NdbTableImpl();
|
||||
// Copy table, since event might be accessed from different threads
|
||||
m_tableImpl->assign(*tableImpl);
|
||||
}
|
||||
|
||||
const NdbDictionary::Table *
|
||||
NdbEventImpl::getTable() const
|
||||
{
|
||||
if (m_tableImpl)
|
||||
return m_tableImpl->m_facade;
|
||||
else
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void
|
||||
NdbEventImpl::setTable(const char * table)
|
||||
{
|
||||
|
|
@ -1247,6 +1267,21 @@ NdbDictionaryImpl::fetchGlobalTableImpl(const BaseString& internalTableName)
|
|||
return info;
|
||||
}
|
||||
|
||||
void
|
||||
NdbDictionaryImpl::putTable(NdbTableImpl *impl)
|
||||
{
|
||||
m_globalHash->lock();
|
||||
m_globalHash->put(impl->m_internalName.c_str(), impl);
|
||||
m_globalHash->unlock();
|
||||
Ndb_local_table_info *info=
|
||||
Ndb_local_table_info::create(impl, m_local_table_data_size);
|
||||
|
||||
m_localHash.put(impl->m_internalName.c_str(), info);
|
||||
|
||||
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
|
||||
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
bool
|
||||
NdbDictionaryImpl::setTransporter(class TransporterFacade * tf)
|
||||
|
|
@ -3074,13 +3109,11 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
|
|||
evnt.getTableName()));
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
evnt.setTable(tab);
|
||||
}
|
||||
|
||||
DBUG_PRINT("info",("Table: id: %d version: %d", tab->m_id, tab->m_version));
|
||||
|
||||
evnt.m_tableId = tab->m_id;
|
||||
evnt.m_tableVersion = tab->m_version;
|
||||
evnt.m_tableImpl = tab;
|
||||
NdbTableImpl &table = *evnt.m_tableImpl;
|
||||
|
||||
int attributeList_sz = evnt.m_attrIds.size();
|
||||
|
|
@ -3102,7 +3135,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
|
|||
attributeList_sz = evnt.m_columns.size();
|
||||
|
||||
DBUG_PRINT("info",("Event on tableId=%d, tableVersion=%d, event name %s, no of columns %d",
|
||||
evnt.m_tableId, evnt.m_tableVersion,
|
||||
table.m_id, table.m_version,
|
||||
evnt.m_name.c_str(),
|
||||
evnt.m_columns.size()));
|
||||
|
||||
|
|
@ -3180,11 +3213,12 @@ NdbDictInterface::createEvent(class Ndb & ndb,
|
|||
req->setRequestType(CreateEvntReq::RT_USER_GET);
|
||||
} else {
|
||||
DBUG_PRINT("info",("tableId: %u tableVersion: %u",
|
||||
evnt.m_tableId, evnt.m_tableVersion));
|
||||
evnt.m_tableImpl->m_id,
|
||||
evnt.m_tableImpl->m_version));
|
||||
// creating event in Dictionary
|
||||
req->setRequestType(CreateEvntReq::RT_USER_CREATE);
|
||||
req->setTableId(evnt.m_tableId);
|
||||
req->setTableVersion(evnt.m_tableVersion);
|
||||
req->setTableId(evnt.m_tableImpl->m_id);
|
||||
req->setTableVersion(evnt.m_tableImpl->m_version);
|
||||
req->setAttrListBitmask(evnt.m_attrListBitmask);
|
||||
req->setEventType(evnt.mi_type);
|
||||
req->clearFlags();
|
||||
|
|
@ -3235,14 +3269,12 @@ NdbDictInterface::createEvent(class Ndb & ndb,
|
|||
// NdbEventImpl *evntImpl = (NdbEventImpl *)evntConf->getUserData();
|
||||
|
||||
if (getFlag) {
|
||||
evnt.m_tableId = evntConf->getTableId();
|
||||
evnt.m_tableVersion = evntConf->getTableVersion();
|
||||
evnt.m_attrListBitmask = evntConf->getAttrListBitmask();
|
||||
evnt.mi_type = evntConf->getEventType();
|
||||
evnt.setTable(dataPtr);
|
||||
} else {
|
||||
if (evnt.m_tableId != evntConf->getTableId() ||
|
||||
evnt.m_tableVersion != evntConf->getTableVersion() ||
|
||||
if (evnt.m_tableImpl->m_id != evntConf->getTableId() ||
|
||||
evnt.m_tableImpl->m_version != evntConf->getTableVersion() ||
|
||||
//evnt.m_attrListBitmask != evntConf->getAttrListBitmask() ||
|
||||
evnt.mi_type != evntConf->getEventType()) {
|
||||
ndbout_c("ERROR*************");
|
||||
|
|
@ -3336,7 +3368,7 @@ NdbDictionaryImpl::getEvent(const char * eventName)
|
|||
DBUG_ENTER("NdbDictionaryImpl::getEvent");
|
||||
DBUG_PRINT("enter",("eventName= %s", eventName));
|
||||
|
||||
NdbEventImpl *ev = new NdbEventImpl();
|
||||
NdbEventImpl *ev = new NdbEventImpl();
|
||||
if (ev == NULL) {
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
|
|
@ -3353,48 +3385,29 @@ NdbDictionaryImpl::getEvent(const char * eventName)
|
|||
// We only have the table name with internal name
|
||||
DBUG_PRINT("info",("table %s", ev->getTableName()));
|
||||
Ndb_local_table_info *info;
|
||||
int retry= 0;
|
||||
while (1)
|
||||
info= get_local_table_info(ev->getTableName(), true);
|
||||
if (info == 0)
|
||||
{
|
||||
info= get_local_table_info(ev->getTableName(), true);
|
||||
if (info == 0)
|
||||
{
|
||||
DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
|
||||
if (ev->m_tableId == info->m_table_impl->m_id &&
|
||||
ev->m_tableVersion == info->m_table_impl->m_version)
|
||||
break;
|
||||
if (retry)
|
||||
{
|
||||
m_error.code= 241;
|
||||
DBUG_PRINT("error",("%s: table version mismatch, event: [%u,%u] table: [%u,%u]",
|
||||
ev->getTableName(), ev->m_tableId, ev->m_tableVersion,
|
||||
info->m_table_impl->m_id, info->m_table_impl->m_version));
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
invalidateObject(*info->m_table_impl);
|
||||
retry++;
|
||||
DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
|
||||
ev->m_tableImpl= info->m_table_impl;
|
||||
ev->setTable(m_ndb.externalizeTableName(ev->getTableName()));
|
||||
|
||||
ev->setTable(info->m_table_impl);
|
||||
ev->setTable(m_ndb.externalizeTableName(ev->getTableName()));
|
||||
// get the columns from the attrListBitmask
|
||||
|
||||
NdbTableImpl &table = *ev->m_tableImpl;
|
||||
AttributeMask & mask = ev->m_attrListBitmask;
|
||||
unsigned attributeList_sz = mask.count();
|
||||
|
||||
DBUG_PRINT("info",("Table: id: %d version: %d", table.m_id, table.m_version));
|
||||
DBUG_PRINT("info",("Table: id: %d version: %d",
|
||||
table.m_id, table.m_version));
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
char buf[128] = {0};
|
||||
mask.getText(buf);
|
||||
DBUG_PRINT("info",("attributeList_sz= %d, mask= %s", attributeList_sz, buf));
|
||||
DBUG_PRINT("info",("attributeList_sz= %d, mask= %s",
|
||||
attributeList_sz, buf));
|
||||
#endif
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -261,6 +261,12 @@ public:
|
|||
};
|
||||
|
||||
class NdbEventImpl : public NdbDictionary::Event, public NdbDictObjectImpl {
|
||||
friend class NdbDictInterface;
|
||||
friend class NdbDictionaryImpl;
|
||||
friend class NdbEventOperation;
|
||||
friend class NdbEventOperationImpl;
|
||||
friend class NdbEventBuffer;
|
||||
friend class EventBufData_hash;
|
||||
public:
|
||||
NdbEventImpl();
|
||||
NdbEventImpl(NdbDictionary::Event &);
|
||||
|
|
@ -270,6 +276,7 @@ public:
|
|||
void setName(const char * name);
|
||||
const char * getName() const;
|
||||
void setTable(const NdbDictionary::Table& table);
|
||||
const NdbDictionary::Table * getTable() const;
|
||||
void setTable(const char * table);
|
||||
const char * getTableName() const;
|
||||
void addTableEvent(const NdbDictionary::Event::TableEvent t);
|
||||
|
|
@ -288,15 +295,12 @@ public:
|
|||
|
||||
Uint32 m_eventId;
|
||||
Uint32 m_eventKey;
|
||||
Uint32 m_tableId;
|
||||
Uint32 m_tableVersion;
|
||||
AttributeMask m_attrListBitmask;
|
||||
BaseString m_name;
|
||||
Uint32 mi_type;
|
||||
NdbDictionary::Event::EventDurability m_dur;
|
||||
NdbDictionary::Event::EventReport m_rep;
|
||||
|
||||
NdbTableImpl *m_tableImpl;
|
||||
BaseString m_tableName;
|
||||
Vector<NdbColumnImpl *> m_columns;
|
||||
Vector<unsigned> m_attrIds;
|
||||
|
|
@ -304,6 +308,9 @@ public:
|
|||
static NdbEventImpl & getImpl(NdbDictionary::Event & t);
|
||||
static NdbEventImpl & getImpl(const NdbDictionary::Event & t);
|
||||
NdbDictionary::Event * m_facade;
|
||||
private:
|
||||
NdbTableImpl *m_tableImpl;
|
||||
void setTable(NdbTableImpl *tableImpl);
|
||||
};
|
||||
|
||||
struct NdbFilegroupImpl : public NdbDictObjectImpl {
|
||||
|
|
@ -558,6 +565,7 @@ public:
|
|||
int listIndexes(List& list, Uint32 indexId);
|
||||
|
||||
NdbTableImpl * getTable(const char * tableName, void **data= 0);
|
||||
void putTable(NdbTableImpl *impl);
|
||||
Ndb_local_table_info* get_local_table_info(
|
||||
const BaseString& internalTableName, bool do_add_blob_tables);
|
||||
NdbIndexImpl * getIndex(const char * indexName,
|
||||
|
|
|
|||
|
|
@ -85,6 +85,26 @@ NdbEventOperation::hasError() const
|
|||
return m_impl.m_has_error;
|
||||
}
|
||||
|
||||
const bool NdbEventOperation::tableNameChanged() const
|
||||
{
|
||||
return m_impl.tableNameChanged();
|
||||
}
|
||||
|
||||
const bool NdbEventOperation::tableFrmChanged() const
|
||||
{
|
||||
return m_impl.tableFrmChanged();
|
||||
}
|
||||
|
||||
const bool NdbEventOperation::tableFragmentationChanged() const
|
||||
{
|
||||
return m_impl.tableFragmentationChanged();
|
||||
}
|
||||
|
||||
const bool NdbEventOperation::tableRangeListChanged() const
|
||||
{
|
||||
return m_impl.tableRangeListChanged();
|
||||
}
|
||||
|
||||
Uint64
|
||||
NdbEventOperation::getGCI() const
|
||||
{
|
||||
|
|
@ -112,10 +132,6 @@ NdbEventOperation::print()
|
|||
/*
|
||||
* Internal for the mysql server
|
||||
*/
|
||||
const NdbDictionary::Table *NdbEventOperation::getTable() const
|
||||
{
|
||||
return m_impl.m_eventImpl->m_tableImpl->m_facade;
|
||||
}
|
||||
const NdbDictionary::Event *NdbEventOperation::getEvent() const
|
||||
{
|
||||
return m_impl.m_eventImpl->m_facade;
|
||||
|
|
@ -136,6 +152,7 @@ const NdbRecAttr* NdbEventOperation::getFirstDataPreAttr() const
|
|||
{
|
||||
return m_impl.theFirstDataAttrs[1];
|
||||
}
|
||||
/*
|
||||
bool NdbEventOperation::validateTable(NdbDictionary::Table &table) const
|
||||
{
|
||||
DBUG_ENTER("NdbEventOperation::validateTable");
|
||||
|
|
@ -147,7 +164,7 @@ bool NdbEventOperation::validateTable(NdbDictionary::Table &table) const
|
|||
}
|
||||
DBUG_RETURN(res);
|
||||
}
|
||||
|
||||
*/
|
||||
void NdbEventOperation::setCustomData(void * data)
|
||||
{
|
||||
m_impl.m_custom_data= data;
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@
|
|||
#include <NdbRecAttr.hpp>
|
||||
#include <NdbEventOperation.hpp>
|
||||
#include "NdbEventOperationImpl.hpp"
|
||||
#include <signaldata/AlterTable.hpp>
|
||||
|
||||
#include <EventLogger.hpp>
|
||||
extern EventLogger g_eventLogger;
|
||||
|
|
@ -77,6 +78,7 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
|
|||
const char* eventName)
|
||||
: NdbEventOperation(*this), m_facade(&N), m_magic_number(0),
|
||||
m_ndb(theNdb), m_state(EO_ERROR), mi_type(0), m_oid(~(Uint32)0),
|
||||
m_change_mask(0),
|
||||
#ifdef VM_TRACE
|
||||
m_data_done_count(0), m_data_count(0),
|
||||
#endif
|
||||
|
|
@ -341,6 +343,26 @@ NdbEventOperationImpl::stop()
|
|||
DBUG_RETURN(r);
|
||||
}
|
||||
|
||||
const bool NdbEventOperationImpl::tableNameChanged() const
|
||||
{
|
||||
return (bool)AlterTableReq::getNameFlag(m_change_mask);
|
||||
}
|
||||
|
||||
const bool NdbEventOperationImpl::tableFrmChanged() const
|
||||
{
|
||||
return (bool)AlterTableReq::getFrmFlag(m_change_mask);
|
||||
}
|
||||
|
||||
const bool NdbEventOperationImpl::tableFragmentationChanged() const
|
||||
{
|
||||
return (bool)AlterTableReq::getFragDataFlag(m_change_mask);
|
||||
}
|
||||
|
||||
const bool NdbEventOperationImpl::tableRangeListChanged() const
|
||||
{
|
||||
return (bool)AlterTableReq::getRangeListFlag(m_change_mask);
|
||||
}
|
||||
|
||||
Uint64
|
||||
NdbEventOperationImpl::getGCI()
|
||||
{
|
||||
|
|
@ -353,6 +375,35 @@ NdbEventOperationImpl::getLatestGCI()
|
|||
return m_ndb->theEventBuffer->getLatestGCI();
|
||||
}
|
||||
|
||||
bool
|
||||
NdbEventOperationImpl::execSUB_TABLE_DATA(NdbApiSignal * signal,
|
||||
LinearSectionPtr ptr[3])
|
||||
{
|
||||
DBUG_ENTER("NdbEventOperationImpl::execSUB_TABLE_DATA");
|
||||
const SubTableData * const sdata=
|
||||
CAST_CONSTPTR(SubTableData, signal->getDataPtr());
|
||||
|
||||
if(signal->isFirstFragment()){
|
||||
m_fragmentId = signal->getFragmentId();
|
||||
m_buffer.grow(4 * sdata->totalLen);
|
||||
} else {
|
||||
if(m_fragmentId != signal->getFragmentId()){
|
||||
abort();
|
||||
}
|
||||
}
|
||||
const Uint32 i = SubTableData::DICT_TAB_INFO;
|
||||
DBUG_PRINT("info", ("Accumulated %u bytes for fragment %u",
|
||||
4 * ptr[i].sz, m_fragmentId));
|
||||
m_buffer.append(ptr[i].p, 4 * ptr[i].sz);
|
||||
|
||||
if(!signal->isLastFragment()){
|
||||
DBUG_RETURN(FALSE);
|
||||
}
|
||||
|
||||
DBUG_RETURN(TRUE);
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
NdbEventOperationImpl::receive_event()
|
||||
{
|
||||
|
|
@ -361,6 +412,26 @@ NdbEventOperationImpl::receive_event()
|
|||
Uint32 operation= (Uint32)m_data_item->sdata->operation;
|
||||
DBUG_PRINT_EVENT("info",("sdata->operation %u",operation));
|
||||
|
||||
if (operation == NdbDictionary::Event::_TE_ALTER)
|
||||
{
|
||||
// Parse the new table definition and
|
||||
// create a table object
|
||||
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
|
||||
NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict);
|
||||
NdbError error;
|
||||
NdbDictInterface dif(error);
|
||||
NdbTableImpl *at;
|
||||
m_change_mask = m_data_item->sdata->changeMask;
|
||||
error.code = dif.parseTableInfo(&at,
|
||||
(Uint32*)m_buffer.get_data(),
|
||||
m_buffer.length() / 4,
|
||||
true);
|
||||
m_buffer.clear();
|
||||
if ( m_eventImpl->m_tableImpl)
|
||||
delete m_eventImpl->m_tableImpl;
|
||||
m_eventImpl->m_tableImpl = at;
|
||||
}
|
||||
|
||||
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
|
||||
{
|
||||
DBUG_RETURN_EVENT(1);
|
||||
|
|
@ -1178,7 +1249,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
|
|||
*/
|
||||
DBUG_RETURN_EVENT(0);
|
||||
}
|
||||
|
||||
|
||||
bool use_hash =
|
||||
op->m_mergeEvents &&
|
||||
sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@
|
|||
#include <signaldata/SumaImpl.hpp>
|
||||
#include <transporter/TransporterDefinitions.hpp>
|
||||
#include <NdbRecAttr.hpp>
|
||||
#include <UtilBuffer.hpp>
|
||||
|
||||
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
|
||||
|
||||
|
|
@ -177,8 +178,14 @@ public:
|
|||
NdbRecAttr *getValue(const char *colName, char *aValue, int n);
|
||||
NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n);
|
||||
int receive_event();
|
||||
const bool tableNameChanged() const;
|
||||
const bool tableFrmChanged() const;
|
||||
const bool tableFragmentationChanged() const;
|
||||
const bool tableRangeListChanged() const;
|
||||
Uint64 getGCI();
|
||||
Uint64 getLatestGCI();
|
||||
bool execSUB_TABLE_DATA(NdbApiSignal * signal,
|
||||
LinearSectionPtr ptr[3]);
|
||||
|
||||
NdbDictionary::Event::TableEvent getEventType();
|
||||
|
||||
|
|
@ -213,6 +220,12 @@ public:
|
|||
void *m_custom_data;
|
||||
int m_has_error;
|
||||
|
||||
Uint32 m_fragmentId;
|
||||
UtilBuffer m_buffer;
|
||||
|
||||
// Bit mask for what has changed in a table (for TE_ALTER event)
|
||||
Uint32 m_change_mask;
|
||||
|
||||
#ifdef VM_TRACE
|
||||
Uint32 m_data_done_count;
|
||||
Uint32 m_data_count;
|
||||
|
|
|
|||
|
|
@ -703,7 +703,6 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
|
|||
aSignal, ptr);
|
||||
return;
|
||||
|
||||
case GSN_SUB_META_DATA:
|
||||
case GSN_SUB_REMOVE_CONF:
|
||||
case GSN_SUB_REMOVE_REF:
|
||||
return; // ignore these signals
|
||||
|
|
@ -726,6 +725,16 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
|
|||
const SubTableData * const sdata=
|
||||
CAST_CONSTPTR(SubTableData, aSignal->getDataPtr());
|
||||
const Uint32 oid = sdata->senderData;
|
||||
NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid);
|
||||
|
||||
if (op->m_magic_number != NDB_EVENT_OP_MAGIC_NUMBER)
|
||||
g_eventLogger.error("dropped GSN_SUB_TABLE_DATA due to wrong magic "
|
||||
"number");
|
||||
|
||||
// Accumulate DIC_TAB_INFO for TE_ALTER events
|
||||
if (sdata->operation == NdbDictionary::Event::_TE_ALTER &&
|
||||
!op->execSUB_TABLE_DATA(aSignal, ptr))
|
||||
return;
|
||||
|
||||
for (int i= aSignal->m_noOfSections;i < 3; i++) {
|
||||
ptr[i].p = NULL;
|
||||
|
|
@ -736,12 +745,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
|
|||
sdata->senderData, sdata->gci, sdata->operation,
|
||||
sdata->tableId));
|
||||
|
||||
NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid);
|
||||
if (op->m_magic_number == NDB_EVENT_OP_MAGIC_NUMBER)
|
||||
theEventBuffer->insertDataL(op,sdata, ptr);
|
||||
else
|
||||
g_eventLogger.error("dropped GSN_SUB_TABLE_DATA due to wrong magic "
|
||||
"number");
|
||||
theEventBuffer->insertDataL(op,sdata, ptr);
|
||||
return;
|
||||
}
|
||||
case GSN_DIHNDBTAMPER:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue