mirror of
https://github.com/MariaDB/server.git
synced 2025-01-16 03:52:35 +01:00
Merge whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-telco-gca
into whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-single-user BitKeeper/etc/ignore: auto-union storage/ndb/test/tools/Makefile.am: manual merge storage/ndb/test/tools/listen.cpp: manual merge
This commit is contained in:
commit
e79f14e152
4 changed files with 503 additions and 5 deletions
13
.bzrignore
13
.bzrignore
|
@ -2640,6 +2640,16 @@ storage/ndb/lib/libNEWTON_BASICTEST_COMMON.so
|
|||
storage/ndb/lib/libREP_API.so
|
||||
storage/ndb/lib/libndbclient.so
|
||||
storage/ndb/lib/libndbclient_extra.so
|
||||
storage/ndb/ndbapi-examples/mgmapi_logevent/mgmapi_logevent
|
||||
storage/ndb/ndbapi-examples/mgmapi_logevent2/mgmapi_logevent2
|
||||
storage/ndb/ndbapi-examples/ndbapi_async/ndbapi_async
|
||||
storage/ndb/ndbapi-examples/ndbapi_async1/ndbapi_async1
|
||||
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event
|
||||
storage/ndb/ndbapi-examples/ndbapi_retries/ndbapi_retries
|
||||
storage/ndb/ndbapi-examples/ndbapi_scan/ndbapi_scan
|
||||
storage/ndb/ndbapi-examples/ndbapi_simple/ndbapi_simple
|
||||
storage/ndb/ndbapi-examples/ndbapi_simple_dual/ndbapi_simple_dual
|
||||
storage/ndb/ndbapi-examples/ndbapi_simple_index/ndbapi_simple_index
|
||||
storage/ndb/src/common/debugger/libtrace.dsp
|
||||
storage/ndb/src/common/debugger/signaldata/libsignaldataprint.dsp
|
||||
storage/ndb/src/common/logger/liblogger.dsp
|
||||
|
@ -2717,6 +2727,8 @@ storage/ndb/test/ndbapi/testDataBuffers
|
|||
storage/ndb/test/ndbapi/testDeadlock
|
||||
storage/ndb/test/ndbapi/testDict
|
||||
storage/ndb/test/ndbapi/testIndex
|
||||
storage/ndb/test/ndbapi/testIndexStat
|
||||
storage/ndb/test/ndbapi/testInterpreter
|
||||
storage/ndb/test/ndbapi/testLcp
|
||||
storage/ndb/test/ndbapi/testMgm
|
||||
storage/ndb/test/ndbapi/testNdbApi
|
||||
|
@ -2752,6 +2764,7 @@ storage/ndb/test/tools/hugoScanRead
|
|||
storage/ndb/test/tools/hugoScanUpdate
|
||||
storage/ndb/test/tools/listen_event
|
||||
storage/ndb/test/tools/ndb_cpcc
|
||||
storage/ndb/test/tools/rep_latency
|
||||
storage/ndb/test/tools/restart
|
||||
storage/ndb/test/tools/verify_index
|
||||
storage/ndb/tools/ndb_config
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
|
||||
ndbtest_PROGRAMS = hugoLoad hugoFill hugoLockRecords hugoPkDelete hugoPkRead hugoPkReadRecord hugoPkUpdate hugoScanRead hugoScanUpdate restart verify_index copy_tab create_index ndb_cpcc listen_event eventlog
|
||||
ndbtest_PROGRAMS = hugoLoad hugoFill hugoLockRecords hugoPkDelete hugoPkRead hugoPkReadRecord hugoPkUpdate hugoScanRead hugoScanUpdate restart verify_index copy_tab create_index ndb_cpcc listen_event eventlog rep_latency
|
||||
|
||||
# transproxy
|
||||
|
||||
|
@ -34,6 +34,7 @@ create_index_SOURCES = create_index.cpp
|
|||
ndb_cpcc_SOURCES = cpcc.cpp
|
||||
listen_event_SOURCES = listen.cpp
|
||||
eventlog_SOURCES = log_listner.cpp
|
||||
rep_latency_SOURCES = rep_latency.cpp
|
||||
|
||||
include $(top_srcdir)/storage/ndb/config/common.mk.am
|
||||
include $(top_srcdir)/storage/ndb/config/type_ndbapitest.mk.am
|
||||
|
|
|
@ -22,6 +22,128 @@
|
|||
#include <getarg.h>
|
||||
|
||||
|
||||
#define BATCH_SIZE 128
|
||||
struct Table_info
|
||||
{
|
||||
Uint32 id;
|
||||
};
|
||||
|
||||
struct Trans_arg
|
||||
{
|
||||
Ndb *ndb;
|
||||
NdbTransaction *trans;
|
||||
Uint32 bytes_batched;
|
||||
};
|
||||
|
||||
Vector< Vector<NdbRecAttr*> > event_values;
|
||||
Vector< Vector<NdbRecAttr*> > event_pre_values;
|
||||
Vector<struct Table_info> table_infos;
|
||||
|
||||
static void do_begin(Ndb *ndb, struct Trans_arg &trans_arg)
|
||||
{
|
||||
trans_arg.ndb = ndb;
|
||||
trans_arg.trans = ndb->startTransaction();
|
||||
trans_arg.bytes_batched = 0;
|
||||
}
|
||||
|
||||
static void do_equal(NdbOperation *op,
|
||||
NdbEventOperation *pOp)
|
||||
{
|
||||
struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
|
||||
Vector<NdbRecAttr*> &ev = event_values[ti->id];
|
||||
const NdbDictionary::Table *tab= pOp->getTable();
|
||||
unsigned i, n_columns = tab->getNoOfColumns();
|
||||
for (i= 0; i < n_columns; i++)
|
||||
{
|
||||
if (tab->getColumn(i)->getPrimaryKey() &&
|
||||
op->equal(i, ev[i]->aRef()))
|
||||
{
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void do_set_value(NdbOperation *op,
|
||||
NdbEventOperation *pOp)
|
||||
{
|
||||
struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
|
||||
Vector<NdbRecAttr*> &ev = event_values[ti->id];
|
||||
const NdbDictionary::Table *tab= pOp->getTable();
|
||||
unsigned i, n_columns = tab->getNoOfColumns();
|
||||
for (i= 0; i < n_columns; i++)
|
||||
{
|
||||
if (!tab->getColumn(i)->getPrimaryKey() &&
|
||||
op->setValue(i, ev[i]->aRef()))
|
||||
{
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void do_insert(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
|
||||
{
|
||||
if (!trans_arg.trans)
|
||||
return;
|
||||
|
||||
NdbOperation *op =
|
||||
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
|
||||
op->writeTuple();
|
||||
|
||||
do_equal(op, pOp);
|
||||
do_set_value(op, pOp);
|
||||
|
||||
trans_arg.bytes_batched++;
|
||||
if (trans_arg.bytes_batched > BATCH_SIZE)
|
||||
{
|
||||
trans_arg.trans->execute(NdbTransaction::NoCommit);
|
||||
trans_arg.bytes_batched = 0;
|
||||
}
|
||||
}
|
||||
static void do_update(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
|
||||
{
|
||||
if (!trans_arg.trans)
|
||||
return;
|
||||
|
||||
NdbOperation *op =
|
||||
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
|
||||
op->writeTuple();
|
||||
|
||||
do_equal(op, pOp);
|
||||
do_set_value(op, pOp);
|
||||
|
||||
trans_arg.bytes_batched++;
|
||||
if (trans_arg.bytes_batched > BATCH_SIZE)
|
||||
{
|
||||
trans_arg.trans->execute(NdbTransaction::NoCommit);
|
||||
trans_arg.bytes_batched = 0;
|
||||
}
|
||||
}
|
||||
static void do_delete(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
|
||||
{
|
||||
if (!trans_arg.trans)
|
||||
return;
|
||||
|
||||
NdbOperation *op =
|
||||
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
|
||||
op->deleteTuple();
|
||||
|
||||
do_equal(op, pOp);
|
||||
|
||||
trans_arg.bytes_batched++;
|
||||
if (trans_arg.bytes_batched > BATCH_SIZE)
|
||||
{
|
||||
trans_arg.trans->execute(NdbTransaction::NoCommit);
|
||||
trans_arg.bytes_batched = 0;
|
||||
}
|
||||
}
|
||||
static void do_commit(struct Trans_arg &trans_arg)
|
||||
{
|
||||
if (!trans_arg.trans)
|
||||
return;
|
||||
trans_arg.trans->execute(NdbTransaction::Commit);
|
||||
trans_arg.ndb->closeTransaction(trans_arg.trans);
|
||||
}
|
||||
|
||||
int
|
||||
main(int argc, const char** argv){
|
||||
ndb_init();
|
||||
|
@ -29,8 +151,14 @@ main(int argc, const char** argv){
|
|||
|
||||
int _help = 0;
|
||||
const char* db = 0;
|
||||
const char* connectstring1 = 0;
|
||||
const char* connectstring2 = 0;
|
||||
|
||||
struct getargs args[] = {
|
||||
{ "connectstring1", 'c',
|
||||
arg_string, &connectstring1, "connectstring1", "" },
|
||||
{ "connectstring2", 'C',
|
||||
arg_string, &connectstring2, "connectstring2", "" },
|
||||
{ "database", 'd', arg_string, &db, "Database", "" },
|
||||
{ "usage", '?', arg_flag, &_help, "Print help", "" }
|
||||
};
|
||||
|
@ -46,7 +174,7 @@ main(int argc, const char** argv){
|
|||
}
|
||||
|
||||
// Connect to Ndb
|
||||
Ndb_cluster_connection con;
|
||||
Ndb_cluster_connection con(connectstring1);
|
||||
if(con.connect(12, 5, 1) != 0)
|
||||
{
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
|
@ -61,12 +189,35 @@ main(int argc, const char** argv){
|
|||
// Connect to Ndb and wait for it to become ready
|
||||
while(MyNdb.waitUntilReady() != 0)
|
||||
ndbout << "Waiting for ndb to become ready..." << endl;
|
||||
|
||||
|
||||
Ndb_cluster_connection *con2 = NULL;
|
||||
Ndb *ndb2 = NULL;
|
||||
if (connectstring2)
|
||||
{
|
||||
con2 = new Ndb_cluster_connection(connectstring2);
|
||||
|
||||
if(con2->connect(12, 5, 1) != 0)
|
||||
{
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
}
|
||||
ndb2 = new Ndb( con2, db ? db : "TEST_DB" );
|
||||
|
||||
if(ndb2->init() != 0){
|
||||
ERR(ndb2->getNdbError());
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
}
|
||||
|
||||
// Connect to Ndb and wait for it to become ready
|
||||
while(ndb2->waitUntilReady() != 0)
|
||||
ndbout << "Waiting for ndb to become ready..." << endl;
|
||||
}
|
||||
|
||||
int result = 0;
|
||||
|
||||
NdbDictionary::Dictionary *myDict = MyNdb.getDictionary();
|
||||
Vector<NdbDictionary::Event*> events;
|
||||
Vector<NdbEventOperation*> event_ops;
|
||||
int sz = 0;
|
||||
for(i= optind; i<argc; i++)
|
||||
{
|
||||
const NdbDictionary::Table* table= myDict->getTable(argv[i]);
|
||||
|
@ -121,12 +272,23 @@ main(int argc, const char** argv){
|
|||
goto end;
|
||||
}
|
||||
|
||||
event_values.push_back(Vector<NdbRecAttr *>());
|
||||
event_pre_values.push_back(Vector<NdbRecAttr *>());
|
||||
for (int a = 0; a < table->getNoOfColumns(); a++)
|
||||
{
|
||||
pOp->getValue(table->getColumn(a)->getName());
|
||||
pOp->getPreValue(table->getColumn(a)->getName());
|
||||
event_values[sz].
|
||||
push_back(pOp->getValue(table->getColumn(a)->getName()));
|
||||
event_pre_values[sz].
|
||||
push_back(pOp->getPreValue(table->getColumn(a)->getName()));
|
||||
}
|
||||
event_ops.push_back(pOp);
|
||||
{
|
||||
struct Table_info ti;
|
||||
ti.id = sz;
|
||||
table_infos.push_back(ti);
|
||||
}
|
||||
pOp->setCustomData((void *)&table_infos[sz]);
|
||||
sz++;
|
||||
}
|
||||
|
||||
for(i= 0; i<(int)event_ops.size(); i++)
|
||||
|
@ -140,6 +302,7 @@ main(int argc, const char** argv){
|
|||
}
|
||||
}
|
||||
|
||||
struct Trans_arg trans_arg;
|
||||
while(true)
|
||||
{
|
||||
while(MyNdb.pollEvents(100) == 0);
|
||||
|
@ -149,18 +312,26 @@ main(int argc, const char** argv){
|
|||
{
|
||||
Uint64 gci= pOp->getGCI();
|
||||
Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0;
|
||||
if (ndb2)
|
||||
do_begin(ndb2, trans_arg);
|
||||
do
|
||||
{
|
||||
switch(pOp->getEventType())
|
||||
{
|
||||
case NdbDictionary::Event::TE_INSERT:
|
||||
cnt_i++;
|
||||
if (ndb2)
|
||||
do_insert(trans_arg, pOp);
|
||||
break;
|
||||
case NdbDictionary::Event::TE_DELETE:
|
||||
cnt_d++;
|
||||
if (ndb2)
|
||||
do_delete(trans_arg, pOp);
|
||||
break;
|
||||
case NdbDictionary::Event::TE_UPDATE:
|
||||
cnt_u++;
|
||||
if (ndb2)
|
||||
do_update(trans_arg, pOp);
|
||||
break;
|
||||
case NdbDictionary::Event::TE_CLUSTER_FAILURE:
|
||||
break;
|
||||
|
@ -180,6 +351,8 @@ main(int argc, const char** argv){
|
|||
abort();
|
||||
}
|
||||
} while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI());
|
||||
if (ndb2)
|
||||
do_commit(trans_arg);
|
||||
ndbout_c("GCI: %lld events: %lld(I) %lld(U) %lld(D)", gci, cnt_i, cnt_u, cnt_d);
|
||||
}
|
||||
}
|
||||
|
@ -187,8 +360,15 @@ end:
|
|||
for(i= 0; i<(int)event_ops.size(); i++)
|
||||
MyNdb.dropEventOperation(event_ops[i]);
|
||||
|
||||
if (ndb2)
|
||||
delete ndb2;
|
||||
if (con2)
|
||||
delete con2;
|
||||
return NDBT_ProgramExit(NDBT_OK);
|
||||
}
|
||||
|
||||
template class Vector<struct Table_info>;
|
||||
template class Vector<NdbRecAttr*>;
|
||||
template class Vector< Vector<NdbRecAttr*> >;
|
||||
template class Vector<NdbDictionary::Event*>;
|
||||
template class Vector<NdbEventOperation*>;
|
||||
|
|
304
storage/ndb/test/tools/rep_latency.cpp
Normal file
304
storage/ndb/test/tools/rep_latency.cpp
Normal file
|
@ -0,0 +1,304 @@
|
|||
/* Copyright (C) 2003 MySQL AB
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
/*
|
||||
* Update on master wait for update on slave
|
||||
*
|
||||
*/
|
||||
|
||||
#include <NdbApi.hpp>
|
||||
#include <NdbSleep.h>
|
||||
#include <sys/time.h>
|
||||
#include <NdbOut.hpp>
|
||||
#include <NDBT.hpp>
|
||||
|
||||
struct Xxx
|
||||
{
|
||||
Ndb *ndb;
|
||||
const NdbDictionary::Table *table;
|
||||
Uint32 pk_col;
|
||||
Uint32 col;
|
||||
};
|
||||
|
||||
struct XxxR
|
||||
{
|
||||
Uint32 pk_val;
|
||||
Uint32 val;
|
||||
struct timeval start_time;
|
||||
Uint32 latency;
|
||||
};
|
||||
|
||||
static int
|
||||
prepare_master_or_slave(Ndb &myNdb,
|
||||
const char* table,
|
||||
const char* pk,
|
||||
Uint32 pk_val,
|
||||
const char* col,
|
||||
struct Xxx &xxx,
|
||||
struct XxxR &xxxr);
|
||||
static void
|
||||
run_master_update(struct Xxx &xxx, struct XxxR &xxxr);
|
||||
static void
|
||||
run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr);
|
||||
|
||||
#define PRINT_ERROR(code,msg) \
|
||||
g_err << "Error in " << __FILE__ << ", line: " << __LINE__ \
|
||||
<< ", code: " << code \
|
||||
<< ", msg: " << msg << ".\n"
|
||||
#define APIERROR(error) { \
|
||||
PRINT_ERROR((error).code, (error).message); \
|
||||
exit(-1); }
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
if (argc != 8)
|
||||
{
|
||||
ndbout << "Arguments are <connect_string cluster 1> <connect_string cluster 2> <database> <table name> <primary key> <value of primary key> <attribute to update>.\n";
|
||||
exit(-1);
|
||||
}
|
||||
// ndb_init must be called first
|
||||
ndb_init();
|
||||
{
|
||||
const char *opt_connectstring1 = argv[1];
|
||||
const char *opt_connectstring2 = argv[2];
|
||||
const char *opt_db = argv[3];
|
||||
const char *opt_table = argv[4];
|
||||
const char *opt_pk = argv[5];
|
||||
const Uint32 opt_pk_val = atoi(argv[6]);
|
||||
const char *opt_col = argv[7];
|
||||
|
||||
// Object representing the cluster 1
|
||||
Ndb_cluster_connection cluster1_connection(opt_connectstring1);
|
||||
// Object representing the cluster 2
|
||||
Ndb_cluster_connection cluster2_connection(opt_connectstring2);
|
||||
|
||||
// connect cluster 1 and run application
|
||||
// Connect to cluster 1 management server (ndb_mgmd)
|
||||
if (cluster1_connection.connect(4 /* retries */,
|
||||
5 /* delay between retries */,
|
||||
1 /* verbose */))
|
||||
{
|
||||
g_err << "Cluster 1 management server was not ready within 30 secs.\n";
|
||||
exit(-1);
|
||||
}
|
||||
// Optionally connect and wait for the storage nodes (ndbd's)
|
||||
if (cluster1_connection.wait_until_ready(30,0) < 0)
|
||||
{
|
||||
g_err << "Cluster 1 was not ready within 30 secs.\n";
|
||||
exit(-1);
|
||||
}
|
||||
// connect cluster 2 and run application
|
||||
// Connect to cluster management server (ndb_mgmd)
|
||||
if (cluster2_connection.connect(4 /* retries */,
|
||||
5 /* delay between retries */,
|
||||
1 /* verbose */))
|
||||
{
|
||||
g_err << "Cluster 2 management server was not ready within 30 secs.\n";
|
||||
exit(-1);
|
||||
}
|
||||
// Optionally connect and wait for the storage nodes (ndbd's)
|
||||
if (cluster2_connection.wait_until_ready(30,0) < 0)
|
||||
{
|
||||
g_err << "Cluster 2 was not ready within 30 secs.\n";
|
||||
exit(-1);
|
||||
}
|
||||
// Object representing the database
|
||||
Ndb myNdb1(&cluster1_connection, opt_db);
|
||||
Ndb myNdb2(&cluster2_connection, opt_db);
|
||||
//
|
||||
struct Xxx xxx1;
|
||||
struct Xxx xxx2;
|
||||
struct XxxR xxxr;
|
||||
prepare_master_or_slave(myNdb1, opt_table, opt_pk, opt_pk_val, opt_col,
|
||||
xxx1, xxxr);
|
||||
prepare_master_or_slave(myNdb2, opt_table, opt_pk, opt_pk_val, opt_col,
|
||||
xxx2, xxxr);
|
||||
while (1)
|
||||
{
|
||||
// run the application code
|
||||
run_master_update(xxx1, xxxr);
|
||||
run_slave_wait(xxx2, xxxr);
|
||||
ndbout << "latency: " << xxxr.latency << endl;
|
||||
}
|
||||
}
|
||||
// Note: all connections must have been destroyed before calling ndb_end()
|
||||
ndb_end(0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
prepare_master_or_slave(Ndb &myNdb,
|
||||
const char* table,
|
||||
const char* pk,
|
||||
Uint32 pk_val,
|
||||
const char* col,
|
||||
struct Xxx &xxx,
|
||||
struct XxxR &xxxr)
|
||||
{
|
||||
if (myNdb.init())
|
||||
APIERROR(myNdb.getNdbError());
|
||||
const NdbDictionary::Dictionary* myDict = myNdb.getDictionary();
|
||||
const NdbDictionary::Table *myTable = myDict->getTable(table);
|
||||
if (myTable == NULL)
|
||||
APIERROR(myDict->getNdbError());
|
||||
const NdbDictionary::Column *myPkCol = myTable->getColumn(pk);
|
||||
if (myPkCol == NULL)
|
||||
APIERROR(myDict->getNdbError());
|
||||
if (myPkCol->getType() != NdbDictionary::Column::Unsigned)
|
||||
{
|
||||
PRINT_ERROR(0, "Primary key column not of type unsigned");
|
||||
exit(-1);
|
||||
}
|
||||
const NdbDictionary::Column *myCol = myTable->getColumn(col);
|
||||
if (myCol == NULL)
|
||||
APIERROR(myDict->getNdbError());
|
||||
if (myCol->getType() != NdbDictionary::Column::Unsigned)
|
||||
{
|
||||
PRINT_ERROR(0, "Update column not of type unsigned");
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
xxx.ndb = &myNdb;
|
||||
xxx.table = myTable;
|
||||
xxx.pk_col = myPkCol->getColumnNo();
|
||||
xxx.col = myCol->getColumnNo();
|
||||
|
||||
xxxr.pk_val = pk_val;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void run_master_update(struct Xxx &xxx, struct XxxR &xxxr)
|
||||
{
|
||||
Ndb *ndb = xxx.ndb;
|
||||
const NdbDictionary::Table *myTable = xxx.table;
|
||||
int retry_sleep= 10; /* 10 milliseconds */
|
||||
int retries= 100;
|
||||
while (1)
|
||||
{
|
||||
Uint32 val;
|
||||
NdbTransaction *trans = ndb->startTransaction();
|
||||
if (trans == NULL)
|
||||
goto err;
|
||||
{
|
||||
NdbOperation *op = trans->getNdbOperation(myTable);
|
||||
if (op == NULL)
|
||||
APIERROR(trans->getNdbError());
|
||||
op->readTupleExclusive();
|
||||
op->equal(xxx.pk_col, xxxr.pk_val);
|
||||
op->getValue(xxx.col, (char *)&val);
|
||||
}
|
||||
if (trans->execute(NdbTransaction::NoCommit))
|
||||
goto err;
|
||||
//fprintf(stderr, "read %u\n", val);
|
||||
xxxr.val = val + 1;
|
||||
{
|
||||
NdbOperation *op = trans->getNdbOperation(myTable);
|
||||
if (op == NULL)
|
||||
APIERROR(trans->getNdbError());
|
||||
op->updateTuple();
|
||||
op->equal(xxx.pk_col, xxxr.pk_val);
|
||||
op->setValue(xxx.col, xxxr.val);
|
||||
}
|
||||
if (trans->execute(NdbTransaction::Commit))
|
||||
goto err;
|
||||
ndb->closeTransaction(trans);
|
||||
//fprintf(stderr, "updated to %u\n", xxxr.val);
|
||||
break;
|
||||
err:
|
||||
const NdbError this_error= trans ?
|
||||
trans->getNdbError() : ndb->getNdbError();
|
||||
if (this_error.status == NdbError::TemporaryError)
|
||||
{
|
||||
if (retries--)
|
||||
{
|
||||
if (trans)
|
||||
ndb->closeTransaction(trans);
|
||||
NdbSleep_MilliSleep(retry_sleep);
|
||||
continue; // retry
|
||||
}
|
||||
}
|
||||
if (trans)
|
||||
ndb->closeTransaction(trans);
|
||||
APIERROR(this_error);
|
||||
}
|
||||
/* update done start timer */
|
||||
gettimeofday(&xxxr.start_time, 0);
|
||||
}
|
||||
|
||||
static void run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr)
|
||||
{
|
||||
struct timeval old_end_time = xxxr.start_time, end_time;
|
||||
Ndb *ndb = xxx.ndb;
|
||||
const NdbDictionary::Table *myTable = xxx.table;
|
||||
int retry_sleep= 10; /* 10 milliseconds */
|
||||
int retries= 100;
|
||||
while (1)
|
||||
{
|
||||
Uint32 val;
|
||||
NdbTransaction *trans = ndb->startTransaction();
|
||||
if (trans == NULL)
|
||||
goto err;
|
||||
{
|
||||
NdbOperation *op = trans->getNdbOperation(myTable);
|
||||
if (op == NULL)
|
||||
APIERROR(trans->getNdbError());
|
||||
op->readTuple();
|
||||
op->equal(xxx.pk_col, xxxr.pk_val);
|
||||
op->getValue(xxx.col, (char *)&val);
|
||||
if (trans->execute(NdbTransaction::Commit))
|
||||
goto err;
|
||||
}
|
||||
/* read done, check time of read */
|
||||
gettimeofday(&end_time, 0);
|
||||
ndb->closeTransaction(trans);
|
||||
//fprintf(stderr, "read %u waiting for %u\n", val, xxxr.val);
|
||||
if (xxxr.val != val)
|
||||
{
|
||||
/* expected value not received yet */
|
||||
retries = 100;
|
||||
NdbSleep_MilliSleep(retry_sleep);
|
||||
old_end_time = end_time;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
err:
|
||||
const NdbError this_error= trans ?
|
||||
trans->getNdbError() : ndb->getNdbError();
|
||||
if (this_error.status == NdbError::TemporaryError)
|
||||
{
|
||||
if (retries--)
|
||||
{
|
||||
if (trans)
|
||||
ndb->closeTransaction(trans);
|
||||
NdbSleep_MilliSleep(retry_sleep);
|
||||
continue; // retry
|
||||
}
|
||||
}
|
||||
if (trans)
|
||||
ndb->closeTransaction(trans);
|
||||
APIERROR(this_error);
|
||||
}
|
||||
|
||||
Int64 elapsed_usec1 =
|
||||
((Int64)end_time.tv_sec - (Int64)xxxr.start_time.tv_sec)*1000*1000 +
|
||||
((Int64)end_time.tv_usec - (Int64)xxxr.start_time.tv_usec);
|
||||
Int64 elapsed_usec2 =
|
||||
((Int64)end_time.tv_sec - (Int64)old_end_time.tv_sec)*1000*1000 +
|
||||
((Int64)end_time.tv_usec - (Int64)old_end_time.tv_usec);
|
||||
xxxr.latency =
|
||||
((elapsed_usec1 - elapsed_usec2/2)+999)/1000;
|
||||
}
|
Loading…
Reference in a new issue