Merge joreland@bk-internal.mysql.com:/tmp/my4.1-ndb

into mysql.com:/home/jonas/src/wl2077
This commit is contained in:
joreland@mysql.com 2004-11-19 13:35:14 +01:00
commit 4dead3b68d
21 changed files with 560 additions and 211 deletions

35
ndb/docs/wl2077.txt Normal file
View file

@ -0,0 +1,35 @@
100' * (select 1 from T1 (1M rows) where key = rand());
1 host, 1 ndbd, api co-hosted
results in 1000 rows / sec
wo/reset bounds w/ rb
4.1-read committed a) 4.9 b) 7.4
4.1-read hold lock c) 4.7 d) 6.7
wl2077-read committed 6.4 (+30%) 10.8 (+45%)
wl2077-read hold lock 4.6 (-1%) 6.7 (+ 0%)
-- Comparision e)
serial pk: 10.9'
batched (1000): 59'
serial uniq index: 8.4'
batched (1000): 33'
index range (1000): 186'
----
load) testScanPerf -c 1 -d 1 T1
a) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 0 T1
b) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 1 T1
c) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 0 T1
d) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 1 T1
e) testReadPerf -i 25 -c 0 -d 0 T1
--- music join 1db-co 2db-co
4.1 13s 14s
4.1 wo/ blobs 1.7s 3.2s
wl2077 12s 14s
wl2077 wo/ blobs 1.2s (-30%) 2.5s (-22%)

View file

@ -147,7 +147,6 @@ public:
Uint32 nfConnect; Uint32 nfConnect;
Uint32 table; Uint32 table;
Uint32 userpointer; Uint32 userpointer;
Uint32 nodeCount;
BlockReference userblockref; BlockReference userblockref;
}; };
typedef Ptr<ConnectRecord> ConnectRecordPtr; typedef Ptr<ConnectRecord> ConnectRecordPtr;

View file

@ -7080,24 +7080,22 @@ void Dbdih::execDIGETPRIMREQ(Signal* signal)
ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE); ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE);
connectPtr.i = signal->theData[0]; connectPtr.i = signal->theData[0];
if(connectPtr.i != RNIL){ if(connectPtr.i != RNIL)
{
jam(); jam();
ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord); ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord);
ndbrequire(connectPtr.p->connectState == ConnectRecord::INUSE);
getFragstore(tabPtr.p, fragId, fragPtr);
connectPtr.p->nodeCount = extractNodeInfo(fragPtr.p, connectPtr.p->nodes);
signal->theData[0] = connectPtr.p->userpointer; signal->theData[0] = connectPtr.p->userpointer;
signal->theData[1] = passThrough; }
signal->theData[2] = connectPtr.p->nodes[0]; else
sendSignal(connectPtr.p->userblockref, GSN_DIGETPRIMCONF, signal, 3, JBB); {
return; jam();
}//if signal->theData[0] = RNIL;
//connectPtr.i == RNIL -> question without connect record }
Uint32 nodes[MAX_REPLICAS]; Uint32 nodes[MAX_REPLICAS];
getFragstore(tabPtr.p, fragId, fragPtr); getFragstore(tabPtr.p, fragId, fragPtr);
Uint32 count = extractNodeInfo(fragPtr.p, nodes); Uint32 count = extractNodeInfo(fragPtr.p, nodes);
signal->theData[0] = RNIL;
signal->theData[1] = passThrough; signal->theData[1] = passThrough;
signal->theData[2] = nodes[0]; signal->theData[2] = nodes[0];
signal->theData[3] = nodes[1]; signal->theData[3] = nodes[1];

View file

@ -2925,4 +2925,23 @@ Dblqh::ScanRecord::check_scan_batch_completed() const
(max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes)); (max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes));
} }
inline
void
Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
{
if (index == 0) {
acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
} else {
Uint32 attr_buf_index, attr_buf_rec;
AttrbufPtr regAttrPtr;
jam();
attr_buf_rec= (index + 31) / 32;
attr_buf_index= (index - 1) & 31;
regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
}
}
#endif #endif

View file

@ -7161,10 +7161,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
// Update timer on tcConnectRecord // Update timer on tcConnectRecord
tcConnectptr.p->tcTimer = cLqhTimeOutCount; tcConnectptr.p->tcTimer = cLqhTimeOutCount;
init_acc_ptr_list(scanptr.p); init_acc_ptr_list(scanptr.p);
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
scanNextLoopLab(signal); scanNextLoopLab(signal);
}//Dblqh::continueScanNextReqLab() }//Dblqh::continueScanNextReqLab()
@ -7363,22 +7360,32 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
tcConnectptr.i = scanptr.p->scanTcrec; tcConnectptr.i = scanptr.p->scanTcrec;
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
releaseActiveFrag(signal); releaseActiveFrag(signal);
if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) { if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) {
if ((scanptr.p->scanErrorCounter > 0) || if ((scanptr.p->scanErrorCounter > 0) ||
(scanptr.p->scanCompletedStatus == ZTRUE)) { (scanptr.p->scanCompletedStatus == ZTRUE)) {
jam(); jam();
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes = 0;
closeScanLab(signal); closeScanLab(signal);
} else if (scanptr.p->check_scan_batch_completed() && } else if (scanptr.p->check_scan_batch_completed() &&
scanptr.p->scanLockHold != ZTRUE) { scanptr.p->scanLockHold != ZTRUE) {
jam(); jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE); sendScanFragConf(signal, ZFALSE);
} else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
jam();
closeScanLab(signal);
return;
} else { } else {
jam(); jam();
/* /*
We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only * We came here after releasing locks after
come here when scanHoldLock == ZTRUE * receiving SCAN_NEXTREQ from TC. We only come here
*/ * when scanHoldLock == ZTRUE
*/
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes = 0;
continueScanNextReqLab(signal); continueScanNextReqLab(signal);
}//if }//if
} else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) { } else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) {
@ -7465,25 +7472,6 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP)
scanP->scan_acc_index = 0; scanP->scan_acc_index = 0;
} }
inline
void
Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
{
if (index == 0) {
acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
} else {
Uint32 attr_buf_index, attr_buf_rec;
AttrbufPtr regAttrPtr;
jam();
attr_buf_rec= (index + 31) / 32;
attr_buf_index= (index - 1) & 31;
regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
}
}
Uint32 Uint32
Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP,
Uint32 index, Uint32 index,
@ -8007,6 +7995,13 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
/************************************************************* /*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
************************************************************ */ ************************************************************ */
if (!scanptr.p->scanLockHold)
{
jam();
closeScanLab(signal);
return;
}
if (scanptr.p->scanCompletedStatus == ZTRUE) { if (scanptr.p->scanCompletedStatus == ZTRUE) {
if ((scanptr.p->scanLockHold == ZTRUE) && if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) { (scanptr.p->m_curr_batch_size_rows > 0)) {
@ -8507,8 +8502,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
ScanFragRef::SignalLength, JBB); ScanFragRef::SignalLength, JBB);
} else { } else {
jam(); jam();
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
sendScanFragConf(signal, ZSCAN_FRAG_CLOSED); sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
}//if }//if
finishScanrec(signal); finishScanrec(signal);
@ -8912,6 +8905,13 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
conf->total_len= total_len; conf->total_len= total_len;
sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF, sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF,
signal, ScanFragConf::SignalLength, JBB); signal, ScanFragConf::SignalLength, JBB);
if(!scanptr.p->scanLockHold)
{
jam();
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
}
}//Dblqh::sendScanFragConf() }//Dblqh::sendScanFragConf()
/* ######################################################################### */ /* ######################################################################### */

