mirror of
https://github.com/MariaDB/server.git
synced 2025-01-31 11:01:52 +01:00
ndb - wl-2455 TUP scan for testing in 5.0
ndb/include/ndbapi/NdbIndexScanOperation.hpp: fix readTuples signature ndb/include/ndbapi/NdbScanOperation.hpp: fix readTuples signature ndb/src/ndbapi/NdbScanOperation.cpp: fix readTuples signature ndb/test/include/HugoTransactions.hpp: fix readTuples signature ndb/test/ndbapi/testScanPerf.cpp: fix readTuples signature ndb/test/src/HugoOperations.cpp: fix readTuples signature ndb/test/src/HugoTransactions.cpp: fix readTuples signature ndb/test/ndbapi/testOIBasic.cpp: fix readTuples signature ndb/include/kernel/signaldata/AccScan.hpp: TUP scan for testing in 5.0 ndb/include/kernel/signaldata/NextScan.hpp: TUP scan for testing in 5.0 ndb/include/kernel/signaldata/ScanFrag.hpp: TUP scan for testing in 5.0 ndb/include/kernel/signaldata/ScanTab.hpp: TUP scan for testing in 5.0 ndb/src/common/debugger/signaldata/ScanTab.cpp: TUP scan for testing in 5.0 ndb/src/kernel/blocks/dblqh/Dblqh.hpp: TUP scan for testing in 5.0 ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: TUP scan for testing in 5.0 ndb/src/kernel/blocks/dbtc/DbtcMain.cpp: TUP scan for testing in 5.0 ndb/src/kernel/blocks/dbtup/Dbtup.hpp: TUP scan for testing in 5.0 ndb/src/kernel/blocks/dbtup/DbtupGen.cpp: TUP scan for testing in 5.0 ndb/src/kernel/blocks/dbtup/Makefile.am: TUP scan for testing in 5.0 ndb/test/ndbapi/testScan.cpp: TUP scan for testing in 5.0
This commit is contained in:
parent
5810ee1c91
commit
19cedc5e3d
20 changed files with 245 additions and 80 deletions
|
@ -34,6 +34,7 @@ class AccScanReq {
|
|||
*/
|
||||
friend class Dbacc;
|
||||
friend class Dbtux;
|
||||
friend class Dbtup;
|
||||
public:
|
||||
STATIC_CONST( SignalLength = 8 );
|
||||
|
||||
|
@ -120,6 +121,7 @@ class AccScanConf {
|
|||
*/
|
||||
friend class Dbacc;
|
||||
friend class Dbtux;
|
||||
friend class Dbtup;
|
||||
|
||||
/**
|
||||
* Reciver(s)
|
||||
|
@ -148,6 +150,7 @@ private:
|
|||
class AccCheckScan {
|
||||
friend class Dbacc;
|
||||
friend class Dbtux;
|
||||
friend class Dbtup;
|
||||
friend class Dblqh;
|
||||
enum {
|
||||
ZCHECK_LCP_STOP = 0,
|
||||
|
|
|
@ -23,6 +23,7 @@ class NextScanReq {
|
|||
friend class Dblqh;
|
||||
friend class Dbacc;
|
||||
friend class Dbtux;
|
||||
friend class Dbtup;
|
||||
public:
|
||||
// two sets of defs picked from lqh/acc
|
||||
enum ScanFlag {
|
||||
|
@ -50,6 +51,7 @@ private:
|
|||
class NextScanConf {
|
||||
friend class Dbacc;
|
||||
friend class Dbtux;
|
||||
friend class Dbtup;
|
||||
friend class Dblqh;
|
||||
public:
|
||||
// length is less if no keyinfo or no next result
|
||||
|
|
|
@ -57,6 +57,7 @@ public:
|
|||
static Uint32 getReadCommittedFlag(const Uint32 & requestInfo);
|
||||
static Uint32 getRangeScanFlag(const Uint32 & requestInfo);
|
||||
static Uint32 getDescendingFlag(const Uint32 & requestInfo);
|
||||
static Uint32 getTupScanFlag(const Uint32 & requestInfo);
|
||||
static Uint32 getAttrLen(const Uint32 & requestInfo);
|
||||
static Uint32 getScanPrio(const Uint32 & requestInfo);
|
||||
|
||||
|
@ -66,6 +67,7 @@ public:
|
|||
static void setReadCommittedFlag(Uint32 & requestInfo, Uint32 readCommitted);
|
||||
static void setRangeScanFlag(Uint32 & requestInfo, Uint32 rangeScan);
|
||||
static void setDescendingFlag(Uint32 & requestInfo, Uint32 descending);
|
||||
static void setTupScanFlag(Uint32 & requestInfo, Uint32 tupScan);
|
||||
static void setAttrLen(Uint32 & requestInfo, Uint32 attrLen);
|
||||
static void setScanPrio(Uint32& requestInfo, Uint32 prio);
|
||||
};
|
||||
|
@ -200,11 +202,12 @@ public:
|
|||
* r = read committed - 1 Bit 9
|
||||
* x = range scan - 1 Bit 6
|
||||
* z = descending - 1 Bit 10
|
||||
* t = tup scan -1 Bit 11 (implies x=z=0)
|
||||
* p = Scan prio - 4 Bits (12-15) -> max 15
|
||||
*
|
||||
* 1111111111222222222233
|
||||
* 01234567890123456789012345678901
|
||||
* lxhkrz ppppaaaaaaaaaaaaaaaa
|
||||
* lxhkrztppppaaaaaaaaaaaaaaaa
|
||||
*/
|
||||
#define SF_LOCK_MODE_SHIFT (5)
|
||||
#define SF_LOCK_MODE_MASK (1)
|
||||
|
@ -214,6 +217,7 @@ public:
|
|||
#define SF_READ_COMMITTED_SHIFT (9)
|
||||
#define SF_RANGE_SCAN_SHIFT (6)
|
||||
#define SF_DESCENDING_SHIFT (10)
|
||||
#define SF_TUP_SCAN_SHIFT (11)
|
||||
|
||||
#define SF_ATTR_LEN_SHIFT (16)
|
||||
#define SF_ATTR_LEN_MASK (65535)
|
||||
|
@ -251,6 +255,12 @@ ScanFragReq::getDescendingFlag(const Uint32 & requestInfo){
|
|||
return (requestInfo >> SF_DESCENDING_SHIFT) & 1;
|
||||
}
|
||||
|
||||
inline
|
||||
Uint32
|
||||
ScanFragReq::getTupScanFlag(const Uint32 & requestInfo){
|
||||
return (requestInfo >> SF_TUP_SCAN_SHIFT) & 1;
|
||||
}
|
||||
|
||||
inline
|
||||
Uint32
|
||||
ScanFragReq::getReadCommittedFlag(const Uint32 & requestInfo){
|
||||
|
@ -318,6 +328,13 @@ ScanFragReq::setDescendingFlag(UintR & requestInfo, UintR val){
|
|||
requestInfo |= (val << SF_DESCENDING_SHIFT);
|
||||
}
|
||||
|
||||
inline
|
||||
void
|
||||
ScanFragReq::setTupScanFlag(UintR & requestInfo, UintR val){
|
||||
ASSERT_BOOL(val, "ScanFragReq::setTupScanFlag");
|
||||
requestInfo |= (val << SF_TUP_SCAN_SHIFT);
|
||||
}
|
||||
|
||||
inline
|
||||
void
|
||||
ScanFragReq::setAttrLen(UintR & requestInfo, UintR val){
|
||||
|
|
|
@ -81,6 +81,7 @@ private:
|
|||
static Uint8 getReadCommittedFlag(const UintR & requestInfo);
|
||||
static Uint8 getRangeScanFlag(const UintR & requestInfo);
|
||||
static Uint8 getDescendingFlag(const UintR & requestInfo);
|
||||
static Uint8 getTupScanFlag(const UintR & requestInfo);
|
||||
static Uint8 getKeyinfoFlag(const UintR & requestInfo);
|
||||
static Uint16 getScanBatch(const UintR & requestInfo);
|
||||
static Uint8 getDistributionKeyFlag(const UintR & requestInfo);
|
||||
|
@ -95,6 +96,7 @@ private:
|
|||
static void setReadCommittedFlag(UintR & requestInfo, Uint32 flag);
|
||||
static void setRangeScanFlag(UintR & requestInfo, Uint32 flag);
|
||||
static void setDescendingFlag(UintR & requestInfo, Uint32 flag);
|
||||
static void setTupScanFlag(UintR & requestInfo, Uint32 flag);
|
||||
static void setKeyinfoFlag(UintR & requestInfo, Uint32 flag);
|
||||
static void setScanBatch(Uint32& requestInfo, Uint32 sz);
|
||||
static void setDistributionKeyFlag(Uint32& requestInfo, Uint32 flag);
|
||||
|
@ -108,6 +110,7 @@ private:
|
|||
h = Hold lock mode - 1 Bit 10
|
||||
c = Read Committed - 1 Bit 11
|
||||
k = Keyinfo - 1 Bit 12
|
||||
t = Tup scan - 1 Bit 13
|
||||
z = Descending (TUX) - 1 Bit 14
|
||||
x = Range Scan (TUX) - 1 Bit 15
|
||||
b = Scan batch - 10 Bit 16-25 (max 1023)
|
||||
|
@ -115,7 +118,7 @@ private:
|
|||
|
||||
1111111111222222222233
|
||||
01234567890123456789012345678901
|
||||
ppppppppl hck zxbbbbbbbbbb
|
||||
ppppppppl hcktzxbbbbbbbbbb
|
||||
*/
|
||||
|
||||
#define PARALLELL_SHIFT (0)
|
||||
|
@ -139,6 +142,9 @@ private:
|
|||
#define DESCENDING_SHIFT (14)
|
||||
#define DESCENDING_MASK (1)
|
||||
|
||||
#define TUP_SCAN_SHIFT (13)
|
||||
#define TUP_SCAN_MASK (1)
|
||||
|
||||
#define SCAN_BATCH_SHIFT (16)
|
||||
#define SCAN_BATCH_MASK (1023)
|
||||
|
||||
|
@ -180,6 +186,12 @@ ScanTabReq::getDescendingFlag(const UintR & requestInfo){
|
|||
return (Uint8)((requestInfo >> DESCENDING_SHIFT) & DESCENDING_MASK);
|
||||
}
|
||||
|
||||
inline
|
||||
Uint8
|
||||
ScanTabReq::getTupScanFlag(const UintR & requestInfo){
|
||||
return (Uint8)((requestInfo >> TUP_SCAN_SHIFT) & TUP_SCAN_MASK);
|
||||
}
|
||||
|
||||
inline
|
||||
Uint16
|
||||
ScanTabReq::getScanBatch(const Uint32 & requestInfo){
|
||||
|
@ -234,6 +246,13 @@ ScanTabReq::setDescendingFlag(UintR & requestInfo, Uint32 flag){
|
|||
requestInfo |= (flag << DESCENDING_SHIFT);
|
||||
}
|
||||
|
||||
inline
|
||||
void
|
||||
ScanTabReq::setTupScanFlag(UintR & requestInfo, Uint32 flag){
|
||||
ASSERT_BOOL(flag, "ScanTabReq::setTupScanFlag");
|
||||
requestInfo |= (flag << TUP_SCAN_SHIFT);
|
||||
}
|
||||
|
||||
inline
|
||||
void
|
||||
ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){
|
||||
|
|
|
@ -33,6 +33,17 @@ class NdbIndexScanOperation : public NdbScanOperation {
|
|||
#endif
|
||||
|
||||
public:
|
||||
/**
|
||||
* readTuples using ordered index
|
||||
*
|
||||
* @param lock_mode Lock mode
|
||||
* @param scan_flags see @ref ScanFlag
|
||||
* @param parallel No of fragments to scan in parallel (0=max)
|
||||
*/
|
||||
virtual int readTuples(LockMode lock_mode = LM_Read,
|
||||
Uint32 scan_flags = 0, Uint32 parallel = 0);
|
||||
|
||||
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
||||
/**
|
||||
* readTuples using ordered index
|
||||
*
|
||||
|
@ -45,12 +56,19 @@ public:
|
|||
* @returns 0 for success and -1 for failure
|
||||
* @see NdbScanOperation::readTuples
|
||||
*/
|
||||
int readTuples(LockMode lock_mode = LM_Read,
|
||||
Uint32 batch = 0,
|
||||
Uint32 parallel = 0,
|
||||
bool order_by = false,
|
||||
inline int readTuples(LockMode lock_mode,
|
||||
Uint32 batch,
|
||||
Uint32 parallel,
|
||||
bool order_by,
|
||||
bool order_desc = false,
|
||||
bool read_range_no = false);
|
||||
bool read_range_no = false) {
|
||||
Uint32 scan_flags =
|
||||
(SF_OrderBy & -(Int32)order_by) |
|
||||
(SF_Descending & -(Int32)order_desc) |
|
||||
(SF_ReadRangeNo & -(Int32)read_range_no);
|
||||
return readTuples(lock_mode, scan_flags, parallel);
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Type of ordered index key bound. The values (0-4) will not change
|
||||
|
|
|
@ -36,6 +36,29 @@ class NdbScanOperation : public NdbOperation {
|
|||
#endif
|
||||
|
||||
public:
|
||||
/**
|
||||
* Scan flags. OR-ed together and passed as second argument to
|
||||
* readTuples.
|
||||
*/
|
||||
enum ScanFlag {
|
||||
SF_TupScan = (1 << 16), // scan TUP - only LM_CommittedRead
|
||||
SF_OrderBy = (1 << 24), // index scan in order
|
||||
SF_Descending = (2 << 24), // index scan in descending order
|
||||
SF_ReadRangeNo = (4 << 24) // enable @ref get_range_no
|
||||
};
|
||||
|
||||
/**
|
||||
* readTuples
|
||||
*
|
||||
* @param lock_mode Lock mode
|
||||
* @param scan_flags see @ref ScanFlag
|
||||
* @param parallel No of fragments to scan in parallel (0=max)
|
||||
*/
|
||||
virtual
|
||||
int readTuples(LockMode lock_mode = LM_Read,
|
||||
Uint32 scan_flags = 0, Uint32 parallel = 0);
|
||||
|
||||
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
|
||||
/**
|
||||
* readTuples
|
||||
*
|
||||
|
@ -44,10 +67,11 @@ public:
|
|||
* @param parallel No of fragments to scan in parallell
|
||||
* @note specifying 0 for batch and parallall means max performance
|
||||
*/
|
||||
#ifdef ndb_readtuples_impossible_overload
|
||||
int readTuples(LockMode lock_mode = LM_Read,
|
||||
Uint32 batch = 0, Uint32 parallel = 0);
|
||||
#endif
|
||||
|
||||
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
|
||||
inline int readTuples(int parallell){
|
||||
return readTuples(LM_Read, 0, parallell);
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
|
|||
fprintf(output, " apiConnectPtr: H\'%.8x",
|
||||
sig->apiConnectPtr);
|
||||
fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo);
|
||||
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u Keyinfo: %u Holdlock: %u RangeScan: %u Descending: %u ReadCommitted: %u\n DistributionKeyFlag: %u",
|
||||
fprintf(output, " Parallellism: %u Batch: %u LockMode: %u Keyinfo: %u Holdlock: %u RangeScan: %u Descending: %u TupScan: %u\n ReadCommitted: %u DistributionKeyFlag: %u",
|
||||
sig->getParallelism(requestInfo),
|
||||
sig->getScanBatch(requestInfo),
|
||||
sig->getLockMode(requestInfo),
|
||||
|
@ -38,6 +38,7 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
|
|||
sig->getHoldLockFlag(requestInfo),
|
||||
sig->getRangeScanFlag(requestInfo),
|
||||
sig->getDescendingFlag(requestInfo),
|
||||
sig->getTupScanFlag(requestInfo),
|
||||
sig->getReadCommittedFlag(requestInfo),
|
||||
sig->getDistributionKeyFlag(requestInfo));
|
||||
|
||||
|
|
|
@ -566,6 +566,9 @@ public:
|
|||
Uint16 scanReleaseCounter;
|
||||
Uint16 scanNumber;
|
||||
|
||||
// scan source block ACC TUX TUP
|
||||
BlockReference scanBlockref;
|
||||
|
||||
Uint8 scanCompletedStatus;
|
||||
Uint8 scanFlag;
|
||||
Uint8 scanLockHold;
|
||||
|
@ -573,6 +576,7 @@ public:
|
|||
Uint8 readCommitted;
|
||||
Uint8 rangeScan;
|
||||
Uint8 descending;
|
||||
Uint8 tupScan;
|
||||
Uint8 scanTcWaiting;
|
||||
Uint8 scanKeyinfoFlag;
|
||||
Uint8 m_last_row;
|
||||
|
|
|
@ -7176,10 +7176,7 @@ void Dblqh::continueScanReleaseAfterBlockedLab(Signal* signal)
|
|||
scanptr.p->scanReleaseCounter -1,
|
||||
false);
|
||||
signal->theData[2] = NextScanReq::ZSCAN_COMMIT;
|
||||
if (! scanptr.p->rangeScan)
|
||||
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
|
||||
else
|
||||
sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
|
||||
sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
|
||||
}//Dblqh::continueScanReleaseAfterBlockedLab()
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
|
@ -7492,6 +7489,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
|
|||
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
|
||||
const Uint8 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
|
||||
const Uint8 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
|
||||
const Uint8 tupScan = ScanFragReq::getTupScanFlag(reqinfo);
|
||||
|
||||
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
|
||||
if(tabptr.p->tableStatus != Tablerec::TABLE_DEFINED){
|
||||
|
@ -7641,12 +7639,7 @@ void Dblqh::continueAfterReceivingAllAiLab(Signal* signal)
|
|||
req->transId1 = tcConnectptr.p->transid[0];
|
||||
req->transId2 = tcConnectptr.p->transid[1];
|
||||
req->savePointId = tcConnectptr.p->savePointId;
|
||||
// always use if-stmt to switch (instead of setting a "scan block ref")
|
||||
if (! scanptr.p->rangeScan)
|
||||
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_ACC_SCANREQ, signal,
|
||||
AccScanReq::SignalLength, JBB);
|
||||
else
|
||||
sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_ACC_SCANREQ, signal,
|
||||
sendSignal(scanptr.p->scanBlockref, GSN_ACC_SCANREQ, signal,
|
||||
AccScanReq::SignalLength, JBB);
|
||||
}//Dblqh::continueAfterReceivingAllAiLab()
|
||||
|
||||
|
@ -8002,10 +7995,7 @@ void Dblqh::continueFirstScanAfterBlockedLab(Signal* signal)
|
|||
signal->theData[0] = scanptr.p->scanAccPtr;
|
||||
signal->theData[1] = RNIL;
|
||||
signal->theData[2] = NextScanReq::ZSCAN_NEXT;
|
||||
if (! scanptr.p->rangeScan)
|
||||
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
|
||||
else
|
||||
sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
|
||||
sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
|
||||
return;
|
||||
}//Dblqh::continueFirstScanAfterBlockedLab()
|
||||
|
||||
|
@ -8075,10 +8065,8 @@ void Dblqh::continueAfterCheckLcpStopBlocked(Signal* signal)
|
|||
c_scanRecordPool.getPtr(scanptr);
|
||||
signal->theData[0] = scanptr.p->scanAccPtr;
|
||||
signal->theData[1] = AccCheckScan::ZNOT_CHECK_LCP_STOP;
|
||||
if (! scanptr.p->rangeScan)
|
||||
EXECUTE_DIRECT(DBACC, GSN_ACC_CHECK_SCAN, signal, 2);
|
||||
else
|
||||
EXECUTE_DIRECT(DBTUX, GSN_ACC_CHECK_SCAN, signal, 2);
|
||||
EXECUTE_DIRECT(refToBlock(scanptr.p->scanBlockref), GSN_ACC_CHECK_SCAN,
|
||||
signal, 2);
|
||||
}//Dblqh::continueAfterCheckLcpStopBlocked()
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
|
@ -8168,11 +8156,7 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
|
|||
|
||||
signal->theData[0] = scanptr.p->scanAccPtr;
|
||||
signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
|
||||
if (! scanptr.p->rangeScan)
|
||||
sendSignal(tcConnectptr.p->tcAccBlockref,
|
||||
GSN_ACC_CHECK_SCAN, signal, 2, JBB);
|
||||
else
|
||||
sendSignal(tcConnectptr.p->tcTuxBlockref,
|
||||
sendSignal(scanptr.p->scanBlockref,
|
||||
GSN_ACC_CHECK_SCAN, signal, 2, JBB);
|
||||
return;
|
||||
}//if
|
||||
|
@ -8416,10 +8400,7 @@ void Dblqh::continueScanAfterBlockedLab(Signal* signal)
|
|||
signal->theData[0] = scanptr.p->scanAccPtr;
|
||||
signal->theData[1] = accOpPtr;
|
||||
signal->theData[2] = scanptr.p->scanFlag;
|
||||
if (! scanptr.p->rangeScan)
|
||||
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3,JBB);
|
||||
else
|
||||
sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3,JBB);
|
||||
sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
|
||||
}//Dblqh::continueScanAfterBlockedLab()
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
|
@ -8541,10 +8522,7 @@ void Dblqh::continueCloseScanAfterBlockedLab(Signal* signal)
|
|||
signal->theData[0] = scanptr.p->scanAccPtr;
|
||||
signal->theData[1] = RNIL;
|
||||
signal->theData[2] = NextScanReq::ZSCAN_CLOSE;
|
||||
if (! scanptr.p->rangeScan)
|
||||
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
|
||||
else
|
||||
sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
|
||||
sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
|
||||
}//Dblqh::continueCloseScanAfterBlockedLab()
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
|
@ -8628,8 +8606,9 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
|||
const Uint32 scanLockHold = ScanFragReq::getHoldLockFlag(reqinfo);
|
||||
const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
|
||||
const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo);
|
||||
const Uint32 idx = ScanFragReq::getRangeScanFlag(reqinfo);
|
||||
const Uint32 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
|
||||
const Uint32 descending = ScanFragReq::getDescendingFlag(reqinfo);
|
||||
const Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo);
|
||||
const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo);
|
||||
const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo);
|
||||
|
||||
|
@ -8647,11 +8626,19 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
|||
scanptr.p->m_max_batch_size_rows = max_rows;
|
||||
scanptr.p->m_max_batch_size_bytes = max_bytes;
|
||||
|
||||
if (! rangeScan && ! tupScan)
|
||||
scanptr.p->scanBlockref = tcConnectptr.p->tcAccBlockref;
|
||||
else if (! tupScan)
|
||||
scanptr.p->scanBlockref = tcConnectptr.p->tcTuxBlockref;
|
||||
else
|
||||
scanptr.p->scanBlockref = tcConnectptr.p->tcTupBlockref;
|
||||
|
||||
scanptr.p->scanErrorCounter = 0;
|
||||
scanptr.p->scanLockMode = scanLockMode;
|
||||
scanptr.p->readCommitted = readCommitted;
|
||||
scanptr.p->rangeScan = idx;
|
||||
scanptr.p->rangeScan = rangeScan;
|
||||
scanptr.p->descending = descending;
|
||||
scanptr.p->tupScan = tupScan;
|
||||
scanptr.p->scanState = ScanRecord::SCAN_FREE;
|
||||
scanptr.p->scanFlag = ZFALSE;
|
||||
scanptr.p->scanLocalref[0] = 0;
|
||||
|
@ -8683,8 +8670,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
|||
* !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11
|
||||
* idx uses from MAX_PARALLEL_SCANS_PER_FRAG - MAX = 12-42)
|
||||
*/
|
||||
Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
|
||||
Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
|
||||
Uint32 start = (rangeScan ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
|
||||
Uint32 stop = (rangeScan ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
|
||||
stop += start;
|
||||
Uint32 free = tFragPtr.p->m_scanNumberMask.find(start);
|
||||
|
||||
|
@ -9111,6 +9098,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
|
|||
/* ------------------------------------------------------------------------- */
|
||||
scanptr.p->m_max_batch_size_rows = 0;
|
||||
scanptr.p->rangeScan = 0;
|
||||
scanptr.p->tupScan = 0;
|
||||
seizeTcrec();
|
||||
|
||||
/**
|
||||
|
|
|
@ -8840,6 +8840,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
|
|||
ScanFragReq::setReadCommittedFlag(tmp,ScanTabReq::getReadCommittedFlag(ri));
|
||||
ScanFragReq::setRangeScanFlag(tmp, ScanTabReq::getRangeScanFlag(ri));
|
||||
ScanFragReq::setDescendingFlag(tmp, ScanTabReq::getDescendingFlag(ri));
|
||||
ScanFragReq::setTupScanFlag(tmp, ScanTabReq::getTupScanFlag(ri));
|
||||
ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF);
|
||||
|
||||
scanptr.p->scanRequestInfo = tmp;
|
||||
|
|
|
@ -64,6 +64,7 @@
|
|||
// DbtupSystemRestart.cpp 26000
|
||||
// DbtupIndex.cpp 28000
|
||||
// DbtupDebug.cpp 30000
|
||||
// DbtupScan.cpp 32000
|
||||
//------------------------------------------------------------------
|
||||
|
||||
/*
|
||||
|
@ -511,6 +512,49 @@ struct Fragoperrec {
|
|||
};
|
||||
typedef Ptr<Fragoperrec> FragoperrecPtr;
|
||||
|
||||
// Position for use by scan
|
||||
struct PagePos {
|
||||
Uint32 m_fragId; // "base" fragment id
|
||||
Uint32 m_fragBit; // two fragments in 5.0
|
||||
Uint32 m_pageId;
|
||||
Uint32 m_tupleNo;
|
||||
bool m_match;
|
||||
};
|
||||
|
||||
// Tup scan op (compare Dbtux::ScanOp)
|
||||
struct ScanOp {
|
||||
enum {
|
||||
Undef = 0,
|
||||
First = 1, // before first entry
|
||||
Locked = 4, // at current entry (no lock needed)
|
||||
Next = 5, // looking for next extry
|
||||
Last = 6, // after last entry
|
||||
Invalid = 9 // cannot return REF to LQH currently
|
||||
};
|
||||
Uint16 m_state;
|
||||
Uint16 m_lockwait; // unused
|
||||
Uint32 m_userPtr; // scanptr.i in LQH
|
||||
Uint32 m_userRef;
|
||||
Uint32 m_tableId;
|
||||
Uint32 m_fragId; // "base" fragment id
|
||||
Uint32 m_fragPtrI[2];
|
||||
Uint32 m_transId1;
|
||||
Uint32 m_transId2;
|
||||
PagePos m_scanPos;
|
||||
union {
|
||||
Uint32 nextPool;
|
||||
Uint32 nextList;
|
||||
};
|
||||
Uint32 prevList;
|
||||
};
|
||||
typedef Ptr<ScanOp> ScanOpPtr;
|
||||
ArrayPool<ScanOp> c_scanOpPool;
|
||||
|
||||
void scanFirst(Signal* signal, ScanOpPtr scanPtr);
|
||||
void scanNext(Signal* signal, ScanOpPtr scanPtr);
|
||||
void scanClose(Signal* signal, ScanOpPtr scanPtr);
|
||||
void releaseScanOp(ScanOpPtr& scanPtr);
|
||||
|
||||
struct Fragrecord {
|
||||
Uint32 nextStartRange;
|
||||
Uint32 currentPageRange;
|
||||
|
@ -532,6 +576,9 @@ struct Fragrecord {
|
|||
Uint32 fragTableId;
|
||||
Uint32 fragmentId;
|
||||
Uint32 nextfreefrag;
|
||||
|
||||
DLList<ScanOp> m_scanList;
|
||||
Fragrecord(ArrayPool<ScanOp> & scanOpPool) : m_scanList(scanOpPool) {}
|
||||
};
|
||||
typedef Ptr<Fragrecord> FragrecordPtr;
|
||||
|
||||
|
@ -1084,6 +1131,11 @@ private:
|
|||
void buildIndex(Signal* signal, Uint32 buildPtrI);
|
||||
void buildIndexReply(Signal* signal, const BuildIndexRec* buildRec);
|
||||
|
||||
// Tup scan
|
||||
void execACC_SCANREQ(Signal* signal);
|
||||
void execNEXT_SCANREQ(Signal* signal);
|
||||
void execACC_CHECK_SCAN(Signal* signal);
|
||||
|
||||
//------------------------------------------------------------------
|
||||
//------------------------------------------------------------------
|
||||
// Methods to handle execution of TUPKEYREQ + ATTRINFO.
|
||||
|
|
|
@ -143,6 +143,11 @@ Dbtup::Dbtup(const class Configuration & conf)
|
|||
// Ordered index related
|
||||
addRecSignal(GSN_BUILDINDXREQ, &Dbtup::execBUILDINDXREQ);
|
||||
|
||||
// Tup scan
|
||||
addRecSignal(GSN_ACC_SCANREQ, &Dbtup::execACC_SCANREQ);
|
||||
addRecSignal(GSN_NEXT_SCANREQ, &Dbtup::execNEXT_SCANREQ);
|
||||
addRecSignal(GSN_ACC_CHECK_SCAN, &Dbtup::execACC_CHECK_SCAN);
|
||||
|
||||
initData();
|
||||
}//Dbtup::Dbtup()
|
||||
|
||||
|
@ -652,6 +657,10 @@ void Dbtup::execREAD_CONFIG_REQ(Signal* signal)
|
|||
c_buildIndexPool.setSize(c_noOfBuildIndexRec);
|
||||
c_triggerPool.setSize(noOfTriggers);
|
||||
|
||||
Uint32 nScanOp; // use TUX config for now
|
||||
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_SCAN_OP, &nScanOp));
|
||||
c_scanOpPool.setSize(nScanOp);
|
||||
|
||||
initRecords();
|
||||
czero = 0;
|
||||
cminusOne = czero - 1;
|
||||
|
@ -672,6 +681,8 @@ void Dbtup::execREAD_CONFIG_REQ(Signal* signal)
|
|||
|
||||
void Dbtup::initRecords()
|
||||
{
|
||||
unsigned i;
|
||||
|
||||
// Records with dynamic sizes
|
||||
attrbufrec = (Attrbufrec*)allocRecord("Attrbufrec",
|
||||
sizeof(Attrbufrec),
|
||||
|
@ -694,6 +705,11 @@ void Dbtup::initRecords()
|
|||
sizeof(Fragrecord),
|
||||
cnoOfFragrec);
|
||||
|
||||
for (i = 0; i<cnoOfFragrec; i++) {
|
||||
void * p = &fragrecord[i];
|
||||
new (p) Fragrecord(c_scanOpPool);
|
||||
}
|
||||
|
||||
hostBuffer = (HostBuffer*)allocRecord("HostBuffer",
|
||||
sizeof(HostBuffer),
|
||||
MAX_NODES);
|
||||
|
@ -730,7 +746,7 @@ void Dbtup::initRecords()
|
|||
sizeof(Tablerec),
|
||||
cnoOfTablerec);
|
||||
|
||||
for(unsigned i = 0; i<cnoOfTablerec; i++) {
|
||||
for (i = 0; i<cnoOfTablerec; i++) {
|
||||
void * p = &tablerec[i];
|
||||
new (p) Tablerec(c_triggerPool);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ libdbtup_a_SOURCES = \
|
|||
DbtupGen.cpp \
|
||||
DbtupSystemRestart.cpp \
|
||||
DbtupIndex.cpp \
|
||||
DbtupScan.cpp \
|
||||
DbtupDebug.cpp
|
||||
|
||||
include $(top_srcdir)/ndb/config/common.mk.am
|
||||
|
|
|
@ -113,7 +113,7 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
|
|||
|
||||
int
|
||||
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
||||
Uint32 batch,
|
||||
Uint32 scan_flags,
|
||||
Uint32 parallel)
|
||||
{
|
||||
m_ordered = m_descending = false;
|
||||
|
@ -159,7 +159,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||
|
||||
m_keyInfo = lockExcl ? 1 : 0;
|
||||
|
||||
bool range = false;
|
||||
bool rangeScan = false;
|
||||
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)
|
||||
{
|
||||
if (m_currentTable == m_accessTable){
|
||||
|
@ -172,9 +172,13 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||
// Modify operation state
|
||||
theStatus = GetValue;
|
||||
theOperationType = OpenRangeScanRequest;
|
||||
range = true;
|
||||
rangeScan = true;
|
||||
}
|
||||
|
||||
bool tupScan = (scan_flags & SF_TupScan);
|
||||
if (tupScan && rangeScan)
|
||||
tupScan = false;
|
||||
|
||||
theParallelism = parallel;
|
||||
|
||||
if(fix_receivers(parallel) == -1){
|
||||
|
@ -202,7 +206,8 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||
ScanTabReq::setLockMode(reqInfo, lockExcl);
|
||||
ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
|
||||
ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
|
||||
ScanTabReq::setRangeScanFlag(reqInfo, range);
|
||||
ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
|
||||
ScanTabReq::setTupScanFlag(reqInfo, tupScan);
|
||||
req->requestInfo = reqInfo;
|
||||
|
||||
Uint64 transId = theNdbCon->getTransactionId();
|
||||
|
@ -1177,12 +1182,14 @@ error:
|
|||
|
||||
int
|
||||
NdbIndexScanOperation::readTuples(LockMode lm,
|
||||
Uint32 batch,
|
||||
Uint32 parallel,
|
||||
bool order_by,
|
||||
bool order_desc,
|
||||
bool read_range_no){
|
||||
int res = NdbScanOperation::readTuples(lm, batch, 0);
|
||||
Uint32 scan_flags,
|
||||
Uint32 parallel)
|
||||
{
|
||||
const bool order_by = scan_flags & SF_OrderBy;
|
||||
const bool order_desc = scan_flags & SF_Descending;
|
||||
const bool read_range_no = scan_flags & SF_ReadRangeNo;
|
||||
|
||||
int res = NdbScanOperation::readTuples(lm, scan_flags, 0);
|
||||
if(!res && read_range_no)
|
||||
{
|
||||
m_read_range_no = 1;
|
||||
|
|
|
@ -42,7 +42,8 @@ public:
|
|||
int records,
|
||||
int abort = 0,
|
||||
int parallelism = 0,
|
||||
NdbOperation::LockMode = NdbOperation::LM_Read);
|
||||
NdbOperation::LockMode = NdbOperation::LM_Read,
|
||||
int scan_flags = 0);
|
||||
|
||||
int scanReadRecords(Ndb*,
|
||||
const NdbDictionary::Index*,
|
||||
|
@ -50,7 +51,7 @@ public:
|
|||
int abort = 0,
|
||||
int parallelism = 0,
|
||||
NdbOperation::LockMode = NdbOperation::LM_Read,
|
||||
bool sorted = false);
|
||||
int scan_flags = 0);
|
||||
|
||||
int pkReadRecords(Ndb*,
|
||||
int records,
|
||||
|
|
|
@ -55,7 +55,6 @@ struct Opt {
|
|||
unsigned m_pctnull;
|
||||
unsigned m_rows;
|
||||
unsigned m_samples;
|
||||
unsigned m_scanbat;
|
||||
unsigned m_scanpar;
|
||||
unsigned m_scanstop;
|
||||
int m_seed;
|
||||
|
@ -83,7 +82,6 @@ struct Opt {
|
|||
m_pctnull(10),
|
||||
m_rows(1000),
|
||||
m_samples(0),
|
||||
m_scanbat(0),
|
||||
m_scanpar(0),
|
||||
m_scanstop(0),
|
||||
m_seed(-1),
|
||||
|
@ -121,7 +119,6 @@ printhelp()
|
|||
<< " -pctnull N pct NULL values in nullable column [" << d.m_pctnull << "]" << endl
|
||||
<< " -rows N rows per thread [" << d.m_rows << "]" << endl
|
||||
<< " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
|
||||
<< " -scanbat N scan batch per fragment (ignored by ndb api) [" << d.m_scanbat << "]" << endl
|
||||
<< " -scanpar N scan parallelism [" << d.m_scanpar << "]" << endl
|
||||
<< " -seed N srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl
|
||||
<< " -subloop N subtest loop count [" << d.m_subloop << "]" << endl
|
||||
|
@ -1369,7 +1366,7 @@ int
|
|||
Con::readTuples(Par par)
|
||||
{
|
||||
assert(m_tx != 0 && m_scanop != 0);
|
||||
CHKCON(m_scanop->readTuples(par.m_lockmode, par.m_scanbat, par.m_scanpar) == 0, *this);
|
||||
CHKCON(m_scanop->readTuples(par.m_lockmode, 0, par.m_scanpar) == 0, *this);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1377,7 +1374,7 @@ int
|
|||
Con::readIndexTuples(Par par)
|
||||
{
|
||||
assert(m_tx != 0 && m_indexscanop != 0);
|
||||
CHKCON(m_indexscanop->readTuples(par.m_lockmode, par.m_scanbat, par.m_scanpar, par.m_ordered, par.m_descending) == 0, *this);
|
||||
CHKCON(m_indexscanop->readTuples(par.m_lockmode, 0, par.m_scanpar, par.m_ordered, par.m_descending) == 0, *this);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -4952,12 +4949,6 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
|
|||
continue;
|
||||
}
|
||||
}
|
||||
if (strcmp(arg, "-scanbat") == 0) {
|
||||
if (++argv, --argc > 0) {
|
||||
g_opt.m_scanbat = atoi(argv[0]);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (strcmp(arg, "-scanpar") == 0) {
|
||||
if (++argv, --argc > 0) {
|
||||
g_opt.m_scanpar = atoi(argv[0]);
|
||||
|
|
|
@ -316,11 +316,15 @@ int runScanReadIndex(NDBT_Context* ctx, NDBT_Step* step){
|
|||
while (pIdx && i<loops && !ctx->isTestStopped()) {
|
||||
g_info << i << ": ";
|
||||
bool sort = (rand() % 100) > 50 ? true : false;
|
||||
bool desc = (rand() % 100) > 50 ? true : false;
|
||||
int scan_flags =
|
||||
(NdbScanOperation::SF_OrderBy & -(int)sort) |
|
||||
(NdbScanOperation::SF_Descending & -(int)desc);
|
||||
NdbOperation::LockMode lm = (NdbOperation::LockMode)(rand() % 3);
|
||||
if (hugoTrans.scanReadRecords(GETNDB(step), pIdx,
|
||||
records, abort, parallelism,
|
||||
lm,
|
||||
sort) != 0){
|
||||
scan_flags) != 0){
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
i++;
|
||||
|
@ -333,6 +337,8 @@ int runScanReadCommitted(NDBT_Context* ctx, NDBT_Step* step){
|
|||
int records = ctx->getNumRecords();
|
||||
int parallelism = ctx->getProperty("Parallelism", 240);
|
||||
int abort = ctx->getProperty("AbortProb", 5);
|
||||
bool tupScan = ctx->getProperty("TupScan");
|
||||
int scan_flags = (NdbScanOperation::SF_TupScan & -(int)tupScan);
|
||||
|
||||
int i = 0;
|
||||
HugoTransactions hugoTrans(*ctx->getTab());
|
||||
|
@ -340,7 +346,8 @@ int runScanReadCommitted(NDBT_Context* ctx, NDBT_Step* step){
|
|||
g_info << i << ": ";
|
||||
if (hugoTrans.scanReadRecords(GETNDB(step), records,
|
||||
abort, parallelism,
|
||||
NdbOperation::LM_CommittedRead) != 0){
|
||||
NdbOperation::LM_CommittedRead,
|
||||
scan_flags) != 0){
|
||||
return NDBT_FAILED;
|
||||
}
|
||||
i++;
|
||||
|
@ -1150,6 +1157,18 @@ TESTCASE("ScanReadCommitted240",
|
|||
"downgraded to the maximum parallelism value for the current config)"){
|
||||
INITIALIZER(runLoadTable);
|
||||
TC_PROPERTY("Parallelism", 240);
|
||||
TC_PROPERTY("TupScan", (Uint32)0);
|
||||
STEP(runScanReadCommitted);
|
||||
FINALIZER(runClearTable);
|
||||
}
|
||||
TESTCASE("ScanTupReadCommitted240",
|
||||
"Verify scan requirement: It should be possible to scan read committed with "\
|
||||
"parallelism, test with parallelism 240(240 would automatically be "\
|
||||
"downgraded to the maximum parallelism value for the current config). "\
|
||||
"Scans TUP pages directly without using ACC."){
|
||||
INITIALIZER(runLoadTable);
|
||||
TC_PROPERTY("Parallelism", 240);
|
||||
TC_PROPERTY("TupScan", 1);
|
||||
STEP(runScanReadCommitted);
|
||||
FINALIZER(runClearTable);
|
||||
}
|
||||
|
|
|
@ -216,7 +216,7 @@ run_scan(){
|
|||
}
|
||||
|
||||
int par = g_paramters[P_PARRA].value;
|
||||
int bat = g_paramters[P_BATCH].value;
|
||||
int bat = 0; // g_paramters[P_BATCH].value;
|
||||
NdbScanOperation::LockMode lm;
|
||||
switch(g_paramters[P_LOCK].value){
|
||||
case 0:
|
||||
|
|
|
@ -613,7 +613,7 @@ HugoOperations::scanReadRecords(Ndb* pNdb, NdbScanOperation::LockMode lm,
|
|||
if(!pOp)
|
||||
return -1;
|
||||
|
||||
if(pOp->readTuples(lm, 1, 1)){
|
||||
if(pOp->readTuples(lm, 0, 1)){
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,8 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
|
|||
int records,
|
||||
int abortPercent,
|
||||
int parallelism,
|
||||
NdbOperation::LockMode lm)
|
||||
NdbOperation::LockMode lm,
|
||||
int scan_flags)
|
||||
{
|
||||
|
||||
int retryAttempt = 0;
|
||||
|
@ -72,7 +73,7 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
|
|||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
if( pOp ->readTuples(lm, 0, parallelism) ) {
|
||||
if( pOp ->readTuples(lm, scan_flags, parallelism) ) {
|
||||
ERR(pTrans->getNdbError());
|
||||
closeTransaction(pNdb);
|
||||
return NDBT_FAILED;
|
||||
|
@ -187,7 +188,7 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
|
|||
int abortPercent,
|
||||
int parallelism,
|
||||
NdbOperation::LockMode lm,
|
||||
bool sorted)
|
||||
int scan_flags)
|
||||
{
|
||||
|
||||
int retryAttempt = 0;
|
||||
|
@ -224,7 +225,7 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
|
|||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
if( pOp ->readTuples(lm, 0, parallelism, sorted) ) {
|
||||
if( pOp ->readTuples(lm, scan_flags, parallelism) ) {
|
||||
ERR(pTrans->getNdbError());
|
||||
closeTransaction(pNdb);
|
||||
return NDBT_FAILED;
|
||||
|
|
Loading…
Add table
Reference in a new issue