Merge tulin@bk-internal.mysql.com:/tmp/my4.1-ndb

into poseidon.ndb.mysql.com:/home/tomas/mysql-4.1-mysqladmin
This commit is contained in:
tomas@poseidon.ndb.mysql.com 2004-11-19 14:25:59 +00:00
commit 0de3a28048
21 changed files with 565 additions and 211 deletions

35
ndb/docs/wl2077.txt Normal file
View file

@ -0,0 +1,35 @@
100' * (select 1 from T1 (1M rows) where key = rand());
1 host, 1 ndbd, api co-hosted
results in 1000 rows / sec
wo/reset bounds w/ rb
4.1-read committed a) 4.9 b) 7.4
4.1-read hold lock c) 4.7 d) 6.7
wl2077-read committed 6.4 (+30%) 10.8 (+45%)
wl2077-read hold lock 4.6 (-1%) 6.7 (+ 0%)
-- Comparision e)
serial pk: 10.9'
batched (1000): 59'
serial uniq index: 8.4'
batched (1000): 33'
index range (1000): 186'
----
load) testScanPerf -c 1 -d 1 T1
a) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 0 T1
b) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 1 T1
c) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 0 T1
d) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 1 T1
e) testReadPerf -i 25 -c 0 -d 0 T1
--- music join 1db-co 2db-co
4.1 13s 14s
4.1 wo/ blobs 1.7s 3.2s
wl2077 12s 14s
wl2077 wo/ blobs 1.2s (-30%) 2.5s (-22%)

View file

@ -147,7 +147,6 @@ public:
Uint32 nfConnect;
Uint32 table;
Uint32 userpointer;
Uint32 nodeCount;
BlockReference userblockref;
};
typedef Ptr<ConnectRecord> ConnectRecordPtr;

View file

@ -7080,24 +7080,22 @@ void Dbdih::execDIGETPRIMREQ(Signal* signal)
ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE);
connectPtr.i = signal->theData[0];
if(connectPtr.i != RNIL){
if(connectPtr.i != RNIL)
{
jam();
ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord);
ndbrequire(connectPtr.p->connectState == ConnectRecord::INUSE);
getFragstore(tabPtr.p, fragId, fragPtr);
connectPtr.p->nodeCount = extractNodeInfo(fragPtr.p, connectPtr.p->nodes);
signal->theData[0] = connectPtr.p->userpointer;
signal->theData[1] = passThrough;
signal->theData[2] = connectPtr.p->nodes[0];
sendSignal(connectPtr.p->userblockref, GSN_DIGETPRIMCONF, signal, 3, JBB);
return;
}//if
//connectPtr.i == RNIL -> question without connect record
}
else
{
jam();
signal->theData[0] = RNIL;
}
Uint32 nodes[MAX_REPLICAS];
getFragstore(tabPtr.p, fragId, fragPtr);
Uint32 count = extractNodeInfo(fragPtr.p, nodes);
signal->theData[0] = RNIL;
signal->theData[1] = passThrough;
signal->theData[2] = nodes[0];
signal->theData[3] = nodes[1];

View file

@ -2925,4 +2925,23 @@ Dblqh::ScanRecord::check_scan_batch_completed() const
(max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes));
}
inline
void
Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
{
if (index == 0) {
acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
} else {
Uint32 attr_buf_index, attr_buf_rec;
AttrbufPtr regAttrPtr;
jam();
attr_buf_rec= (index + 31) / 32;
attr_buf_index= (index - 1) & 31;
regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
}
}
#endif

View file

@ -7161,10 +7161,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
// Update timer on tcConnectRecord
tcConnectptr.p->tcTimer = cLqhTimeOutCount;
init_acc_ptr_list(scanptr.p);
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
scanNextLoopLab(signal);
}//Dblqh::continueScanNextReqLab()
@ -7363,22 +7360,32 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
tcConnectptr.i = scanptr.p->scanTcrec;
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
releaseActiveFrag(signal);
if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) {
if ((scanptr.p->scanErrorCounter > 0) ||
(scanptr.p->scanCompletedStatus == ZTRUE)) {
jam();
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes = 0;
closeScanLab(signal);
} else if (scanptr.p->check_scan_batch_completed() &&
scanptr.p->scanLockHold != ZTRUE) {
jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
} else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
jam();
closeScanLab(signal);
return;
} else {
jam();
/*
We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only
come here when scanHoldLock == ZTRUE
*/
* We came here after releasing locks after
* receiving SCAN_NEXTREQ from TC. We only come here
* when scanHoldLock == ZTRUE
*/
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes = 0;
continueScanNextReqLab(signal);
}//if
} else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) {
@ -7465,25 +7472,6 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP)
scanP->scan_acc_index = 0;
}
inline
void
Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
{
if (index == 0) {
acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
} else {
Uint32 attr_buf_index, attr_buf_rec;
AttrbufPtr regAttrPtr;
jam();
attr_buf_rec= (index + 31) / 32;
attr_buf_index= (index - 1) & 31;
regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
}
}
Uint32
Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP,
Uint32 index,
@ -8007,6 +7995,13 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
/*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
************************************************************ */
if (!scanptr.p->scanLockHold)
{
jam();
closeScanLab(signal);
return;
}
if (scanptr.p->scanCompletedStatus == ZTRUE) {
if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) {
@ -8507,8 +8502,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
ScanFragRef::SignalLength, JBB);
} else {
jam();
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
}//if
finishScanrec(signal);
@ -8912,6 +8905,13 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
conf->total_len= total_len;
sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF,
signal, ScanFragConf::SignalLength, JBB);
if(!scanptr.p->scanLockHold)
{
jam();
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
}
}//Dblqh::sendScanFragConf()
/* ######################################################################### */

View file

@ -1054,9 +1054,8 @@ public:
// Id of the ScanRecord this fragment scan belongs to
Uint32 scanRec;
// The maximum number of operations that can be scanned before
// returning to TC
Uint16 scanFragConcurrency;
// The value of fragmentCompleted in the last received SCAN_FRAGCONF
Uint8 m_scan_frag_conf_status;
inline void startFragTimer(Uint32 timeVal){
scanFragTimer = timeVal;
@ -1193,8 +1192,10 @@ public:
// Number of operation records per scanned fragment
// Number of operations in first batch
// Max number of bytes per batch
Uint16 noOprecPerFrag;
Uint16 first_batch_size;
union {
Uint16 first_batch_size_rows;
Uint16 batch_size_rows;
};
Uint32 batch_byte_size;
Uint32 scanRequestInfo; // ScanFrag format

View file

@ -8646,9 +8646,9 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
scanptr.p->scanTableref = tabptr.i;
scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion;
scanptr.p->scanParallel = scanParallel;
scanptr.p->noOprecPerFrag = noOprecPerFrag;
scanptr.p->first_batch_size= scanTabReq->first_batch_size;
scanptr.p->batch_byte_size= scanTabReq->batch_byte_size;
scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size;
scanptr.p->batch_byte_size = scanTabReq->batch_byte_size;
scanptr.p->batch_size_rows = noOprecPerFrag;
Uint32 tmp = 0;
const UintR ri = scanTabReq->requestInfo;
@ -8672,7 +8672,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
ndbrequire(list.seize(ptr));
ptr.p->scanRec = scanptr.i;
ptr.p->scanFragId = 0;
ptr.p->scanFragConcurrency = noOprecPerFrag;
ptr.p->m_apiPtr = cdata[i];
}//for
@ -8945,6 +8944,25 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
/**
* This must be false as select count(*) otherwise
* can "pass" committing on backup fragments and
* get incorrect row count
*/
if(false && ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo))
{
jam();
Uint32 max = 3+signal->theData[6];
Uint32 nodeid = getOwnNodeId();
for(Uint32 i = 3; i<max; i++)
if(signal->theData[i] == nodeid)
{
jam();
tnodeid = nodeid;
break;
}
}
{
/**
* Check table
@ -9141,6 +9159,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0];
const Uint32 noCompletedOps = conf->completedOps;
const Uint32 status = conf->fragmentCompleted;
scanFragptr.i = conf->senderData;
c_scan_frag_pool.getPtr(scanFragptr);
@ -9163,11 +9182,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
const Uint32 status = conf->fragmentCompleted;
if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
jam();
if(status == ZFALSE){
if(status == 0){
/**
* We have started closing = we sent a close -> ignore this
*/
@ -9184,11 +9201,11 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
return;
}
if(status == ZCLOSED && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
if(noCompletedOps == 0 && status != 0 &&
scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
/**
* Start on next fragment
*/
ndbrequire(noCompletedOps == 0);
scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
scanFragptr.p->startFragTimer(ctcTimer);
@ -9218,6 +9235,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
scanptr.p->m_queued_count++;
}
scanFragptr.p->m_scan_frag_conf_status = status;
scanFragptr.p->m_ops = noCompletedOps;
scanFragptr.p->m_totalLen = total_len;
scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY;
@ -9330,11 +9348,12 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
// Copy op ptrs so I dont overwrite them when sending...
memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len);
ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0];
nextReq->closeFlag = ZFALSE;
nextReq->transId1 = apiConnectptr.p->transid[0];
nextReq->transId2 = apiConnectptr.p->transid[1];
nextReq->batch_size_bytes= scanP->batch_byte_size;
ScanFragNextReq tmp;
tmp.closeFlag = ZFALSE;
tmp.transId1 = apiConnectptr.p->transid[0];
tmp.transId2 = apiConnectptr.p->transid[1];
tmp.batch_size_rows = scanP->batch_size_rows;
tmp.batch_size_bytes = scanP->batch_byte_size;
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
@ -9344,15 +9363,37 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
c_scan_frag_pool.getPtr(scanFragptr);
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED);
scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
scanFragptr.p->startFragTimer(ctcTimer);
scanFragptr.p->m_ops = 0;
nextReq->senderData = scanFragptr.i;
nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency;
sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
if(scanFragptr.p->m_scan_frag_conf_status)
{
/**
* last scan was complete
*/
jam();
ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag);
scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
tcConnectptr.i = scanptr.p->scanTcrec;
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++;
signal->theData[0] = tcConnectptr.p->dihConnectptr;
signal->theData[1] = scanFragptr.i;
signal->theData[2] = scanptr.p->scanTableref;
signal->theData[3] = scanFragptr.p->scanFragId;
sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB);
}
else
{
jam();
scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend();
* req = tmp;
req->senderData = scanFragptr.i;
sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
}
delivered.remove(scanFragptr);
running.add(scanFragptr);
}//for
@ -9551,7 +9592,7 @@ void Dbtc::sendScanFragReq(Signal* signal,
req->transId1 = apiConnectptr.p->transid[0];
req->transId2 = apiConnectptr.p->transid[1];
req->clientOpPtr = scanFragP->m_apiPtr;
req->batch_size_rows= scanFragP->scanFragConcurrency;
req->batch_size_rows= scanP->batch_size_rows;
req->batch_size_bytes= scanP->batch_byte_size;
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB);
@ -9573,6 +9614,8 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
jam();
ops += 21;
}
Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId;
ScanTabConf * conf = (ScanTabConf*)&signal->theData[0];
conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
@ -9588,24 +9631,25 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
ScanFragRecPtr curr = ptr; // Remove while iterating...
queued.next(ptr);
bool done = curr.p->m_scan_frag_conf_status && --left;
* ops++ = curr.p->m_apiPtr;
* ops++ = curr.i;
* ops++ = done ? RNIL : curr.i;
* ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops;
queued.remove(curr);
if(curr.p->m_ops > 0){
if(!done){
delivered.add(curr);
curr.p->scanFragState = ScanFragRec::DELIVERED;
curr.p->stopFragTimer();
} else {
(* --ops) = ScanTabConf::EndOfData; ops++;
c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
}
}
}
if(scanPtr.p->m_delivered_scan_frags.isEmpty() &&
scanPtr.p->m_running_scan_frags.isEmpty()){
conf->requestInfo = op_count | ScanTabConf::EndOfData;
@ -10424,9 +10468,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sfp.i,
sfp.p->scanFragState,
sfp.p->scanFragId);
infoEvent(" nodeid=%d, concurr=%d, timer=%d",
infoEvent(" nodeid=%d, timer=%d",
refToNode(sfp.p->lqhBlockref),
sfp.p->scanFragConcurrency,
sfp.p->scanFragTimer);
}
@ -10504,7 +10547,7 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sp.p->scanAiLength,
sp.p->scanParallel,
sp.p->scanReceivedOperations,
sp.p->noOprecPerFrag);
sp.p->batch_size_rows);
infoEvent(" schv=%d, tab=%d, sproc=%d",
sp.p->scanSchemaVersion,
sp.p->scanTableref,

View file

@ -1888,7 +1888,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->requestInfo = 0;
req->savePointId = 0;
ScanFragReq::setLockMode(req->requestInfo, 0);
ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
ScanFragReq::setAttrLen(req->requestInfo, attrLen);
req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;

View file

@ -1582,9 +1582,6 @@ from other transactions.
/**
* There's always a TCKEYCONF when using IgnoreError
*/
#ifdef VM_TRACE
ndbout_c("Not completing transaction 2");
#endif
return -1;
}
/**********************************************************************/
@ -1836,9 +1833,6 @@ NdbConnection::OpCompleteFailure(Uint8 abortOption, bool setFailure)
/**
* There's always a TCKEYCONF when using IgnoreError
*/
#ifdef VM_TRACE
ndbout_c("Not completing transaction");
#endif
return -1;
}

View file

@ -97,7 +97,7 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
theScanningOp->execCLOSE_SCAN_REP();
return 0;
}
for(Uint32 i = 0; i<len; i += 3){
Uint32 opCount, totalLen;
Uint32 ptrI = * ops++;
@ -109,24 +109,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
void * tPtr = theNdb->int2void(ptrI);
assert(tPtr); // For now
NdbReceiver* tOp = theNdb->void2rec(tPtr);
if (tOp && tOp->checkMagicNumber()){
if(tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)){
/**
*
*/
theScanningOp->receiver_delivered(tOp);
} else if(info == ScanTabConf::EndOfData){
if (tOp && tOp->checkMagicNumber())
{
if (tcPtrI == RNIL && opCount == 0)
theScanningOp->receiver_completed(tOp);
}
}
}
if (conf->requestInfo & ScanTabConf::EndOfData) {
if(theScanningOp->m_ordered)
theScanningOp->m_api_receivers_count = 0;
if(theScanningOp->m_api_receivers_count +
theScanningOp->m_conf_receivers_count +
theScanningOp->m_sent_receivers_count){
abort();
else if (tOp->execSCANOPCONF(tcPtrI, totalLen, opCount))
theScanningOp->receiver_delivered(tOp);
}
}
return 0;

View file