View file

@ -1054,9 +1054,8 @@ public:
// Id of the ScanRecord this fragment scan belongs to // Id of the ScanRecord this fragment scan belongs to
Uint32 scanRec; Uint32 scanRec;
// The maximum number of operations that can be scanned before // The value of fragmentCompleted in the last received SCAN_FRAGCONF
// returning to TC Uint8 m_scan_frag_conf_status;
Uint16 scanFragConcurrency;
inline void startFragTimer(Uint32 timeVal){ inline void startFragTimer(Uint32 timeVal){
scanFragTimer = timeVal; scanFragTimer = timeVal;
@ -1193,8 +1192,10 @@ public:
// Number of operation records per scanned fragment // Number of operation records per scanned fragment
// Number of operations in first batch // Number of operations in first batch
// Max number of bytes per batch // Max number of bytes per batch
Uint16 noOprecPerFrag; union {
Uint16 first_batch_size; Uint16 first_batch_size_rows;
Uint16 batch_size_rows;
};
Uint32 batch_byte_size; Uint32 batch_byte_size;
Uint32 scanRequestInfo; // ScanFrag format Uint32 scanRequestInfo; // ScanFrag format

View file

@ -8646,9 +8646,9 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
scanptr.p->scanTableref = tabptr.i; scanptr.p->scanTableref = tabptr.i;
scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion; scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion;
scanptr.p->scanParallel = scanParallel; scanptr.p->scanParallel = scanParallel;
scanptr.p->noOprecPerFrag = noOprecPerFrag; scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size;
scanptr.p->first_batch_size= scanTabReq->first_batch_size; scanptr.p->batch_byte_size = scanTabReq->batch_byte_size;
scanptr.p->batch_byte_size= scanTabReq->batch_byte_size; scanptr.p->batch_size_rows = noOprecPerFrag;
Uint32 tmp = 0; Uint32 tmp = 0;
const UintR ri = scanTabReq->requestInfo; const UintR ri = scanTabReq->requestInfo;
@ -8672,7 +8672,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
ndbrequire(list.seize(ptr)); ndbrequire(list.seize(ptr));
ptr.p->scanRec = scanptr.i; ptr.p->scanRec = scanptr.i;
ptr.p->scanFragId = 0; ptr.p->scanFragId = 0;
ptr.p->scanFragConcurrency = noOprecPerFrag;
ptr.p->m_apiPtr = cdata[i]; ptr.p->m_apiPtr = cdata[i];
}//for }//for
@ -8945,6 +8944,20 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
scanptr.i = scanFragptr.p->scanRec; scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
if(ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo))
{
jam();
Uint32 max = 3+signal->theData[6];
Uint32 nodeid = getOwnNodeId();
for(Uint32 i = 3; i<max; i++)
if(signal->theData[i] == nodeid)
{
jam();
tnodeid = nodeid;
break;
}
}
{ {
/** /**
* Check table * Check table
@ -9141,6 +9154,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0]; const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0];
const Uint32 noCompletedOps = conf->completedOps; const Uint32 noCompletedOps = conf->completedOps;
const Uint32 status = conf->fragmentCompleted;
scanFragptr.i = conf->senderData; scanFragptr.i = conf->senderData;
c_scan_frag_pool.getPtr(scanFragptr); c_scan_frag_pool.getPtr(scanFragptr);
@ -9163,11 +9177,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE); ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
const Uint32 status = conf->fragmentCompleted;
if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){ if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
jam(); jam();
if(status == ZFALSE){ if(status == 0){
/** /**
* We have started closing = we sent a close -> ignore this * We have started closing = we sent a close -> ignore this
*/ */
@ -9184,11 +9196,11 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
return; return;
} }
if(status == ZCLOSED && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){ if(noCompletedOps == 0 && status != 0 &&
scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
/** /**
* Start on next fragment * Start on next fragment
*/ */
ndbrequire(noCompletedOps == 0);
scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
scanFragptr.p->startFragTimer(ctcTimer); scanFragptr.p->startFragTimer(ctcTimer);
@ -9218,6 +9230,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
scanptr.p->m_queued_count++; scanptr.p->m_queued_count++;
} }
scanFragptr.p->m_scan_frag_conf_status = status;
scanFragptr.p->m_ops = noCompletedOps; scanFragptr.p->m_ops = noCompletedOps;
scanFragptr.p->m_totalLen = total_len; scanFragptr.p->m_totalLen = total_len;
scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY; scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY;
@ -9330,11 +9343,12 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
// Copy op ptrs so I dont overwrite them when sending... // Copy op ptrs so I dont overwrite them when sending...
memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len); memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len);
ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0]; ScanFragNextReq tmp;
nextReq->closeFlag = ZFALSE; tmp.closeFlag = ZFALSE;
nextReq->transId1 = apiConnectptr.p->transid[0]; tmp.transId1 = apiConnectptr.p->transid[0];
nextReq->transId2 = apiConnectptr.p->transid[1]; tmp.transId2 = apiConnectptr.p->transid[1];
nextReq->batch_size_bytes= scanP->batch_byte_size; tmp.batch_size_rows = scanP->batch_size_rows;
tmp.batch_size_bytes = scanP->batch_byte_size;
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags); ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags); ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
@ -9344,15 +9358,37 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
c_scan_frag_pool.getPtr(scanFragptr); c_scan_frag_pool.getPtr(scanFragptr);
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED); ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED);
scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
scanFragptr.p->startFragTimer(ctcTimer); scanFragptr.p->startFragTimer(ctcTimer);
scanFragptr.p->m_ops = 0; scanFragptr.p->m_ops = 0;
nextReq->senderData = scanFragptr.i;
nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency;
sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, if(scanFragptr.p->m_scan_frag_conf_status)
ScanFragNextReq::SignalLength, JBB); {
/**
* last scan was complete
*/
jam();
ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag);
scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
tcConnectptr.i = scanptr.p->scanTcrec;
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++;
signal->theData[0] = tcConnectptr.p->dihConnectptr;
signal->theData[1] = scanFragptr.i;
signal->theData[2] = scanptr.p->scanTableref;
signal->theData[3] = scanFragptr.p->scanFragId;
sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB);
}
else
{
jam();
scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend();
* req = tmp;
req->senderData = scanFragptr.i;
sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
}
delivered.remove(scanFragptr); delivered.remove(scanFragptr);
running.add(scanFragptr); running.add(scanFragptr);
}//for }//for
@ -9551,7 +9587,7 @@ void Dbtc::sendScanFragReq(Signal* signal,
req->transId1 = apiConnectptr.p->transid[0]; req->transId1 = apiConnectptr.p->transid[0];
req->transId2 = apiConnectptr.p->transid[1]; req->transId2 = apiConnectptr.p->transid[1];
req->clientOpPtr = scanFragP->m_apiPtr; req->clientOpPtr = scanFragP->m_apiPtr;
req->batch_size_rows= scanFragP->scanFragConcurrency; req->batch_size_rows= scanP->batch_size_rows;
req->batch_size_bytes= scanP->batch_byte_size; req->batch_size_bytes= scanP->batch_byte_size;
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB); ScanFragReq::SignalLength, JBB);
@ -9573,6 +9609,8 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
jam(); jam();
ops += 21; ops += 21;
} }
Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId;
ScanTabConf * conf = (ScanTabConf*)&signal->theData[0]; ScanTabConf * conf = (ScanTabConf*)&signal->theData[0];
conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect; conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
@ -9588,24 +9626,25 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
ScanFragRecPtr curr = ptr; // Remove while iterating... ScanFragRecPtr curr = ptr; // Remove while iterating...
queued.next(ptr); queued.next(ptr);
bool done = curr.p->m_scan_frag_conf_status && --left;
* ops++ = curr.p->m_apiPtr; * ops++ = curr.p->m_apiPtr;
* ops++ = curr.i; * ops++ = done ? RNIL : curr.i;
* ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops; * ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops;
queued.remove(curr); queued.remove(curr);
if(curr.p->m_ops > 0){ if(!done){
delivered.add(curr); delivered.add(curr);
curr.p->scanFragState = ScanFragRec::DELIVERED; curr.p->scanFragState = ScanFragRec::DELIVERED;
curr.p->stopFragTimer(); curr.p->stopFragTimer();
} else { } else {
(* --ops) = ScanTabConf::EndOfData; ops++;
c_scan_frag_pool.release(curr); c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer(); curr.p->stopFragTimer();
} }
} }
} }
if(scanPtr.p->m_delivered_scan_frags.isEmpty() && if(scanPtr.p->m_delivered_scan_frags.isEmpty() &&
scanPtr.p->m_running_scan_frags.isEmpty()){ scanPtr.p->m_running_scan_frags.isEmpty()){
conf->requestInfo = op_count | ScanTabConf::EndOfData; conf->requestInfo = op_count | ScanTabConf::EndOfData;
@ -10424,9 +10463,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sfp.i, sfp.i,
sfp.p->scanFragState, sfp.p->scanFragState,
sfp.p->scanFragId); sfp.p->scanFragId);
infoEvent(" nodeid=%d, concurr=%d, timer=%d", infoEvent(" nodeid=%d, timer=%d",
refToNode(sfp.p->lqhBlockref), refToNode(sfp.p->lqhBlockref),
sfp.p->scanFragConcurrency,
sfp.p->scanFragTimer); sfp.p->scanFragTimer);
} }
@ -10504,7 +10542,7 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sp.p->scanAiLength, sp.p->scanAiLength,
sp.p->scanParallel, sp.p->scanParallel,
sp.p->scanReceivedOperations, sp.p->scanReceivedOperations,
sp.p->noOprecPerFrag); sp.p->batch_size_rows);
infoEvent(" schv=%d, tab=%d, sproc=%d", infoEvent(" schv=%d, tab=%d, sproc=%d",
sp.p->scanSchemaVersion, sp.p->scanSchemaVersion,
sp.p->scanTableref, sp.p->scanTableref,

View file

@ -1888,7 +1888,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->requestInfo = 0; req->requestInfo = 0;
req->savePointId = 0; req->savePointId = 0;
ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setLockMode(req->requestInfo, 0);
ScanFragReq::setHoldLockFlag(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
ScanFragReq::setAttrLen(req->requestInfo, attrLen); ScanFragReq::setAttrLen(req->requestInfo, attrLen);
req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo; req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;

View file

@ -1582,9 +1582,6 @@ from other transactions.
/** /**
* There's always a TCKEYCONF when using IgnoreError * There's always a TCKEYCONF when using IgnoreError
*/ */
#ifdef VM_TRACE
ndbout_c("Not completing transaction 2");
#endif
return -1; return -1;
} }
/**********************************************************************/ /**********************************************************************/
@ -1836,9 +1833,6 @@ NdbConnection::OpCompleteFailure(Uint8 abortOption, bool setFailure)
/** /**
* There's always a TCKEYCONF when using IgnoreError * There's always a TCKEYCONF when using IgnoreError
*/ */
#ifdef VM_TRACE
ndbout_c("Not completing transaction");
#endif
return -1; return -1;
} }

View file

@ -97,7 +97,7 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
theScanningOp->execCLOSE_SCAN_REP(); theScanningOp->execCLOSE_SCAN_REP();
return 0; return 0;
} }
for(Uint32 i = 0; i<len; i += 3){ for(Uint32 i = 0; i<len; i += 3){
Uint32 opCount, totalLen; Uint32 opCount, totalLen;
Uint32 ptrI = * ops++; Uint32 ptrI = * ops++;
@ -109,24 +109,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
void * tPtr = theNdb->int2void(ptrI); void * tPtr = theNdb->int2void(ptrI);
assert(tPtr); // For now assert(tPtr); // For now
NdbReceiver* tOp = theNdb->void2rec(tPtr); NdbReceiver* tOp = theNdb->void2rec(tPtr);
if (tOp && tOp->checkMagicNumber()){ if (tOp && tOp->checkMagicNumber())
if(tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)){ {
/** if (tcPtrI == RNIL && opCount == 0)
*
*/
theScanningOp->receiver_delivered(tOp);
} else if(info == ScanTabConf::EndOfData){
theScanningOp->receiver_completed(tOp); theScanningOp->receiver_completed(tOp);
} else if (tOp->execSCANOPCONF(tcPtrI, totalLen, opCount))
} theScanningOp->receiver_delivered(tOp);
}
if (conf->requestInfo & ScanTabConf::EndOfData) {
if(theScanningOp->m_ordered)
theScanningOp->m_api_receivers_count = 0;
if(theScanningOp->m_api_receivers_count +
theScanningOp->m_conf_receivers_count +
theScanningOp->m_sent_receivers_count){
abort();
} }
} }
return 0; return 0;

View file

@ -35,6 +35,8 @@
#include <signaldata/AttrInfo.hpp> #include <signaldata/AttrInfo.hpp>
#include <signaldata/TcKeyReq.hpp> #include <signaldata/TcKeyReq.hpp>
#define DEBUG_NEXT_RESULT 0
NdbScanOperation::NdbScanOperation(Ndb* aNdb) : NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
NdbOperation(aNdb), NdbOperation(aNdb),
m_resultSet(0), m_resultSet(0),
@ -275,6 +277,9 @@ NdbScanOperation::fix_receivers(Uint32 parallel){
void void
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
if(theError.code == 0){ if(theError.code == 0){
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver_delivered");
Uint32 idx = tRec->m_list_index; Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1; Uint32 last = m_sent_receivers_count - 1;
if(idx != last){ if(idx != last){
@ -298,6 +303,9 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
void void
NdbScanOperation::receiver_completed(NdbReceiver* tRec){ NdbScanOperation::receiver_completed(NdbReceiver* tRec){
if(theError.code == 0){ if(theError.code == 0){
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver_completed");
Uint32 idx = tRec->m_list_index; Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1; Uint32 last = m_sent_receivers_count - 1;
if(idx != last){ if(idx != last){
@ -445,8 +453,6 @@ NdbScanOperation::executeCursor(int nodeId){
return -1; return -1;
} }
#define DEBUG_NEXT_RESULT 0
int NdbScanOperation::nextResult(bool fetchAllowed) int NdbScanOperation::nextResult(bool fetchAllowed)
{ {
if(m_ordered) if(m_ordered)
@ -579,7 +585,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
int int
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
if(cnt > 0 || stopScanFlag){ if(cnt > 0)
{
NdbApiSignal tSignal(theNdb->theMyRef); NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ); tSignal.setSignal(GSN_SCAN_NEXTREQ);
@ -595,33 +602,40 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
*/ */
Uint32 last = m_sent_receivers_count; Uint32 last = m_sent_receivers_count;
Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4); Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
Uint32 sent = 0;
for(Uint32 i = 0; i<cnt; i++){ for(Uint32 i = 0; i<cnt; i++){
NdbReceiver * tRec = m_api_receivers[i]; NdbReceiver * tRec = m_api_receivers[i];
m_sent_receivers[last+i] = tRec; if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
tRec->m_list_index = last+i; {
prep_array[i] = tRec->m_tcPtrI; m_sent_receivers[last+sent] = tRec;
tRec->prepareSend(); tRec->m_list_index = last+sent;
tRec->prepareSend();
sent++;
}
} }
memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*)); memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*));
Uint32 nodeId = theNdbCon->theDBnode; int ret = 0;
TransporterFacade * tp = TransporterFacade::instance(); if(sent)
int ret; {
if(cnt > 21){ Uint32 nodeId = theNdbCon->theDBnode;
tSignal.setLength(4); TransporterFacade * tp = TransporterFacade::instance();
LinearSectionPtr ptr[3]; if(cnt > 21 && !stopScanFlag){
ptr[0].p = prep_array; tSignal.setLength(4);
ptr[0].sz = cnt; LinearSectionPtr ptr[3];
ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); ptr[0].p = prep_array;
} else { ptr[0].sz = sent;
tSignal.setLength(4+cnt); ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
ret = tp->sendSignal(&tSignal, nodeId); } else {
tSignal.setLength(4+(stopScanFlag ? 0 : sent));
ret = tp->sendSignal(&tSignal, nodeId);
}
} }
m_sent_receivers_count = last + cnt + stopScanFlag; m_sent_receivers_count = last + sent;
m_api_receivers_count -= cnt; m_api_receivers_count -= cnt;
m_current_api_receiver = 0; m_current_api_receiver = 0;
return ret; return ret;
} }
return 0; return 0;
@ -1412,10 +1426,22 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
if(idx == theParallelism) if(idx == theParallelism)
return 0; return 0;
NdbReceiver* tRec = m_api_receivers[idx];
NdbApiSignal tSignal(theNdb->theMyRef); NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ); tSignal.setSignal(GSN_SCAN_NEXTREQ);
Uint32 last = m_sent_receivers_count;
Uint32* theData = tSignal.getDataPtrSend(); Uint32* theData = tSignal.getDataPtrSend();
Uint32* prep_array = theData + 4;
m_current_api_receiver = idx + 1;
if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
{
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver completed, don't send");
return 0;
}
theData[0] = theNdbCon->theTCConPtr; theData[0] = theNdbCon->theTCConPtr;
theData[1] = 0; theData[1] = 0;
Uint64 transId = theNdbCon->theTransactionId; Uint64 transId = theNdbCon->theTransactionId;
@ -1425,17 +1451,10 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
/** /**
* Prepare ops * Prepare ops
*/ */
Uint32 last = m_sent_receivers_count;
Uint32 * prep_array = theData + 4;
NdbReceiver * tRec = m_api_receivers[idx];
m_sent_receivers[last] = tRec; m_sent_receivers[last] = tRec;
tRec->m_list_index = last; tRec->m_list_index = last;
prep_array[0] = tRec->m_tcPtrI;
tRec->prepareSend(); tRec->prepareSend();
m_sent_receivers_count = last + 1; m_sent_receivers_count = last + 1;
m_current_api_receiver = idx + 1;
Uint32 nodeId = theNdbCon->theDBnode; Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance(); TransporterFacade * tp = TransporterFacade::instance();
@ -1448,12 +1467,17 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
Uint32 seq = theNdbCon->theNodeSequence; Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode; Uint32 nodeId = theNdbCon->theDBnode;
if(seq != tp->getNodeSequence(nodeId)){ if(seq != tp->getNodeSequence(nodeId))
{
theNdbCon->theReleaseOnClose = true; theNdbCon->theReleaseOnClose = true;
return -1; return -1;
} }
while(theError.code == 0 && m_sent_receivers_count){ /**
* Wait for outstanding
*/
while(theError.code == 0 && m_sent_receivers_count)
{
theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@ -1471,18 +1495,52 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
} }
} }
if(m_api_receivers_count+m_conf_receivers_count){ /**
// Send close scan * move all conf'ed into api
if(send_next_scan(0, true) == -1){ // Close scan * so that send_next_scan can check if they needs to be closed
theNdbCon->theReleaseOnClose = true; */
return -1; Uint32 api = m_api_receivers_count;
} Uint32 conf = m_conf_receivers_count;
if(m_ordered)
{
/**
* Ordered scan, keep the m_api_receivers "to the right"
*/
memmove(m_api_receivers, m_api_receivers+m_current_api_receiver,
(theParallelism - m_current_api_receiver) * sizeof(char*));
api = (theParallelism - m_current_api_receiver);
m_api_receivers_count = api;
}
if(DEBUG_NEXT_RESULT)
ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
m_ordered, api, conf,
m_sent_receivers_count, m_current_api_receiver, theParallelism);
if(api+conf)
{
/**
* There's something to close
* setup m_api_receivers (for send_next_scan)
*/
memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
m_api_receivers_count = api + conf;
m_conf_receivers_count = 0;
}
// Send close scan
if(send_next_scan(api+conf, true) == -1)
{
theNdbCon->theReleaseOnClose = true;
return -1;
} }
/** /**
* wait for close scan conf * wait for close scan conf
*/ */
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){ while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{
theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@ -1499,6 +1557,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
return -1; return -1;
} }
} }
return 0; return 0;
} }

View file

@ -36,15 +36,21 @@ public:
bool allowConstraintViolation = true, bool allowConstraintViolation = true,
int doSleep = 0, int doSleep = 0,
bool oneTrans = false); bool oneTrans = false);
int scanReadRecords(Ndb*, int scanReadRecords(Ndb*,
int records, int records,
int abort = 0, int abort = 0,
int parallelism = 0, int parallelism = 0,
bool committed = false); NdbOperation::LockMode = NdbOperation::LM_Read);
int scanReadCommittedRecords(Ndb*,
int records, int scanReadRecords(Ndb*,
int abort = 0, const NdbDictionary::Index*,
int parallelism = 0); int records,
int abort = 0,
int parallelism = 0,
NdbOperation::LockMode = NdbOperation::LM_Read,
bool sorted = false);
int pkReadRecords(Ndb*, int pkReadRecords(Ndb*,
int records, int records,
int batchsize = 1, int batchsize = 1,

View file

@ -53,11 +53,11 @@ public:
int selectCount(Ndb*, int selectCount(Ndb*,
int parallelism = 0, int parallelism = 0,
int* count_rows = NULL, int* count_rows = NULL,
ScanLock lock = SL_Read, NdbOperation::LockMode lm = NdbOperation::LM_CommittedRead,
NdbConnection* pTrans = NULL); NdbConnection* pTrans = NULL);
int scanReadRecords(Ndb*, int scanReadRecords(Ndb*,
int parallelism, int parallelism,
bool exclusive, NdbOperation::LockMode lm,
int records, int records,
int noAttribs, int noAttribs,
int* attrib_list, int* attrib_list,

View file

@ -391,8 +391,15 @@ run_read(){
void void
print_result(){ print_result(){
int tmp = 1;
tmp *= g_paramters[P_RANGE].value;
tmp *= g_paramters[P_LOOPS].value;
int t, t2;
for(int i = 0; i<P_OP_TYPES; i++){ for(int i = 0; i<P_OP_TYPES; i++){
g_err.println("%s avg: %u us/row", g_ops[i], g_err << g_ops[i] << " avg: "
(1000*g_times[i])/(g_paramters[P_RANGE].value*g_paramters[P_LOOPS].value)); << (int)((1000*g_times[i])/tmp)
<< " us/row ("
<< (1000 * tmp)/g_times[i] << " rows / sec)" << endl;
} }
} }

View file

@ -242,8 +242,9 @@ int runScanReadCommitted(NDBT_Context* ctx, NDBT_Step* step){
HugoTransactions hugoTrans(*ctx->getTab()); HugoTransactions hugoTrans(*ctx->getTab());
while (i<loops && !ctx->isTestStopped()) { while (i<loops && !ctx->isTestStopped()) {
g_info << i << ": "; g_info << i << ": ";
if (hugoTrans.scanReadCommittedRecords(GETNDB(step), records, if (hugoTrans.scanReadRecords(GETNDB(step), records,
abort, parallelism) != 0){ abort, parallelism,
NdbOperation::LM_CommittedRead) != 0){
return NDBT_FAILED; return NDBT_FAILED;
} }
i++; i++;
@ -639,7 +640,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){
g_info << (unsigned)i << endl; g_info << (unsigned)i << endl;
if(utilTrans.scanReadRecords(GETNDB(step), if(utilTrans.scanReadRecords(GETNDB(step),
parallelism, parallelism,
false, NdbOperation::LM_Read,
records, records,
alist.attriblist[i]->numAttribs, alist.attriblist[i]->numAttribs,
alist.attriblist[i]->attribs) != 0){ alist.attriblist[i]->attribs) != 0){
@ -647,7 +648,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){
} }
if(utilTrans.scanReadRecords(GETNDB(step), if(utilTrans.scanReadRecords(GETNDB(step),
parallelism, parallelism,
true, NdbOperation::LM_Read,
records, records,
alist.attriblist[i]->numAttribs, alist.attriblist[i]->numAttribs,
alist.attriblist[i]->attribs) != 0){ alist.attriblist[i]->attribs) != 0){

View file

@ -39,8 +39,9 @@ struct Parameter {
#define P_LOOPS 8 #define P_LOOPS 8
#define P_CREATE 9 #define P_CREATE 9
#define P_LOAD 10 #define P_LOAD 10
#define P_RESET 11
#define P_MAX 11 #define P_MAX 12
static static
Parameter Parameter
@ -55,7 +56,8 @@ g_paramters[] = {
{ "size", 1000000, 1, ~0 }, { "size", 1000000, 1, ~0 },
{ "iterations", 3, 1, ~0 }, { "iterations", 3, 1, ~0 },
{ "create_drop", 1, 0, 1 }, { "create_drop", 1, 0, 1 },
{ "data", 1, 0, 1 } { "data", 1, 0, 1 },
{ "q-reset bounds", 0, 1, 0 }
}; };
static Ndb* g_ndb = 0; static Ndb* g_ndb = 0;
@ -219,21 +221,29 @@ run_scan(){
NDB_TICKS start1, stop; NDB_TICKS start1, stop;
int sum_time= 0; int sum_time= 0;
int sample_rows = 0;
NDB_TICKS sample_start = NdbTick_CurrentMillisecond();
Uint32 tot = g_paramters[P_ROWS].value; Uint32 tot = g_paramters[P_ROWS].value;
if(g_paramters[P_BOUND].value == 2 || g_paramters[P_FILT].value == 2)
iter *= g_paramters[P_ROWS].value;
NdbScanOperation * pOp = 0;
NdbIndexScanOperation * pIOp = 0;
NdbConnection * pTrans = 0;
NdbResultSet * rs = 0;
int check = 0;
for(int i = 0; i<iter; i++){ for(int i = 0; i<iter; i++){
start1 = NdbTick_CurrentMillisecond(); start1 = NdbTick_CurrentMillisecond();
NdbConnection * pTrans = g_ndb->startTransaction(); pTrans = pTrans ? pTrans : g_ndb->startTransaction();
if(!pTrans){ if(!pTrans){
g_err << "Failed to start transaction" << endl; g_err << "Failed to start transaction" << endl;
err(g_ndb->getNdbError()); err(g_ndb->getNdbError());
return -1; return -1;
} }
NdbScanOperation * pOp;
NdbIndexScanOperation * pIOp;
NdbResultSet * rs;
int par = g_paramters[P_PARRA].value; int par = g_paramters[P_PARRA].value;
int bat = g_paramters[P_BATCH].value; int bat = g_paramters[P_BATCH].value;
NdbScanOperation::LockMode lm; NdbScanOperation::LockMode lm;
@ -256,9 +266,17 @@ run_scan(){
assert(pOp); assert(pOp);
rs = pOp->readTuples(lm, bat, par); rs = pOp->readTuples(lm, bat, par);
} else { } else {
pOp = pIOp = pTrans->getNdbIndexScanOperation(g_indexname, g_tablename); if(g_paramters[P_RESET].value == 0 || pIOp == 0)
bool ord = g_paramters[P_ACCESS].value == 2; {
rs = pIOp->readTuples(lm, bat, par, ord); pOp= pIOp= pTrans->getNdbIndexScanOperation(g_indexname, g_tablename);
bool ord = g_paramters[P_ACCESS].value == 2;
rs = pIOp->readTuples(lm, bat, par, ord);
}
else
{
pIOp->reset_bounds();
}
switch(g_paramters[P_BOUND].value){ switch(g_paramters[P_BOUND].value){
case 0: // All case 0: // All
break; break;
@ -268,20 +286,22 @@ run_scan(){
case 2: { // 1 row case 2: { // 1 row
default: default:
assert(g_table->getNoOfPrimaryKeys() == 1); // only impl. so far assert(g_table->getNoOfPrimaryKeys() == 1); // only impl. so far
abort();
#if 0
int tot = g_paramters[P_ROWS].value; int tot = g_paramters[P_ROWS].value;
int row = rand() % tot; int row = rand() % tot;
#if 0
fix_eq_bound(pIOp, row); fix_eq_bound(pIOp, row);
#else
pIOp->setBound((Uint32)0, NdbIndexScanOperation::BoundEQ, &row);
#endif #endif
break; break;
} }
} }
if(g_paramters[P_RESET].value == 1)
goto execute;
} }
assert(pOp); assert(pOp);
assert(rs); assert(rs);
int check = 0;
switch(g_paramters[P_FILT].value){ switch(g_paramters[P_FILT].value){
case 0: // All case 0: // All
check = pOp->interpret_exit_ok(); check = pOp->interpret_exit_ok();
@ -313,7 +333,7 @@ run_scan(){
for(int i = 0; i<g_table->getNoOfColumns(); i++){ for(int i = 0; i<g_table->getNoOfColumns(); i++){
pOp->getValue(i); pOp->getValue(i);
} }
execute:
int rows = 0; int rows = 0;
check = pTrans->execute(NoCommit); check = pTrans->execute(NoCommit);
assert(check == 0); assert(check == 0);
@ -334,19 +354,29 @@ run_scan(){
return -1; return -1;
} }
assert(check == 1); assert(check == 1);
g_info << "Found " << rows << " rows" << endl; if(g_paramters[P_RESET].value == 0)
{
pTrans->close(); pTrans->close();
pTrans = 0;
}
stop = NdbTick_CurrentMillisecond(); stop = NdbTick_CurrentMillisecond();
int time_passed= (int)(stop - start1); int time_passed= (int)(stop - start1);
g_err.println("Time: %d ms = %u rows/sec", time_passed, sample_rows += rows;
(1000*tot)/time_passed);
sum_time+= time_passed; sum_time+= time_passed;
if(sample_rows >= tot)
{
int sample_time = (int)(stop - sample_start);
g_info << "Found " << sample_rows << " rows" << endl;
g_err.println("Time: %d ms = %u rows/sec", sample_time,
(1000*sample_rows)/sample_time);
sample_rows = 0;
sample_start = stop;
}
} }
sum_time= sum_time / iter;
g_err.println("Avg time: %d ms = %u rows/sec", sum_time/iter,
g_err.println("Avg time: %d ms = %u rows/sec", sum_time, (1000*tot*iter)/sum_time);
(1000*tot)/sum_time);
return 0; return 0;
} }

View file

@ -29,20 +29,13 @@ HugoTransactions::~HugoTransactions(){
deallocRows(); deallocRows();
} }
int HugoTransactions::scanReadCommittedRecords(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
return scanReadRecords(pNdb, records, abortPercent, parallelism, true);
}
int int
HugoTransactions::scanReadRecords(Ndb* pNdb, HugoTransactions::scanReadRecords(Ndb* pNdb,
int records, int records,
int abortPercent, int abortPercent,
int parallelism, int parallelism,
bool committed){ NdbOperation::LockMode lm)
{
int retryAttempt = 0; int retryAttempt = 0;
const int retryMax = 100; const int retryMax = 100;
@ -80,8 +73,163 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
} }
NdbResultSet * rs; NdbResultSet * rs;
rs = pOp ->readTuples(committed ? NdbScanOperation::LM_CommittedRead : rs = pOp ->readTuples(lm);
NdbScanOperation::LM_Read);
if( rs == 0 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->interpret_exit_ok();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
for(a = 0; a<tab.getNoOfColumns(); a++){
if((row.attributeStore(a) =
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
check = pTrans->execute(NoCommit);
if( check == -1 ) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Abort after 1-100 or 1-records rows
int ranVal = rand();
int abortCount = ranVal % (records == 0 ? 100 : records);
bool abortTrans = false;
if (abort > 0){
// Abort if abortCount is less then abortPercent
if (abortCount < abortPercent)
abortTrans = true;
}
int eof;
int rows = 0;
while((eof = rs->nextResult(true)) == 0){
rows++;
if (calc.verifyRowValues(&row) != 0){
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
if (abortCount == rows && abortTrans == true){
ndbout << "Scan is aborted" << endl;
g_info << "Scan is aborted" << endl;
rs->close();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
return NDBT_OK;
}
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR_INFO(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
switch (err.code){
case 488:
case 245:
case 490:
// Too many active scans, no limit on number of retry attempts
break;
default:
retryAttempt++;
}
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
g_info << rows << " rows have been read" << endl;
if (records != 0 && rows != records){
g_err << "Check expected number of records failed" << endl
<< " expected=" << records <<", " << endl
<< " read=" << rows << endl;
return NDBT_FAILED;
}
return NDBT_OK;
}
return NDBT_FAILED;
}
int
HugoTransactions::scanReadRecords(Ndb* pNdb,
const NdbDictionary::Index * pIdx,
int records,
int abortPercent,
int parallelism,
NdbOperation::LockMode lm,
bool sorted)
{
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
NdbIndexScanOperation *pOp;
while (true){
if (retryAttempt >= retryMax){
g_err << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
return NDBT_FAILED;
}
pTrans = pNdb->startTransaction();
if (pTrans == NULL) {
const NdbError err = pNdb->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
return NDBT_FAILED;
}
pOp = pTrans->getNdbIndexScanOperation(pIdx->getName(), tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
NdbResultSet * rs;
rs = pOp ->readTuples(lm, 0, parallelism, sorted);
if( rs == 0 ) { if( rs == 0 ) {
ERR(pTrans->getNdbError()); ERR(pTrans->getNdbError());

View file

@ -619,7 +619,7 @@ UtilTransactions::addRowToInsert(Ndb* pNdb,
int int
UtilTransactions::scanReadRecords(Ndb* pNdb, UtilTransactions::scanReadRecords(Ndb* pNdb,
int parallelism, int parallelism,
bool exclusive, NdbOperation::LockMode lm,
int records, int records,
int noAttribs, int noAttribs,
int *attrib_list, int *attrib_list,
@ -669,10 +669,7 @@ UtilTransactions::scanReadRecords(Ndb* pNdb,
return NDBT_FAILED; return NDBT_FAILED;
} }
NdbResultSet * rs = pOp->readTuples(exclusive ? NdbResultSet * rs = pOp->readTuples(lm, 0, parallelism);
NdbScanOperation::LM_Exclusive :
NdbScanOperation::LM_Read,
0, parallelism);
if( rs == 0 ) { if( rs == 0 ) {
ERR(pTrans->getNdbError()); ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans); pNdb->closeTransaction(pTrans);
@ -761,7 +758,7 @@ int
UtilTransactions::selectCount(Ndb* pNdb, UtilTransactions::selectCount(Ndb* pNdb,
int parallelism, int parallelism,
int* count_rows, int* count_rows,
ScanLock lock, NdbOperation::LockMode lm,
NdbConnection* pTrans){ NdbConnection* pTrans){
int retryAttempt = 0; int retryAttempt = 0;
@ -785,19 +782,7 @@ UtilTransactions::selectCount(Ndb* pNdb,
return NDBT_FAILED; return NDBT_FAILED;
} }
NdbResultSet * rs; NdbResultSet * rs = pOp->readTuples(lm);
switch(lock){
case SL_ReadHold:
rs = pOp->readTuples(NdbScanOperation::LM_Read);
break;
case SL_Exclusive:
rs = pOp->readTuples(NdbScanOperation::LM_Exclusive);
break;
case SL_Read:
default:
rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead);
}
if( rs == 0) { if( rs == 0) {
ERR(pTrans->getNdbError()); ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans); pNdb->closeTransaction(pTrans);

View file

@ -30,7 +30,7 @@ main(int argc, const char** argv){
const char* _dbname = "TEST_DB"; const char* _dbname = "TEST_DB";
int _help = 0; int _help = 0;
int _ordered, _pk; int _ordered = 0, _pk = 1;
struct getargs args[] = { struct getargs args[] = {
{ "database", 'd', arg_string, &_dbname, "dbname", { "database", 'd', arg_string, &_dbname, "dbname",

View file

@ -35,13 +35,17 @@ int main(int argc, const char** argv){
int _parallelism = 1; int _parallelism = 1;
const char* _tabname = NULL; const char* _tabname = NULL;
int _help = 0; int _help = 0;
int lock = NdbOperation::LM_Read;
int sorted = 0;
struct getargs args[] = { struct getargs args[] = {
{ "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" }, { "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" },
{ "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" }, { "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" },
{ "parallelism", 'p', arg_integer, &_parallelism, "parallelism(1-240)", "para" }, { "parallelism", 'p', arg_integer, &_parallelism, "parallelism(1-240)", "para" },
{ "records", 'r', arg_integer, &_records, "Number of records", "recs" }, { "records", 'r', arg_integer, &_records, "Number of records", "recs" },
{ "usage", '?', arg_flag, &_help, "Print help", "" } { "usage", '?', arg_flag, &_help, "Print help", "" },
{ "lock", 'm', arg_integer, &lock, "lock mode", "" },
{ "sorted", 's', arg_flag, &sorted, "sorted", "" }
}; };
int num_args = sizeof(args) / sizeof(args[0]); int num_args = sizeof(args) / sizeof(args[0]);
int optind = 0; int optind = 0;
@ -73,16 +77,48 @@ int main(int argc, const char** argv){
ndbout << " Table " << _tabname << " does not exist!" << endl; ndbout << " Table " << _tabname << " does not exist!" << endl;
return NDBT_ProgramExit(NDBT_WRONGARGS); return NDBT_ProgramExit(NDBT_WRONGARGS);
} }
const NdbDictionary::Index * pIdx = 0;
if(optind+1 < argc)
{
pIdx = MyNdb.getDictionary()->getIndex(argv[optind+1], _tabname);
if(!pIdx)
ndbout << " Index " << argv[optind+1] << " not found" << endl;
else
if(pIdx->getType() != NdbDictionary::Index::UniqueOrderedIndex &&
pIdx->getType() != NdbDictionary::Index::OrderedIndex)
{
ndbout << " Index " << argv[optind+1] << " is not scannable" << endl;
pIdx = 0;
}
}
HugoTransactions hugoTrans(*pTab); HugoTransactions hugoTrans(*pTab);
int i = 0; int i = 0;
while (i<_loops || _loops==0) { while (i<_loops || _loops==0) {
ndbout << i << ": "; ndbout << i << ": ";
if(hugoTrans.scanReadRecords(&MyNdb, if(!pIdx)
0, {
_abort, if(hugoTrans.scanReadRecords(&MyNdb,
_parallelism) != 0){ 0,
return NDBT_ProgramExit(NDBT_FAILED); _abort,
_parallelism,
(NdbOperation::LockMode)lock) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
}
}
else
{
if(hugoTrans.scanReadRecords(&MyNdb, pIdx,
0,
_abort,
_parallelism,
(NdbOperation::LockMode)lock,
sorted) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
}
} }
i++; i++;
} }

View file

@ -133,13 +133,18 @@ int main(int argc, char** argv){
const NdbDictionary::Table* pTab = NDBT_Table::discoverTableFromDb(&MyNdb, _tabname); const NdbDictionary::Table* pTab = NDBT_Table::discoverTableFromDb(&MyNdb, _tabname);
const NdbDictionary::Index * pIdx = 0; const NdbDictionary::Index * pIdx = 0;
if(argc > 1){ if(argc > 1){
pIdx = MyNdb.getDictionary()->getIndex(argv[0], _tabname); pIdx = MyNdb.getDictionary()->getIndex(argv[1], _tabname);
} }
if(pTab == NULL){ if(pTab == NULL){
ndbout << " Table " << _tabname << " does not exist!" << endl; ndbout << " Table " << _tabname << " does not exist!" << endl;
return NDBT_ProgramExit(NDBT_WRONGARGS); return NDBT_ProgramExit(NDBT_WRONGARGS);
} }
if(argc > 1 && pIdx == 0)
{
ndbout << " Index " << argv[1] << " does not exists" << endl;
}
if(_order && pIdx == NULL){ if(_order && pIdx == NULL){
ndbout << " Order flag given without an index" << endl; ndbout << " Order flag given without an index" << endl;