mirror of
https://github.com/MariaDB/server.git
synced 2025-01-18 04:53:01 +01:00
Merge build.mysql.com:/home/bk/mysql-4.1-ndb
into build.mysql.com:/users/mronstrom/wl2025 ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: Auto merged ndb/src/kernel/blocks/dblqh/Dblqh.hpp: Auto merged ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: Auto merged ndb/src/kernel/blocks/dbtc/DbtcMain.cpp: Auto merged
This commit is contained in:
commit
f72b82bd2d
22 changed files with 257 additions and 238 deletions
|
@ -117,6 +117,7 @@ monty@tik.
|
|||
monty@tik.mysql.fi
|
||||
monty@tramp.mysql.fi
|
||||
monty@work.mysql.com
|
||||
mronstrom@build.mysql.com
|
||||
mronstrom@mysql.com
|
||||
mskold@mysql.com
|
||||
msvensson@build.mysql.com
|
||||
|
|
|
@ -68,7 +68,22 @@
|
|||
* API can order a multiple of this number of records at a time since
|
||||
* fragments can be scanned in parallel.
|
||||
*/
|
||||
#define MAX_PARALLEL_OP_PER_SCAN 16
|
||||
#define MAX_PARALLEL_OP_PER_SCAN 64
|
||||
/*
|
||||
* When calculating the number of records sent from LQH in each batch
|
||||
* one uses SCAN_BATCH_SIZE divided by the expected size of signals
|
||||
* per row. This gives the batch size used for the scan. The NDB API
|
||||
* will receive one batch from each node at a time so there has to be
|
||||
* some care taken also so that the NDB API is not overloaded with
|
||||
* signals.
|
||||
*/
|
||||
#define SCAN_BATCH_SIZE 32768
|
||||
/*
|
||||
* To protect the NDB API from overload we also define a maximum total
|
||||
* batch size from all nodes. This parameter should most likely be
|
||||
* configurable, or dependent on sendBufferSize.
|
||||
*/
|
||||
#define MAX_SCAN_BATCH_SIZE 196608
|
||||
/*
|
||||
* Maximum number of Parallel Scan queries on one hash index fragment
|
||||
*/
|
||||
|
|
|
@ -33,7 +33,7 @@ class ScanFragReq {
|
|||
*/
|
||||
friend class Dblqh;
|
||||
public:
|
||||
STATIC_CONST( SignalLength = 25 );
|
||||
STATIC_CONST( SignalLength = 13 );
|
||||
|
||||
public:
|
||||
Uint32 senderData;
|
||||
|
@ -45,9 +45,11 @@ public:
|
|||
Uint32 schemaVersion;
|
||||
Uint32 transId1;
|
||||
Uint32 transId2;
|
||||
Uint32 clientOpPtr[MAX_PARALLEL_OP_PER_SCAN];
|
||||
Uint32 clientOpPtr;
|
||||
Uint32 concurrency;
|
||||
Uint32 batch_byte_size;
|
||||
Uint32 first_batch_size;
|
||||
|
||||
static Uint32 getConcurrency(const Uint32 & requestInfo);
|
||||
static Uint32 getLockMode(const Uint32 & requestInfo);
|
||||
static Uint32 getHoldLockFlag(const Uint32 & requestInfo);
|
||||
static Uint32 getKeyinfoFlag(const Uint32 & requestInfo);
|
||||
|
@ -56,7 +58,6 @@ public:
|
|||
static Uint32 getAttrLen(const Uint32 & requestInfo);
|
||||
static Uint32 getScanPrio(const Uint32 & requestInfo);
|
||||
|
||||
static void setConcurrency(Uint32 & requestInfo, Uint32 concurrency);
|
||||
static void setLockMode(Uint32 & requestInfo, Uint32 lockMode);
|
||||
static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock);
|
||||
static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo);
|
||||
|
@ -79,7 +80,6 @@ class KeyInfo20 {
|
|||
friend class NdbOperation;
|
||||
friend class NdbScanReceiver;
|
||||
public:
|
||||
//STATIC_CONST( SignalLength = 21 );
|
||||
STATIC_CONST( HeaderLength = 5);
|
||||
STATIC_CONST( DataLength = 20 );
|
||||
|
||||
|
@ -110,15 +110,15 @@ class ScanFragConf {
|
|||
friend class Backup;
|
||||
friend class Suma;
|
||||
public:
|
||||
STATIC_CONST( SignalLength = 21 );
|
||||
STATIC_CONST( SignalLength = 6 );
|
||||
|
||||
public:
|
||||
Uint32 senderData;
|
||||
Uint32 completedOps;
|
||||
Uint32 fragmentCompleted;
|
||||
Uint32 opReturnDataLen[16];
|
||||
Uint32 transId1;
|
||||
Uint32 transId2;
|
||||
Uint32 total_len;
|
||||
};
|
||||
|
||||
class ScanFragRef {
|
||||
|
@ -188,7 +188,6 @@ public:
|
|||
* Request Info
|
||||
*
|
||||
* a = Length of attrinfo - 16 Bits (16-31)
|
||||
* c = Concurrency - 5 Bits (0-4) -> Max 31
|
||||
* l = Lock Mode - 1 Bit 5
|
||||
* h = Hold lock - 1 Bit 7
|
||||
* k = Keyinfo - 1 Bit 8
|
||||
|
@ -198,11 +197,8 @@ public:
|
|||
*
|
||||
* 1111111111222222222233
|
||||
* 01234567890123456789012345678901
|
||||
* ccccclxhkr ppppaaaaaaaaaaaaaaaa
|
||||
* lxhkr ppppaaaaaaaaaaaaaaaa
|
||||
*/
|
||||
#define SF_CONCURRENCY_SHIFT (0)
|
||||
#define SF_CONCURRENCY_MASK (31)
|
||||
|
||||
#define SF_LOCK_MODE_SHIFT (5)
|
||||
#define SF_LOCK_MODE_MASK (1)
|
||||
|
||||
|
@ -217,12 +213,6 @@ public:
|
|||
#define SF_PRIO_SHIFT 12
|
||||
#define SF_PRIO_MASK 15
|
||||
|
||||
inline
|
||||
Uint32
|
||||
ScanFragReq::getConcurrency(const Uint32 & requestInfo){
|
||||
return (requestInfo >> SF_CONCURRENCY_SHIFT) & SF_CONCURRENCY_MASK;
|
||||
}
|
||||
|
||||
inline
|
||||
Uint32
|
||||
ScanFragReq::getLockMode(const Uint32 & requestInfo){
|
||||
|
@ -272,13 +262,6 @@ ScanFragReq::setScanPrio(UintR & requestInfo, UintR val){
|
|||
requestInfo |= (val << SF_PRIO_SHIFT);
|
||||
}
|
||||
|
||||
inline
|
||||
void
|
||||
ScanFragReq::setConcurrency(UintR & requestInfo, UintR val){
|
||||
ASSERT_MAX(val, SF_CONCURRENCY_MASK, "ScanFragReq::setConcurrency");
|
||||
requestInfo |= (val << SF_CONCURRENCY_SHIFT);
|
||||
}
|
||||
|
||||
inline
|
||||
void
|
||||
ScanFragReq::setLockMode(UintR & requestInfo, UintR val){
|
||||
|
@ -324,7 +307,7 @@ ScanFragReq::setAttrLen(UintR & requestInfo, UintR val){
|
|||
inline
|
||||
Uint32
|
||||
KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){
|
||||
ASSERT_MAX(opNo, 15, "KeyInfo20::setScanInfo");
|
||||
ASSERT_MAX(opNo, 1023, "KeyInfo20::setScanInfo");
|
||||
ASSERT_MAX(scanNo, 255, "KeyInfo20::setScanInfo");
|
||||
return (opNo << 8) + scanNo;
|
||||
}
|
||||
|
@ -338,7 +321,7 @@ KeyInfo20::getScanNo(Uint32 scanInfo){
|
|||
inline
|
||||
Uint32
|
||||
KeyInfo20::getScanOp(Uint32 scanInfo){
|
||||
return (scanInfo >> 8) & 0xF;
|
||||
return (scanInfo >> 8) & 0x1023;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -45,7 +45,7 @@ public:
|
|||
/**
|
||||
* Length of signal
|
||||
*/
|
||||
STATIC_CONST( SignalLength = 25 );
|
||||
STATIC_CONST( StaticLength = 11 );
|
||||
|
||||
private:
|
||||
|
||||
|
@ -63,7 +63,8 @@ private:
|
|||
UintR transId1; // DATA 6
|
||||
UintR transId2; // DATA 7
|
||||
UintR buddyConPtr; // DATA 8
|
||||
UintR apiOperationPtr[16]; // DATA 9-25
|
||||
UintR batch_byte_size; // DATA 9
|
||||
UintR first_batch_size; // DATA 10
|
||||
|
||||
/**
|
||||
* Get:ers for requestInfo
|
||||
|
@ -95,11 +96,11 @@ private:
|
|||
h = Hold lock mode - 1 Bit 10
|
||||
c = Read Committed - 1 Bit 11
|
||||
x = Range Scan (TUX) - 1 Bit 15
|
||||
b = Scan batch - 5 Bit 16-19 (max 15)
|
||||
b = Scan batch - 10 Bit 16-25 (max 1023)
|
||||
|
||||
1111111111222222222233
|
||||
01234567890123456789012345678901
|
||||
ppppppppl hc xbbbbb
|
||||
ppppppppl hc xbbbbbbbbbb
|
||||
*/
|
||||
|
||||
#define PARALLELL_SHIFT (0)
|
||||
|
@ -118,7 +119,7 @@ private:
|
|||
#define RANGE_SCAN_MASK (1)
|
||||
|
||||
#define SCAN_BATCH_SHIFT (16)
|
||||
#define SCAN_BATCH_MASK (31)
|
||||
#define SCAN_BATCH_MASK (1023)
|
||||
|
||||
inline
|
||||
Uint8
|
||||
|
@ -201,6 +202,7 @@ inline
|
|||
void
|
||||
ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){
|
||||
ASSERT_MAX(flag, SCAN_BATCH_MASK, "ScanTabReq::setScanBatch");
|
||||
requestInfo &= ~(SCAN_BATCH_MASK << SCAN_BATCH_SHIFT);
|
||||
requestInfo |= (flag << SCAN_BATCH_SHIFT);
|
||||
}
|
||||
|
||||
|
@ -250,8 +252,8 @@ private:
|
|||
Uint32 info;
|
||||
};
|
||||
|
||||
static Uint32 getLength(Uint32 opDataInfo) { return opDataInfo >> 5; };
|
||||
static Uint32 getRows(Uint32 opDataInfo) { return opDataInfo & 31;}
|
||||
static Uint32 getLength(Uint32 opDataInfo) { return opDataInfo >> 10; };
|
||||
static Uint32 getRows(Uint32 opDataInfo) { return opDataInfo & 1023;}
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -228,21 +228,21 @@ private:
|
|||
* Scan Info
|
||||
*
|
||||
t = Scan take over indicator - 1 Bit
|
||||
n = Take over node - 16 Bits -> max 65535
|
||||
p = Scan Info - 12 Bits -> max 4095
|
||||
n = Take over node - 12 Bits -> max 65535
|
||||
p = Scan Info - 18 Bits -> max 4095
|
||||
|
||||
1111111111222222222233
|
||||
01234567890123456789012345678901
|
||||
tpppppppppppp nnnnnnnnnnnnnnnn
|
||||
tpppppppppppppppppp nnnnnnnnnnnn
|
||||
*/
|
||||
|
||||
#define TAKE_OVER_SHIFT (0)
|
||||
|
||||
#define TAKE_OVER_NODE_SHIFT (16)
|
||||
#define TAKE_OVER_NODE_MASK (65535)
|
||||
#define TAKE_OVER_NODE_SHIFT (20)
|
||||
#define TAKE_OVER_NODE_MASK (4095)
|
||||
|
||||
#define SCAN_INFO_SHIFT (1)
|
||||
#define SCAN_INFO_MASK (4095)
|
||||
#define SCAN_INFO_MASK (262143)
|
||||
|
||||
/**
|
||||
* Attr Len
|
||||
|
|
|
@ -75,6 +75,7 @@ private:
|
|||
class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr);
|
||||
void do_get_value(NdbReceiver*, Uint32 rows, Uint32 key_size);
|
||||
void prepareSend();
|
||||
void calculate_batch_size(Uint32, Uint32, Uint32&, Uint32&, Uint32&);
|
||||
|
||||
int execKEYINFO20(Uint32 info, const Uint32* ptr, Uint32 len);
|
||||
int execTRANSID_AI(const Uint32* ptr, Uint32 len);
|
||||
|
|
|
@ -122,7 +122,6 @@ protected:
|
|||
NdbConnection *m_transConnection;
|
||||
|
||||
// Scan related variables
|
||||
Uint32 theBatchSize;
|
||||
Uint32 theParallelism;
|
||||
Uint32 m_keyInfo;
|
||||
NdbApiSignal* theSCAN_TABREQ;
|
||||
|
|
|
@ -132,7 +132,7 @@ struct SignalHeader {
|
|||
Uint16 theTrace;
|
||||
Uint8 m_noOfSections;
|
||||
Uint8 m_fragmentInfo;
|
||||
}; /** 7x4 = 32 Bytes */
|
||||
}; /** 7x4 = 28 Bytes */
|
||||
|
||||
struct LinearSectionPtr {
|
||||
Uint32 sz;
|
||||
|
|
|
@ -27,7 +27,7 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
|
|||
|
||||
const UintR requestInfo = sig->requestInfo;
|
||||
|
||||
fprintf(output, " apiConnectPtr: H\'%.8x\n",
|
||||
fprintf(output, " apiConnectPtr: H\'%.8x",
|
||||
sig->apiConnectPtr);
|
||||
fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo);
|
||||
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Holdlock: %u, RangeScan: %u\n",
|
||||
|
@ -42,23 +42,8 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
|
|||
|
||||
fprintf(output, " transId(1, 2): (H\'%.8x, H\'%.8x) storedProcId: H\'%.8x\n",
|
||||
sig->transId1, sig->transId2, sig->storedProcId);
|
||||
|
||||
fprintf(output, " OperationPtr(s):\n ");
|
||||
Uint32 restLen = (len - 9);
|
||||
const Uint32 * rest = &sig->apiOperationPtr[0];
|
||||
while(restLen >= 7){
|
||||
fprintf(output,
|
||||
" H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x\n",
|
||||
rest[0], rest[1], rest[2], rest[3],
|
||||
rest[4], rest[5], rest[6]);
|
||||
restLen -= 7;
|
||||
rest += 7;
|
||||
}
|
||||
if(restLen > 0){
|
||||
for(Uint32 i = 0; i<restLen; i++)
|
||||
fprintf(output, " H\'%.8x", rest[i]);
|
||||
fprintf(output, "\n");
|
||||
}
|
||||
fprintf(output, " batch_byte_size: %d, first_batch_size: %d\n",
|
||||
sig->batch_byte_size, sig->first_batch_size);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -3324,20 +3324,16 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
|
|||
req->requestInfo = 0;
|
||||
req->savePointId = 0;
|
||||
req->tableId = table.tableId;
|
||||
ScanFragReq::setConcurrency(req->requestInfo, parallelism);
|
||||
//ScanFragReq::setConcurrency(req->requestInfo, parallelism);
|
||||
ScanFragReq::setLockMode(req->requestInfo, 0);
|
||||
ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
|
||||
ScanFragReq::setKeyinfoFlag(req->requestInfo, 1);
|
||||
ScanFragReq::setAttrLen(req->requestInfo,attrLen);
|
||||
req->transId1 = 0;
|
||||
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
|
||||
|
||||
Uint32 i;
|
||||
for(i = 0; i<parallelism; i++) {
|
||||
jam();
|
||||
req->clientOpPtr[i] = filePtr.i;
|
||||
}//for
|
||||
sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 25, JBB);
|
||||
req->clientOpPtr= filePtr.i;
|
||||
sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal,
|
||||
ScanFragReq::SignalLength, JBB);
|
||||
|
||||
signal->theData[0] = filePtr.i;
|
||||
signal->theData[1] = 0;
|
||||
|
@ -3351,6 +3347,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
|
|||
signal->theData[7] = 0;
|
||||
|
||||
Uint32 dataPos = 8;
|
||||
Uint32 i;
|
||||
for(i = 0; i<table.noOfAttributes; i++) {
|
||||
jam();
|
||||
AttributePtr attr;
|
||||
|
@ -3655,7 +3652,7 @@ Backup::execSCAN_FRAGCONF(Signal* signal)
|
|||
c_backupFilePool.getPtr(filePtr, filePtrI);
|
||||
|
||||
OperationRecord & op = filePtr.p->operation;
|
||||
op.scanConf(conf->completedOps, conf->opReturnDataLen);
|
||||
//op.scanConf(conf->completedOps, conf->opReturnDataLen);
|
||||
|
||||
const Uint32 completed = conf->fragmentCompleted;
|
||||
if(completed != 2) {
|
||||
|
|
|
@ -12562,7 +12562,7 @@ void Dbacc::releaseLcpConnectRec(Signal* signal)
|
|||
/* --------------------------------------------------------------------------------- */
|
||||
void Dbacc::releaseOpRec(Signal* signal)
|
||||
{
|
||||
#ifdef VM_TRACE
|
||||
#ifdef NDB_DEBUG_FULL
|
||||
// DEBUG CODE
|
||||
// Check that the operation to be released isn't
|
||||
// already in the list of free operations
|
||||
|
|
|
@ -533,9 +533,11 @@ public:
|
|||
COPY = 2
|
||||
};
|
||||
UintR scanAccOpPtr[MAX_PARALLEL_OP_PER_SCAN];
|
||||
UintR scanApiOpPtr[MAX_PARALLEL_OP_PER_SCAN];
|
||||
UintR scanOpLength[MAX_PARALLEL_OP_PER_SCAN];
|
||||
UintR scanApiOpPtr;
|
||||
UintR scanLocalref[2];
|
||||
Uint32 scan_batch_len;
|
||||
Uint32 first_batch_size;
|
||||
Uint32 batch_byte_size;
|
||||
UintR copyPtr;
|
||||
union {
|
||||
Uint32 nextPool;
|
||||
|
|
|
@ -890,7 +890,7 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* signal)
|
|||
&ctcConnectrecFileSize));
|
||||
clogFileFileSize = 4 * cnoLogFiles;
|
||||
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_SCAN, &cscanrecFileSize));
|
||||
cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_SCANS_PER_FRAG;
|
||||
cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_OP_PER_SCAN;
|
||||
|
||||
initRecords();
|
||||
initialiseRecordsLab(signal, 0, ref, senderData);
|
||||
|
@ -2099,8 +2099,6 @@ void Dblqh::execTIME_SIGNAL(Signal* signal)
|
|||
c_scanRecordPool.getPtr(TscanPtr, tTcConptr.p->tcScanRec);
|
||||
ndbout << " scanState = " << TscanPtr.p->scanState << endl;
|
||||
//TscanPtr.p->scanAccOpPtr[16];
|
||||
//TscanPtr.p->scanApiOpPtr[16];
|
||||
//TscanPtr.p->scanOpLength[16];
|
||||
//TscanPtr.p->scanLocalref[2];
|
||||
ndbout << " copyPtr="<<TscanPtr.p->copyPtr
|
||||
<< " scanAccPtr="<<TscanPtr.p->scanAccPtr
|
||||
|
@ -6988,6 +6986,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
|
|||
|
||||
initScanAccOp(signal);
|
||||
scanptr.p->scanCompletedOperations = 0;
|
||||
scanptr.p->scan_batch_len= 0;
|
||||
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
|
||||
scanNextLoopLab(signal);
|
||||
}//Dblqh::continueScanNextReqLab()
|
||||
|
@ -7142,6 +7141,7 @@ void Dblqh::closeScanRequestLab(Signal* signal)
|
|||
}//if
|
||||
tcConnectptr.p->abortState = TcConnectionrec::ABORT_ACTIVE;
|
||||
scanptr.p->scanCompletedOperations = 0;
|
||||
scanptr.p->scan_batch_len= 0;
|
||||
sendScanFragConf(signal, ZTRUE);
|
||||
break;
|
||||
case TcConnectionrec::SCAN_TUPKEY:
|
||||
|
@ -7225,7 +7225,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
|
|||
* ------------------------------------------------------------------------- */
|
||||
void Dblqh::execSCAN_FRAGREQ(Signal* signal)
|
||||
{
|
||||
const ScanFragReq * const scanFragReq = (ScanFragReq *)&signal->theData[0];
|
||||
ScanFragReq * const scanFragReq = (ScanFragReq *)&signal->theData[0];
|
||||
ScanFragRef * ref;
|
||||
const Uint32 transid1 = scanFragReq->transId1;
|
||||
const Uint32 transid2 = scanFragReq->transId2;
|
||||
|
@ -7238,7 +7238,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
|
|||
const Uint32 reqinfo = scanFragReq->requestInfo;
|
||||
const Uint32 fragId = scanFragReq->fragmentNo;
|
||||
tabptr.i = scanFragReq->tableId;
|
||||
const Uint32 scanConcurrentOperations = ScanFragReq::getConcurrency(reqinfo);
|
||||
const Uint32 scanConcurrentOperations = scanFragReq->concurrency;
|
||||
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
|
||||
const Uint8 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
|
||||
const Uint8 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
|
||||
|
@ -7256,9 +7256,9 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
|
|||
tcConnectptr.p->savePointId = scanFragReq->savePointId;
|
||||
} else {
|
||||
jam();
|
||||
/* ---------------------------------------------------------------------
|
||||
* NO FREE TC RECORD AVAILABLE, THUS WE CANNOT HANDLE THE REQUEST.
|
||||
* --------------------------------------------------------------------- */
|
||||
/* --------------------------------------------------------------------
|
||||
* NO FREE TC RECORD AVAILABLE, THUS WE CANNOT HANDLE THE REQUEST.
|
||||
* -------------------------------------------------------------------- */
|
||||
errorCode = ZNO_TC_CONNECT_ERROR;
|
||||
senderData = scanFragReq->senderData;
|
||||
goto error_handler_early;
|
||||
|
@ -7871,8 +7871,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
|
|||
tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1];
|
||||
tupKeyReq->attrBufLen = 0;
|
||||
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
|
||||
tupKeyReq->opRef =
|
||||
scanptr.p->scanApiOpPtr[scanptr.p->scanCompletedOperations];
|
||||
tupKeyReq->opRef = scanptr.p->scanApiOpPtr;
|
||||
tupKeyReq->applRef = scanptr.p->scanApiBlockref;
|
||||
tupKeyReq->schemaVersion = scanptr.p->scanSchemaVersion;
|
||||
tupKeyReq->storedProcedure = scanptr.p->scanStoredProcId;
|
||||
|
@ -7963,7 +7962,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
|
|||
tdata4 += tcConnectptr.p->primKeyLen;// Inform API about keyinfo len aswell
|
||||
}//if
|
||||
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
|
||||
scanptr.p->scanOpLength[scanptr.p->scanCompletedOperations] = tdata4;
|
||||
scanptr.p->scan_batch_len+= tdata4;
|
||||
scanptr.p->scanCompletedOperations++;
|
||||
if ((scanptr.p->scanCompletedOperations ==
|
||||
scanptr.p->scanConcurrentOperations) &&
|
||||
|
@ -8217,6 +8216,7 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
|
|||
} else {
|
||||
jam();
|
||||
scanptr.p->scanCompletedOperations = 0;
|
||||
scanptr.p->scan_batch_len= 0;
|
||||
sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
|
||||
}//if
|
||||
finishScanrec(signal);
|
||||
|
@ -8249,7 +8249,7 @@ void Dblqh::initScanAccOp(Signal* signal)
|
|||
Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
||||
{
|
||||
const Uint32 reqinfo = scanFragReq->requestInfo;
|
||||
const Uint32 scanConcurrentOperations = ScanFragReq::getConcurrency(reqinfo);
|
||||
const Uint32 scanConcurrentOperations = scanFragReq->concurrency;
|
||||
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
|
||||
const Uint32 scanLockHold = ScanFragReq::getHoldLockFlag(reqinfo);
|
||||
const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
|
||||
|
@ -8267,7 +8267,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
|||
scanptr.p->scanTcrec = tcConnectptr.i;
|
||||
scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion;
|
||||
scanptr.p->scanCompletedOperations = 0;
|
||||
scanptr.p->scan_batch_len= 0;
|
||||
scanptr.p->scanConcurrentOperations = scanConcurrentOperations;
|
||||
scanptr.p->batch_byte_size= scanFragReq->batch_byte_size;
|
||||
scanptr.p->first_batch_size= scanFragReq->first_batch_size;
|
||||
scanptr.p->scanErrorCounter = 0;
|
||||
scanptr.p->scanLockMode = scanLockMode;
|
||||
scanptr.p->readCommitted = readCommitted;
|
||||
|
@ -8279,11 +8282,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
|
|||
scanptr.p->scanLocalFragid = 0;
|
||||
scanptr.p->scanTcWaiting = ZTRUE;
|
||||
scanptr.p->scanNumber = ~0;
|
||||
|
||||
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
|
||||
for (Uint32 i = 0; i < scanConcurrentOperations; i++) {
|
||||
jam();
|
||||
scanptr.p->scanApiOpPtr[i] = scanFragReq->clientOpPtr[i];
|
||||
scanptr.p->scanOpLength[i] = 0;
|
||||
scanptr.p->scanAccOpPtr[i] = 0;
|
||||
}//for
|
||||
|
||||
|
@ -8547,11 +8547,11 @@ void Dblqh::sendKeyinfo20(Signal* signal,
|
|||
TdataBuf.i = TdataBuf.p->nextDatabuf;
|
||||
}
|
||||
|
||||
keyInfo->clientOpPtr = scanP->scanApiOpPtr[scanOp];
|
||||
keyInfo->clientOpPtr = scanP->scanApiOpPtr;
|
||||
keyInfo->keyLen = keyLen;
|
||||
keyInfo->scanInfo_Node = KeyInfo20::setScanInfo(scanOp,
|
||||
scanP->scanNumber)+
|
||||
(getOwnNodeId() << 16);
|
||||
(getOwnNodeId() << 20);
|
||||
keyInfo->transId1 = tcConP->transid[0];
|
||||
keyInfo->transId2 = tcConP->transid[1];
|
||||
|
||||
|
@ -8632,23 +8632,27 @@ void Dblqh::sendKeyinfo20(Signal* signal,
|
|||
* ------------------------------------------------------------------------ */
|
||||
void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
|
||||
{
|
||||
Uint32 completed_ops= scanptr.p->scanCompletedOperations;
|
||||
Uint32 total_len= scanptr.p->scan_batch_len;
|
||||
scanptr.p->scanTcWaiting = ZFALSE;
|
||||
|
||||
if(ERROR_INSERTED(5037)){
|
||||
CLEAR_ERROR_INSERT_VALUE;
|
||||
return;
|
||||
}
|
||||
|
||||
scanptr.p->scanTcWaiting = ZFALSE;
|
||||
ScanFragConf * conf = (ScanFragConf*)&signal->theData[0];
|
||||
NodeId tc_node_id= refToNode(tcConnectptr.p->clientBlockref);
|
||||
Uint32 trans_id1= tcConnectptr.p->transid[0];
|
||||
Uint32 trans_id2= tcConnectptr.p->transid[1];
|
||||
|
||||
conf->senderData = tcConnectptr.p->clientConnectrec;
|
||||
conf->completedOps = scanptr.p->scanCompletedOperations;
|
||||
conf->completedOps = completed_ops;
|
||||
conf->fragmentCompleted = scanCompleted;
|
||||
for(Uint32 i = 0; i<MAX_PARALLEL_OP_PER_SCAN; i++)
|
||||
conf->opReturnDataLen[i] = scanptr.p->scanOpLength[i];
|
||||
conf->transId1 = tcConnectptr.p->transid[0];
|
||||
conf->transId2 = tcConnectptr.p->transid[1];
|
||||
conf->transId1 = trans_id1;
|
||||
conf->transId2 = trans_id2;
|
||||
conf->total_len= total_len;
|
||||
sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF,
|
||||
signal, ScanFragConf::SignalLength, JBB);
|
||||
signal, ScanFragConf::SignalLength, JBB);
|
||||
}//Dblqh::sendScanFragConf()
|
||||
|
||||
/* ######################################################################### */
|
||||
|
|
|
@ -1184,7 +1184,11 @@ public:
|
|||
Uint32 scanTableref;
|
||||
|
||||
// Number of operation records per scanned fragment
|
||||
// Number of operations in first batch
|
||||
// Max number of bytes per batch
|
||||
Uint16 noOprecPerFrag;
|
||||
Uint16 first_batch_size;
|
||||
Uint32 batch_byte_size;
|
||||
|
||||
// Shall the locks be held until the application have read the
|
||||
// records
|
||||
|
@ -1417,17 +1421,13 @@ private:
|
|||
UintR anApiConnectPtr);
|
||||
void handleScanStop(Signal* signal, UintR aFailedNode);
|
||||
void initScanTcrec(Signal* signal);
|
||||
void initScanApirec(Signal* signal,
|
||||
Uint32 buddyPtr,
|
||||
UintR transid1,
|
||||
UintR transid2);
|
||||
void initScanrec(ScanRecordPtr, const class ScanTabReq*,
|
||||
void initScanrec(ScanRecordPtr, const class ScanTabReq*,
|
||||
const UintR scanParallel,
|
||||
const UintR noOprecPerFrag);
|
||||
void initScanfragrec(Signal* signal);
|
||||
void releaseScanResources(ScanRecordPtr);
|
||||
ScanRecordPtr seizeScanrec(Signal* signal);
|
||||
void sendScanFragReq(Signal* signal, ScanRecord*, ScanFragRec*);
|
||||
void sendScanFragReq(Signal*, ScanRecord*, ScanFragRec*);
|
||||
void sendScanTabConf(Signal* signal, ScanRecord*);
|
||||
void close_scan_req(Signal*, ScanRecordPtr, bool received_req);
|
||||
void close_scan_req_send_conf(Signal*, ScanRecordPtr);
|
||||
|
|
|
@ -2630,8 +2630,9 @@ void Dbtc::execTCKEYREQ(Signal* signal)
|
|||
{
|
||||
Uint32 TDistrGHIndex = tcKeyReq->getScanIndFlag(Treqinfo);
|
||||
Uint32 TDistrKeyIndex = TDistrGHIndex + TDistrGroupFlag;
|
||||
Uint32 TscanNode = tcKeyReq->getTakeOverScanNode(TOptionalDataPtr[0]);
|
||||
Uint32 TscanInfo = tcKeyReq->getTakeOverScanInfo(TOptionalDataPtr[0]);
|
||||
|
||||
Uint32 TscanNode = tcKeyReq->getTakeOverScanNode(TOptionalDataPtr[0]);
|
||||
Uint32 TscanInfo = tcKeyReq->getTakeOverScanInfo(TOptionalDataPtr[0]);
|
||||
|
||||
regCachePtr->scanTakeOverInd = TDistrGHIndex;
|
||||
regCachePtr->scanNode = TscanNode;
|
||||
|
@ -8406,11 +8407,11 @@ void Dbtc::systemErrorLab(Signal* signal)
|
|||
void Dbtc::execSCAN_TABREQ(Signal* signal)
|
||||
{
|
||||
const ScanTabReq * const scanTabReq = (ScanTabReq *)&signal->theData[0];
|
||||
const UintR reqinfo = scanTabReq->requestInfo;
|
||||
const Uint32 reqinfo = scanTabReq->requestInfo;
|
||||
const Uint32 aiLength = scanTabReq->attrLen;
|
||||
const Uint32 schemaVersion = scanTabReq->tableSchemaVersion;
|
||||
const UintR transid1 = scanTabReq->transId1;
|
||||
const UintR transid2 = scanTabReq->transId2;
|
||||
const Uint32 transid1 = scanTabReq->transId1;
|
||||
const Uint32 transid2 = scanTabReq->transId2;
|
||||
const Uint32 tmpXX = scanTabReq->buddyConPtr;
|
||||
const Uint32 buddyPtr = (tmpXX == 0xFFFFFFFF ? RNIL : tmpXX);
|
||||
Uint32 currSavePointId = 0;
|
||||
|
@ -8421,17 +8422,15 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
|
|||
Uint32 errCode;
|
||||
ScanRecordPtr scanptr;
|
||||
|
||||
if(noOprecPerFrag == 0){
|
||||
jam();
|
||||
scanParallel = (scanConcurrency + 15) / 16;
|
||||
noOprecPerFrag = (scanConcurrency >= 16 ? 16 : scanConcurrency & 15);
|
||||
}
|
||||
jamEntry();
|
||||
|
||||
SegmentedSectionPtr api_op_ptr;
|
||||
signal->getSection(api_op_ptr, 0);
|
||||
copy(&cdata[0], api_op_ptr);
|
||||
releaseSections(signal);
|
||||
|
||||
jamEntry();
|
||||
apiConnectptr.i = scanTabReq->apiConnectPtr;
|
||||
tabptr.i = scanTabReq->tableId;
|
||||
for(int i=0; i<16; i++)
|
||||
cdata[i] = scanTabReq->apiOperationPtr[i];
|
||||
|
||||
if (apiConnectptr.i >= capiConnectFilesize ||
|
||||
tabptr.i >= ctabrecFilesize) {
|
||||
|
@ -8441,7 +8440,6 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
|
|||
}//if
|
||||
ptrAss(apiConnectptr, apiConnectRecord);
|
||||
ApiConnectRecord * transP = apiConnectptr.p;
|
||||
|
||||
if (transP->apiConnectstate != CS_CONNECTED) {
|
||||
jam();
|
||||
// could be left over from TCKEYREQ rollback
|
||||
|
@ -8455,50 +8453,15 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
|
|||
}
|
||||
}
|
||||
ptrAss(tabptr, tableRecord);
|
||||
|
||||
if (aiLength == 0) {
|
||||
jam()
|
||||
errCode = ZSCAN_AI_LEN_ERROR;
|
||||
goto SCAN_TAB_error;
|
||||
}//if
|
||||
if (!tabptr.p->checkTable(schemaVersion)){
|
||||
jam();
|
||||
goto SCAN_TAB_schema_error;
|
||||
}//if
|
||||
/*****************************************************************
|
||||
* THE CONCURRENCY LEVEL SPECIFIED BY THE APPLICATION. IT MUST BE
|
||||
* BETWEEN 1 AND 240. IF IT IS 16 OR GREATER IT MUST BE A MULTIPLE
|
||||
* OF 16. CONCURRENCY LEVELS UPTO 16 ONLY SCAN ONE FRAGMENT AT A
|
||||
* TIME. IF WE SPECIFY 32 IT WILL SCAN TWO FRAGMENTS AT A TIME AND
|
||||
* SO FORTH. MAXIMUM 15 PARALLEL SCANS ARE ALLOWED
|
||||
******************************************************************/
|
||||
if (scanConcurrency == 0) {
|
||||
jam();
|
||||
errCode = ZNO_CONCURRENCY_ERROR;
|
||||
goto SCAN_TAB_error;
|
||||
}//if
|
||||
|
||||
/**********************************************************
|
||||
* CALCULATE THE NUMBER OF SCAN_TABINFO SIGNALS THAT WILL
|
||||
* ARRIVE TO DEFINE THIS SCAN. THIS ALSO DEFINES THE NUMBER
|
||||
* OF PARALLEL SCANS AND IT ALSO DEFINES THE NUMBER OF SCAN
|
||||
* OPERATION POINTER RECORDS TO ALLOCATE.
|
||||
**********************************************************/
|
||||
if (cfirstfreeTcConnect == RNIL) {
|
||||
jam();
|
||||
errCode = ZNO_FREE_TC_CONNECTION;
|
||||
goto SCAN_TAB_error;
|
||||
}//if
|
||||
|
||||
if (cfirstfreeScanrec == RNIL) {
|
||||
jam();
|
||||
errCode = ZNO_SCANREC_ERROR;
|
||||
goto SCAN_TAB_error;
|
||||
}//if
|
||||
|
||||
if ((aiLength == 0) ||
|
||||
(!tabptr.p->checkTable(schemaVersion)) ||
|
||||
(scanConcurrency == 0) ||
|
||||
(cfirstfreeTcConnect == RNIL) ||
|
||||
(cfirstfreeScanrec == RNIL)) {
|
||||
goto SCAN_error_check;
|
||||
}
|
||||
if (buddyPtr != RNIL) {
|
||||
jam();
|
||||
|
||||
ApiConnectRecordPtr buddyApiPtr;
|
||||
buddyApiPtr.i = buddyPtr;
|
||||
ptrCheckGuard(buddyApiPtr, capiConnectFilesize, apiConnectRecord);
|
||||
|
@ -8528,7 +8491,6 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
|
|||
|
||||
initScanrec(scanptr, scanTabReq, scanParallel, noOprecPerFrag);
|
||||
|
||||
//initScanApirec(signal, buddyPtr, transid1, transid2);
|
||||
transP->apiScanRec = scanptr.i;
|
||||
transP->returncode = 0;
|
||||
transP->transid[0] = transid1;
|
||||
|
@ -8554,10 +8516,32 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
|
|||
scanptr.p->scanState = ScanRecord::WAIT_AI;
|
||||
return;
|
||||
|
||||
SCAN_TAB_schema_error:
|
||||
SCAN_error_check:
|
||||
if (aiLength == 0) {
|
||||
jam()
|
||||
errCode = ZSCAN_AI_LEN_ERROR;
|
||||
goto SCAN_TAB_error;
|
||||
}//if
|
||||
if (!tabptr.p->checkTable(schemaVersion)){
|
||||
jam();
|
||||
errCode = tabptr.p->getErrorCode(schemaVersion);
|
||||
goto SCAN_TAB_error;
|
||||
}//if
|
||||
if (scanConcurrency == 0) {
|
||||
jam();
|
||||
errCode = ZNO_CONCURRENCY_ERROR;
|
||||
goto SCAN_TAB_error;
|
||||
}//if
|
||||
if (cfirstfreeTcConnect == RNIL) {
|
||||
jam();
|
||||
errCode = ZNO_FREE_TC_CONNECTION;
|
||||
goto SCAN_TAB_error;
|
||||
}//if
|
||||
ndbrequire(cfirstfreeScanrec == RNIL);
|
||||
jam();
|
||||
errCode = tabptr.p->getErrorCode(schemaVersion);
|
||||
|
||||
errCode = ZNO_SCANREC_ERROR;
|
||||
goto SCAN_TAB_error;
|
||||
|
||||
SCAN_TAB_error:
|
||||
jam();
|
||||
ScanTabRef * ref = (ScanTabRef*)&signal->theData[0];
|
||||
|
@ -8568,23 +8552,15 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
|
|||
ref->closeNeeded = 0;
|
||||
sendSignal(transP->ndbapiBlockref, GSN_SCAN_TABREF,
|
||||
signal, ScanTabRef::SignalLength, JBB);
|
||||
|
||||
return;
|
||||
}//Dbtc::execSCAN_TABREQ()
|
||||
|
||||
|
||||
void Dbtc::initScanApirec(Signal* signal,
|
||||
Uint32 buddyPtr, UintR transid1, UintR transid2)
|
||||
{
|
||||
}//Dbtc::initScanApirec()
|
||||
|
||||
void Dbtc::initScanrec(ScanRecordPtr scanptr,
|
||||
const ScanTabReq * scanTabReq,
|
||||
UintR scanParallel,
|
||||
UintR noOprecPerFrag)
|
||||
{
|
||||
const UintR reqinfo = scanTabReq->requestInfo;
|
||||
ndbrequire(scanParallel < 16);
|
||||
|
||||
scanptr.p->scanTcrec = tcConnectptr.i;
|
||||
scanptr.p->scanApiRec = apiConnectptr.i;
|
||||
|
@ -8593,6 +8569,8 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
|
|||
scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion;
|
||||
scanptr.p->scanParallel = scanParallel;
|
||||
scanptr.p->noOprecPerFrag = noOprecPerFrag;
|
||||
scanptr.p->first_batch_size= scanTabReq->first_batch_size;
|
||||
scanptr.p->batch_byte_size= scanTabReq->batch_byte_size;
|
||||
scanptr.p->scanLockMode = ScanTabReq::getLockMode(reqinfo);
|
||||
scanptr.p->scanLockHold = ScanTabReq::getHoldLockFlag(reqinfo);
|
||||
scanptr.p->readCommitted = ScanTabReq::getReadCommittedFlag(reqinfo);
|
||||
|
@ -8600,7 +8578,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
|
|||
scanptr.p->scanStoredProcId = scanTabReq->storedProcId;
|
||||
scanptr.p->scanState = ScanRecord::RUNNING;
|
||||
scanptr.p->m_queued_count = 0;
|
||||
|
||||
|
||||
ScanFragList list(c_scan_frag_pool,
|
||||
scanptr.p->m_running_scan_frags);
|
||||
for (Uint32 i = 0; i < scanParallel; i++) {
|
||||
|
@ -9080,6 +9058,7 @@ void Dbtc::scanError(Signal* signal, ScanRecordPtr scanptr, Uint32 errorCode)
|
|||
************************************************************/
|
||||
void Dbtc::execSCAN_FRAGCONF(Signal* signal)
|
||||
{
|
||||
Uint32 transid1, transid2, total_len;
|
||||
jamEntry();
|
||||
|
||||
const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0];
|
||||
|
@ -9095,8 +9074,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
|
|||
apiConnectptr.i = scanptr.p->scanApiRec;
|
||||
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
|
||||
|
||||
Uint32 transid1 = apiConnectptr.p->transid[0] ^ conf->transId1;
|
||||
Uint32 transid2 = apiConnectptr.p->transid[1] ^ conf->transId2;
|
||||
transid1 = apiConnectptr.p->transid[0] ^ conf->transId1;
|
||||
transid2 = apiConnectptr.p->transid[1] ^ conf->transId2;
|
||||
total_len= conf->total_len;
|
||||
transid1 = transid1 | transid2;
|
||||
if (transid1 != 0) {
|
||||
jam();
|
||||
|
@ -9146,15 +9126,13 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
|
|||
sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB);
|
||||
return;
|
||||
}
|
||||
|
||||
Uint32 chksum = 0;
|
||||
/*
|
||||
Uint32 totalLen = 0;
|
||||
for(Uint32 i = 0; i<noCompletedOps; i++){
|
||||
Uint32 tmp = conf->opReturnDataLen[i];
|
||||
chksum += (tmp << i);
|
||||
totalLen += tmp;
|
||||
}
|
||||
|
||||
*/
|
||||
{
|
||||
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
|
||||
ScanFragList queued(c_scan_frag_pool, scanptr.p->m_queued_scan_frags);
|
||||
|
@ -9165,8 +9143,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
|
|||
}
|
||||
|
||||
scanFragptr.p->m_ops = noCompletedOps;
|
||||
scanFragptr.p->m_chksum = chksum;
|
||||
scanFragptr.p->m_totalLen = totalLen;
|
||||
scanFragptr.p->m_totalLen = total_len;
|
||||
scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY;
|
||||
scanFragptr.p->stopFragTimer();
|
||||
|
||||
|
@ -9478,9 +9455,10 @@ Dbtc::seizeScanrec(Signal* signal) {
|
|||
|
||||
void Dbtc::sendScanFragReq(Signal* signal,
|
||||
ScanRecord* scanP,
|
||||
ScanFragRec* scanFragP){
|
||||
ScanFragRec* scanFragP)
|
||||
{
|
||||
Uint32 requestInfo = 0;
|
||||
ScanFragReq::setConcurrency(requestInfo, scanFragP->scanFragConcurrency);
|
||||
ScanFragReq * const req = (ScanFragReq *)&signal->theData[0];
|
||||
ScanFragReq::setLockMode(requestInfo, scanP->scanLockMode);
|
||||
ScanFragReq::setHoldLockFlag(requestInfo, scanP->scanLockHold);
|
||||
if(scanP->scanLockMode == 1){ // Not read -> keyinfo
|
||||
|
@ -9492,24 +9470,24 @@ void Dbtc::sendScanFragReq(Signal* signal,
|
|||
ScanFragReq::setAttrLen(requestInfo, scanP->scanAiLength);
|
||||
ScanFragReq::setScanPrio(requestInfo, 1);
|
||||
apiConnectptr.i = scanP->scanApiRec;
|
||||
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
|
||||
ScanFragReq * const req = (ScanFragReq *)&signal->theData[0];
|
||||
req->senderData = scanFragptr.i;
|
||||
req->resultRef = apiConnectptr.p->ndbapiBlockref;
|
||||
req->requestInfo = requestInfo;
|
||||
req->savePointId = apiConnectptr.p->currSavePointId;
|
||||
req->tableId = scanP->scanTableref;
|
||||
req->fragmentNo = scanFragP->scanFragId;
|
||||
req->schemaVersion = scanP->scanSchemaVersion;
|
||||
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
|
||||
req->senderData = scanFragptr.i;
|
||||
req->requestInfo = requestInfo;
|
||||
req->fragmentNo = scanFragP->scanFragId;
|
||||
req->resultRef = apiConnectptr.p->ndbapiBlockref;
|
||||
req->savePointId = apiConnectptr.p->currSavePointId;
|
||||
req->transId1 = apiConnectptr.p->transid[0];
|
||||
req->transId2 = apiConnectptr.p->transid[1];
|
||||
for(int i = 0; i<16; i++){
|
||||
req->clientOpPtr[i] = scanFragP->m_apiPtr;
|
||||
}
|
||||
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, 25, JBB);
|
||||
req->concurrency= scanFragP->scanFragConcurrency;
|
||||
req->clientOpPtr = scanFragP->m_apiPtr;
|
||||
req->batch_byte_size= scanP->batch_byte_size;
|
||||
req->first_batch_size= scanP->first_batch_size;
|
||||
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
|
||||
ScanFragReq::SignalLength, JBB);
|
||||
updateBuddyTimer(apiConnectptr);
|
||||
scanFragP->startFragTimer(ctcTimer);
|
||||
|
||||
}//Dbtc::sendScanFragReq()
|
||||
|
||||
|
||||
|
@ -9538,7 +9516,7 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
|
|||
|
||||
* ops++ = curr.p->m_apiPtr;
|
||||
* ops++ = curr.i;
|
||||
* ops++ = (curr.p->m_totalLen << 5) + curr.p->m_ops;
|
||||
* ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops;
|
||||
|
||||
queued.remove(curr);
|
||||
if(curr.p->m_ops > 0){
|
||||
|
|
|
@ -1844,7 +1844,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
|
|||
req->tableId = tabPtr.p->m_tableId;
|
||||
req->requestInfo = 0;
|
||||
req->savePointId = 0;
|
||||
ScanFragReq::setConcurrency(req->requestInfo, parallelism);
|
||||
//ScanFragReq::setConcurrency(req->requestInfo, parallelism);
|
||||
ScanFragReq::setLockMode(req->requestInfo, 0);
|
||||
ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
|
||||
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
|
||||
|
@ -1853,9 +1853,10 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
|
|||
req->schemaVersion = tabPtr.p->m_schemaVersion;
|
||||
req->transId1 = 0;
|
||||
req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
|
||||
|
||||
|
||||
for(unsigned int i = 0; i<parallelism; i++){
|
||||
req->clientOpPtr[i] = (ptrI << 16) + (i + 1);
|
||||
//req->clientOpPtr[i] = (ptrI << 16) + (i + 1);
|
||||
req->clientOpPtr = (ptrI << 16) + (i + 1);
|
||||
}
|
||||
suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 25, JBB);
|
||||
|
||||
|
|
|
@ -549,7 +549,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){
|
|||
/*-----------------------------------------------------------------------*/
|
||||
cfg.put(CFG_ACC_OP_RECS,
|
||||
((11 * noOfOperations) / 10 + 50) +
|
||||
(noOfLocalScanRecords * MAX_PARALLEL_SCANS_PER_FRAG) +
|
||||
(noOfLocalScanRecords * MAX_PARALLEL_OP_PER_SCAN) +
|
||||
NODE_RECOVERY_SCAN_OP_RECORDS);
|
||||
|
||||
cfg.put(CFG_ACC_OVERFLOW_RECS,
|
||||
|
|
|
@ -177,7 +177,7 @@ NdbApiSignal::setSignal(int aNdbSignalType)
|
|||
theTrace = TestOrd::TraceAPI;
|
||||
theReceiversBlockNumber = DBTC;
|
||||
theVerId_signalNumber = GSN_SCAN_TABREQ;
|
||||
theLength = 9; // ScanTabReq::SignalLength;
|
||||
theLength = ScanTabReq::StaticLength;
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -186,7 +186,7 @@ NdbApiSignal::setSignal(int aNdbSignalType)
|
|||
theTrace = TestOrd::TraceAPI;
|
||||
theReceiversBlockNumber = DBTC;
|
||||
theVerId_signalNumber = GSN_SCAN_NEXTREQ;
|
||||
theLength = 4;
|
||||
theLength = ScanNextReq::SignalLength;
|
||||
}
|
||||
break;
|
||||
|
||||
|
|
|
@ -71,6 +71,8 @@ public:
|
|||
const Uint32 * getDataPtr() const;
|
||||
Uint32 * getDataPtrSend();
|
||||
|
||||
NodeId get_sender_node();
|
||||
|
||||
/**
|
||||
* Fragmentation
|
||||
*/
|
||||
|
@ -103,6 +105,17 @@ private:
|
|||
NdbApiSignal *theNextSignal;
|
||||
Uint32 *theRealData;
|
||||
};
|
||||
/**********************************************************************
|
||||
NodeId get_sender_node
|
||||
Remark: Get the node id of the sender
|
||||
***********************************************************************/
|
||||
inline
|
||||
NodeId
|
||||
NdbApiSignal::get_sender_node()
|
||||
{
|
||||
return refToNode(theSendersBlockRef);
|
||||
}
|
||||
|
||||
/**********************************************************************
|
||||
void getLength
|
||||
Remark: Get the length of the signal.
|
||||
|
|
|
@ -99,11 +99,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
|
|||
}
|
||||
|
||||
for(Uint32 i = 0; i<len; i += 3){
|
||||
Uint32 opCount, totalLen;
|
||||
Uint32 ptrI = * ops++;
|
||||
Uint32 tcPtrI = * ops++;
|
||||
Uint32 info = * ops++;
|
||||
Uint32 opCount = ScanTabConf::getRows(info);
|
||||
Uint32 totalLen = ScanTabConf::getLength(info);
|
||||
opCount = ScanTabConf::getRows(info);
|
||||
totalLen = ScanTabConf::getLength(info);
|
||||
|
||||
void * tPtr = theNdb->int2void(ptrI);
|
||||
assert(tPtr); // For now
|
||||
|
|
|
@ -89,6 +89,47 @@ NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
|
|||
|
||||
#define KEY_ATTR_ID (~0)
|
||||
|
||||
void
|
||||
NdbReceiver::calculate_batch_size(Uint32 key_size,
|
||||
Uint32 parallelism,
|
||||
Uint32& batch_size,
|
||||
Uint32& batch_byte_size,
|
||||
Uint32& first_batch_size)
|
||||
{
|
||||
Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
|
||||
NdbRecAttr *rec_attr= theFirstRecAttr;
|
||||
while (rec_attr != NULL) {
|
||||
Uint32 attr_size= rec_attr->attrSize() * rec_attr->arraySize();
|
||||
attr_size= ((attr_size + 7) >> 2) << 2; //Even to word + overhead
|
||||
tot_size+= attr_size;
|
||||
rec_attr= rec_attr->next();
|
||||
}
|
||||
tot_size+= 32; //include signal overhead
|
||||
|
||||
/**
|
||||
* Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
|
||||
* bytes sent for each batch from each node. We do however ensure that
|
||||
* no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
|
||||
* batch.
|
||||
*/
|
||||
batch_byte_size= SCAN_BATCH_SIZE;
|
||||
if (SCAN_BATCH_SIZE * parallelism > MAX_SCAN_BATCH_SIZE) {
|
||||
batch_byte_size= MAX_SCAN_BATCH_SIZE / parallelism;
|
||||
}
|
||||
batch_size= batch_byte_size / tot_size;
|
||||
#ifdef VM_TRACE
|
||||
ndbout << "batch_byte_size = " << batch_byte_size << " batch_size = ";
|
||||
ndbout << batch_size << "tot_size = " << tot_size << endl;
|
||||
#endif
|
||||
if (batch_size == 0) {
|
||||
batch_size= 1;
|
||||
} else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
|
||||
batch_size= MAX_PARALLEL_OP_PER_SCAN;
|
||||
}
|
||||
first_batch_size= batch_size;
|
||||
return;
|
||||
}
|
||||
|
||||
void
|
||||
NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
|
||||
if(rows > m_defined_rows){
|
||||
|
@ -139,7 +180,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
|
|||
}
|
||||
|
||||
prepareSend();
|
||||
return ; //0;
|
||||
return;
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -140,17 +140,9 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||
|
||||
Uint32 fragCount = m_currentTable->m_fragmentCount;
|
||||
|
||||
if (batch + parallel == 0) {
|
||||
batch = 16;
|
||||
parallel= fragCount;
|
||||
} else {
|
||||
if (batch == 0 && parallel > 0) { // Backward
|
||||
batch = (parallel >= 16 ? 16 : parallel);
|
||||
parallel = (parallel + 15) / 16;
|
||||
}
|
||||
if (parallel > fragCount || parallel == 0)
|
||||
if (parallel > fragCount || parallel == 0) {
|
||||
parallel = fragCount;
|
||||
}
|
||||
}
|
||||
|
||||
// It is only possible to call openScan if
|
||||
// 1. this transcation don't already contain another scan operation
|
||||
|
@ -201,7 +193,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||
}
|
||||
|
||||
theParallelism = parallel;
|
||||
theBatchSize = batch;
|
||||
|
||||
if(fix_receivers(parallel) == -1){
|
||||
setErrorCodeAbort(4000);
|
||||
|
@ -223,7 +214,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
|
|||
|
||||
Uint32 reqInfo = 0;
|
||||
ScanTabReq::setParallelism(reqInfo, parallel);
|
||||
ScanTabReq::setScanBatch(reqInfo, batch);
|
||||
ScanTabReq::setScanBatch(reqInfo, 0);
|
||||
ScanTabReq::setLockMode(reqInfo, lockExcl);
|
||||
ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
|
||||
ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
|
||||
|
@ -815,8 +806,23 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
|
|||
theReceiver.prepareSend();
|
||||
bool keyInfo = m_keyInfo;
|
||||
Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;
|
||||
/**
|
||||
* The number of records sent by each LQH is calculated and the kernel
|
||||
* is informed of this number by updating the SCAN_TABREQ signal
|
||||
*/
|
||||
Uint32 batch_size, batch_byte_size, first_batch_size;
|
||||
theReceiver.calculate_batch_size(key_size,
|
||||
theParallelism,
|
||||
batch_size,
|
||||
batch_byte_size,
|
||||
first_batch_size);
|
||||
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
|
||||
ScanTabReq::setScanBatch(req->requestInfo, batch_size);
|
||||
req->batch_byte_size= batch_byte_size;
|
||||
req->first_batch_size= first_batch_size;
|
||||
|
||||
for(Uint32 i = 0; i<theParallelism; i++){
|
||||
m_receivers[i]->do_get_value(&theReceiver, theBatchSize, key_size);
|
||||
m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -856,23 +862,13 @@ NdbScanOperation::doSendScan(int aProcessorId)
|
|||
if (theOperationType == OpenRangeScanRequest)
|
||||
req->attrLen += theTotalBoundAI_Len;
|
||||
TransporterFacade *tp = TransporterFacade::instance();
|
||||
if(theParallelism > 16){
|
||||
LinearSectionPtr ptr[3];
|
||||
ptr[0].p = m_prepared_receivers;
|
||||
ptr[0].sz = theParallelism;
|
||||
if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) {
|
||||
setErrorCode(4002);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
tSignal->setLength(9+theParallelism);
|
||||
memcpy(tSignal->getDataPtrSend()+9, m_prepared_receivers, 4*theParallelism);
|
||||
if (tp->sendSignal(tSignal, aProcessorId) == -1) {
|
||||
setErrorCode(4002);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
LinearSectionPtr ptr[3];
|
||||
ptr[0].p = m_prepared_receivers;
|
||||
ptr[0].sz = theParallelism;
|
||||
if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) {
|
||||
setErrorCode(4002);
|
||||
return -1;
|
||||
}
|
||||
if (theOperationType == OpenRangeScanRequest) {
|
||||
// must have at least one signal since it contains attrLen for bounds
|
||||
assert(theBoundATTRINFO != NULL);
|
||||
|
@ -969,8 +965,8 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
|
|||
}
|
||||
|
||||
const Uint32 * src = (Uint32*)tRecAttr->aRef();
|
||||
const Uint32 tScanInfo = src[len] & 0xFFFF;
|
||||
const Uint32 tTakeOverNode = src[len] >> 16;
|
||||
const Uint32 tScanInfo = src[len] & 0x3FFFF;
|
||||
const Uint32 tTakeOverNode = src[len] >> 20;
|
||||
{
|
||||
UintR scanInfo = 0;
|
||||
TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
|
||||
|
|
Loading…
Reference in a new issue