diff --git a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp index b1638599dcc..f0c3dbc2866 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp +++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp @@ -533,7 +533,6 @@ struct Operationrec { ,OP_STATE_WAITING = 0x00000 ,OP_STATE_RUNNING = 0x10000 ,OP_STATE_EXECUTED = 0x30000 - ,OP_STATE_RUNNING_ABORT = 0x20000 ,OP_EXECUTED_DIRTY_READ = 0x3050F ,OP_INITIAL = ~(Uint32)0 diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 5337c7c015d..62a786d9f0e 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -1181,6 +1181,7 @@ void Dbacc::execACCKEYREQ(Signal* signal) void Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI) { + jamEntry(); OperationrecPtr lastOp; lastOp.i = opPtrI; ptrCheckGuard(lastOp, coprecsize, operationrec); @@ -1200,9 +1201,6 @@ Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI) startNext(signal, lastOp); return; } - else if (opstate == Operationrec::OP_STATE_RUNNING_ABORT) - { - } else { } @@ -1240,15 +1238,14 @@ Dbacc::startNext(Signal* signal, OperationrecPtr lastOp) { jam(); ptrCheckGuard(loPtr, coprecsize, operationrec); - nextOp.i = loPtr.p->nextSerialQue; } else { jam(); - nextOp.i = loPtr.i; loPtr = lastOp; } + nextOp.i = loPtr.p->nextSerialQue; ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); if (nextOp.i == RNIL) @@ -1411,6 +1408,9 @@ conf: else { jam(); + fragrecptr.i = nextOp.p->fragptr; + ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); + sendAcckeyconf(signal); sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF, signal, 6, JBA); @@ -1680,8 +1680,7 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) bool running = false; { Uint32 opstate = loPtr.p->m_op_bits & Operationrec::OP_STATE_MASK; - if (opstate == Operationrec::OP_STATE_RUNNING || - opstate == Operationrec::OP_STATE_RUNNING_ABORT) + if (opstate == Operationrec::OP_STATE_RUNNING) running = true; else { @@ -1719,8 +1718,7 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) } else { - if (opstate == Operationrec::OP_STATE_RUNNING || - opstate == Operationrec::OP_STATE_RUNNING_ABORT) + if (opstate == Operationrec::OP_STATE_RUNNING) running = true; else vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED); @@ -1830,8 +1828,6 @@ operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr) out << " RUNNING "; break; case Dbacc::Operationrec::OP_STATE_EXECUTED: out << " EXECUTED "; break; - case Dbacc::Operationrec::OP_STATE_RUNNING_ABORT: - out << " RUNNIG_ABORT "; break; case Dbacc::Operationrec::OP_STATE_IDLE: out << " IDLE "; break; default: @@ -1857,7 +1853,6 @@ operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr) ,OP_STATE_WAITING = 0x0000 ,OP_STATE_RUNNING = 0x1000 ,OP_STATE_EXECUTED = 0x3000 - ,OP_STATE_RUNNING_ABORT = 0x2000 }; */ if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER) @@ -3950,6 +3945,7 @@ void Dbacc::checkoverfreelist(Signal* signal) void Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr) { + jam(); OperationrecPtr nextP; OperationrecPtr prevP; OperationrecPtr loPtr; @@ -3992,13 +3988,21 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr) else { jam(); + /** + * P0 - P1 + * + * Abort P1, check start next + */ ndbassert(prevP.p->m_op_bits & Operationrec::OP_LOCK_OWNER); prevP.p->m_lo_last_parallel_op_ptr_i = RNIL; startNext(signal, prevP); validate_lock_queue(prevP); return; } - + + /** + * Abort P1/P2 + */ if (opbits & Operationrec::OP_LOCK_MODE) { Uint32 nextbits = nextP.p->m_op_bits; @@ -4024,12 +4028,23 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr) /** * Abort P1, P2 */ + if (opstate == Operationrec::OP_STATE_RUNNING) + { + jam(); + startNext(signal, prevP); + validate_lock_queue(prevP); + return; + } + + ndbassert(opstate == Operationrec::OP_STATE_EXECUTED || + opstate == Operationrec::OP_STATE_WAITING); /** * Scan to last of run queue */ while (nextP.p->nextParallelQue != RNIL) { + jam(); nextP.i = nextP.p->nextParallelQue; ptrCheckGuard(nextP, coprecsize, operationrec); } @@ -4049,6 +4064,7 @@ Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr) void Dbacc::abortSerieQueueOperation(Signal* signal, OperationrecPtr opPtr) { + jam(); OperationrecPtr prevS, nextS; OperationrecPtr prevP, nextP; OperationrecPtr loPtr; @@ -4620,7 +4636,25 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit) * Aborting an operation can *always* lead to lock upgrade */ action = CHECK_LOCK_UPGRADE; - + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + if (opstate != Operationrec::OP_STATE_EXECUTED) + { + ndbassert(opstate == Operationrec::OP_STATE_RUNNING); + if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) + { + jam(); + report_dealloc(signal, opPtr.p); + newOwner.p->localdata[0] = ~0; + } + else + { + jam(); + newOwner.p->localdata[0] = opPtr.p->localdata[0]; + newOwner.p->localdata[1] = opPtr.p->localdata[1]; + } + action = START_NEW; + } + /** * Update ACC_LOCK_MODE */ diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 18e5e6cd585..89d275c6625 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -2604,11 +2604,6 @@ void Dblqh::execTUPKEYREF(Signal* signal) ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP || regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT); } - else if (getNodeState().startLevel == NodeState::SL_STARTED) - { - if (terrorCode == 899) - ndbout << "899: " << regTcPtr->m_row_id << endl; - } switch (tcConnectptr.p->transactionState) { case TcConnectionrec::WAIT_TUP: @@ -9095,6 +9090,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) if (accOpPtr != (Uint32)-1) { c_acc->execACCKEY_ORD(signal, accOpPtr); + jamEntry(); } else { @@ -9419,7 +9415,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo); const Uint32 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo); const Uint32 descending = ScanFragReq::getDescendingFlag(reqinfo); - const Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo); + Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo); const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo); const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo); @@ -9458,7 +9454,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr; scanptr.p->m_last_row = 0; scanptr.p->scanStoredProcId = RNIL; - + scanptr.p->copyPtr = RNIL; if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){ jam(); return ScanFragRef::ZWRONG_BATCH_SIZE; @@ -9479,8 +9475,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) * !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11 * idx uses from MAX_PARALLEL_SCANS_PER_FRAG - MAX = 12-42) */ - Uint32 start = (rangeScan || tupScan ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ); - Uint32 stop = (rangeScan || tupScan ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1); + tupScan = 0; // Make sure that close tup scan does not start acc scan incorrectly + Uint32 start = (rangeScan || tupScan) ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ; + Uint32 stop = (rangeScan || tupScan) ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : + MAX_PARALLEL_SCANS_PER_FRAG - 1; stop += start; Uint32 free = tFragPtr.p->m_scanNumberMask.find(start); diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp index 0d2ffe0927f..d9e94e63726 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp @@ -156,7 +156,7 @@ Dbtup::execNEXT_SCANREQ(Signal* signal) conf->scanPtr = scan.m_userPtr; unsigned signalLength = 1; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, - signal, signalLength, JBB); + signal, signalLength, JBB); return; } break; @@ -171,7 +171,7 @@ Dbtup::execNEXT_SCANREQ(Signal* signal) lockReq->requestInfo = AccLockReq::AbortWithConf; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, - signal, AccLockReq::UndoSignalLength); + signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_state = ScanOp::Aborting; @@ -182,10 +182,10 @@ Dbtup::execNEXT_SCANREQ(Signal* signal) ndbrequire(scan.m_accLockOp != RNIL); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; - lockReq->requestInfo = AccLockReq::Unlock; + lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, - signal, AccLockReq::UndoSignalLength); + signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOp = RNIL; @@ -433,7 +433,7 @@ Dbtup::execACCKEYCONF(Signal* signal) jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; - lockReq->requestInfo = AccLockReq::Unlock; + lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); diff --git a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index 3c0b2c4ed3f..55315806635 100644 --- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -321,7 +321,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) conf->scanPtr = scan.m_userPtr; unsigned signalLength = 1; sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, - signal, signalLength, JBB); + signal, signalLength, JBB); return; } break; @@ -344,7 +344,8 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::AbortWithConf; lockReq->accOpPtr = scan.m_accLockOp; - EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); + EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, + AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_state = ScanOp::Aborting; @@ -355,9 +356,10 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ndbrequire(scan.m_accLockOp != RNIL); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; - lockReq->requestInfo = AccLockReq::Unlock; + lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOp; - EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); + EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, + AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); scan.m_accLockOp = RNIL; @@ -612,7 +614,7 @@ Dbtux::execACCKEYCONF(Signal* signal) jam(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; - lockReq->requestInfo = AccLockReq::Unlock; + lockReq->requestInfo = AccLockReq::Abort; lockReq->accOpPtr = scan.m_accLockOp; EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); diff --git a/storage/ndb/test/include/HugoOperations.hpp b/storage/ndb/test/include/HugoOperations.hpp index c6ecb4c574e..995463e5056 100644 --- a/storage/ndb/test/include/HugoOperations.hpp +++ b/storage/ndb/test/include/HugoOperations.hpp @@ -108,6 +108,8 @@ public: NDBT_ResultRow& get_row(Uint32 idx) { return *rows[idx];} int execute_async(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError); + int execute_async_prepare(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError); + int wait_async(Ndb*, int timeout = -1); protected: diff --git a/storage/ndb/test/ndbapi/testBasic.cpp b/storage/ndb/test/ndbapi/testBasic.cpp index d45a8ecb7b1..f6a4bf37478 100644 --- a/storage/ndb/test/ndbapi/testBasic.cpp +++ b/storage/ndb/test/ndbapi/testBasic.cpp @@ -1090,11 +1090,6 @@ runMassiveRollback4(NDBT_Context* ctx, NDBT_Step* step){ ok = false; break; } - if (hugoOps.execute_NoCommit(pNdb) != 0) - { - ok = false; - break; - } } hugoOps.execute_Rollback(pNdb); CHECK(hugoOps.closeTransaction(pNdb) == 0); @@ -1199,6 +1194,61 @@ runTupErrors(NDBT_Context* ctx, NDBT_Step* step){ return NDBT_OK; } +int +runInsertError(NDBT_Context* ctx, NDBT_Step* step){ + + int result = NDBT_OK; + HugoOperations hugoOp1(*ctx->getTab()); + HugoOperations hugoOp2(*ctx->getTab()); + Ndb* pNdb = GETNDB(step); + + NdbRestarter restarter; + restarter.insertErrorInAllNodes(4017); + const Uint32 LOOPS = 10; + for (Uint32 i = 0; igetTab()); + Ndb* pNdb = GETNDB(step); + + NdbRestarter restarter; + restarter.insertErrorInAllNodes(4017); + + const Uint32 LOOPS = 1; + for (Uint32 i = 0; igetNumRecords(); int parallelism = ctx->getProperty("Parallelism", 240); int abort = ctx->getProperty("AbortProb", 5); + int tupscan = ctx->getProperty("TupScan", (Uint32)0); int i = 0; HugoTransactions hugoTrans(*ctx->getTab()); while (iisTestStopped()) { g_info << i << ": "; NdbOperation::LockMode lm = (NdbOperation::LockMode)(rand() % 3); + int scan_flags = 0; + + if (tupscan == 1) + scan_flags |= NdbScanOperation::SF_TupScan; + else if (tupscan == 2 && ((rand() & 0x800))) + { + scan_flags |= NdbScanOperation::SF_TupScan; + } + if (hugoTrans.scanReadRecords(GETNDB(step), records, abort, parallelism, - lm) != 0){ + lm, + scan_flags) != 0){ return NDBT_FAILED; } i++; @@ -1320,6 +1331,16 @@ TESTCASE("ScanRead488", STEPS(runRandScanRead, 70); FINALIZER(runClearTable); } +TESTCASE("ScanRead488T", + "Verify scan requirement: It's only possible to have 11 concurrent "\ + "scans per fragment running in Ndb kernel at the same time. "\ + "When this limit is exceeded the scan will be aborted with errorcode "\ + "488."){ + TC_PROPERTY("TupScan", 1); + INITIALIZER(runLoadTable); + STEPS(runRandScanRead, 70); + FINALIZER(runClearTable); +} TESTCASE("ScanRead488O", "Verify scan requirement: It's only possible to have 11 concurrent "\ "scans per fragment running in Ndb kernel at the same time. "\ @@ -1336,6 +1357,7 @@ TESTCASE("ScanRead488_Mixed", "scans per fragment running in Ndb kernel at the same time. "\ "When this limit is exceeded the scan will be aborted with errorcode "\ "488."){ + TC_PROPERTY("TupScan", 2); INITIALIZER(createOrderedPkIndex); INITIALIZER(runLoadTable); STEPS(runRandScanRead, 50); diff --git a/storage/ndb/test/run-test/daily-basic-tests.txt b/storage/ndb/test/run-test/daily-basic-tests.txt index 3fead45533f..a251c4db305 100644 --- a/storage/ndb/test/run-test/daily-basic-tests.txt +++ b/storage/ndb/test/run-test/daily-basic-tests.txt @@ -219,6 +219,14 @@ max-time: 500 cmd: testBasic args: -n TupError +max-time: 500 +cmd: testBasic +args: -n InsertError T1 + +max-time: 500 +cmd: testBasic +args: -n InsertError2 T1 + max-time: 500 cmd: testTimeout args: T1 @@ -273,6 +281,10 @@ max-time: 500 cmd: testScan args: -n ScanRead488O -l 10 T6 D1 D2 +max-time: 1000 +cmd: testScan +args: -n ScanRead488T -l 10 T6 D1 D2 + max-time: 1000 cmd: testScan args: -n ScanRead488_Mixed -l 10 T6 D1 D2 diff --git a/storage/ndb/test/src/HugoOperations.cpp b/storage/ndb/test/src/HugoOperations.cpp index 84ea88388dc..2903cb8810e 100644 --- a/storage/ndb/test/src/HugoOperations.cpp +++ b/storage/ndb/test/src/HugoOperations.cpp @@ -471,16 +471,33 @@ HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et, return NDBT_OK; } +int +HugoOperations::execute_async_prepare(Ndb* pNdb, NdbTransaction::ExecType et, + NdbTransaction::AbortOption eao){ + + m_async_reply= 0; + pTrans->executeAsynchPrepare(et, + HugoOperations_async_callback, + this, + eao); + + return NDBT_OK; +} + int HugoOperations::wait_async(Ndb* pNdb, int timeout) { - pNdb->pollNdb(1000); - - if(m_async_reply) + volatile int * wait = &m_async_reply; + while (!* wait) { - if(m_async_return) - ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl; - return m_async_return; + pNdb->sendPollNdb(1000); + + if(* wait) + { + if(m_async_return) + ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl; + return m_async_return; + } } ndbout_c("wait returned nothing..."); return -1;