mirror of
https://github.com/MariaDB/server.git
synced 2025-01-18 04:53:01 +01:00
Fix bugs + include check of batch_byte_size and
use of first_batch_size ndb/include/kernel/ndb_limits.h: New maximum size ndb/include/kernel/signaldata/ScanFrag.hpp: New error code ndb/include/kernel/signaldata/ScanTab.hpp: Need to go to Uint16 when batch size > 255 ndb/src/kernel/blocks/dblqh/Dblqh.hpp: Uint8 => Uint16 when batch_size > 255 New and changed methods for acc ptr's and checking end of scan batch (incl. check of batch_byte_size ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: Uint8 => Uint16 when batch_size > 255 New and changed methods for acc ptr's and checking end of scan batch (incl. check of batch_byte_size
This commit is contained in:
parent
6a908c4322
commit
f03022b22a
5 changed files with 84 additions and 48 deletions
|
@ -68,7 +68,7 @@
|
|||
* API can order a multiple of this number of records at a time since
|
||||
* fragments can be scanned in parallel.
|
||||
*/
|
||||
#define MAX_PARALLEL_OP_PER_SCAN 512
|
||||
#define MAX_PARALLEL_OP_PER_SCAN 992
|
||||
/*
|
||||
* When calculating the number of records sent from LQH in each batch
|
||||
* one uses SCAN_BATCH_SIZE divided by the expected size of signals
|
||||
|
|
|
@ -143,6 +143,7 @@ public:
|
|||
ZSCAN_NO_FRAGMENT_ERROR = 487,
|
||||
ZTOO_MANY_ACTIVE_SCAN_ERROR = 488,
|
||||
ZNO_FREE_SCANREC_ERROR = 489,
|
||||
ZWRONG_BATCH_SIZE = 1230,
|
||||
ZSTANDBY_SCAN_ERROR = 1209,
|
||||
ZSCAN_BOOK_ACC_OP_ERROR = 1219,
|
||||
ZUNKNOWN_TRANS_ERROR = 1227
|
||||
|
|
|
@ -74,7 +74,7 @@ private:
|
|||
static Uint8 getHoldLockFlag(const UintR & requestInfo);
|
||||
static Uint8 getReadCommittedFlag(const UintR & requestInfo);
|
||||
static Uint8 getRangeScanFlag(const UintR & requestInfo);
|
||||
static Uint8 getScanBatch(const UintR & requestInfo);
|
||||
static Uint16 getScanBatch(const UintR & requestInfo);
|
||||
|
||||
/**
|
||||
* Set:ers for requestInfo
|
||||
|
@ -152,9 +152,9 @@ ScanTabReq::getRangeScanFlag(const UintR & requestInfo){
|
|||
}
|
||||
|
||||
inline
|
||||
Uint8
|
||||
Uint16
|
||||
ScanTabReq::getScanBatch(const Uint32 & requestInfo){
|
||||
return (Uint8)((requestInfo >> SCAN_BATCH_SHIFT) & SCAN_BATCH_MASK);
|
||||
return (Uint16)((requestInfo >> SCAN_BATCH_SHIFT) & SCAN_BATCH_MASK);
|
||||
}
|
||||
|
||||
inline
|
||||
|
|
|
@ -538,8 +538,10 @@ public:
|
|||
UintR scanApiOpPtr;
|
||||
UintR scanLocalref[2];
|
||||
Uint32 scan_batch_len;
|
||||
Uint32 batch_size;
|
||||
Uint32 first_batch_size;
|
||||
Uint32 batch_byte_size;
|
||||
|
||||
UintR copyPtr;
|
||||
union {
|
||||
Uint32 nextPool;
|
||||
|
@ -569,14 +571,15 @@ public:
|
|||
ScanType scanType;
|
||||
BlockReference scanApiBlockref;
|
||||
NodeId scanNodeId;
|
||||
Uint16 scanReleaseCounter;
|
||||
Uint16 scanNumber;
|
||||
|
||||
Uint8 scanCompletedStatus;
|
||||
Uint8 scanFlag;
|
||||
Uint8 scanLockHold;
|
||||
Uint8 scanLockMode;
|
||||
Uint8 readCommitted;
|
||||
Uint8 rangeScan;
|
||||
Uint8 scanNumber;
|
||||
Uint8 scanReleaseCounter;
|
||||
Uint8 scanTcWaiting;
|
||||
Uint8 scanKeyinfoFlag;
|
||||
}; // Size 272 bytes
|
||||
|
@ -2225,9 +2228,10 @@ private:
|
|||
void init_acc_ptr_list(ScanRecord*);
|
||||
bool seize_acc_ptr_list(ScanRecord*, Uint32);
|
||||
void release_acc_ptr_list(ScanRecord*);
|
||||
Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32);
|
||||
Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32, bool);
|
||||
void set_acc_ptr_in_scan_record(ScanRecord*, Uint32, Uint32);
|
||||
void get_acc_ptr(ScanRecord*, Uint32*, Uint32);
|
||||
void i_get_acc_ptr(ScanRecord*, Uint32*&, Uint32);
|
||||
bool check_scan_batch_completed(ScanRecord*);
|
||||
|
||||
void removeTable(Uint32 tableId);
|
||||
void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId);
|
||||
|
@ -2926,4 +2930,11 @@ public:
|
|||
DLHashTable<ScanRecord> c_scanTakeOverHash;
|
||||
};
|
||||
|
||||
inline
|
||||
bool
|
||||
Dblqh::check_scan_batch_completed(ScanRecord* scanP)
|
||||
{
|
||||
return (scanP->scanCompletedOperations == scanP->scanConcurrentOperations) ||
|
||||
(scanP->scan_batch_len >= scanP->batch_byte_size);
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -3581,7 +3581,9 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
|
|||
takeOverErrorLab(signal);
|
||||
return;
|
||||
}//if
|
||||
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, ttcScanOp);
|
||||
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
|
||||
ttcScanOp,
|
||||
true);
|
||||
if (accOpPtr == RNIL) {
|
||||
jam();
|
||||
releaseActiveFrag(signal);
|
||||
|
@ -7023,7 +7025,9 @@ void Dblqh::continueScanReleaseAfterBlockedLab(Signal* signal)
|
|||
scanptr.p->scanState = ScanRecord::WAIT_RELEASE_LOCK;
|
||||
signal->theData[0] = scanptr.p->scanAccPtr;
|
||||
signal->theData[1]=
|
||||
get_acc_ptr_from_scan_record(scanptr.p, scanptr.p->scanReleaseCounter -1);
|
||||
get_acc_ptr_from_scan_record(scanptr.p,
|
||||
scanptr.p->scanReleaseCounter -1,
|
||||
false);
|
||||
signal->theData[2] = NextScanReq::ZSCAN_COMMIT;
|
||||
if (! scanptr.p->rangeScan)
|
||||
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
|
||||
|
@ -7181,8 +7185,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
|
|||
(scanptr.p->scanCompletedStatus == ZTRUE)) {
|
||||
jam();
|
||||
closeScanLab(signal);
|
||||
} else if ((scanptr.p->scanConcurrentOperations ==
|
||||
scanptr.p->scanCompletedOperations) &&
|
||||
} else if (check_scan_batch_completed(scanptr.p) &&
|
||||
scanptr.p->scanLockHold != ZTRUE) {
|
||||
jam();
|
||||
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
|
||||
|
@ -7279,31 +7282,36 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP)
|
|||
|
||||
inline
|
||||
void
|
||||
Dblqh::get_acc_ptr(ScanRecord* scanP, Uint32 *acc_ptr, Uint32 index)
|
||||
Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
|
||||
{
|
||||
if (index == 0) {
|
||||
jam();
|
||||
acc_ptr= &scanP->scan_acc_op_ptr[0];
|
||||
acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
|
||||
} else {
|
||||
Uint32 attr_buf_index, attr_buf_rec;
|
||||
|
||||
AttrbufPtr regAttrPtr;
|
||||
jam();
|
||||
attr_buf_rec= (index + 30) / 32;
|
||||
attr_buf_rec= (index + 31) / 32;
|
||||
attr_buf_index= (index - 1) & 31;
|
||||
regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
|
||||
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
|
||||
acc_ptr= ®AttrPtr.p->attrbuf[attr_buf_index];
|
||||
acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index];
|
||||
}
|
||||
}
|
||||
|
||||
Uint32
|
||||
Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index)
|
||||
Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP,
|
||||
Uint32 index,
|
||||
bool crash_flag)
|
||||
{
|
||||
Uint32 *acc_ptr;
|
||||
Uint32* acc_ptr;
|
||||
Uint32 attr_buf_rec, attr_buf_index;
|
||||
ndbrequire((index < MAX_PARALLEL_OP_PER_SCAN) &&
|
||||
index < scanP->scan_acc_index);
|
||||
get_acc_ptr(scanP, acc_ptr, index);
|
||||
if (!((index < MAX_PARALLEL_OP_PER_SCAN) &&
|
||||
index < scanP->scan_acc_index)) {
|
||||
ndbrequire(crash_flag);
|
||||
return RNIL;
|
||||
}
|
||||
i_get_acc_ptr(scanP, acc_ptr, index);
|
||||
return *acc_ptr;
|
||||
}
|
||||
|
||||
|
@ -7315,7 +7323,7 @@ Dblqh::set_acc_ptr_in_scan_record(ScanRecord* scanP,
|
|||
ndbrequire((index == 0 || scanP->scan_acc_index == index) &&
|
||||
(index < MAX_PARALLEL_OP_PER_SCAN));
|
||||
scanP->scan_acc_index= index + 1;
|
||||
get_acc_ptr(scanP, acc_ptr, index);
|
||||
i_get_acc_ptr(scanP, acc_ptr, index);
|
||||
*acc_ptr= acc;
|
||||
}
|
||||
|
||||
|
@ -7882,10 +7890,11 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
|
|||
GSN_ACC_CHECK_SCAN, signal, 2, JBB);
|
||||
return;
|
||||
}//if
|
||||
|
||||
jam();
|
||||
set_acc_ptr_in_scan_record(scanptr.p,
|
||||
scanptr.p->scanCompletedOperations,
|
||||
nextScanConf->accOperationPtr);
|
||||
jam();
|
||||
scanptr.p->scanLocalref[0] = nextScanConf->localKey[0];
|
||||
scanptr.p->scanLocalref[1] = nextScanConf->localKey[1];
|
||||
scanptr.p->scanLocalFragid = nextScanConf->fragId;
|
||||
|
@ -7904,6 +7913,7 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
|
|||
return;
|
||||
}//if
|
||||
}//if
|
||||
jam();
|
||||
nextScanConfLoopLab(signal);
|
||||
}//Dblqh::nextScanConfScanLab()
|
||||
|
||||
|
@ -7926,7 +7936,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
|
|||
closeScanLab(signal);
|
||||
return;
|
||||
}//if
|
||||
|
||||
jam();
|
||||
Uint32 tableRef;
|
||||
Uint32 tupFragPtr;
|
||||
Uint32 reqinfo = (scanptr.p->scanLockHold == ZFALSE);
|
||||
|
@ -7960,6 +7970,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
|
|||
}//if
|
||||
}
|
||||
{
|
||||
jam();
|
||||
TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend();
|
||||
|
||||
tupKeyReq->connectPtr = tcConnectptr.p->tupConnectrec;
|
||||
|
@ -8062,26 +8073,27 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
|
|||
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
|
||||
scanptr.p->scan_batch_len+= tdata4;
|
||||
scanptr.p->scanCompletedOperations++;
|
||||
if ((scanptr.p->scanCompletedOperations ==
|
||||
scanptr.p->scanConcurrentOperations) &&
|
||||
(scanptr.p->scanLockHold == ZTRUE)) {
|
||||
jam();
|
||||
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
|
||||
sendScanFragConf(signal, ZFALSE);
|
||||
return;
|
||||
} else if (scanptr.p->scanCompletedOperations ==
|
||||
scanptr.p->scanConcurrentOperations) {
|
||||
jam();
|
||||
scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations;
|
||||
scanReleaseLocksLab(signal);
|
||||
return;
|
||||
} else if (scanptr.p->scanLockHold == ZTRUE) {
|
||||
jam();
|
||||
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
|
||||
if (check_scan_batch_completed(scanptr.p)) {
|
||||
if (scanptr.p->scanLockHold == ZTRUE) {
|
||||
jam();
|
||||
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
|
||||
sendScanFragConf(signal, ZFALSE);
|
||||
return;
|
||||
} else {
|
||||
jam();
|
||||
scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations;
|
||||
scanReleaseLocksLab(signal);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
jam();
|
||||
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
|
||||
}//if
|
||||
if (scanptr.p->scanLockHold == ZTRUE) {
|
||||
jam();
|
||||
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
|
||||
} else {
|
||||
jam();
|
||||
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
|
||||
}
|
||||
}
|
||||
scanNextLoopLab(signal);
|
||||
}//Dblqh::scanTupkeyConfLab()
|
||||
|
||||
|
@ -8123,11 +8135,13 @@ void Dblqh::continueScanAfterBlockedLab(Signal* signal)
|
|||
jam();
|
||||
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
|
||||
accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
|
||||
scanptr.p->scanCompletedOperations);
|
||||
scanptr.p->scanCompletedOperations,
|
||||
false);
|
||||
} else if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_COMMIT) {
|
||||
jam();
|
||||
accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
|
||||
scanptr.p->scanCompletedOperations);
|
||||
scanptr.p->scanCompletedOperations,
|
||||
false);
|
||||
} else {
|
||||
jam();
|
||||
accOpPtr = RNIL; // The value is not used in ACC
|
||||
|
@ -8355,7 +8369,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
|||
scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion;
|
||||
scanptr.p->scanCompletedOperations = 0;
|
||||
scanptr.p->scan_batch_len= 0;
|
||||
scanptr.p->scanConcurrentOperations = scanConcurrentOperations;
|
||||
scanptr.p->scanConcurrentOperations = scanFragReq->first_batch_size;
|
||||
scanptr.p->batch_size= scanConcurrentOperations;
|
||||
scanptr.p->batch_byte_size= scanFragReq->batch_byte_size;
|
||||
scanptr.p->first_batch_size= scanFragReq->first_batch_size;
|
||||
scanptr.p->scanErrorCounter = 0;
|
||||
|
@ -8371,6 +8386,14 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
|||
scanptr.p->scanNumber = ~0;
|
||||
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
|
||||
|
||||
ndbout << "batch_size = " << scanptr.p->batch_size;
|
||||
ndbout << " first_batch_size = " << scanptr.p->scanConcurrentOperations;
|
||||
ndbout << endl;
|
||||
if ((scanptr.p->scanConcurrentOperations == 0) ||
|
||||
(scanptr.p->scanConcurrentOperations > scanptr.p->batch_size)) {
|
||||
jam();
|
||||
return ScanFragRef::ZWRONG_BATCH_SIZE;
|
||||
}
|
||||
if (!seize_acc_ptr_list(scanptr.p, scanConcurrentOperations)) {
|
||||
jam();
|
||||
return ScanFragRef::ZTOO_MANY_ACTIVE_SCAN_ERROR;
|
||||
|
@ -8693,6 +8716,7 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
|
|||
{
|
||||
Uint32 completed_ops= scanptr.p->scanCompletedOperations;
|
||||
Uint32 total_len= scanptr.p->scan_batch_len;
|
||||
scanptr.p->scanConcurrentOperations= scanptr.p->batch_size;
|
||||
scanptr.p->scanTcWaiting = ZFALSE;
|
||||
|
||||
if(ERROR_INSERTED(5037)){
|
||||
|
@ -9276,7 +9300,7 @@ void Dblqh::continueCopyAfterBlockedLab(Signal* signal)
|
|||
scanptr.i = tcConnectptr.p->tcScanRec;
|
||||
c_scanRecordPool.getPtr(scanptr);
|
||||
tcConnectptr.p->errorCode = 0;
|
||||
Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0);
|
||||
Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0, false);
|
||||
signal->theData[0] = scanptr.p->scanAccPtr;
|
||||
signal->theData[1] = acc_op_ptr;
|
||||
signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT;
|
||||
|
|
Loading…
Reference in a new issue