mirror of
https://github.com/MariaDB/server.git
synced 2025-01-19 13:32:33 +01:00
wl2025 - fix suma & backup
pluse remove "first_batch_size" and instead put batch_size on next_scan_req
This commit is contained in:
parent
1965bb01e5
commit
fe118d628f
7 changed files with 119 additions and 102 deletions
|
@ -33,7 +33,7 @@ class ScanFragReq {
|
|||
*/
|
||||
friend class Dblqh;
|
||||
public:
|
||||
STATIC_CONST( SignalLength = 13 );
|
||||
STATIC_CONST( SignalLength = 12 );
|
||||
|
||||
public:
|
||||
Uint32 senderData;
|
||||
|
@ -46,10 +46,9 @@ public:
|
|||
Uint32 transId1;
|
||||
Uint32 transId2;
|
||||
Uint32 clientOpPtr;
|
||||
Uint32 concurrency;
|
||||
Uint32 batch_byte_size;
|
||||
Uint32 first_batch_size;
|
||||
|
||||
Uint32 batch_size_rows;
|
||||
Uint32 batch_size_bytes;
|
||||
|
||||
static Uint32 getLockMode(const Uint32 & requestInfo);
|
||||
static Uint32 getHoldLockFlag(const Uint32 & requestInfo);
|
||||
static Uint32 getKeyinfoFlag(const Uint32 & requestInfo);
|
||||
|
@ -176,13 +175,15 @@ class ScanFragNextReq {
|
|||
friend bool printSCANFRAGNEXTREQ(FILE * output, const Uint32 * theData,
|
||||
Uint32 len, Uint16 receiverBlockNo);
|
||||
public:
|
||||
STATIC_CONST( SignalLength = 4 );
|
||||
STATIC_CONST( SignalLength = 6 );
|
||||
|
||||
public:
|
||||
Uint32 senderData;
|
||||
Uint32 closeFlag;
|
||||
Uint32 transId1;
|
||||
Uint32 transId2;
|
||||
Uint32 batch_size_rows;
|
||||
Uint32 batch_size_bytes;
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -3324,7 +3324,6 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
|
|||
req->requestInfo = 0;
|
||||
req->savePointId = 0;
|
||||
req->tableId = table.tableId;
|
||||
//ScanFragReq::setConcurrency(req->requestInfo, parallelism);
|
||||
ScanFragReq::setLockMode(req->requestInfo, 0);
|
||||
ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
|
||||
ScanFragReq::setKeyinfoFlag(req->requestInfo, 1);
|
||||
|
@ -3332,6 +3331,8 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
|
|||
req->transId1 = 0;
|
||||
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
|
||||
req->clientOpPtr= filePtr.i;
|
||||
req->batch_size_rows= 16;
|
||||
req->batch_size_bytes= 0;
|
||||
sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal,
|
||||
ScanFragReq::SignalLength, JBB);
|
||||
|
||||
|
@ -3549,8 +3550,7 @@ Backup::OperationRecord::newFragment(Uint32 tableId, Uint32 fragNo)
|
|||
head->FragmentNo = htonl(fragNo);
|
||||
head->ChecksumType = htonl(0);
|
||||
|
||||
opNoDone = opNoConf = 0;
|
||||
memset(attrLen, 0, sizeof(attrLen));
|
||||
opNoDone = opNoConf = opLen = 0;
|
||||
newRecord(tmp + headSz);
|
||||
scanStart = tmp;
|
||||
scanStop = (tmp + headSz);
|
||||
|
@ -3593,8 +3593,7 @@ Backup::OperationRecord::newScan()
|
|||
ndbrequire(16 * maxRecordSize < dataBuffer.getMaxWrite());
|
||||
if(dataBuffer.getWritePtr(&tmp, 16 * maxRecordSize)) {
|
||||
jam();
|
||||
opNoDone = opNoConf = 0;
|
||||
memset(attrLen, 0, sizeof(attrLen));
|
||||
opNoDone = opNoConf = opLen = 0;
|
||||
newRecord(tmp);
|
||||
scanStart = tmp;
|
||||
scanStop = tmp;
|
||||
|
@ -3604,14 +3603,14 @@ Backup::OperationRecord::newScan()
|
|||
}
|
||||
|
||||
bool
|
||||
Backup::OperationRecord::scanConf(Uint32 noOfOps, Uint32 opLen[])
|
||||
Backup::OperationRecord::scanConf(Uint32 noOfOps, Uint32 total_len)
|
||||
{
|
||||
const Uint32 done = opNoDone-opNoConf;
|
||||
|
||||
ndbrequire(noOfOps == done);
|
||||
ndbrequire(memcmp(&attrLen[opNoConf], opLen, done << 2) == 0);
|
||||
ndbrequire(opLen == total_len);
|
||||
opNoConf = opNoDone;
|
||||
|
||||
|
||||
const Uint32 len = (scanStop - scanStart);
|
||||
ndbrequire(len < dataBuffer.getMaxWrite());
|
||||
dataBuffer.updateWritePtr(len);
|
||||
|
@ -3652,8 +3651,8 @@ Backup::execSCAN_FRAGCONF(Signal* signal)
|
|||
c_backupFilePool.getPtr(filePtr, filePtrI);
|
||||
|
||||
OperationRecord & op = filePtr.p->operation;
|
||||
//op.scanConf(conf->completedOps, conf->opReturnDataLen);
|
||||
|
||||
|
||||
op.scanConf(conf->completedOps, conf->total_len);
|
||||
const Uint32 completed = conf->fragmentCompleted;
|
||||
if(completed != 2) {
|
||||
jam();
|
||||
|
@ -3722,6 +3721,8 @@ Backup::checkScan(Signal* signal, BackupFilePtr filePtr)
|
|||
req->closeFlag = 0;
|
||||
req->transId1 = 0;
|
||||
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
|
||||
req->batch_size_rows= 16;
|
||||
req->batch_size_bytes= 0;
|
||||
sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
|
||||
ScanFragNextReq::SignalLength, JBB);
|
||||
return;
|
||||
|
|
|
@ -236,7 +236,7 @@ public:
|
|||
* Once per scan frag (next) req/conf
|
||||
*/
|
||||
bool newScan();
|
||||
bool scanConf(Uint32 noOfOps, Uint32 opLen[]);
|
||||
bool scanConf(Uint32 noOfOps, Uint32 opLen);
|
||||
|
||||
/**
|
||||
* Per record
|
||||
|
@ -268,7 +268,7 @@ public:
|
|||
|
||||
Uint32 opNoDone;
|
||||
Uint32 opNoConf;
|
||||
Uint32 attrLen[16];
|
||||
Uint32 opLen;
|
||||
|
||||
public:
|
||||
Uint32* dst;
|
||||
|
@ -713,7 +713,7 @@ Backup::OperationRecord::finished(){
|
|||
return false;
|
||||
}
|
||||
|
||||
attrLen[opNoDone] = attrSzTotal + sz_FixedKeys;
|
||||
opLen += attrSzTotal + sz_FixedKeys;
|
||||
opNoDone++;
|
||||
|
||||
scanStop = dst = (Uint32 *)dst_VariableData;
|
||||
|
|
|
@ -532,16 +532,21 @@ public:
|
|||
SCAN = 1,
|
||||
COPY = 2
|
||||
};
|
||||
|
||||
UintR scan_acc_op_ptr[32];
|
||||
Uint32 scan_acc_index;
|
||||
Uint32 scan_acc_attr_recs;
|
||||
UintR scanApiOpPtr;
|
||||
UintR scanLocalref[2];
|
||||
Uint32 scan_batch_len;
|
||||
Uint32 batch_size;
|
||||
Uint32 first_batch_size;
|
||||
Uint32 batch_byte_size;
|
||||
|
||||
Uint32 m_max_batch_size_rows;
|
||||
Uint32 m_max_batch_size_bytes;
|
||||
|
||||
Uint32 m_curr_batch_size_rows;
|
||||
Uint32 m_curr_batch_size_bytes;
|
||||
|
||||
bool check_scan_batch_completed() const;
|
||||
|
||||
UintR copyPtr;
|
||||
union {
|
||||
Uint32 nextPool;
|
||||
|
@ -559,8 +564,6 @@ public:
|
|||
|
||||
UintR scanAccPtr;
|
||||
UintR scanAiLength;
|
||||
UintR scanCompletedOperations;
|
||||
UintR scanConcurrentOperations;
|
||||
UintR scanErrorCounter;
|
||||
UintR scanLocalFragid;
|
||||
UintR scanSchemaVersion;
|
||||
|
@ -2231,7 +2234,6 @@ private:
|
|||
Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32, bool);
|
||||
void set_acc_ptr_in_scan_record(ScanRecord*, Uint32, Uint32);
|
||||
void i_get_acc_ptr(ScanRecord*, Uint32*&, Uint32);
|
||||
bool check_scan_batch_completed(ScanRecord*);
|
||||
|
||||
void removeTable(Uint32 tableId);
|
||||
void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId);
|
||||
|
@ -2933,9 +2935,13 @@ public:
|
|||
|
||||
inline
|
||||
bool
|
||||
Dblqh::check_scan_batch_completed(ScanRecord* scanP)
|
||||
Dblqh::ScanRecord::check_scan_batch_completed() const
|
||||
{
|
||||
return (scanP->scanCompletedOperations == scanP->scanConcurrentOperations) ||
|
||||
(scanP->scan_batch_len >= scanP->batch_byte_size);
|
||||
Uint32 max_rows = m_max_batch_size_rows;
|
||||
Uint32 max_bytes = m_max_batch_size_bytes;
|
||||
|
||||
return (max_rows > 0 && (m_curr_batch_size_rows >= max_rows)) ||
|
||||
(max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes));
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -2105,10 +2105,10 @@ void Dblqh::execTIME_SIGNAL(Signal* signal)
|
|||
<< " scanAccPtr="<<TscanPtr.p->scanAccPtr
|
||||
<< " scanAiLength="<<TscanPtr.p->scanAiLength
|
||||
<< endl;
|
||||
ndbout << " scanCompletedOperations="<<
|
||||
TscanPtr.p->scanCompletedOperations
|
||||
<< " scanConcurrentOperations="<<
|
||||
TscanPtr.p->scanConcurrentOperations
|
||||
ndbout << " m_curr_batch_size_rows="<<
|
||||
TscanPtr.p->m_curr_batch_size_rows
|
||||
<< " m_max_batch_size_rows="<<
|
||||
TscanPtr.p->m_max_batch_size_rows
|
||||
<< " scanErrorCounter="<<TscanPtr.p->scanErrorCounter
|
||||
<< " scanLocalFragid="<<TscanPtr.p->scanLocalFragid
|
||||
<< endl;
|
||||
|
@ -6975,6 +6975,15 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal)
|
|||
fragptr.i = tcConnectptr.p->fragmentptr;
|
||||
ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
|
||||
|
||||
/**
|
||||
* Change parameters while running
|
||||
* (is currently not supported)
|
||||
*/
|
||||
const Uint32 max_rows = nextReq->batch_size_rows;
|
||||
const Uint32 max_bytes = nextReq->batch_size_bytes;
|
||||
ndbrequire(scanptr.p->m_max_batch_size_rows == max_rows);
|
||||
ndbrequire(scanptr.p->m_max_batch_size_bytes == max_bytes);
|
||||
|
||||
/* --------------------------------------------------------------------
|
||||
* If scanLockHold = TRUE we need to unlock previous round of
|
||||
* scanned records.
|
||||
|
@ -6984,7 +6993,7 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal)
|
|||
* acquiring new locks.
|
||||
* -------------------------------------------------------------------- */
|
||||
if ((scanptr.p->scanLockHold == ZTRUE) &&
|
||||
(scanptr.p->scanCompletedOperations > 0)) {
|
||||
(scanptr.p->m_curr_batch_size_rows > 0)) {
|
||||
jam();
|
||||
scanptr.p->scanReleaseCounter = 1;
|
||||
scanReleaseLocksLab(signal);
|
||||
|
@ -7011,8 +7020,8 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
|
|||
tcConnectptr.p->tcTimer = cLqhTimeOutCount;
|
||||
|
||||
init_acc_ptr_list(scanptr.p);
|
||||
scanptr.p->scanCompletedOperations = 0;
|
||||
scanptr.p->scan_batch_len= 0;
|
||||
scanptr.p->m_curr_batch_size_rows = 0;
|
||||
scanptr.p->m_curr_batch_size_bytes= 0;
|
||||
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
|
||||
scanNextLoopLab(signal);
|
||||
}//Dblqh::continueScanNextReqLab()
|
||||
|
@ -7136,7 +7145,7 @@ void Dblqh::closeScanRequestLab(Signal* signal)
|
|||
ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
|
||||
|
||||
if (scanptr.p->scanLockHold == ZTRUE) {
|
||||
if (scanptr.p->scanCompletedOperations > 0) {
|
||||
if (scanptr.p->m_curr_batch_size_rows > 0) {
|
||||
jam();
|
||||
scanptr.p->scanReleaseCounter = 1;
|
||||
scanReleaseLocksLab(signal);
|
||||
|
@ -7167,8 +7176,8 @@ void Dblqh::closeScanRequestLab(Signal* signal)
|
|||
return;
|
||||
}//if
|
||||
tcConnectptr.p->abortState = TcConnectionrec::ABORT_ACTIVE;
|
||||
scanptr.p->scanCompletedOperations = 0;
|
||||
scanptr.p->scan_batch_len= 0;
|
||||
scanptr.p->m_curr_batch_size_rows = 0;
|
||||
scanptr.p->m_curr_batch_size_bytes= 0;
|
||||
sendScanFragConf(signal, ZTRUE);
|
||||
break;
|
||||
case TcConnectionrec::SCAN_TUPKEY:
|
||||
|
@ -7211,12 +7220,12 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
|
|||
tcConnectptr.i = scanptr.p->scanTcrec;
|
||||
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
|
||||
releaseActiveFrag(signal);
|
||||
if (scanptr.p->scanReleaseCounter == scanptr.p->scanCompletedOperations) {
|
||||
if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) {
|
||||
if ((scanptr.p->scanErrorCounter > 0) ||
|
||||
(scanptr.p->scanCompletedStatus == ZTRUE)) {
|
||||
jam();
|
||||
closeScanLab(signal);
|
||||
} else if (check_scan_batch_completed(scanptr.p) &&
|
||||
} else if (scanptr.p->check_scan_batch_completed() &&
|
||||
scanptr.p->scanLockHold != ZTRUE) {
|
||||
jam();
|
||||
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
|
||||
|
@ -7229,7 +7238,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
|
|||
*/
|
||||
continueScanNextReqLab(signal);
|
||||
}//if
|
||||
} else if (scanptr.p->scanReleaseCounter < scanptr.p->scanCompletedOperations) {
|
||||
} else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) {
|
||||
jam();
|
||||
scanptr.p->scanReleaseCounter++;
|
||||
scanReleaseLocksLab(signal);
|
||||
|
@ -7237,7 +7246,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
|
|||
jam();
|
||||
/*
|
||||
We come here when we have been scanning for a long time and not been able
|
||||
to find scanConcurrentOperations records to return. We needed to release
|
||||
to find m_max_batch_size_rows records to return. We needed to release
|
||||
the record we didn't want, but now we are returning all found records to
|
||||
the API.
|
||||
*/
|
||||
|
@ -7249,9 +7258,9 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
|
|||
bool
|
||||
Dblqh::seize_acc_ptr_list(ScanRecord* scanP, Uint32 batch_size)
|
||||
{
|
||||
Uint32 i, attr_buf_recs;
|
||||
Uint32 i;
|
||||
Uint32 attr_buf_recs= (batch_size + 30) / 32;
|
||||
if (batch_size > 1) {
|
||||
attr_buf_recs= (batch_size + 30) / 32;
|
||||
if (c_no_attrinbuf_recs < attr_buf_recs) {
|
||||
jam();
|
||||
return false;
|
||||
|
@ -7376,7 +7385,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
|
|||
const Uint32 reqinfo = scanFragReq->requestInfo;
|
||||
const Uint32 fragId = scanFragReq->fragmentNo;
|
||||
tabptr.i = scanFragReq->tableId;
|
||||
const Uint32 scanConcurrentOperations = scanFragReq->concurrency;
|
||||
const Uint32 max_rows = scanFragReq->batch_size_rows;
|
||||
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
|
||||
const Uint8 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
|
||||
const Uint8 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
|
||||
|
@ -7406,8 +7415,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
|
|||
*/
|
||||
ndbrequire(scanLockMode == 0 || keyinfo);
|
||||
|
||||
ndbrequire(scanConcurrentOperations <= MAX_PARALLEL_OP_PER_SCAN);
|
||||
ndbrequire(scanConcurrentOperations != 0);
|
||||
ndbrequire(max_rows > 0 && max_rows <= MAX_PARALLEL_OP_PER_SCAN);
|
||||
if (!getFragmentrec(signal, fragId)) {
|
||||
errorCode = __LINE__;
|
||||
goto error_handler;
|
||||
|
@ -7427,7 +7435,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
|
|||
}
|
||||
|
||||
// XXX adjust cmaxAccOps for range scans and remove this comment
|
||||
if ((cbookedAccOps + scanConcurrentOperations) > cmaxAccOps) {
|
||||
if ((cbookedAccOps + max_rows) > cmaxAccOps) {
|
||||
jam();
|
||||
errorCode = ScanFragRef::ZSCAN_BOOK_ACC_OP_ERROR;
|
||||
goto error_handler;
|
||||
|
@ -7445,7 +7453,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
|
|||
goto error_handler2;
|
||||
}//if
|
||||
cscanNoFreeRec--;
|
||||
cbookedAccOps += scanConcurrentOperations;
|
||||
cbookedAccOps += max_rows;
|
||||
|
||||
hashIndex = (tcConnectptr.p->transid[0] ^ tcConnectptr.p->tcOprec) & 1023;
|
||||
nextHashptr.i = ctransidHash[hashIndex];
|
||||
|
@ -7861,7 +7869,7 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
|
|||
************************************************************ */
|
||||
if (scanptr.p->scanCompletedStatus == ZTRUE) {
|
||||
if ((scanptr.p->scanLockHold == ZTRUE) &&
|
||||
(scanptr.p->scanCompletedOperations > 0)) {
|
||||
(scanptr.p->m_curr_batch_size_rows > 0)) {
|
||||
jam();
|
||||
scanptr.p->scanReleaseCounter = 1;
|
||||
scanReleaseLocksLab(signal);
|
||||
|
@ -7872,7 +7880,7 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
|
|||
return;
|
||||
}//if
|
||||
|
||||
if (scanptr.p->scanCompletedOperations > 0) {
|
||||
if (scanptr.p->m_curr_batch_size_rows > 0) {
|
||||
jam();
|
||||
scanptr.p->scanCompletedStatus = ZTRUE;
|
||||
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
|
||||
|
@ -7892,7 +7900,7 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
|
|||
if (scanptr.p->scanCompletedStatus == ZTRUE) {
|
||||
releaseActiveFrag(signal);
|
||||
if ((scanptr.p->scanLockHold == ZTRUE) &&
|
||||
(scanptr.p->scanCompletedOperations > 0)) {
|
||||
(scanptr.p->m_curr_batch_size_rows > 0)) {
|
||||
jam();
|
||||
scanptr.p->scanReleaseCounter = 1;
|
||||
scanReleaseLocksLab(signal);
|
||||
|
@ -7903,7 +7911,7 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
|
|||
return;
|
||||
}//if
|
||||
|
||||
if (scanptr.p->scanCompletedOperations > 0) {
|
||||
if (scanptr.p->m_curr_batch_size_rows > 0) {
|
||||
jam();
|
||||
releaseActiveFrag(signal);
|
||||
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
|
||||
|
@ -7923,7 +7931,7 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
|
|||
}//if
|
||||
jam();
|
||||
set_acc_ptr_in_scan_record(scanptr.p,
|
||||
scanptr.p->scanCompletedOperations,
|
||||
scanptr.p->m_curr_batch_size_rows,
|
||||
nextScanConf->accOperationPtr);
|
||||
jam();
|
||||
scanptr.p->scanLocalref[0] = nextScanConf->localKey[0];
|
||||
|
@ -7958,7 +7966,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
|
|||
releaseActiveFrag(signal);
|
||||
releaseOprec(signal);
|
||||
if ((scanptr.p->scanLockHold == ZTRUE) &&
|
||||
(scanptr.p->scanCompletedOperations > 0)) {
|
||||
(scanptr.p->m_curr_batch_size_rows > 0)) {
|
||||
jam();
|
||||
scanptr.p->scanReleaseCounter = 1;
|
||||
scanReleaseLocksLab(signal);
|
||||
|
@ -8084,7 +8092,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
|
|||
* --------------------------------------------------------------------- */
|
||||
releaseOprec(signal);
|
||||
if ((scanptr.p->scanLockHold == ZTRUE) &&
|
||||
(scanptr.p->scanCompletedOperations > 0)) {
|
||||
(scanptr.p->m_curr_batch_size_rows > 0)) {
|
||||
jam();
|
||||
scanptr.p->scanReleaseCounter = 1;
|
||||
scanReleaseLocksLab(signal);
|
||||
|
@ -8101,10 +8109,10 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
|
|||
|
||||
tdata4 += tcConnectptr.p->primKeyLen;// Inform API about keyinfo len aswell
|
||||
}//if
|
||||
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
|
||||
scanptr.p->scan_batch_len+= tdata4;
|
||||
scanptr.p->scanCompletedOperations++;
|
||||
if (check_scan_batch_completed(scanptr.p)) {
|
||||
ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
|
||||
scanptr.p->m_curr_batch_size_bytes+= tdata4;
|
||||
scanptr.p->m_curr_batch_size_rows++;
|
||||
if (scanptr.p->check_scan_batch_completed()){
|
||||
if (scanptr.p->scanLockHold == ZTRUE) {
|
||||
jam();
|
||||
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
|
||||
|
@ -8112,7 +8120,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
|
|||
return;
|
||||
} else {
|
||||
jam();
|
||||
scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations;
|
||||
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
|
||||
scanReleaseLocksLab(signal);
|
||||
return;
|
||||
}
|
||||
|
@ -8166,12 +8174,12 @@ void Dblqh::continueScanAfterBlockedLab(Signal* signal)
|
|||
jam();
|
||||
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
|
||||
accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
|
||||
scanptr.p->scanCompletedOperations,
|
||||
scanptr.p->m_curr_batch_size_rows,
|
||||
false);
|
||||
} else if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_COMMIT) {
|
||||
jam();
|
||||
accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
|
||||
scanptr.p->scanCompletedOperations,
|
||||
scanptr.p->m_curr_batch_size_rows - 1,
|
||||
false);
|
||||
} else {
|
||||
jam();
|
||||
|
@ -8206,7 +8214,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
|
|||
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
|
||||
* --------------------------------------------------------------------- */
|
||||
if ((scanptr.p->scanLockHold == ZTRUE) &&
|
||||
(scanptr.p->scanCompletedOperations > 0)) {
|
||||
(scanptr.p->m_curr_batch_size_rows > 0)) {
|
||||
jam();
|
||||
scanptr.p->scanReleaseCounter = 1;
|
||||
scanReleaseLocksLab(signal);
|
||||
|
@ -8227,8 +8235,8 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
|
|||
scanptr.p->scanReleaseCounter = 1;
|
||||
} else {
|
||||
jam();
|
||||
scanptr.p->scanCompletedOperations++;
|
||||
scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations;
|
||||
scanptr.p->m_curr_batch_size_rows++;
|
||||
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
|
||||
}//if
|
||||
/* --------------------------------------------------------------------
|
||||
* WE NEED TO RELEASE ALL LOCKS CURRENTLY
|
||||
|
@ -8238,7 +8246,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
|
|||
return;
|
||||
}//if
|
||||
Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount;
|
||||
if (scanptr.p->scanCompletedOperations > 0) {
|
||||
if (scanptr.p->m_curr_batch_size_rows > 0) {
|
||||
if (time_passed > 1) {
|
||||
/* -----------------------------------------------------------------------
|
||||
* WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A
|
||||
|
@ -8246,7 +8254,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
|
|||
* THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we
|
||||
* send the found tuples to the API.
|
||||
* ----------------------------------------------------------------------- */
|
||||
scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations + 1;
|
||||
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows + 1;
|
||||
scanReleaseLocksLab(signal);
|
||||
return;
|
||||
}
|
||||
|
@ -8360,8 +8368,8 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
|
|||
ScanFragRef::SignalLength, JBB);
|
||||
} else {
|
||||
jam();
|
||||
scanptr.p->scanCompletedOperations = 0;
|
||||
scanptr.p->scan_batch_len= 0;
|
||||
scanptr.p->m_curr_batch_size_rows = 0;
|
||||
scanptr.p->m_curr_batch_size_bytes= 0;
|
||||
sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
|
||||
}//if
|
||||
release_acc_ptr_list(scanptr.p);
|
||||
|
@ -8381,7 +8389,8 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
|
|||
Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
||||
{
|
||||
const Uint32 reqinfo = scanFragReq->requestInfo;
|
||||
const Uint32 scanConcurrentOperations = scanFragReq->concurrency;
|
||||
const Uint32 max_rows = scanFragReq->batch_size_rows;
|
||||
const Uint32 max_bytes = scanFragReq->batch_size_bytes;
|
||||
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
|
||||
const Uint32 scanLockHold = ScanFragReq::getHoldLockFlag(reqinfo);
|
||||
const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
|
||||
|
@ -8398,12 +8407,12 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
|||
scanptr.p->scanAiLength = attrLen;
|
||||
scanptr.p->scanTcrec = tcConnectptr.i;
|
||||
scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion;
|
||||
scanptr.p->scanCompletedOperations = 0;
|
||||
scanptr.p->scan_batch_len= 0;
|
||||
scanptr.p->scanConcurrentOperations = scanFragReq->first_batch_size;
|
||||
scanptr.p->batch_size= scanConcurrentOperations;
|
||||
scanptr.p->batch_byte_size= scanFragReq->batch_byte_size;
|
||||
scanptr.p->first_batch_size= scanFragReq->first_batch_size;
|
||||
|
||||
scanptr.p->m_curr_batch_size_rows = 0;
|
||||
scanptr.p->m_curr_batch_size_bytes= 0;
|
||||
scanptr.p->m_max_batch_size_rows = max_rows;
|
||||
scanptr.p->m_max_batch_size_bytes = max_bytes;
|
||||
|
||||
scanptr.p->scanErrorCounter = 0;
|
||||
scanptr.p->scanLockMode = scanLockMode;
|
||||
scanptr.p->readCommitted = readCommitted;
|
||||
|
@ -8417,12 +8426,11 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
|||
scanptr.p->scanNumber = ~0;
|
||||
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
|
||||
|
||||
if ((scanptr.p->scanConcurrentOperations == 0) ||
|
||||
(scanptr.p->scanConcurrentOperations > scanptr.p->batch_size)) {
|
||||
if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){
|
||||
jam();
|
||||
return ScanFragRef::ZWRONG_BATCH_SIZE;
|
||||
}
|
||||
if (!seize_acc_ptr_list(scanptr.p, scanConcurrentOperations)) {
|
||||
if (!seize_acc_ptr_list(scanptr.p, max_rows)){
|
||||
jam();
|
||||
return ScanFragRef::ZTOO_MANY_ACTIVE_SCAN_ERROR;
|
||||
}
|
||||
|
@ -8607,7 +8615,7 @@ void Dblqh::releaseScanrec(Signal* signal)
|
|||
scanptr.p->scanState = ScanRecord::SCAN_FREE;
|
||||
scanptr.p->scanType = ScanRecord::ST_IDLE;
|
||||
scanptr.p->scanTcWaiting = ZFALSE;
|
||||
cbookedAccOps -= scanptr.p->scanConcurrentOperations;
|
||||
cbookedAccOps -= scanptr.p->m_max_batch_size_rows;
|
||||
cscanNoFreeRec++;
|
||||
}//Dblqh::releaseScanrec()
|
||||
|
||||
|
@ -8619,7 +8627,7 @@ void Dblqh::sendKeyinfo20(Signal* signal,
|
|||
ScanRecord * scanP,
|
||||
TcConnectionrec * tcConP)
|
||||
{
|
||||
ndbrequire(scanP->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
|
||||
ndbrequire(scanP->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
|
||||
KeyInfo20 * keyInfo = (KeyInfo20 *)&signal->theData[0];
|
||||
|
||||
DatabufPtr TdataBuf;
|
||||
|
@ -8633,7 +8641,7 @@ void Dblqh::sendKeyinfo20(Signal* signal,
|
|||
*/
|
||||
ndbrequire(keyLen * 4 <= sizeof(signal->theData));
|
||||
const BlockReference ref = scanP->scanApiBlockref;
|
||||
const Uint32 scanOp = scanP->scanCompletedOperations;
|
||||
const Uint32 scanOp = scanP->m_curr_batch_size_rows;
|
||||
const Uint32 nodeId = refToNode(ref);
|
||||
const bool connectedToNode = getNodeInfo(nodeId).m_connected;
|
||||
const Uint32 type = getNodeInfo(nodeId).m_type;
|
||||
|
@ -8742,9 +8750,8 @@ void Dblqh::sendKeyinfo20(Signal* signal,
|
|||
* ------------------------------------------------------------------------ */
|
||||
void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
|
||||
{
|
||||
Uint32 completed_ops= scanptr.p->scanCompletedOperations;
|
||||
Uint32 total_len= scanptr.p->scan_batch_len;
|
||||
scanptr.p->scanConcurrentOperations= scanptr.p->batch_size;
|
||||
Uint32 completed_ops= scanptr.p->m_curr_batch_size_rows;
|
||||
Uint32 total_len= scanptr.p->m_curr_batch_size_bytes;
|
||||
scanptr.p->scanTcWaiting = ZFALSE;
|
||||
|
||||
if(ERROR_INSERTED(5037)){
|
||||
|
@ -8859,11 +8866,11 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
|
|||
/* ------------------------------------------------------------------------- */
|
||||
// We keep track of how many operation records in ACC that has been booked.
|
||||
// Copy fragment has records always booked and thus need not book any. The
|
||||
// most operations in parallel use is the scanConcurrentOperations.
|
||||
// most operations in parallel use is the m_max_batch_size_rows.
|
||||
// This variable has to be set-up here since it is used by releaseScanrec
|
||||
// to unbook operation records in ACC.
|
||||
/* ------------------------------------------------------------------------- */
|
||||
scanptr.p->scanConcurrentOperations = 0;
|
||||
scanptr.p->m_max_batch_size_rows = 0;
|
||||
scanptr.p->rangeScan = 0;
|
||||
seizeTcrec();
|
||||
|
||||
|
@ -18260,8 +18267,8 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
|
|||
infoEvent(" copyptr=%d, ailen=%d, complOps=%d, concurrOps=%d",
|
||||
sp.p->copyPtr,
|
||||
sp.p->scanAiLength,
|
||||
sp.p->scanCompletedOperations,
|
||||
sp.p->scanConcurrentOperations);
|
||||
sp.p->m_curr_batch_size_rows,
|
||||
sp.p->m_max_batch_size_rows);
|
||||
infoEvent(" errCnt=%d, localFid=%d, schV=%d",
|
||||
sp.p->scanErrorCounter,
|
||||
sp.p->scanLocalFragid,
|
||||
|
|
|
@ -9264,7 +9264,8 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
|
|||
nextReq->closeFlag = ZFALSE;
|
||||
nextReq->transId1 = apiConnectptr.p->transid[0];
|
||||
nextReq->transId2 = apiConnectptr.p->transid[1];
|
||||
|
||||
nextReq->batch_size_bytes= scanP->batch_byte_size;
|
||||
|
||||
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
|
||||
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
|
||||
for(Uint32 i = 0 ; i<len; i++){
|
||||
|
@ -9278,6 +9279,8 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
|
|||
|
||||
scanFragptr.p->m_ops = 0;
|
||||
nextReq->senderData = scanFragptr.i;
|
||||
nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency;
|
||||
|
||||
sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
|
||||
ScanFragNextReq::SignalLength, JBB);
|
||||
delivered.remove(scanFragptr);
|
||||
|
@ -9487,10 +9490,9 @@ void Dbtc::sendScanFragReq(Signal* signal,
|
|||
req->savePointId = apiConnectptr.p->currSavePointId;
|
||||
req->transId1 = apiConnectptr.p->transid[0];
|
||||
req->transId2 = apiConnectptr.p->transid[1];
|
||||
req->concurrency= scanFragP->scanFragConcurrency;
|
||||
req->clientOpPtr = scanFragP->m_apiPtr;
|
||||
req->batch_byte_size= scanP->batch_byte_size;
|
||||
req->first_batch_size= scanP->first_batch_size;
|
||||
req->batch_size_rows= scanFragP->scanFragConcurrency;
|
||||
req->batch_size_bytes= scanP->batch_byte_size;
|
||||
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
|
||||
ScanFragReq::SignalLength, JBB);
|
||||
updateBuddyTimer(apiConnectptr);
|
||||
|
|
|
@ -1880,7 +1880,6 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
|
|||
req->tableId = tabPtr.p->m_tableId;
|
||||
req->requestInfo = 0;
|
||||
req->savePointId = 0;
|
||||
//ScanFragReq::setConcurrency(req->requestInfo, parallelism);
|
||||
ScanFragReq::setLockMode(req->requestInfo, 0);
|
||||
ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
|
||||
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
|
||||
|
@ -1889,12 +1888,11 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
|
|||
req->schemaVersion = tabPtr.p->m_schemaVersion;
|
||||
req->transId1 = 0;
|
||||
req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
|
||||
|
||||
for(unsigned int i = 0; i<parallelism; i++){
|
||||
//req->clientOpPtr[i] = (ptrI << 16) + (i + 1);
|
||||
req->clientOpPtr = (ptrI << 16) + (i + 1);
|
||||
}
|
||||
suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 25, JBB);
|
||||
req->clientOpPtr = (ptrI << 16);
|
||||
req->batch_size_rows= 16;
|
||||
req->batch_size_bytes= 0;
|
||||
suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal,
|
||||
ScanFragReq::SignalLength, JBB);
|
||||
|
||||
signal->theData[0] = ptrI;
|
||||
signal->theData[1] = 0;
|
||||
|
@ -1996,6 +1994,8 @@ SumaParticipant::execSUB_SYNC_CONTINUE_CONF(Signal* signal){
|
|||
req->closeFlag = 0;
|
||||
req->transId1 = 0;
|
||||
req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
|
||||
req->batch_size_rows = 16;
|
||||
req->batch_size_bytes = 0;
|
||||
sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
|
||||
ScanFragNextReq::SignalLength, JBB);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue