mirror of
https://github.com/MariaDB/server.git
synced 2025-01-23 23:34:34 +01:00
Fix scan error bug
(last known :-)) ndb/src/kernel/blocks/dbtc/DbtcMain.cpp: Removed printouts ndb/src/ndbapi/NdbConnectionScan.cpp: Fix scan error bug ndb/src/ndbapi/NdbScanOperation.cpp: Fix scan error bug Remove printout
This commit is contained in:
parent
cf6573ae37
commit
f81c74c8fe
3 changed files with 36 additions and 124 deletions
|
@ -6433,7 +6433,6 @@ void Dbtc::execSCAN_HBREP(Signal* signal)
|
|||
c_scan_frag_pool.getPtr(scanFragptr);
|
||||
switch (scanFragptr.p->scanFragState){
|
||||
case ScanFragRec::LQH_ACTIVE:
|
||||
//case ScanFragRec::LQH_ACTIVE_CLOSE:
|
||||
break;
|
||||
default:
|
||||
DEBUG("execSCAN_HBREP: scanFragState="<<scanFragptr.p->scanFragState);
|
||||
|
@ -6514,7 +6513,6 @@ void Dbtc::timeOutFoundFragLab(Signal* signal, UintR TscanConPtr)
|
|||
/**
|
||||
* The node has died
|
||||
*/
|
||||
ndbout_c("Node %d has died", nodeId);
|
||||
ptr.p->scanFragState = ScanFragRec::COMPLETED;
|
||||
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
|
||||
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
|
||||
|
@ -8358,10 +8356,6 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
|
|||
scanParallel = (scanConcurrency + 15) / 16;
|
||||
noOprecPerFrag = (scanConcurrency >= 16 ? 16 : scanConcurrency & 15);
|
||||
}
|
||||
#ifdef VM_TRACE
|
||||
ndbout_c("noOprecPerFrag=%d", noOprecPerFrag);
|
||||
ndbout_c("scanParallel=%d", scanParallel);
|
||||
#endif
|
||||
|
||||
jamEntry();
|
||||
apiConnectptr.i = scanTabReq->apiConnectPtr;
|
||||
|
@ -8708,11 +8702,6 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal)
|
|||
for (list.first(ptr); !ptr.isNull(); list.next(ptr)){
|
||||
jam();
|
||||
|
||||
#ifdef VM_TRACE
|
||||
ndbout_c("DIGETPRIMREQ(%d, %d)",
|
||||
scanptr.p->scanTableref, scanptr.p->scanNextFragId);
|
||||
#endif
|
||||
|
||||
ptr.p->lqhBlockref = 0;
|
||||
ptr.p->startFragTimer(ctcTimer);
|
||||
ptr.p->scanFragId = scanptr.p->scanNextFragId++;
|
||||
|
@ -8759,9 +8748,6 @@ void Dbtc::abortScanLab(Signal* signal, ScanRecordPtr scanptr, Uint32 errCode)
|
|||
|
||||
void Dbtc::releaseScanResources(ScanRecordPtr scanPtr)
|
||||
{
|
||||
#ifdef VM_TRACE
|
||||
ndbout_c("releaseScanResources: %d", scanPtr.i);
|
||||
#endif
|
||||
if (apiConnectptr.p->cachePtr != RNIL) {
|
||||
cachePtr.i = apiConnectptr.p->cachePtr;
|
||||
ptrCheckGuard(cachePtr, ccacheFilesize, cacheRecord);
|
||||
|
@ -9041,26 +9027,15 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
|
|||
|
||||
const Uint32 status = conf->fragmentCompleted;
|
||||
|
||||
DEBUG(apiConnectptr.i << " " << scanFragptr.i <<
|
||||
" execSCAN_FRAGCONF() status: " << status
|
||||
<< " ops: " << noCompletedOps << " from: " << refToNode(signal->getSendersBlockRef()));
|
||||
|
||||
if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
|
||||
jam();
|
||||
if(status == ZFALSE){
|
||||
/**
|
||||
* We have started closing = we sent a close -> ignore this
|
||||
*/
|
||||
DEBUG(apiConnectptr.i << " " << scanFragptr.i <<
|
||||
" Received SCANFRAG_CONF wo/ close when in "
|
||||
" CLOSING_SCAN:" << status << " " << noCompletedOps);
|
||||
return;
|
||||
} else {
|
||||
jam();
|
||||
DEBUG(apiConnectptr.i << " " << scanFragptr.i
|
||||
<< " Received SCANFRAG_CONF w/ close when in "
|
||||
" CLOSING_SCAN:" << status << " " << noCompletedOps);
|
||||
|
||||
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
|
||||
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
|
||||
|
||||
|
@ -9249,9 +9224,6 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
|
|||
|
||||
void
|
||||
Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
|
||||
#ifdef VM_TRACE
|
||||
ndbout_c("%d close_scan_req", apiConnectptr.i);
|
||||
#endif
|
||||
ScanRecord* scanP = scanPtr.p;
|
||||
scanPtr.p->scanState = ScanRecord::CLOSING_SCAN;
|
||||
scanPtr.p->m_close_scan_req = req_received;
|
||||
|
@ -9295,7 +9267,6 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
|
|||
nextReq->senderData = curr.i;
|
||||
sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
|
||||
ScanFragNextReq::SignalLength, JBB);
|
||||
ndbout_c("%d running -> closing", curr.i);
|
||||
}
|
||||
|
||||
// Close delivered
|
||||
|
@ -9316,13 +9287,11 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
|
|||
sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
|
||||
ScanFragNextReq::SignalLength, JBB);
|
||||
|
||||
ndbout_c("%d delivered -> closing (%d)", curr.i, curr.p->m_ops);
|
||||
} else {
|
||||
jam();
|
||||
completed.add(curr);
|
||||
curr.p->scanFragState = ScanFragRec::COMPLETED;
|
||||
curr.p->stopFragTimer();
|
||||
ndbout_c("%d delivered -> completed", curr.i);
|
||||
}
|
||||
}//for
|
||||
|
||||
|
@ -9346,14 +9315,11 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
|
|||
nextReq->senderData = curr.i;
|
||||
sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
|
||||
ScanFragNextReq::SignalLength, JBB);
|
||||
|
||||
ndbout_c("%d queued -> closing", curr.i);
|
||||
} else {
|
||||
jam();
|
||||
completed.add(curr);
|
||||
curr.p->scanFragState = ScanFragRec::COMPLETED;
|
||||
curr.p->stopFragTimer();
|
||||
ndbout_c("%d queued -> completed", curr.i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -9369,7 +9335,7 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){
|
|||
ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty());
|
||||
//ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty());
|
||||
|
||||
#if 1
|
||||
#if 0
|
||||
{
|
||||
ScanFragList comp(c_scan_frag_pool, scanPtr.p->m_completed_scan_frags);
|
||||
ScanFragRecPtr ptr;
|
||||
|
@ -9382,8 +9348,6 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){
|
|||
|
||||
if(!scanPtr.p->m_running_scan_frags.isEmpty()){
|
||||
jam();
|
||||
|
||||
ndbout_c("%d close_scan_req_send_conf: not ready", apiConnectptr.i);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -9394,12 +9358,9 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){
|
|||
/**
|
||||
* The API hasn't order closing yet
|
||||
*/
|
||||
ndbout_c("%d close_scan_req_send_conf: api not ready", apiConnectptr.i);
|
||||
return;
|
||||
}
|
||||
|
||||
ndbout_c("%d close_scan_req_send_conf: ready", apiConnectptr.i);
|
||||
|
||||
if(!apiFail){
|
||||
jam();
|
||||
Uint32 ref = apiConnectptr.p->ndbapiBlockref;
|
||||
|
|
|
@ -67,7 +67,6 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){
|
|||
return 0;
|
||||
}
|
||||
assert(theScanningOp->m_sent_receivers_count);
|
||||
theScanningOp->m_sent_receivers_count--;
|
||||
theScanningOp->m_conf_receivers_count++;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -135,8 +135,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||
m_ordered = 0;
|
||||
|
||||
Uint32 fragCount = m_currentTable->m_fragmentCount;
|
||||
ndbout_c("batch: %d parallell: %d fragCount: %d",
|
||||
batch, parallell, fragCount);
|
||||
|
||||
if(batch + parallell == 0){ // Max speed
|
||||
batch = 16;
|
||||
|
@ -155,9 +153,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||
parallell = fragCount;
|
||||
else if(parallell == 0)
|
||||
parallell = fragCount;
|
||||
|
||||
ndbout_c("batch: %d parallell: %d fragCount: %d",
|
||||
batch, parallell, fragCount);
|
||||
|
||||
assert(parallell > 0);
|
||||
|
||||
|
@ -300,20 +295,22 @@ NdbScanOperation::fix_receivers(Uint32 parallell, bool keyInfo){
|
|||
*/
|
||||
void
|
||||
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
|
||||
Uint32 idx = tRec->m_list_index;
|
||||
Uint32 last = m_sent_receivers_count - 1;
|
||||
if(idx != last){
|
||||
NdbReceiver * move = m_sent_receivers[last];
|
||||
m_sent_receivers[idx] = move;
|
||||
move->m_list_index = idx;
|
||||
if(theError.code == 0){
|
||||
Uint32 idx = tRec->m_list_index;
|
||||
Uint32 last = m_sent_receivers_count - 1;
|
||||
if(idx != last){
|
||||
NdbReceiver * move = m_sent_receivers[last];
|
||||
m_sent_receivers[idx] = move;
|
||||
move->m_list_index = idx;
|
||||
}
|
||||
m_sent_receivers_count = last;
|
||||
|
||||
last = m_conf_receivers_count;
|
||||
m_conf_receivers[last] = tRec;
|
||||
m_conf_receivers_count = last + 1;
|
||||
tRec->m_list_index = last;
|
||||
tRec->m_current_row = 0;
|
||||
}
|
||||
m_sent_receivers_count = last;
|
||||
|
||||
last = m_conf_receivers_count;
|
||||
m_conf_receivers[last] = tRec;
|
||||
m_conf_receivers_count = last + 1;
|
||||
tRec->m_list_index = last;
|
||||
tRec->m_current_row = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -321,14 +318,16 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
|
|||
*/
|
||||
void
|
||||
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
|
||||
Uint32 idx = tRec->m_list_index;
|
||||
Uint32 last = m_sent_receivers_count - 1;
|
||||
if(idx != last){
|
||||
NdbReceiver * move = m_sent_receivers[last];
|
||||
m_sent_receivers[idx] = move;
|
||||
move->m_list_index = idx;
|
||||
if(theError.code == 0){
|
||||
Uint32 idx = tRec->m_list_index;
|
||||
Uint32 last = m_sent_receivers_count - 1;
|
||||
if(idx != last){
|
||||
NdbReceiver * move = m_sent_receivers[last];
|
||||
m_sent_receivers[idx] = move;
|
||||
move->m_list_index = idx;
|
||||
}
|
||||
m_sent_receivers_count = last;
|
||||
}
|
||||
m_sent_receivers_count = last;
|
||||
}
|
||||
|
||||
/*****************************************************************************
|
||||
|
@ -512,11 +511,6 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
|
|||
*/
|
||||
theNdb->theWaiter.m_node = nodeId;
|
||||
theNdb->theWaiter.m_state = WAIT_SCAN;
|
||||
ndbout_c("%d : api: %d conf: %d sent: %d",
|
||||
__LINE__,
|
||||
m_api_receivers_count,
|
||||
m_conf_receivers_count,
|
||||
m_sent_receivers_count);
|
||||
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
|
||||
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
|
||||
continue;
|
||||
|
@ -531,11 +525,6 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
|
|||
if(send_next_scan(0, true) == 0){ // Close scan
|
||||
theNdb->theWaiter.m_node = nodeId;
|
||||
theNdb->theWaiter.m_state = WAIT_SCAN;
|
||||
ndbout_c("%d : api: %d conf: %d sent: %d",
|
||||
__LINE__,
|
||||
m_api_receivers_count,
|
||||
m_conf_receivers_count,
|
||||
m_sent_receivers_count);
|
||||
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
|
||||
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
|
||||
return 1;
|
||||
|
@ -653,12 +642,8 @@ NdbScanOperation::doSend(int ProcessorId)
|
|||
|
||||
void NdbScanOperation::closeScan()
|
||||
{
|
||||
ndbout_c("closeScan %d : api: %d conf: %d sent: %d",
|
||||
__LINE__,
|
||||
m_api_receivers_count,
|
||||
m_conf_receivers_count,
|
||||
m_sent_receivers_count);
|
||||
|
||||
int self = pthread_self() ;
|
||||
|
||||
do {
|
||||
TransporterFacade* tp = TransporterFacade::instance();
|
||||
Guard guard(tp->theMutexPtr);
|
||||
|
@ -671,17 +656,17 @@ void NdbScanOperation::closeScan()
|
|||
break;
|
||||
}
|
||||
|
||||
if(m_api_receivers_count+m_conf_receivers_count){
|
||||
// Send close scan
|
||||
send_next_scan(0, true); // Close scan
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for all running scans...
|
||||
* wait for close scan conf
|
||||
*/
|
||||
while(m_sent_receivers_count){
|
||||
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
|
||||
theNdb->theWaiter.m_node = nodeId;
|
||||
theNdb->theWaiter.m_state = WAIT_SCAN;
|
||||
ndbout_c("%d : api: %d conf: %d sent: %d",
|
||||
__LINE__,
|
||||
m_api_receivers_count,
|
||||
m_conf_receivers_count,
|
||||
m_sent_receivers_count);
|
||||
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
|
||||
switch(return_code){
|
||||
case 0:
|
||||
|
@ -689,46 +674,13 @@ void NdbScanOperation::closeScan()
|
|||
case -1:
|
||||
setErrorCode(4008);
|
||||
case -2:
|
||||
m_sent_receivers_count = 0;
|
||||
m_api_receivers_count = 0;
|
||||
m_conf_receivers_count = 0;
|
||||
m_sent_receivers_count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if(seq != tp->getNodeSequence(nodeId)){
|
||||
theNdbCon->theReleaseOnClose = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if(m_api_receivers_count+m_conf_receivers_count){
|
||||
// Send close scan
|
||||
send_next_scan(0, true); // Close scan
|
||||
|
||||
/**
|
||||
* wait for close scan conf
|
||||
*/
|
||||
do {
|
||||
theNdb->theWaiter.m_node = nodeId;
|
||||
theNdb->theWaiter.m_state = WAIT_SCAN;
|
||||
ndbout_c("%d : api: %d conf: %d sent: %d",
|
||||
__LINE__,
|
||||
m_api_receivers_count,
|
||||
m_conf_receivers_count,
|
||||
m_sent_receivers_count);
|
||||
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
|
||||
switch(return_code){
|
||||
case 0:
|
||||
break;
|
||||
case -1:
|
||||
setErrorCode(4008);
|
||||
case -2:
|
||||
m_api_receivers_count = 0;
|
||||
m_conf_receivers_count = 0;
|
||||
}
|
||||
} while(m_api_receivers_count+m_conf_receivers_count);
|
||||
}
|
||||
} while(0);
|
||||
|
||||
|
||||
theNdbCon->theScanningOp = 0;
|
||||
theNdb->closeTransaction(theNdbCon);
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue