mirror of
https://github.com/MariaDB/server.git
synced 2026-05-07 07:35:32 +02:00
Piggy back close scan req
This commit is contained in:
parent
e7a950a61d
commit
b7ffde86a0
4 changed files with 60 additions and 85 deletions
|
|
@ -1155,7 +1155,6 @@ public:
|
|||
union { Uint32 m_queued_count; Uint32 scanReceivedOperations; };
|
||||
DLList<ScanFragRec>::Head m_queued_scan_frags; // In TC !sent to API
|
||||
DLList<ScanFragRec>::Head m_delivered_scan_frags;// Delivered to API
|
||||
DLList<ScanFragRec>::Head m_completed_scan_frags;// Completed
|
||||
|
||||
// Id of the next fragment to be scanned. Used by scan fragment
|
||||
// processes when they are ready for the next fragment
|
||||
|
|
@ -1433,7 +1432,7 @@ private:
|
|||
void releaseScanResources(ScanRecordPtr);
|
||||
ScanRecordPtr seizeScanrec(Signal* signal);
|
||||
void sendScanFragReq(Signal*, ScanRecord*, ScanFragRec*);
|
||||
void sendScanTabConf(Signal* signal, ScanRecord*);
|
||||
void sendScanTabConf(Signal* signal, ScanRecordPtr);
|
||||
void close_scan_req(Signal*, ScanRecordPtr, bool received_req);
|
||||
void close_scan_req_send_conf(Signal*, ScanRecordPtr);
|
||||
|
||||
|
|
|
|||
|
|
@ -6586,10 +6586,8 @@ void Dbtc::timeOutFoundFragLab(Signal* signal, UintR TscanConPtr)
|
|||
*/
|
||||
ptr.p->scanFragState = ScanFragRec::COMPLETED;
|
||||
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
|
||||
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
|
||||
|
||||
run.remove(ptr);
|
||||
comp.add(ptr);
|
||||
run.release(ptr);
|
||||
ptr.p->stopFragTimer();
|
||||
}
|
||||
|
||||
|
|
@ -6865,7 +6863,6 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
|
|||
jam();
|
||||
ScanFragRecPtr ptr;
|
||||
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
|
||||
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
|
||||
|
||||
for(run.first(ptr); !ptr.isNull(); ){
|
||||
jam();
|
||||
|
|
@ -6875,8 +6872,7 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
|
|||
refToNode(curr.p->lqhBlockref) == failedNodeId){
|
||||
jam();
|
||||
|
||||
run.remove(curr);
|
||||
comp.add(curr);
|
||||
run.release(curr);
|
||||
curr.p->scanFragState = ScanFragRec::COMPLETED;
|
||||
curr.p->stopFragTimer();
|
||||
found = true;
|
||||
|
|
@ -8809,13 +8805,9 @@ void Dbtc::releaseScanResources(ScanRecordPtr scanPtr)
|
|||
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
|
||||
releaseTcCon();
|
||||
|
||||
ScanFragList x(c_scan_frag_pool,
|
||||
scanPtr.p->m_completed_scan_frags);
|
||||
x.release();
|
||||
ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty());
|
||||
ndbrequire(scanPtr.p->m_queued_scan_frags.isEmpty());
|
||||
ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty());
|
||||
|
||||
|
||||
ndbassert(scanPtr.p->scanApiRec == apiConnectptr.i);
|
||||
ndbassert(apiConnectptr.p->apiScanRec == scanPtr.i);
|
||||
|
|
@ -8868,10 +8860,8 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
|
|||
if(tabPtr.p->checkTable(schemaVersion) == false){
|
||||
jam();
|
||||
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
|
||||
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
|
||||
|
||||
run.remove(scanFragptr);
|
||||
comp.add(scanFragptr);
|
||||
run.release(scanFragptr);
|
||||
scanError(signal, scanptr, tabPtr.p->getErrorCode(schemaVersion));
|
||||
return;
|
||||
}
|
||||
|
|
@ -8889,10 +8879,8 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
|
|||
updateBuddyTimer(apiConnectptr);
|
||||
{
|
||||
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
|
||||
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
|
||||
|
||||
run.remove(scanFragptr);
|
||||
comp.add(scanFragptr);
|
||||
run.release(scanFragptr);
|
||||
}
|
||||
close_scan_req_send_conf(signal, scanptr);
|
||||
return;
|
||||
|
|
@ -8945,10 +8933,8 @@ void Dbtc::execDIGETPRIMREF(Signal* signal)
|
|||
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
|
||||
|
||||
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
|
||||
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
|
||||
|
||||
run.remove(scanFragptr);
|
||||
comp.add(scanFragptr);
|
||||
run.release(scanFragptr);
|
||||
|
||||
scanError(signal, scanptr, errCode);
|
||||
}//Dbtc::execDIGETPRIMREF()
|
||||
|
|
@ -8993,10 +8979,8 @@ void Dbtc::execSCAN_FRAGREF(Signal* signal)
|
|||
{
|
||||
scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
|
||||
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
|
||||
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
|
||||
|
||||
run.remove(scanFragptr);
|
||||
comp.add(scanFragptr);
|
||||
run.release(scanFragptr);
|
||||
scanFragptr.p->stopFragTimer();
|
||||
}
|
||||
scanError(signal, scanptr, errCode);
|
||||
|
|
@ -9095,10 +9079,8 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
|
|||
} else {
|
||||
jam();
|
||||
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
|
||||
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
|
||||
|
||||
run.remove(scanFragptr);
|
||||
comp.add(scanFragptr);
|
||||
run.release(scanFragptr);
|
||||
scanFragptr.p->stopFragTimer();
|
||||
scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
|
||||
}
|
||||
|
|
@ -9147,7 +9129,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
|
|||
|
||||
if(scanptr.p->m_queued_count > /** Min */ 0){
|
||||
jam();
|
||||
sendScanTabConf(signal, scanptr.p);
|
||||
sendScanTabConf(signal, scanptr);
|
||||
}
|
||||
}//Dbtc::execSCAN_FRAGCONF()
|
||||
|
||||
|
|
@ -9308,7 +9290,6 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
|
|||
{
|
||||
ScanFragRecPtr ptr;
|
||||
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
|
||||
ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags);
|
||||
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
|
||||
ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
|
||||
|
||||
|
|
@ -9350,7 +9331,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
|
|||
|
||||
} else {
|
||||
jam();
|
||||
completed.add(curr);
|
||||
c_scan_frag_pool.release(curr);
|
||||
curr.p->scanFragState = ScanFragRec::COMPLETED;
|
||||
curr.p->stopFragTimer();
|
||||
}
|
||||
|
|
@ -9378,7 +9359,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
|
|||
ScanFragNextReq::SignalLength, JBB);
|
||||
} else {
|
||||
jam();
|
||||
completed.add(curr);
|
||||
c_scan_frag_pool.release(curr);
|
||||
curr.p->scanFragState = ScanFragRec::COMPLETED;
|
||||
curr.p->stopFragTimer();
|
||||
}
|
||||
|
|
@ -9492,10 +9473,10 @@ void Dbtc::sendScanFragReq(Signal* signal,
|
|||
}//Dbtc::sendScanFragReq()
|
||||
|
||||
|
||||
void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
|
||||
void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
|
||||
jam();
|
||||
Uint32* ops = signal->getDataPtrSend()+4;
|
||||
Uint32 op_count = scanP->m_queued_count;
|
||||
Uint32 op_count = scanPtr.p->m_queued_count;
|
||||
if(4 + 3 * op_count > 25){
|
||||
jam();
|
||||
ops += 21;
|
||||
|
|
@ -9507,30 +9488,37 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
|
|||
conf->transId1 = apiConnectptr.p->transid[0];
|
||||
conf->transId2 = apiConnectptr.p->transid[1];
|
||||
ScanFragRecPtr ptr;
|
||||
ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
|
||||
ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags);
|
||||
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
|
||||
for(queued.first(ptr); !ptr.isNull(); ){
|
||||
ndbrequire(ptr.p->scanFragState == ScanFragRec::QUEUED_FOR_DELIVERY);
|
||||
ScanFragRecPtr curr = ptr; // Remove while iterating...
|
||||
queued.next(ptr);
|
||||
|
||||
* ops++ = curr.p->m_apiPtr;
|
||||
* ops++ = curr.i;
|
||||
* ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops;
|
||||
|
||||
queued.remove(curr);
|
||||
if(curr.p->m_ops > 0){
|
||||
delivered.add(curr);
|
||||
curr.p->scanFragState = ScanFragRec::DELIVERED;
|
||||
curr.p->stopFragTimer();
|
||||
} else {
|
||||
(* --ops) = ScanTabConf::EndOfData; ops++;
|
||||
completed.add(curr);
|
||||
curr.p->scanFragState = ScanFragRec::COMPLETED;
|
||||
curr.p->stopFragTimer();
|
||||
{
|
||||
ScanFragList queued(c_scan_frag_pool, scanPtr.p->m_queued_scan_frags);
|
||||
ScanFragList delivered(c_scan_frag_pool,scanPtr.p->m_delivered_scan_frags);
|
||||
for(queued.first(ptr); !ptr.isNull(); ){
|
||||
ndbrequire(ptr.p->scanFragState == ScanFragRec::QUEUED_FOR_DELIVERY);
|
||||
ScanFragRecPtr curr = ptr; // Remove while iterating...
|
||||
queued.next(ptr);
|
||||
|
||||
* ops++ = curr.p->m_apiPtr;
|
||||
* ops++ = curr.i;
|
||||
* ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops;
|
||||
|
||||
queued.remove(curr);
|
||||
if(curr.p->m_ops > 0){
|
||||
delivered.add(curr);
|
||||
curr.p->scanFragState = ScanFragRec::DELIVERED;
|
||||
curr.p->stopFragTimer();
|
||||
} else {
|
||||
(* --ops) = ScanTabConf::EndOfData; ops++;
|
||||
c_scan_frag_pool.release(curr);
|
||||
curr.p->scanFragState = ScanFragRec::COMPLETED;
|
||||
curr.p->stopFragTimer();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(scanPtr.p->m_delivered_scan_frags.isEmpty() &&
|
||||
scanPtr.p->m_running_scan_frags.isEmpty()){
|
||||
conf->requestInfo = op_count | ScanTabConf::EndOfData;
|
||||
releaseScanResources(scanPtr);
|
||||
}
|
||||
|
||||
if(4 + 3 * op_count > 25){
|
||||
jam();
|
||||
|
|
@ -9544,7 +9532,7 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
|
|||
sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABCONF, signal,
|
||||
ScanTabConf::SignalLength + 3 * op_count, JBB);
|
||||
}
|
||||
scanP->m_queued_count = 0;
|
||||
scanPtr.p->m_queued_count = 0;
|
||||
}//Dbtc::sendScanTabConf()
|
||||
|
||||
|
||||
|
|
@ -10446,7 +10434,6 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
|
|||
DUMP_SFR(sp.p->m_running_scan_frags);
|
||||
DUMP_SFR(sp.p->m_queued_scan_frags);
|
||||
DUMP_SFR(sp.p->m_delivered_scan_frags);
|
||||
DUMP_SFR(sp.p->m_completed_scan_frags);
|
||||
|
||||
// Request dump of ApiConnectRecord
|
||||
dumpState->args[0] = DumpStateOrd::TcDumpOneApiConnectRec;
|
||||
|
|
|
|||
|
|
@ -120,12 +120,21 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
|
|||
}
|
||||
}
|
||||
}
|
||||
if (conf->requestInfo & ScanTabConf::EndOfData) {
|
||||
if(theScanningOp->m_ordered)
|
||||
theScanningOp->m_api_receivers_count = 0;
|
||||
if(theScanningOp->m_api_receivers_count +
|
||||
theScanningOp->m_conf_receivers_count +
|
||||
theScanningOp->m_sent_receivers_count){
|
||||
abort();
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
#ifdef NDB_NO_DROPPED_SIGNAL
|
||||
abort();
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -520,20 +520,9 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
|
|||
/**
|
||||
* No completed & no sent -> EndOfData
|
||||
*/
|
||||
if(send_next_scan(0, true) == 0){ // Close scan
|
||||
theNdb->theWaiter.m_node = nodeId;
|
||||
theNdb->theWaiter.m_state = WAIT_SCAN;
|
||||
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
|
||||
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
|
||||
theError.code = -1; // make sure user gets error if he tries again
|
||||
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
|
||||
return 1;
|
||||
}
|
||||
retVal = -1; //return_code;
|
||||
} else {
|
||||
retVal = -3;
|
||||
}
|
||||
idx = last;
|
||||
theError.code = -1; // make sure user gets error if he tries again
|
||||
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
|
||||
return 1;
|
||||
}
|
||||
|
||||
if(retVal == 0)
|
||||
|
|
@ -685,6 +674,7 @@ void NdbScanOperation::closeScan()
|
|||
|
||||
if(m_api_receivers_count+m_conf_receivers_count){
|
||||
// Send close scan
|
||||
ndbout_c("sending close %d %d", m_api_receivers_count, m_conf_receivers_count);
|
||||
send_next_scan(0, true); // Close scan
|
||||
}
|
||||
|
||||
|
|
@ -1344,19 +1334,9 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
|
|||
return 0;
|
||||
}
|
||||
|
||||
TransporterFacade* tp = TransporterFacade::instance();
|
||||
Guard guard(tp->theMutexPtr);
|
||||
Uint32 seq = theNdbCon->theNodeSequence;
|
||||
Uint32 nodeId = theNdbCon->theDBnode;
|
||||
if(seq == tp->getNodeSequence(nodeId) &&
|
||||
send_next_scan(0, true) == 0 &&
|
||||
theError.code == 0){
|
||||
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
|
||||
return 1;
|
||||
}
|
||||
setErrorCode(theError.code);
|
||||
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
|
||||
return -1;
|
||||
theError.code = -1;
|
||||
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
|
||||
return 1;
|
||||
}
|
||||
|
||||
int
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue