mirror of
https://github.com/MariaDB/server.git
synced 2025-01-31 11:01:52 +01:00
simple extend of listen_event to do apply on remote cluster
This commit is contained in:
parent
d019e94aed
commit
f24ada7fe6
1 changed files with 184 additions and 4 deletions
|
@ -22,6 +22,128 @@
|
|||
#include <getarg.h>
|
||||
|
||||
|
||||
#define BATCH_SIZE 128
|
||||
struct Table_info
|
||||
{
|
||||
Uint32 id;
|
||||
};
|
||||
|
||||
struct Trans_arg
|
||||
{
|
||||
Ndb *ndb;
|
||||
NdbTransaction *trans;
|
||||
Uint32 bytes_batched;
|
||||
};
|
||||
|
||||
Vector< Vector<NdbRecAttr*> > event_values;
|
||||
Vector< Vector<NdbRecAttr*> > event_pre_values;
|
||||
Vector<struct Table_info> table_infos;
|
||||
|
||||
static void do_begin(Ndb *ndb, struct Trans_arg &trans_arg)
|
||||
{
|
||||
trans_arg.ndb = ndb;
|
||||
trans_arg.trans = ndb->startTransaction();
|
||||
trans_arg.bytes_batched = 0;
|
||||
}
|
||||
|
||||
static void do_equal(NdbOperation *op,
|
||||
NdbEventOperation *pOp)
|
||||
{
|
||||
struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
|
||||
Vector<NdbRecAttr*> &ev = event_values[ti->id];
|
||||
const NdbDictionary::Table *tab= pOp->getTable();
|
||||
unsigned i, n_columns = tab->getNoOfColumns();
|
||||
for (i= 0; i < n_columns; i++)
|
||||
{
|
||||
if (tab->getColumn(i)->getPrimaryKey() &&
|
||||
op->equal(i, ev[i]->aRef()))
|
||||
{
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void do_set_value(NdbOperation *op,
|
||||
NdbEventOperation *pOp)
|
||||
{
|
||||
struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
|
||||
Vector<NdbRecAttr*> &ev = event_values[ti->id];
|
||||
const NdbDictionary::Table *tab= pOp->getTable();
|
||||
unsigned i, n_columns = tab->getNoOfColumns();
|
||||
for (i= 0; i < n_columns; i++)
|
||||
{
|
||||
if (!tab->getColumn(i)->getPrimaryKey() &&
|
||||
op->setValue(i, ev[i]->aRef()))
|
||||
{
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void do_insert(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
|
||||
{
|
||||
if (!trans_arg.trans)
|
||||
return;
|
||||
|
||||
NdbOperation *op =
|
||||
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
|
||||
op->writeTuple();
|
||||
|
||||
do_equal(op, pOp);
|
||||
do_set_value(op, pOp);
|
||||
|
||||
trans_arg.bytes_batched++;
|
||||
if (trans_arg.bytes_batched > BATCH_SIZE)
|
||||
{
|
||||
trans_arg.trans->execute(NdbTransaction::NoCommit);
|
||||
trans_arg.bytes_batched = 0;
|
||||
}
|
||||
}
|
||||
static void do_update(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
|
||||
{
|
||||
if (!trans_arg.trans)
|
||||
return;
|
||||
|
||||
NdbOperation *op =
|
||||
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
|
||||
op->writeTuple();
|
||||
|
||||
do_equal(op, pOp);
|
||||
do_set_value(op, pOp);
|
||||
|
||||
trans_arg.bytes_batched++;
|
||||
if (trans_arg.bytes_batched > BATCH_SIZE)
|
||||
{
|
||||
trans_arg.trans->execute(NdbTransaction::NoCommit);
|
||||
trans_arg.bytes_batched = 0;
|
||||
}
|
||||
}
|
||||
static void do_delete(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
|
||||
{
|
||||
if (!trans_arg.trans)
|
||||
return;
|
||||
|
||||
NdbOperation *op =
|
||||
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
|
||||
op->deleteTuple();
|
||||
|
||||
do_equal(op, pOp);
|
||||
|
||||
trans_arg.bytes_batched++;
|
||||
if (trans_arg.bytes_batched > BATCH_SIZE)
|
||||
{
|
||||
trans_arg.trans->execute(NdbTransaction::NoCommit);
|
||||
trans_arg.bytes_batched = 0;
|
||||
}
|
||||
}
|
||||
static void do_commit(struct Trans_arg &trans_arg)
|
||||
{
|
||||
if (!trans_arg.trans)
|
||||
return;
|
||||
trans_arg.trans->execute(NdbTransaction::Commit);
|
||||
trans_arg.ndb->closeTransaction(trans_arg.trans);
|
||||
}
|
||||
|
||||
int
|
||||
main(int argc, const char** argv){
|
||||
ndb_init();
|
||||
|
@ -29,8 +151,14 @@ main(int argc, const char** argv){
|
|||
|
||||
int _help = 0;
|
||||
const char* db = 0;
|
||||
const char* connectstring1 = 0;
|
||||
const char* connectstring2 = 0;
|
||||
|
||||
struct getargs args[] = {
|
||||
{ "connectstring1", 'c',
|
||||
arg_string, &connectstring1, "connectstring1", "" },
|
||||
{ "connectstring2", 'C',
|
||||
arg_string, &connectstring2, "connectstring2", "" },
|
||||
{ "database", 'd', arg_string, &db, "Database", "" },
|
||||
{ "usage", '?', arg_flag, &_help, "Print help", "" }
|
||||
};
|
||||
|
@ -46,7 +174,7 @@ main(int argc, const char** argv){
|
|||
}
|
||||
|
||||
// Connect to Ndb
|
||||
Ndb_cluster_connection con;
|
||||
Ndb_cluster_connection con(connectstring1);
|
||||
if(con.connect(12, 5, 1) != 0)
|
||||
{
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
|
@ -61,12 +189,35 @@ main(int argc, const char** argv){
|
|||
// Connect to Ndb and wait for it to become ready
|
||||
while(MyNdb.waitUntilReady() != 0)
|
||||
ndbout << "Waiting for ndb to become ready..." << endl;
|
||||
|
||||
|
||||
Ndb_cluster_connection *con2 = NULL;
|
||||
Ndb *ndb2 = NULL;
|
||||
if (connectstring2)
|
||||
{
|
||||
con2 = new Ndb_cluster_connection(connectstring2);
|
||||
|
||||
if(con2->connect(12, 5, 1) != 0)
|
||||
{
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
}
|
||||
ndb2 = new Ndb( con2, db ? db : "TEST_DB" );
|
||||
|
||||
if(ndb2->init() != 0){
|
||||
ERR(ndb2->getNdbError());
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
}
|
||||
|
||||
// Connect to Ndb and wait for it to become ready
|
||||
while(ndb2->waitUntilReady() != 0)
|
||||
ndbout << "Waiting for ndb to become ready..." << endl;
|
||||
}
|
||||
|
||||
int result = 0;
|
||||
|
||||
NdbDictionary::Dictionary *myDict = MyNdb.getDictionary();
|
||||
Vector<NdbDictionary::Event*> events;
|
||||
Vector<NdbEventOperation*> event_ops;
|
||||
int sz = 0;
|
||||
for(i= optind; i<argc; i++)
|
||||
{
|
||||
const NdbDictionary::Table* table= myDict->getTable(argv[i]);
|
||||
|
@ -121,12 +272,23 @@ main(int argc, const char** argv){
|
|||
goto end;
|
||||
}
|
||||
|
||||
event_values.push_back(Vector<NdbRecAttr *>());
|
||||
event_pre_values.push_back(Vector<NdbRecAttr *>());
|
||||
for (int a = 0; a < table->getNoOfColumns(); a++)
|
||||
{
|
||||
pOp->getValue(table->getColumn(a)->getName());
|
||||
pOp->getPreValue(table->getColumn(a)->getName());
|
||||
event_values[sz].
|
||||
push_back(pOp->getValue(table->getColumn(a)->getName()));
|
||||
event_pre_values[sz].
|
||||
push_back(pOp->getPreValue(table->getColumn(a)->getName()));
|
||||
}
|
||||
event_ops.push_back(pOp);
|
||||
{
|
||||
struct Table_info ti;
|
||||
ti.id = sz;
|
||||
table_infos.push_back(ti);
|
||||
}
|
||||
pOp->setCustomData((void *)&table_infos[sz]);
|
||||
sz++;
|
||||
}
|
||||
|
||||
for(i= 0; i<(int)event_ops.size(); i++)
|
||||
|
@ -140,6 +302,7 @@ main(int argc, const char** argv){
|
|||
}
|
||||
}
|
||||
|
||||
struct Trans_arg trans_arg;
|
||||
while(true)
|
||||
{
|
||||
while(MyNdb.pollEvents(100) == 0);
|
||||
|
@ -149,18 +312,26 @@ main(int argc, const char** argv){
|
|||
{
|
||||
Uint64 gci= pOp->getGCI();
|
||||
Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0;
|
||||
if (ndb2)
|
||||
do_begin(ndb2, trans_arg);
|
||||
do
|
||||
{
|
||||
switch(pOp->getEventType())
|
||||
{
|
||||
case NdbDictionary::Event::TE_INSERT:
|
||||
cnt_i++;
|
||||
if (ndb2)
|
||||
do_insert(trans_arg, pOp);
|
||||
break;
|
||||
case NdbDictionary::Event::TE_DELETE:
|
||||
cnt_d++;
|
||||
if (ndb2)
|
||||
do_delete(trans_arg, pOp);
|
||||
break;
|
||||
case NdbDictionary::Event::TE_UPDATE:
|
||||
cnt_u++;
|
||||
if (ndb2)
|
||||
do_update(trans_arg, pOp);
|
||||
break;
|
||||
case NdbDictionary::Event::TE_CLUSTER_FAILURE:
|
||||
break;
|
||||
|
@ -180,12 +351,21 @@ main(int argc, const char** argv){
|
|||
abort();
|
||||
}
|
||||
} while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI());
|
||||
if (ndb2)
|
||||
do_commit(trans_arg);
|
||||
ndbout_c("GCI: %lld events: %lld(I) %lld(U) %lld(D)", gci, cnt_i, cnt_u, cnt_d);
|
||||
}
|
||||
}
|
||||
end:
|
||||
if (ndb2)
|
||||
delete ndb2;
|
||||
if (con2)
|
||||
delete con2;
|
||||
return NDBT_ProgramExit(NDBT_OK);
|
||||
}
|
||||
|
||||
template class Vector<struct Table_info>;
|
||||
template class Vector<NdbRecAttr*>;
|
||||
template class Vector< Vector<NdbRecAttr*> >;
|
||||
template class Vector<NdbDictionary::Event*>;
|
||||
template class Vector<NdbEventOperation*>;
|
||||
|
|
Loading…
Add table
Reference in a new issue