@ -35,6 +35,8 @@
#include <signaldata/AttrInfo.hpp>
#include <signaldata/TcKeyReq.hpp>
#define DEBUG_NEXT_RESULT 0
NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
NdbOperation(aNdb),
m_resultSet(0),
@ -275,6 +277,9 @@ NdbScanOperation::fix_receivers(Uint32 parallel){
void
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
if(theError.code == 0){
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver_delivered");
Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1;
if(idx != last){
@ -298,6 +303,9 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
void
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
if(theError.code == 0){
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver_completed");
Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1;
if(idx != last){
@ -445,8 +453,6 @@ NdbScanOperation::executeCursor(int nodeId){
return -1;
}
#define DEBUG_NEXT_RESULT 0
int NdbScanOperation::nextResult(bool fetchAllowed)
{
if(m_ordered)
@ -579,7 +585,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
int
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
if(cnt > 0 || stopScanFlag){
if(cnt > 0)
{
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
@ -595,33 +602,40 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
*/
Uint32 last = m_sent_receivers_count;
Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
Uint32 sent = 0;
for(Uint32 i = 0; i<cnt; i++){
NdbReceiver * tRec = m_api_receivers[i];
m_sent_receivers[last+i] = tRec;
tRec->m_list_index = last+i;
prep_array[i] = tRec->m_tcPtrI;
tRec->prepareSend();
if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
{
m_sent_receivers[last+sent] = tRec;
tRec->m_list_index = last+sent;
tRec->prepareSend();
sent++;
}
}
memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*));
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
int ret;
if(cnt > 21){
tSignal.setLength(4);
LinearSectionPtr ptr[3];
ptr[0].p = prep_array;
ptr[0].sz = cnt;
ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
} else {
tSignal.setLength(4+cnt);
ret = tp->sendSignal(&tSignal, nodeId);
int ret = 0;
if(sent)
{
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
if(cnt > 21 && !stopScanFlag){
tSignal.setLength(4);
LinearSectionPtr ptr[3];
ptr[0].p = prep_array;
ptr[0].sz = sent;
ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
} else {
tSignal.setLength(4+(stopScanFlag ? 0 : sent));
ret = tp->sendSignal(&tSignal, nodeId);
}
}
m_sent_receivers_count = last + cnt + stopScanFlag;
m_sent_receivers_count = last + sent;
m_api_receivers_count -= cnt;
m_current_api_receiver = 0;
return ret;
}
return 0;
@ -1412,10 +1426,22 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
if(idx == theParallelism)
return 0;
NdbReceiver* tRec = m_api_receivers[idx];
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
Uint32 last = m_sent_receivers_count;
Uint32* theData = tSignal.getDataPtrSend();
Uint32* prep_array = theData + 4;
m_current_api_receiver = idx + 1;
if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
{
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver completed, don't send");
return 0;
}
theData[0] = theNdbCon->theTCConPtr;
theData[1] = 0;
Uint64 transId = theNdbCon->theTransactionId;
@ -1425,17 +1451,10 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
/**
* Prepare ops
*/
Uint32 last = m_sent_receivers_count;
Uint32 * prep_array = theData + 4;
NdbReceiver * tRec = m_api_receivers[idx];
m_sent_receivers[last] = tRec;
tRec->m_list_index = last;
prep_array[0] = tRec->m_tcPtrI;
tRec->prepareSend();
m_sent_receivers_count = last + 1;
m_current_api_receiver = idx + 1;
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
@ -1448,12 +1467,17 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq != tp->getNodeSequence(nodeId)){
if(seq != tp->getNodeSequence(nodeId))
{
theNdbCon->theReleaseOnClose = true;
return -1;
}
while(theError.code == 0 && m_sent_receivers_count){
/**
* Wait for outstanding
*/
while(theError.code == 0 && m_sent_receivers_count)
{
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@ -1471,18 +1495,52 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
}
}
if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan
if(send_next_scan(0, true) == -1){ // Close scan
theNdbCon->theReleaseOnClose = true;
return -1;
}
/**
* move all conf'ed into api
* so that send_next_scan can check if they needs to be closed
*/
Uint32 api = m_api_receivers_count;
Uint32 conf = m_conf_receivers_count;
if(m_ordered)
{
/**
* Ordered scan, keep the m_api_receivers "to the right"
*/
memmove(m_api_receivers, m_api_receivers+m_current_api_receiver,
(theParallelism - m_current_api_receiver) * sizeof(char*));
api = (theParallelism - m_current_api_receiver);
m_api_receivers_count = api;
}
if(DEBUG_NEXT_RESULT)
ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
m_ordered, api, conf,
m_sent_receivers_count, m_current_api_receiver, theParallelism);
if(api+conf)
{
/**
* There's something to close
* setup m_api_receivers (for send_next_scan)
*/
memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
m_api_receivers_count = api + conf;
m_conf_receivers_count = 0;
}
// Send close scan
if(send_next_scan(api+conf, true) == -1)
{
theNdbCon->theReleaseOnClose = true;
return -1;
}
/**
* wait for close scan conf
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@ -1499,6 +1557,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
return -1;
}
}
return 0;
}

View file

@ -36,15 +36,21 @@ public:
bool allowConstraintViolation = true,
int doSleep = 0,
bool oneTrans = false);
int scanReadRecords(Ndb*,
int records,
int abort = 0,
int parallelism = 0,
bool committed = false);
int scanReadCommittedRecords(Ndb*,
int records,
int abort = 0,
int parallelism = 0);
NdbOperation::LockMode = NdbOperation::LM_Read);
int scanReadRecords(Ndb*,
const NdbDictionary::Index*,
int records,
int abort = 0,
int parallelism = 0,
NdbOperation::LockMode = NdbOperation::LM_Read,
bool sorted = false);
int pkReadRecords(Ndb*,
int records,
int batchsize = 1,

View file

@ -53,11 +53,11 @@ public:
int selectCount(Ndb*,
int parallelism = 0,
int* count_rows = NULL,
ScanLock lock = SL_Read,
NdbOperation::LockMode lm = NdbOperation::LM_CommittedRead,
NdbConnection* pTrans = NULL);
int scanReadRecords(Ndb*,
int parallelism,
bool exclusive,
NdbOperation::LockMode lm,
int records,
int noAttribs,
int* attrib_list,

View file

@ -391,8 +391,15 @@ run_read(){
void
print_result(){
int tmp = 1;
tmp *= g_paramters[P_RANGE].value;
tmp *= g_paramters[P_LOOPS].value;
int t, t2;
for(int i = 0; i<P_OP_TYPES; i++){
g_err.println("%s avg: %u us/row", g_ops[i],
(1000*g_times[i])/(g_paramters[P_RANGE].value*g_paramters[P_LOOPS].value));
g_err << g_ops[i] << " avg: "
<< (int)((1000*g_times[i])/tmp)
<< " us/row ("
<< (1000 * tmp)/g_times[i] << " rows / sec)" << endl;
}
}

View file

@ -242,8 +242,9 @@ int runScanReadCommitted(NDBT_Context* ctx, NDBT_Step* step){
HugoTransactions hugoTrans(*ctx->getTab());
while (i<loops && !ctx->isTestStopped()) {
g_info << i << ": ";
if (hugoTrans.scanReadCommittedRecords(GETNDB(step), records,
abort, parallelism) != 0){
if (hugoTrans.scanReadRecords(GETNDB(step), records,
abort, parallelism,
NdbOperation::LM_CommittedRead) != 0){
return NDBT_FAILED;
}
i++;
@ -639,7 +640,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){
g_info << (unsigned)i << endl;
if(utilTrans.scanReadRecords(GETNDB(step),
parallelism,
false,
NdbOperation::LM_Read,
records,
alist.attriblist[i]->numAttribs,
alist.attriblist[i]->attribs) != 0){
@ -647,7 +648,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){
}
if(utilTrans.scanReadRecords(GETNDB(step),
parallelism,
true,
NdbOperation::LM_Read,
records,
alist.attriblist[i]->numAttribs,
alist.attriblist[i]->attribs) != 0){

View file

@ -39,8 +39,9 @@ struct Parameter {
#define P_LOOPS 8
#define P_CREATE 9
#define P_LOAD 10
#define P_RESET 11
#define P_MAX 11
#define P_MAX 12
static
Parameter
@ -55,7 +56,8 @@ g_paramters[] = {
{ "size", 1000000, 1, ~0 },
{ "iterations", 3, 1, ~0 },
{ "create_drop", 1, 0, 1 },
{ "data", 1, 0, 1 }
{ "data", 1, 0, 1 },
{ "q-reset bounds", 0, 1, 0 }
};
static Ndb* g_ndb = 0;
@ -219,21 +221,29 @@ run_scan(){
NDB_TICKS start1, stop;
int sum_time= 0;
int sample_rows = 0;
NDB_TICKS sample_start = NdbTick_CurrentMillisecond();
Uint32 tot = g_paramters[P_ROWS].value;
if(g_paramters[P_BOUND].value == 2 || g_paramters[P_FILT].value == 2)
iter *= g_paramters[P_ROWS].value;
NdbScanOperation * pOp = 0;
NdbIndexScanOperation * pIOp = 0;
NdbConnection * pTrans = 0;
NdbResultSet * rs = 0;
int check = 0;
for(int i = 0; i<iter; i++){
start1 = NdbTick_CurrentMillisecond();
NdbConnection * pTrans = g_ndb->startTransaction();
pTrans = pTrans ? pTrans : g_ndb->startTransaction();
if(!pTrans){
g_err << "Failed to start transaction" << endl;
err(g_ndb->getNdbError());
return -1;
}
NdbScanOperation * pOp;
NdbIndexScanOperation * pIOp;
NdbResultSet * rs;
int par = g_paramters[P_PARRA].value;
int bat = g_paramters[P_BATCH].value;
NdbScanOperation::LockMode lm;
@ -256,9 +266,17 @@ run_scan(){
assert(pOp);
rs = pOp->readTuples(lm, bat, par);
} else {
pOp = pIOp = pTrans->getNdbIndexScanOperation(g_indexname, g_tablename);
bool ord = g_paramters[P_ACCESS].value == 2;
rs = pIOp->readTuples(lm, bat, par, ord);
if(g_paramters[P_RESET].value == 0 || pIOp == 0)
{
pOp= pIOp= pTrans->getNdbIndexScanOperation(g_indexname, g_tablename);
bool ord = g_paramters[P_ACCESS].value == 2;
rs = pIOp->readTuples(lm, bat, par, ord);
}
else
{
pIOp->reset_bounds();
}
switch(g_paramters[P_BOUND].value){
case 0: // All
break;
@ -268,20 +286,22 @@ run_scan(){
case 2: { // 1 row
default:
assert(g_table->getNoOfPrimaryKeys() == 1); // only impl. so far
abort();
#if 0
int tot = g_paramters[P_ROWS].value;
int row = rand() % tot;
#if 0
fix_eq_bound(pIOp, row);
#else
pIOp->setBound((Uint32)0, NdbIndexScanOperation::BoundEQ, &row);
#endif
break;
}
}
if(g_paramters[P_RESET].value == 1)
goto execute;
}
assert(pOp);
assert(rs);
int check = 0;
switch(g_paramters[P_FILT].value){
case 0: // All
check = pOp->interpret_exit_ok();
@ -313,7 +333,7 @@ run_scan(){
for(int i = 0; i<g_table->getNoOfColumns(); i++){
pOp->getValue(i);
}
execute:
int rows = 0;
check = pTrans->execute(NoCommit);
assert(check == 0);
@ -334,19 +354,29 @@ run_scan(){
return -1;
}
assert(check == 1);
g_info << "Found " << rows << " rows" << endl;
pTrans->close();
if(g_paramters[P_RESET].value == 0)
{
pTrans->close();
pTrans = 0;
}
stop = NdbTick_CurrentMillisecond();
int time_passed= (int)(stop - start1);
g_err.println("Time: %d ms = %u rows/sec", time_passed,
(1000*tot)/time_passed);
sample_rows += rows;
sum_time+= time_passed;
if(sample_rows >= tot)
{
int sample_time = (int)(stop - sample_start);
g_info << "Found " << sample_rows << " rows" << endl;
g_err.println("Time: %d ms = %u rows/sec", sample_time,
(1000*sample_rows)/sample_time);
sample_rows = 0;
sample_start = stop;
}
}
sum_time= sum_time / iter;
g_err.println("Avg time: %d ms = %u rows/sec", sum_time,
(1000*tot)/sum_time);
g_err.println("Avg time: %d ms = %u rows/sec", sum_time/iter,
(1000*tot*iter)/sum_time);
return 0;
}

View file

@ -29,20 +29,13 @@ HugoTransactions::~HugoTransactions(){
deallocRows();
}
int HugoTransactions::scanReadCommittedRecords(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
return scanReadRecords(pNdb, records, abortPercent, parallelism, true);
}
int
HugoTransactions::scanReadRecords(Ndb* pNdb,
int records,
int abortPercent,
int parallelism,
bool committed){
NdbOperation::LockMode lm)
{
int retryAttempt = 0;
const int retryMax = 100;
@ -80,8 +73,163 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
}
NdbResultSet * rs;
rs = pOp ->readTuples(committed ? NdbScanOperation::LM_CommittedRead :
NdbScanOperation::LM_Read);
rs = pOp ->readTuples(lm);
if( rs == 0 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->interpret_exit_ok();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
for(a = 0; a<tab.getNoOfColumns(); a++){
if((row.attributeStore(a) =
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
check = pTrans->execute(NoCommit);
if( check == -1 ) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Abort after 1-100 or 1-records rows
int ranVal = rand();
int abortCount = ranVal % (records == 0 ? 100 : records);
bool abortTrans = false;
if (abort > 0){
// Abort if abortCount is less then abortPercent
if (abortCount < abortPercent)
abortTrans = true;
}
int eof;
int rows = 0;
while((eof = rs->nextResult(true)) == 0){
rows++;
if (calc.verifyRowValues(&row) != 0){
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
if (abortCount == rows && abortTrans == true){
ndbout << "Scan is aborted" << endl;
g_info << "Scan is aborted" << endl;
rs->close();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
return NDBT_OK;
}
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR_INFO(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
switch (err.code){
case 488:
case 245:
case 490:
// Too many active scans, no limit on number of retry attempts
break;
default:
retryAttempt++;
}
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
g_info << rows << " rows have been read" << endl;
if (records != 0 && rows != records){
g_err << "Check expected number of records failed" << endl
<< " expected=" << records <<", " << endl
<< " read=" << rows << endl;
return NDBT_FAILED;
}
return NDBT_OK;
}
return NDBT_FAILED;
}
int
HugoTransactions::scanReadRecords(Ndb* pNdb,
const NdbDictionary::Index * pIdx,
int records,
int abortPercent,
int parallelism,
NdbOperation::LockMode lm,
bool sorted)
{
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
NdbIndexScanOperation *pOp;
while (true){
if (retryAttempt >= retryMax){
g_err << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
return NDBT_FAILED;
}
pTrans = pNdb->startTransaction();
if (pTrans == NULL) {
const NdbError err = pNdb->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
return NDBT_FAILED;
}
pOp = pTrans->getNdbIndexScanOperation(pIdx->getName(), tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
NdbResultSet * rs;
rs = pOp ->readTuples(lm, 0, parallelism, sorted);
if( rs == 0 ) {
ERR(pTrans->getNdbError());

View file

@ -619,7 +619,7 @@ UtilTransactions::addRowToInsert(Ndb* pNdb,
int
UtilTransactions::scanReadRecords(Ndb* pNdb,
int parallelism,
bool exclusive,
NdbOperation::LockMode lm,
int records,
int noAttribs,
int *attrib_list,
@ -669,10 +669,7 @@ UtilTransactions::scanReadRecords(Ndb* pNdb,
return NDBT_FAILED;
}
NdbResultSet * rs = pOp->readTuples(exclusive ?
NdbScanOperation::LM_Exclusive :
NdbScanOperation::LM_Read,
0, parallelism);
NdbResultSet * rs = pOp->readTuples(lm, 0, parallelism);
if( rs == 0 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
@ -761,7 +758,7 @@ int
UtilTransactions::selectCount(Ndb* pNdb,
int parallelism,
int* count_rows,
ScanLock lock,
NdbOperation::LockMode lm,
NdbConnection* pTrans){
int retryAttempt = 0;
@ -785,19 +782,7 @@ UtilTransactions::selectCount(Ndb* pNdb,
return NDBT_FAILED;
}
NdbResultSet * rs;
switch(lock){
case SL_ReadHold:
rs = pOp->readTuples(NdbScanOperation::LM_Read);
break;
case SL_Exclusive:
rs = pOp->readTuples(NdbScanOperation::LM_Exclusive);
break;
case SL_Read:
default:
rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead);
}
NdbResultSet * rs = pOp->readTuples(lm);
if( rs == 0) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);

View file

@ -30,7 +30,7 @@ main(int argc, const char** argv){
const char* _dbname = "TEST_DB";
int _help = 0;
int _ordered, _pk;
int _ordered = 0, _pk = 1;
struct getargs args[] = {
{ "database", 'd', arg_string, &_dbname, "dbname",

View file

@ -35,13 +35,17 @@ int main(int argc, const char** argv){
int _parallelism = 1;
const char* _tabname = NULL;
int _help = 0;
int lock = NdbOperation::LM_Read;
int sorted = 0;
struct getargs args[] = {
{ "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" },
{ "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" },
{ "parallelism", 'p', arg_integer, &_parallelism, "parallelism(1-240)", "para" },
{ "records", 'r', arg_integer, &_records, "Number of records", "recs" },
{ "usage", '?', arg_flag, &_help, "Print help", "" }
{ "usage", '?', arg_flag, &_help, "Print help", "" },
{ "lock", 'm', arg_integer, &lock, "lock mode", "" },
{ "sorted", 's', arg_flag, &sorted, "sorted", "" }
};
int num_args = sizeof(args) / sizeof(args[0]);
int optind = 0;
@ -73,16 +77,48 @@ int main(int argc, const char** argv){
ndbout << " Table " << _tabname << " does not exist!" << endl;
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
const NdbDictionary::Index * pIdx = 0;
if(optind+1 < argc)
{
pIdx = MyNdb.getDictionary()->getIndex(argv[optind+1], _tabname);
if(!pIdx)
ndbout << " Index " << argv[optind+1] << " not found" << endl;
else
if(pIdx->getType() != NdbDictionary::Index::UniqueOrderedIndex &&
pIdx->getType() != NdbDictionary::Index::OrderedIndex)
{
ndbout << " Index " << argv[optind+1] << " is not scannable" << endl;
pIdx = 0;
}
}
HugoTransactions hugoTrans(*pTab);
int i = 0;
while (i<_loops || _loops==0) {
ndbout << i << ": ";
if(hugoTrans.scanReadRecords(&MyNdb,
0,
_abort,
_parallelism) != 0){
return NDBT_ProgramExit(NDBT_FAILED);
if(!pIdx)
{
if(hugoTrans.scanReadRecords(&MyNdb,
0,
_abort,
_parallelism,
(NdbOperation::LockMode)lock) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
}
}
else
{
if(hugoTrans.scanReadRecords(&MyNdb, pIdx,
0,
_abort,
_parallelism,
(NdbOperation::LockMode)lock,
sorted) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
}
}
i++;
}

View file

@ -133,13 +133,18 @@ int main(int argc, char** argv){
const NdbDictionary::Table* pTab = NDBT_Table::discoverTableFromDb(&MyNdb, _tabname);
const NdbDictionary::Index * pIdx = 0;
if(argc > 1){
pIdx = MyNdb.getDictionary()->getIndex(argv[0], _tabname);
pIdx = MyNdb.getDictionary()->getIndex(argv[1], _tabname);
}
if(pTab == NULL){
ndbout << " Table " << _tabname << " does not exist!" << endl;
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
if(argc > 1 && pIdx == 0)
{
ndbout << " Index " << argv[1] << " does not exists" << endl;
}
if(_order && pIdx == NULL){
ndbout << " Order flag given without an index" << endl;