diff --git a/ndb/include/ndbapi/NdbIndexScanOperation.hpp b/ndb/include/ndbapi/NdbIndexScanOperation.hpp index 638ecb17779..7ef66f9a30b 100644 --- a/ndb/include/ndbapi/NdbIndexScanOperation.hpp +++ b/ndb/include/ndbapi/NdbIndexScanOperation.hpp @@ -41,7 +41,9 @@ public: * @param parallel No of fragments to scan in parallel (0=max) */ virtual int readTuples(LockMode lock_mode = LM_Read, - Uint32 scan_flags = 0, Uint32 parallel = 0); + Uint32 scan_flags = 0, + Uint32 parallel = 0, + Uint32 batch = 0); #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL /** @@ -69,7 +71,7 @@ public: (SF_ReadRangeNo & -(Int32)read_range_no) | (SF_KeyInfo & -(Int32)keyinfo); - return readTuples(lock_mode, scan_flags, parallel); + return readTuples(lock_mode, scan_flags, parallel, batch); } #endif diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp index 5581e830f82..beaf9402b77 100644 --- a/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/ndb/include/ndbapi/NdbScanOperation.hpp @@ -57,7 +57,9 @@ public: */ virtual int readTuples(LockMode lock_mode = LM_Read, - Uint32 scan_flags = 0, Uint32 parallel = 0); + Uint32 scan_flags = 0, + Uint32 parallel = 0, + Uint32 batch = 0); #ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED /** diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index ecb67d04050..09834d39af4 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -7373,15 +7373,15 @@ void Dblqh::scanLockReleasedLab(Signal* signal) scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes = 0; closeScanLab(signal); + } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) { + jam(); + closeScanLab(signal); + return; } else if (scanptr.p->check_scan_batch_completed() && scanptr.p->scanLockHold != ZTRUE) { jam(); scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; sendScanFragConf(signal, ZFALSE); - } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) { - jam(); - closeScanLab(signal); - return; } else { jam(); /* diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index 193e1fea08b..71f3aff05d4 100644 --- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -7047,6 +7047,18 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal, found = true; } } + + ScanFragList deliv(c_scan_frag_pool, scanptr.p->m_delivered_scan_frags); + for(deliv.first(ptr); !ptr.isNull(); deliv.next(ptr)) + { + jam(); + if (refToNode(ptr.p->lqhBlockref) == failedNodeId) + { + jam(); + found = true; + break; + } + } } if(found){ jam(); diff --git a/ndb/src/kernel/vm/RequestTracker.hpp b/ndb/src/kernel/vm/RequestTracker.hpp index 5fd1ae7255a..ac9ed85ae4b 100644 --- a/ndb/src/kernel/vm/RequestTracker.hpp +++ b/ndb/src/kernel/vm/RequestTracker.hpp @@ -26,12 +26,12 @@ public: void init() { m_confs.clear(); m_nRefs = 0; } template - void init(SafeCounterManager& mgr, + bool init(SafeCounterManager& mgr, NodeReceiverGroup rg, Uint16 GSN, Uint32 senderData) { init(); SafeCounter tmp(mgr, m_sc); - tmp.init(rg, GSN, senderData); + return tmp.init(rg, GSN, senderData); } bool ignoreRef(SafeCounterManager& mgr, Uint32 nodeId) diff --git a/ndb/src/kernel/vm/SafeCounter.hpp b/ndb/src/kernel/vm/SafeCounter.hpp index 3ee5e076ab8..917a67f2508 100644 --- a/ndb/src/kernel/vm/SafeCounter.hpp +++ b/ndb/src/kernel/vm/SafeCounter.hpp @@ -230,10 +230,13 @@ inline bool SafeCounter::init(NodeReceiverGroup rg, Uint16 GSN, Uint32 senderData){ - bool b = init(rg.m_block, GSN, senderData); - m_nodes = rg.m_nodes; - m_count = m_nodes.count(); - return b; + if (init(rg.m_block, GSN, senderData)) + { + m_nodes = rg.m_nodes; + m_count = m_nodes.count(); + return true; + } + return false; } template @@ -241,10 +244,13 @@ inline bool SafeCounter::init(NodeReceiverGroup rg, Uint32 senderData){ - bool b = init(rg.m_block, Ref::GSN, senderData); - m_nodes = rg.m_nodes; - m_count = m_nodes.count(); - return b; + if (init(rg.m_block, Ref::GSN, senderData)) + { + m_nodes = rg.m_nodes; + m_count = m_nodes.count(); + return true; + } + return false; } inline diff --git a/ndb/src/mgmsrv/ConfigInfo.cpp b/ndb/src/mgmsrv/ConfigInfo.cpp index 629ddf7a655..ab4f2b413b3 100644 --- a/ndb/src/mgmsrv/ConfigInfo.cpp +++ b/ndb/src/mgmsrv/ConfigInfo.cpp @@ -30,6 +30,7 @@ extern my_bool opt_core; #define MAX_LINE_LENGTH 255 #define KEY_INTERNAL 0 #define MAX_INT_RNIL 0xfffffeff +#define MAX_PORT_NO 65535 #define _STR_VALUE(x) #x #define STR_VALUE(x) _STR_VALUE(x) @@ -422,7 +423,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::CI_INT, UNDEFINED, "1", - STR_VALUE(MAX_INT_RNIL) }, + STR_VALUE(MAX_PORT_NO) }, { CFG_DB_NO_REPLICAS, @@ -1462,7 +1463,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::CI_INT, NDB_PORT, "0", - STR_VALUE(MAX_INT_RNIL) }, + STR_VALUE(MAX_PORT_NO) }, { KEY_INTERNAL, @@ -1474,7 +1475,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::CI_INT, UNDEFINED, "0", - STR_VALUE(MAX_INT_RNIL) }, + STR_VALUE(MAX_PORT_NO) }, { CFG_NODE_ARBIT_RANK, @@ -1616,7 +1617,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::CI_INT, MANDATORY, "0", - STR_VALUE(MAX_INT_RNIL) }, + STR_VALUE(MAX_PORT_NO) }, { CFG_TCP_SEND_BUFFER_SIZE, @@ -1722,7 +1723,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::CI_INT, MANDATORY, "0", - STR_VALUE(MAX_INT_RNIL) }, + STR_VALUE(MAX_PORT_NO) }, { CFG_SHM_SIGNUM, @@ -1944,7 +1945,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::CI_INT, MANDATORY, "0", - STR_VALUE(MAX_INT_RNIL) }, + STR_VALUE(MAX_PORT_NO) }, { CFG_SCI_HOST1_ID_0, diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp index df16ae66915..62119880076 100644 --- a/ndb/src/ndbapi/NdbReceiver.cpp +++ b/ndb/src/ndbapi/NdbReceiver.cpp @@ -121,7 +121,15 @@ NdbReceiver::calculate_batch_size(Uint32 key_size, * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per * batch. */ - batch_byte_size= max_batch_byte_size; + if (batch_size == 0) + { + batch_byte_size= max_batch_byte_size; + } + else + { + batch_byte_size= batch_size * tot_size; + } + if (batch_byte_size * parallelism > max_scan_batch_size) { batch_byte_size= max_scan_batch_size / parallelism; } diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 7d0712e117d..a36d47c3471 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -117,7 +117,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection) int NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, Uint32 scan_flags, - Uint32 parallel) + Uint32 parallel, + Uint32 batch) { m_ordered = m_descending = false; Uint32 fragCount = m_currentTable->m_fragmentCount; @@ -181,9 +182,12 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, bool tupScan = (scan_flags & SF_TupScan); if (tupScan && rangeScan) tupScan = false; - - theParallelism = parallel; + if (rangeScan && (scan_flags & SF_OrderBy)) + parallel = fragCount; + + theParallelism = parallel; + if(fix_receivers(parallel) == -1){ setErrorCodeAbort(4000); return -1; @@ -202,6 +206,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, req->tableSchemaVersion = m_accessTable->m_version; req->storedProcId = 0xFFFF; req->buddyConPtr = theNdbCon->theBuddyConPtr; + req->first_batch_size = batch; // Save user specified batch size Uint32 reqInfo = 0; ScanTabReq::setParallelism(reqInfo, parallel); @@ -750,13 +755,14 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, * The number of records sent by each LQH is calculated and the kernel * is informed of this number by updating the SCAN_TABREQ signal */ - Uint32 batch_size, batch_byte_size, first_batch_size; + ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); + Uint32 batch_size = req->first_batch_size; // User specified + Uint32 batch_byte_size, first_batch_size; theReceiver.calculate_batch_size(key_size, theParallelism, batch_size, batch_byte_size, first_batch_size); - ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); ScanTabReq::setScanBatch(req->requestInfo, batch_size); req->batch_byte_size= batch_byte_size; req->first_batch_size= first_batch_size; @@ -1216,13 +1222,14 @@ error: int NdbIndexScanOperation::readTuples(LockMode lm, Uint32 scan_flags, - Uint32 parallel) + Uint32 parallel, + Uint32 batch) { const bool order_by = scan_flags & SF_OrderBy; const bool order_desc = scan_flags & SF_Descending; const bool read_range_no = scan_flags & SF_ReadRangeNo; - - int res = NdbScanOperation::readTuples(lm, scan_flags, 0); + + int res = NdbScanOperation::readTuples(lm, scan_flags, parallel, batch); if(!res && read_range_no) { m_read_range_no = 1; @@ -1506,6 +1513,66 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ return -1; } + bool holdLock = false; + if (theSCAN_TABREQ) + { + ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); + holdLock = ScanTabReq::getHoldLockFlag(req->requestInfo); + } + + /** + * When using locks, force close of scan directly + */ + if (holdLock && theError.code == 0 && + (m_sent_receivers_count + m_conf_receivers_count + m_api_receivers_count)) + { + TransporterFacade * tp = TransporterFacade::instance(); + NdbApiSignal tSignal(theNdb->theMyRef); + tSignal.setSignal(GSN_SCAN_NEXTREQ); + + Uint32* theData = tSignal.getDataPtrSend(); + Uint64 transId = theNdbCon->theTransactionId; + theData[0] = theNdbCon->theTCConPtr; + theData[1] = 1; + theData[2] = transId; + theData[3] = (Uint32) (transId >> 32); + + tSignal.setLength(4); + int ret = tp->sendSignal(&tSignal, nodeId); + if (ret) + { + setErrorCode(4008); + return -1; + } + checkForceSend(forceSend); + + /** + * If no receiver is outstanding... + * set it to 1 as execCLOSE_SCAN_REP resets it + */ + m_sent_receivers_count = m_sent_receivers_count ? m_sent_receivers_count : 1; + + while(theError.code == 0 && (m_sent_receivers_count + m_conf_receivers_count)) + { + theNdb->theImpl->theWaiter.m_node = nodeId; + theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + switch(return_code){ + case 0: + break; + case -1: + setErrorCode(4008); + case -2: + m_api_receivers_count = 0; + m_conf_receivers_count = 0; + m_sent_receivers_count = 0; + theNdbCon->theReleaseOnClose = true; + return -1; + } + } + return 0; + } + /** * Wait for outstanding */ diff --git a/ndb/test/ndbapi/testNodeRestart.cpp b/ndb/test/ndbapi/testNodeRestart.cpp index 754b3edc269..c1502940655 100644 --- a/ndb/test/ndbapi/testNodeRestart.cpp +++ b/ndb/test/ndbapi/testNodeRestart.cpp @@ -294,6 +294,7 @@ int runRestarts(NDBT_Context* ctx, NDBT_Step* step){ } i++; } + ctx->stopTest(); return result; }