mirror of
https://github.com/MariaDB/server.git
synced 2025-01-17 12:32:27 +01:00
Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new-ndb
into whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb
This commit is contained in:
commit
cfe7e01aca
10 changed files with 467 additions and 136 deletions
|
@ -6,7 +6,7 @@ Next DBTUP 4029
|
|||
Next DBLQH 5045
|
||||
Next DBDICT 6008
|
||||
Next DBDIH 7186
|
||||
Next DBTC 8053
|
||||
Next DBTC 8054
|
||||
Next CMVMI 9000
|
||||
Next BACKUP 10038
|
||||
Next DBUTIL 11002
|
||||
|
@ -248,6 +248,8 @@ Delay execution of ABORTCONF signal 2 seconds to generate time-out.
|
|||
|
||||
8050: Send ZABORT_TIMEOUT_BREAK delayed
|
||||
|
||||
8053: Crash in timeOutFoundLab, state CS_WAIT_COMMIT_CONF
|
||||
|
||||
ERROR CODES FOR TESTING TIME-OUT HANDLING IN DBTC
|
||||
-------------------------------------------------
|
||||
|
||||
|
|
|
@ -1124,6 +1124,38 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal)
|
|||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
if (arg == 9999)
|
||||
{
|
||||
Uint32 delay = 1000;
|
||||
switch(signal->getLength()){
|
||||
case 1:
|
||||
break;
|
||||
case 2:
|
||||
delay = signal->theData[1];
|
||||
break;
|
||||
default:{
|
||||
Uint32 dmin = signal->theData[1];
|
||||
Uint32 dmax = signal->theData[2];
|
||||
delay = dmin + (rand() % (dmax - dmin));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
signal->theData[0] = 9999;
|
||||
if (delay == 0)
|
||||
{
|
||||
execNDB_TAMPER(signal);
|
||||
}
|
||||
else if (delay < 10)
|
||||
{
|
||||
sendSignal(reference(), GSN_NDB_TAMPER, signal, 1, JBB);
|
||||
}
|
||||
else
|
||||
{
|
||||
sendSignalWithDelay(reference(), GSN_NDB_TAMPER, signal, delay, 1);
|
||||
}
|
||||
}
|
||||
}//Cmvmi::execDUMP_STATE_ORD()
|
||||
|
||||
void
|
||||
|
|
|
@ -6483,6 +6483,7 @@ void Dbtc::timeOutFoundLab(Signal* signal, Uint32 TapiConPtr, Uint32 errCode)
|
|||
return;
|
||||
case CS_WAIT_COMMIT_CONF:
|
||||
jam();
|
||||
CRASH_INSERTION(8053);
|
||||
tcConnectptr.i = apiConnectptr.p->currentTcConnect;
|
||||
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
|
||||
arrGuard(apiConnectptr.p->currentReplicaNo, MAX_REPLICAS);
|
||||
|
|
|
@ -238,6 +238,13 @@ Pgman::execCONTINUEB(Signal* signal)
|
|||
}
|
||||
else
|
||||
{
|
||||
if (ERROR_INSERTED(11007))
|
||||
{
|
||||
ndbout << "No more writes..." << endl;
|
||||
SET_ERROR_INSERT_VALUE(11008);
|
||||
signal->theData[0] = 9999;
|
||||
sendSignalWithDelay(CMVMI_REF, GSN_NDB_TAMPER, signal, 10000, 1);
|
||||
}
|
||||
signal->theData[0] = m_end_lcp_req.senderData;
|
||||
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
|
||||
}
|
||||
|
@ -1301,6 +1308,13 @@ Pgman::process_lcp(Signal* signal)
|
|||
}
|
||||
else
|
||||
{
|
||||
if (ERROR_INSERTED(11007))
|
||||
{
|
||||
ndbout << "No more writes..." << endl;
|
||||
signal->theData[0] = 9999;
|
||||
sendSignalWithDelay(CMVMI_REF, GSN_NDB_TAMPER, signal, 10000, 1);
|
||||
SET_ERROR_INSERT_VALUE(11008);
|
||||
}
|
||||
signal->theData[0] = m_end_lcp_req.senderData;
|
||||
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
|
||||
}
|
||||
|
@ -1588,8 +1602,11 @@ Pgman::fswritereq(Signal* signal, Ptr<Page_entry> ptr)
|
|||
}
|
||||
#endif
|
||||
|
||||
sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
|
||||
FsReadWriteReq::FixedLength + 1, JBA);
|
||||
if (!ERROR_INSERTED(11008))
|
||||
{
|
||||
sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal,
|
||||
FsReadWriteReq::FixedLength + 1, JBA);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -2452,6 +2469,11 @@ Pgman::execDUMP_STATE_ORD(Signal* signal)
|
|||
{
|
||||
SET_ERROR_INSERT_VALUE(11006);
|
||||
}
|
||||
|
||||
if (signal->theData[0] == 11007)
|
||||
{
|
||||
SET_ERROR_INSERT_VALUE(11007);
|
||||
}
|
||||
}
|
||||
|
||||
// page cache client
|
||||
|
|
|
@ -36,6 +36,16 @@ public:
|
|||
int updateValue = 0,
|
||||
bool abort = false);
|
||||
|
||||
int loadTableStartFrom(Ndb*,
|
||||
int startFrom,
|
||||
int records,
|
||||
int batch = 512,
|
||||
bool allowConstraintViolation = true,
|
||||
int doSleep = 0,
|
||||
bool oneTrans = false,
|
||||
int updateValue = 0,
|
||||
bool abort = false);
|
||||
|
||||
int scanReadRecords(Ndb*,
|
||||
int records,
|
||||
int abort = 0,
|
||||
|
@ -56,6 +66,11 @@ public:
|
|||
int batchsize = 1,
|
||||
NdbOperation::LockMode = NdbOperation::LM_Read);
|
||||
|
||||
int scanUpdateRecords(Ndb*, NdbScanOperation::ScanFlag,
|
||||
int records,
|
||||
int abort = 0,
|
||||
int parallelism = 0);
|
||||
|
||||
int scanUpdateRecords(Ndb*,
|
||||
int records,
|
||||
int abort = 0,
|
||||
|
@ -90,9 +105,12 @@ public:
|
|||
int records,
|
||||
int percentToLock = 1,
|
||||
int lockTime = 1000);
|
||||
|
||||
int fillTable(Ndb*,
|
||||
int batch=512);
|
||||
|
||||
int fillTableStartFrom(Ndb*, int startFrom, int batch=512);
|
||||
|
||||
/**
|
||||
* Reading using UniqHashIndex with key = pk
|
||||
*/
|
||||
|
|
|
@ -29,6 +29,11 @@ public:
|
|||
|
||||
int closeTransaction(Ndb*);
|
||||
|
||||
int clearTable(Ndb*,
|
||||
NdbScanOperation::ScanFlag,
|
||||
int records = 0,
|
||||
int parallelism = 0);
|
||||
|
||||
int clearTable(Ndb*,
|
||||
int records = 0,
|
||||
int parallelism = 0);
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include <NdbRestarter.hpp>
|
||||
#include <Vector.hpp>
|
||||
#include <signaldata/DumpStateOrd.hpp>
|
||||
#include <NdbBackup.hpp>
|
||||
|
||||
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){
|
||||
|
||||
|
@ -1293,6 +1294,158 @@ runBug28770(NDBT_Context* ctx, NDBT_Step* step) {
|
|||
return result;
|
||||
}
|
||||
|
||||
int runSR_DD_1(NDBT_Context* ctx, NDBT_Step* step)
|
||||
{
|
||||
Ndb* pNdb = GETNDB(step);
|
||||
int result = NDBT_OK;
|
||||
Uint32 loops = ctx->getNumLoops();
|
||||
int count;
|
||||
NdbRestarter restarter;
|
||||
NdbBackup backup(GETNDB(step)->getNodeId()+1);
|
||||
bool lcploop = ctx->getProperty("LCP", (unsigned)0);
|
||||
|
||||
Uint32 i = 1;
|
||||
Uint32 backupId;
|
||||
|
||||
int val[] = { DumpStateOrd::CmvmiSetRestartOnErrorInsert, 1 };
|
||||
int lcp = DumpStateOrd::DihMinTimeBetweenLCP;
|
||||
|
||||
int startFrom = 0;
|
||||
|
||||
HugoTransactions hugoTrans(*ctx->getTab());
|
||||
while(i<=loops && result != NDBT_FAILED)
|
||||
{
|
||||
|
||||
if (lcploop)
|
||||
{
|
||||
CHECK(restarter.dumpStateAllNodes(&lcp, 1) == 0);
|
||||
}
|
||||
|
||||
int nodeId = restarter.getDbNodeId(rand() % restarter.getNumDbNodes());
|
||||
//CHECK(restarter.dumpStateAllNodes(&val, 1) == 0);
|
||||
|
||||
ndbout << "Loop " << i << "/"<< loops <<" started" << endl;
|
||||
ndbout << "Loading records..." << startFrom << endl;
|
||||
CHECK(hugoTrans.loadTable(pNdb, startFrom) == 0);
|
||||
|
||||
ndbout << "Making " << nodeId << " crash" << endl;
|
||||
int kill[] = { 9999, 1000, 3000 };
|
||||
CHECK(restarter.dumpStateOneNode(nodeId, val, 2) == 0);
|
||||
CHECK(restarter.dumpStateOneNode(nodeId, kill, 3) == 0);
|
||||
|
||||
Uint64 end = NdbTick_CurrentMillisecond() + 4000;
|
||||
Uint32 row = startFrom;
|
||||
do {
|
||||
ndbout << "Loading from " << row << " to " << row + 1000 << endl;
|
||||
if (hugoTrans.loadTableStartFrom(pNdb, row, 1000) != 0)
|
||||
break;
|
||||
row += 1000;
|
||||
} while (NdbTick_CurrentMillisecond() < end);
|
||||
|
||||
ndbout << "Waiting for " << nodeId << " to restart" << endl;
|
||||
CHECK(restarter.waitNodesNoStart(&nodeId, 1) == 0);
|
||||
|
||||
ndbout << "Restarting cluster" << endl;
|
||||
CHECK(restarter.restartAll(false, true, true) == 0);
|
||||
CHECK(restarter.waitClusterNoStart() == 0);
|
||||
CHECK(restarter.startAll() == 0);
|
||||
CHECK(restarter.waitClusterStarted() == 0);
|
||||
|
||||
ndbout << "Starting backup..." << flush;
|
||||
CHECK(backup.start(backupId) == 0);
|
||||
ndbout << "done" << endl;
|
||||
|
||||
int cnt = 0;
|
||||
CHECK(hugoTrans.selectCount(pNdb, 0, &cnt) == 0);
|
||||
ndbout << "Found " << cnt << " records..." << endl;
|
||||
ndbout << "Clearing..." << endl;
|
||||
CHECK(hugoTrans.clearTable(pNdb,
|
||||
NdbScanOperation::SF_TupScan, cnt) == 0);
|
||||
|
||||
if (cnt > startFrom)
|
||||
{
|
||||
startFrom = cnt;
|
||||
}
|
||||
startFrom += 1000;
|
||||
i++;
|
||||
}
|
||||
|
||||
ndbout << "runSR_DD_1 finished" << endl;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
int runSR_DD_2(NDBT_Context* ctx, NDBT_Step* step)
|
||||
{
|
||||
Ndb* pNdb = GETNDB(step);
|
||||
int result = NDBT_OK;
|
||||
Uint32 loops = ctx->getNumLoops();
|
||||
Uint32 rows = ctx->getNumRecords();
|
||||
int count;
|
||||
NdbRestarter restarter;
|
||||
NdbBackup backup(GETNDB(step)->getNodeId()+1);
|
||||
bool lcploop = ctx->getProperty("LCP", (unsigned)0);
|
||||
|
||||
Uint32 i = 1;
|
||||
Uint32 backupId;
|
||||
|
||||
int val[] = { DumpStateOrd::CmvmiSetRestartOnErrorInsert, 1 };
|
||||
int lcp = DumpStateOrd::DihMinTimeBetweenLCP;
|
||||
|
||||
int startFrom = 0;
|
||||
|
||||
HugoTransactions hugoTrans(*ctx->getTab());
|
||||
while(i<=loops && result != NDBT_FAILED)
|
||||
{
|
||||
|
||||
if (lcploop)
|
||||
{
|
||||
CHECK(restarter.dumpStateAllNodes(&lcp, 1) == 0);
|
||||
}
|
||||
|
||||
int nodeId = restarter.getDbNodeId(rand() % restarter.getNumDbNodes());
|
||||
|
||||
ndbout << "Making " << nodeId << " crash" << endl;
|
||||
int kill[] = { 9999, 3000, 10000 };
|
||||
CHECK(restarter.dumpStateOneNode(nodeId, val, 2) == 0);
|
||||
CHECK(restarter.dumpStateOneNode(nodeId, kill, 3) == 0);
|
||||
|
||||
Uint64 end = NdbTick_CurrentMillisecond() + 11000;
|
||||
Uint32 row = startFrom;
|
||||
do {
|
||||
if (hugoTrans.loadTable(pNdb, rows) != 0)
|
||||
break;
|
||||
|
||||
if (hugoTrans.clearTable(pNdb, NdbScanOperation::SF_TupScan, rows) != 0)
|
||||
break;
|
||||
} while (NdbTick_CurrentMillisecond() < end);
|
||||
|
||||
ndbout << "Waiting for " << nodeId << " to restart" << endl;
|
||||
CHECK(restarter.waitNodesNoStart(&nodeId, 1) == 0);
|
||||
|
||||
ndbout << "Restarting cluster" << endl;
|
||||
CHECK(restarter.restartAll(false, true, true) == 0);
|
||||
CHECK(restarter.waitClusterNoStart() == 0);
|
||||
CHECK(restarter.startAll() == 0);
|
||||
CHECK(restarter.waitClusterStarted() == 0);
|
||||
|
||||
ndbout << "Starting backup..." << flush;
|
||||
CHECK(backup.start(backupId) == 0);
|
||||
ndbout << "done" << endl;
|
||||
|
||||
int cnt = 0;
|
||||
CHECK(hugoTrans.selectCount(pNdb, 0, &cnt) == 0);
|
||||
ndbout << "Found " << cnt << " records..." << endl;
|
||||
ndbout << "Clearing..." << endl;
|
||||
CHECK(hugoTrans.clearTable(pNdb,
|
||||
NdbScanOperation::SF_TupScan, cnt) == 0);
|
||||
i++;
|
||||
}
|
||||
|
||||
ndbout << "runSR_DD_2 finished" << endl;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
NDBT_TESTSUITE(testSystemRestart);
|
||||
TESTCASE("SR1",
|
||||
|
@ -1474,6 +1627,32 @@ TESTCASE("Bug24664",
|
|||
STEP(runBug24664);
|
||||
FINALIZER(runClearTable);
|
||||
}
|
||||
TESTCASE("SR_DD_1", "")
|
||||
{
|
||||
INITIALIZER(runWaitStarted);
|
||||
STEP(runSR_DD_1);
|
||||
FINALIZER(runClearTable);
|
||||
}
|
||||
TESTCASE("SR_DD_1_LCP", "")
|
||||
{
|
||||
TC_PROPERTY("LCP", 1);
|
||||
INITIALIZER(runWaitStarted);
|
||||
STEP(runSR_DD_1);
|
||||
FINALIZER(runClearTable);
|
||||
}
|
||||
TESTCASE("SR_DD_2", "")
|
||||
{
|
||||
INITIALIZER(runWaitStarted);
|
||||
STEP(runSR_DD_2);
|
||||
FINALIZER(runClearTable);
|
||||
}
|
||||
TESTCASE("SR_DD_2_LCP", "")
|
||||
{
|
||||
TC_PROPERTY("LCP", 1);
|
||||
INITIALIZER(runWaitStarted);
|
||||
STEP(runSR_DD_2);
|
||||
FINALIZER(runClearTable);
|
||||
}
|
||||
TESTCASE("Bug29167", "")
|
||||
{
|
||||
INITIALIZER(runWaitStarted);
|
||||
|
|
|
@ -945,3 +945,36 @@ args: -n Bug28804 T1 T3
|
|||
max-time: 180
|
||||
cmd: testIndex
|
||||
args: -n Bug28804_ATTRINFO T1 T3
|
||||
|
||||
max-time: 1500
|
||||
cmd: testSystemRestart
|
||||
args: -n SR_DD_1 D1
|
||||
|
||||
max-time: 1500
|
||||
cmd: testSystemRestart
|
||||
args: -n SR_DD_1 D2
|
||||
|
||||
max-time: 1500
|
||||
cmd: testSystemRestart
|
||||
args: -n SR_DD_1_LCP D1
|
||||
|
||||
max-time: 1500
|
||||
cmd: testSystemRestart
|
||||
args: -n SR_DD_1_LCP D2
|
||||
|
||||
max-time: 1500
|
||||
cmd: testSystemRestart
|
||||
args: -n SR_DD_2 D1
|
||||
|
||||
max-time: 1500
|
||||
cmd: testSystemRestart
|
||||
args: -n SR_DD_2 D2
|
||||
|
||||
max-time: 1500
|
||||
cmd: testSystemRestart
|
||||
args: -n SR_DD_2_LCP D1
|
||||
|
||||
max-time: 1500
|
||||
cmd: testSystemRestart
|
||||
args: -n SR_DD_2_LCP D2
|
||||
|
||||
|
|
|
@ -341,50 +341,14 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
|
|||
|
||||
int
|
||||
HugoTransactions::scanUpdateRecords(Ndb* pNdb,
|
||||
int records,
|
||||
int abortPercent,
|
||||
int parallelism){
|
||||
if(m_defaultScanUpdateMethod == 1){
|
||||
return scanUpdateRecords1(pNdb, records, abortPercent, parallelism);
|
||||
} else if(m_defaultScanUpdateMethod == 2){
|
||||
return scanUpdateRecords2(pNdb, records, abortPercent, parallelism);
|
||||
} else {
|
||||
return scanUpdateRecords3(pNdb, records, abortPercent, parallelism);
|
||||
}
|
||||
}
|
||||
|
||||
// Scan all records exclusive and update
|
||||
// them one by one
|
||||
int
|
||||
HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
|
||||
int records,
|
||||
int abortPercent,
|
||||
int parallelism){
|
||||
return scanUpdateRecords3(pNdb, records, abortPercent, 1);
|
||||
}
|
||||
|
||||
// Scan all records exclusive and update
|
||||
// them batched by asking nextScanResult to
|
||||
// give us all cached records before fetching new
|
||||
// records from db
|
||||
int
|
||||
HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
|
||||
int records,
|
||||
int abortPercent,
|
||||
int parallelism){
|
||||
return scanUpdateRecords3(pNdb, records, abortPercent, parallelism);
|
||||
}
|
||||
|
||||
int
|
||||
HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
|
||||
int records,
|
||||
int abortPercent,
|
||||
int parallelism){
|
||||
int retryAttempt = 0;
|
||||
NdbScanOperation::ScanFlag flags,
|
||||
int records,
|
||||
int abortPercent,
|
||||
int parallelism){
|
||||
int retryAttempt = 0;
|
||||
int check, a;
|
||||
NdbScanOperation *pOp;
|
||||
|
||||
|
||||
while (true){
|
||||
restart:
|
||||
if (retryAttempt++ >= m_retryMax){
|
||||
|
@ -411,8 +375,9 @@ restart:
|
|||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
if( pOp->readTuplesExclusive(parallelism) ) {
|
||||
ERR(pTrans->getNdbError());
|
||||
if( pOp->readTuples(NdbOperation::LM_Exclusive, flags,
|
||||
parallelism))
|
||||
{
|
||||
closeTransaction(pNdb);
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
|
@ -429,15 +394,18 @@ restart:
|
|||
check = pTrans->execute(NoCommit, AbortOnError);
|
||||
if( check == -1 ) {
|
||||
const NdbError err = pTrans->getNdbError();
|
||||
ERR(err);
|
||||
closeTransaction(pNdb);
|
||||
if (err.status == NdbError::TemporaryError){
|
||||
ERR(err);
|
||||
closeTransaction(pNdb);
|
||||
NdbSleep_MilliSleep(50);
|
||||
retryAttempt++;
|
||||
continue;
|
||||
}
|
||||
ERR(err);
|
||||
closeTransaction(pNdb);
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
|
||||
// Abort after 1-100 or 1-records rows
|
||||
int ranVal = rand();
|
||||
int abortCount = ranVal % (records == 0 ? 100 : records);
|
||||
|
@ -448,74 +416,113 @@ restart:
|
|||
abortTrans = true;
|
||||
}
|
||||
|
||||
int eof;
|
||||
int rows = 0;
|
||||
while((check = pOp->nextResult(true)) == 0){
|
||||
do {
|
||||
rows++;
|
||||
NdbOperation* pUp = pOp->updateCurrentTuple();
|
||||
if(pUp == 0){
|
||||
while((eof = pOp->nextResult(true)) == 0){
|
||||
rows++;
|
||||
if (calc.verifyRowValues(&row) != 0){
|
||||
closeTransaction(pNdb);
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
if (abortCount == rows && abortTrans == true){
|
||||
ndbout << "Scan is aborted" << endl;
|
||||
g_info << "Scan is aborted" << endl;
|
||||
pOp->close();
|
||||
if( check == -1 ) {
|
||||
ERR(pTrans->getNdbError());
|
||||
closeTransaction(pNdb);
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
const int updates = calc.getUpdatesValue(&row) + 1;
|
||||
const int r = calc.getIdValue(&row);
|
||||
for(a = 0; a<tab.getNoOfColumns(); a++){
|
||||
if (tab.getColumn(a)->getPrimaryKey() == false){
|
||||
if(setValueForAttr(pUp, a, r, updates ) != 0){
|
||||
ERR(pTrans->getNdbError());
|
||||
closeTransaction(pNdb);
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (rows == abortCount && abortTrans == true){
|
||||
g_info << "Scan is aborted" << endl;
|
||||
// This scan should be aborted
|
||||
closeTransaction(pNdb);
|
||||
return NDBT_OK;
|
||||
}
|
||||
} while((check = pOp->nextResult(false)) == 0);
|
||||
|
||||
if(check != -1){
|
||||
check = pTrans->execute(Commit, AbortOnError);
|
||||
if(check != -1)
|
||||
m_latest_gci = pTrans->getGCI();
|
||||
pTrans->restart();
|
||||
}
|
||||
|
||||
const NdbError err = pTrans->getNdbError();
|
||||
if( check == -1 ) {
|
||||
|
||||
closeTransaction(pNdb);
|
||||
ERR(err);
|
||||
if (err.status == NdbError::TemporaryError){
|
||||
NdbSleep_MilliSleep(50);
|
||||
goto restart;
|
||||
}
|
||||
return NDBT_FAILED;
|
||||
return NDBT_OK;
|
||||
}
|
||||
}
|
||||
|
||||
const NdbError err = pTrans->getNdbError();
|
||||
if( check == -1 ) {
|
||||
closeTransaction(pNdb);
|
||||
ERR(err);
|
||||
if (eof == -1) {
|
||||
const NdbError err = pTrans->getNdbError();
|
||||
|
||||
if (err.status == NdbError::TemporaryError){
|
||||
ERR_INFO(err);
|
||||
closeTransaction(pNdb);
|
||||
NdbSleep_MilliSleep(50);
|
||||
goto restart;
|
||||
switch (err.code){
|
||||
case 488:
|
||||
case 245:
|
||||
case 490:
|
||||
// Too many active scans, no limit on number of retry attempts
|
||||
break;
|
||||
default:
|
||||
retryAttempt++;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
ERR(err);
|
||||
closeTransaction(pNdb);
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
closeTransaction(pNdb);
|
||||
|
||||
g_info << rows << " rows have been read" << endl;
|
||||
if (records != 0 && rows != records){
|
||||
g_err << "Check expected number of records failed" << endl
|
||||
<< " expected=" << records <<", " << endl
|
||||
<< " read=" << rows << endl;
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
closeTransaction(pNdb);
|
||||
|
||||
g_info << rows << " rows have been updated" << endl;
|
||||
return NDBT_OK;
|
||||
}
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
int
|
||||
HugoTransactions::scanUpdateRecords(Ndb* pNdb,
|
||||
int records,
|
||||
int abortPercent,
|
||||
int parallelism){
|
||||
|
||||
return scanUpdateRecords(pNdb,
|
||||
(NdbScanOperation::ScanFlag)0,
|
||||
records, abortPercent, parallelism);
|
||||
}
|
||||
|
||||
// Scan all records exclusive and update
|
||||
// them one by one
|
||||
int
|
||||
HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
|
||||
int records,
|
||||
int abortPercent,
|
||||
int parallelism){
|
||||
return scanUpdateRecords(pNdb,
|
||||
(NdbScanOperation::ScanFlag)0,
|
||||
records, abortPercent, 1);
|
||||
}
|
||||
|
||||
// Scan all records exclusive and update
|
||||
// them batched by asking nextScanResult to
|
||||
// give us all cached records before fetching new
|
||||
// records from db
|
||||
int
|
||||
HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
|
||||
int records,
|
||||
int abortPercent,
|
||||
int parallelism){
|
||||
return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
|
||||
records, abortPercent, parallelism);
|
||||
}
|
||||
|
||||
int
|
||||
HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
|
||||
int records,
|
||||
int abortPercent,
|
||||
int parallelism)
|
||||
{
|
||||
return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
|
||||
records, abortPercent, parallelism);
|
||||
}
|
||||
|
||||
int
|
||||
HugoTransactions::loadTable(Ndb* pNdb,
|
||||
int records,
|
||||
|
@ -524,7 +531,22 @@ HugoTransactions::loadTable(Ndb* pNdb,
|
|||
int doSleep,
|
||||
bool oneTrans,
|
||||
int value,
|
||||
bool abort){
|
||||
bool abort)
|
||||
{
|
||||
return loadTableStartFrom(pNdb, 0, records, batch, allowConstraintViolation,
|
||||
doSleep, oneTrans, value, abort);
|
||||
}
|
||||
|
||||
int
|
||||
HugoTransactions::loadTableStartFrom(Ndb* pNdb,
|
||||
int startFrom,
|
||||
int records,
|
||||
int batch,
|
||||
bool allowConstraintViolation,
|
||||
int doSleep,
|
||||
bool oneTrans,
|
||||
int value,
|
||||
bool abort){
|
||||
int check;
|
||||
int retryAttempt = 0;
|
||||
int retryMax = 5;
|
||||
|
@ -543,8 +565,9 @@ HugoTransactions::loadTable(Ndb* pNdb,
|
|||
<< " -> rows/commit = " << batch << endl;
|
||||
}
|
||||
|
||||
Uint32 orgbatch = batch;
|
||||
g_info << "|- Inserting records..." << endl;
|
||||
for (int c=0 ; c<records ; ){
|
||||
for (int c=0 ; c<records; ){
|
||||
bool closeTrans = true;
|
||||
|
||||
if(c + batch > records)
|
||||
|
@ -578,7 +601,7 @@ HugoTransactions::loadTable(Ndb* pNdb,
|
|||
}
|
||||
}
|
||||
|
||||
if(pkInsertRecord(pNdb, c, batch, value) != NDBT_OK)
|
||||
if(pkInsertRecord(pNdb, c + startFrom, batch, value) != NDBT_OK)
|
||||
{
|
||||
ERR(pTrans->getNdbError());
|
||||
closeTransaction(pNdb);
|
||||
|
@ -625,6 +648,7 @@ HugoTransactions::loadTable(Ndb* pNdb,
|
|||
ERR(err);
|
||||
NdbSleep_MilliSleep(50);
|
||||
retryAttempt++;
|
||||
batch = 1;
|
||||
continue;
|
||||
break;
|
||||
|
||||
|
@ -670,7 +694,14 @@ HugoTransactions::loadTable(Ndb* pNdb,
|
|||
|
||||
int
|
||||
HugoTransactions::fillTable(Ndb* pNdb,
|
||||
int batch){
|
||||
int batch){
|
||||
return fillTableStartFrom(pNdb, 0, batch);
|
||||
}
|
||||
|
||||
int
|
||||
HugoTransactions::fillTableStartFrom(Ndb* pNdb,
|
||||
int startFrom,
|
||||
int batch){
|
||||
int check;
|
||||
int retryAttempt = 0;
|
||||
int retryMax = 5;
|
||||
|
@ -688,7 +719,7 @@ HugoTransactions::fillTable(Ndb* pNdb,
|
|||
<< " -> rows/commit = " << batch << endl;
|
||||
}
|
||||
|
||||
for (int c=0 ; ; ){
|
||||
for (int c=startFrom ; ; ){
|
||||
|
||||
if (retryAttempt >= retryMax){
|
||||
g_info << "Record " << c << " could not be inserted, has retried "
|
||||
|
|
|
@ -42,38 +42,9 @@ UtilTransactions::UtilTransactions(Ndb* ndb,
|
|||
|
||||
int
|
||||
UtilTransactions::clearTable(Ndb* pNdb,
|
||||
int records,
|
||||
int parallelism){
|
||||
if(m_defaultClearMethod == 1){
|
||||
return clearTable1(pNdb, records, parallelism);
|
||||
} else if(m_defaultClearMethod == 2){
|
||||
return clearTable2(pNdb, records, parallelism);
|
||||
} else {
|
||||
return clearTable3(pNdb, records, parallelism);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
UtilTransactions::clearTable1(Ndb* pNdb,
|
||||
int records,
|
||||
int parallelism)
|
||||
{
|
||||
return clearTable3(pNdb, records, 1);
|
||||
}
|
||||
|
||||
int
|
||||
UtilTransactions::clearTable2(Ndb* pNdb,
|
||||
int records,
|
||||
int parallelism)
|
||||
{
|
||||
return clearTable3(pNdb, records, parallelism);
|
||||
}
|
||||
|
||||
int
|
||||
UtilTransactions::clearTable3(Ndb* pNdb,
|
||||
int records,
|
||||
int parallelism){
|
||||
NdbScanOperation::ScanFlag flags,
|
||||
int records,
|
||||
int parallelism){
|
||||
// Scan all records exclusive and delete
|
||||
// them one by one
|
||||
int retryAttempt = 0;
|
||||
|
@ -116,7 +87,7 @@ UtilTransactions::clearTable3(Ndb* pNdb,
|
|||
goto failed;
|
||||
}
|
||||
|
||||
if( pOp->readTuplesExclusive(par) ) {
|
||||
if( pOp->readTuples(NdbOperation::LM_Exclusive, flags, par) ) {
|
||||
err = pTrans->getNdbError();
|
||||
goto failed;
|
||||
}
|
||||
|
@ -179,6 +150,43 @@ UtilTransactions::clearTable3(Ndb* pNdb,
|
|||
return (err.code != 0 ? err.code : NDBT_FAILED);
|
||||
}
|
||||
|
||||
int
|
||||
UtilTransactions::clearTable(Ndb* pNdb,
|
||||
int records,
|
||||
int parallelism){
|
||||
|
||||
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
|
||||
records, parallelism);
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
UtilTransactions::clearTable1(Ndb* pNdb,
|
||||
int records,
|
||||
int parallelism)
|
||||
{
|
||||
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
|
||||
records, 1);
|
||||
}
|
||||
|
||||
int
|
||||
UtilTransactions::clearTable2(Ndb* pNdb,
|
||||
int records,
|
||||
int parallelism)
|
||||
{
|
||||
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
|
||||
records, parallelism);
|
||||
}
|
||||
|
||||
int
|
||||
UtilTransactions::clearTable3(Ndb* pNdb,
|
||||
int records,
|
||||
int parallelism)
|
||||
{
|
||||
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
|
||||
records, parallelism);
|
||||
}
|
||||
|
||||
int
|
||||
UtilTransactions::copyTableData(Ndb* pNdb,
|
||||
const char* destName){
|
||||
|
|
Loading…
Reference in a new issue