Added 3 psuedo columns

Cleaned up code
Changed ndb_select_all
This commit is contained in:
joreland@mysql.com 2004-08-27 22:31:16 +02:00
parent ee09335752
commit 872dff5b69
18 changed files with 178 additions and 137 deletions

View file

@ -33,8 +33,10 @@ public:
/**
* Psuedo columns
*/
STATIC_CONST( FRAGMENT = 0xFFFE );
STATIC_CONST( ROW_COUNT = 0xFFFD );
STATIC_CONST( PSUEDO = 0x8000 );
STATIC_CONST( FRAGMENT = 0xFFFE );
STATIC_CONST( ROW_COUNT = 0xFFFD );
STATIC_CONST( COMMIT_COUNT = 0xFFFC );
/** Initialize AttributeHeader at location aHeaderPtr */
static AttributeHeader& init(void* aHeaderPtr, Uint32 anAttributeId,

View file

@ -944,6 +944,6 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
#define GSN_TUX_BOUND_INFO 710
#define GSN_ACC_LOCKREQ 711
#define GSN_READ_ROWCOUNT_REQ 712
#define GSN_READ_PSUEDO_REQ 712
#endif

View file

@ -379,6 +379,10 @@ public:
void setDefaultValue(const char*);
const char* getDefaultValue() const;
/** @} *******************************************************************/
static const Column * FRAGMENT;
static const Column * ROW_COUNT;
static const Column * COMMIT_COUNT;
#endif
private:

View file

@ -21,6 +21,7 @@
#include "ndbapi_limits.h"
#include "NdbError.hpp"
#include "NdbReceiver.hpp"
#include "NdbDictionary.hpp"
class Ndb;
class NdbApiSignal;
@ -289,8 +290,9 @@ public:
* the attribute, or a NULL pointer
* (indicating error).
*/
NdbRecAttr* getValue(const char* anAttrName, char* aValue = 0);
NdbRecAttr* getValue(Uint32 anAttrId, char* aValue = 0);
NdbRecAttr* getValue(const char* anAttrName, char* aValue = 0);
NdbRecAttr* getValue(Uint32 anAttrId, char* aValue = 0);
NdbRecAttr* getValue(const NdbDictionary::Column*, char* val = 0);
/**
* Define an attribute to set or update in query.

View file

@ -829,6 +829,7 @@ struct Rootfragmentrec {
Uint32 nextroot;
Uint32 roothashcheck;
Uint32 noOfElements;
Uint32 m_commit_count;
State rootState;
}; /* p2c: size = 72 bytes */
@ -925,7 +926,7 @@ private:
void execACC_OVER_REC(Signal* signal);
void execACC_SAVE_PAGES(Signal* signal);
void execNEXTOPERATION(Signal* signal);
void execREAD_ROWCOUNTREQ(Signal* signal);
void execREAD_PSUEDO_REQ(Signal* signal);
// Received signals
void execSTTOR(Signal* signal);

View file

@ -164,7 +164,7 @@ Dbacc::Dbacc(const class Configuration & conf):
addRecSignal(GSN_ACC_OVER_REC, &Dbacc::execACC_OVER_REC);
addRecSignal(GSN_ACC_SAVE_PAGES, &Dbacc::execACC_SAVE_PAGES);
addRecSignal(GSN_NEXTOPERATION, &Dbacc::execNEXTOPERATION);
addRecSignal(GSN_READ_ROWCOUNT_REQ, &Dbacc::execREAD_ROWCOUNTREQ);
addRecSignal(GSN_READ_PSUEDO_REQ, &Dbacc::execREAD_PSUEDO_REQ);
// Received signals
addRecSignal(GSN_STTOR, &Dbacc::execSTTOR);

View file

@ -17,6 +17,7 @@
#define DBACC_C
#include "Dbacc.hpp"
#include <AttributeHeader.hpp>
#include <signaldata/AccFrag.hpp>
#include <signaldata/AccScan.hpp>
#include <signaldata/AccLock.hpp>
@ -1051,6 +1052,7 @@ void Dbacc::initRootfragrec(Signal* signal)
rootfragrecptr.p->mytabptr = req->tableId;
rootfragrecptr.p->roothashcheck = req->kValue + req->lhFragBits;
rootfragrecptr.p->noOfElements = 0;
rootfragrecptr.p->m_commit_count = 0;
for (Uint32 i = 0; i < MAX_PARALLEL_SCANS_PER_FRAG; i++) {
rootfragrecptr.p->scan[i] = RNIL;
}//for
@ -2335,46 +2337,51 @@ void Dbacc::execACC_COMMITREQ(Signal* signal)
Toperation = operationRecPtr.p->operation;
operationRecPtr.p->transactionstate = IDLE;
operationRecPtr.p->operation = ZUNDEFINED_OP;
if (Toperation != ZINSERT) {
if (Toperation != ZDELETE) {
return;
if(Toperation != ZREAD){
rootfragrecptr.p->m_commit_count++;
if (Toperation != ZINSERT) {
if (Toperation != ZDELETE) {
return;
} else {
jam();
rootfragrecptr.i = fragrecptr.p->myroot;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
rootfragrecptr.p->noOfElements--;
fragrecptr.p->slack += operationRecPtr.p->insertDeleteLen;
if (fragrecptr.p->slack > fragrecptr.p->slackCheck) {
/* TIME FOR JOIN BUCKETS PROCESS */
if (fragrecptr.p->expandCounter > 0) {
if (fragrecptr.p->expandFlag < 2) {
jam();
signal->theData[0] = fragrecptr.i;
signal->theData[1] = fragrecptr.p->p;
signal->theData[2] = fragrecptr.p->maxp;
signal->theData[3] = fragrecptr.p->expandFlag;
fragrecptr.p->expandFlag = 2;
sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 4, JBB);
}//if
}//if
}//if
}//if
} else {
jam();
jam(); /* EXPAND PROCESS HANDLING */
rootfragrecptr.i = fragrecptr.p->myroot;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
rootfragrecptr.p->noOfElements--;
fragrecptr.p->slack += operationRecPtr.p->insertDeleteLen;
if (fragrecptr.p->slack > fragrecptr.p->slackCheck) { /* TIME FOR JOIN BUCKETS PROCESS */
if (fragrecptr.p->expandCounter > 0) {
if (fragrecptr.p->expandFlag < 2) {
jam();
signal->theData[0] = fragrecptr.i;
signal->theData[1] = fragrecptr.p->p;
signal->theData[2] = fragrecptr.p->maxp;
signal->theData[3] = fragrecptr.p->expandFlag;
fragrecptr.p->expandFlag = 2;
sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 4, JBB);
}//if
}//if
rootfragrecptr.p->noOfElements++;
fragrecptr.p->slack -= operationRecPtr.p->insertDeleteLen;
if (fragrecptr.p->slack >= (Uint32)(1 << 31)) {
/* IT MEANS THAT IF SLACK < ZERO */
if (fragrecptr.p->expandFlag == 0) {
jam();
fragrecptr.p->expandFlag = 2;
signal->theData[0] = fragrecptr.i;
signal->theData[1] = fragrecptr.p->p;
signal->theData[2] = fragrecptr.p->maxp;
sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
}//if
}//if
}//if
} else {
jam(); /* EXPAND PROCESS HANDLING */
rootfragrecptr.i = fragrecptr.p->myroot;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
rootfragrecptr.p->noOfElements++;
fragrecptr.p->slack -= operationRecPtr.p->insertDeleteLen;
if (fragrecptr.p->slack >= (Uint32)(1 << 31)) { /* IT MEANS THAT IF SLACK < ZERO */
if (fragrecptr.p->expandFlag == 0) {
jam();
fragrecptr.p->expandFlag = 2;
signal->theData[0] = fragrecptr.i;
signal->theData[1] = fragrecptr.p->p;
signal->theData[2] = fragrecptr.p->maxp;
sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
}//if
}//if
}//if
}
return;
}//Dbacc::execACC_COMMITREQ()
@ -13386,13 +13393,24 @@ void Dbacc::execSET_VAR_REQ(Signal* signal)
}//execSET_VAR_REQ()
void
Dbacc::execREAD_ROWCOUNTREQ(Signal* signal){
Dbacc::execREAD_PSUEDO_REQ(Signal* signal){
jamEntry();
fragrecptr.i = signal->theData[0];
Uint32 attrId = signal->theData[1];
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
rootfragrecptr.i = fragrecptr.p->myroot;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
Uint64 tmp = rootfragrecptr.p->noOfElements;
Uint64 tmp;
switch(attrId){
case AttributeHeader::ROW_COUNT:
tmp = rootfragrecptr.p->noOfElements;
break;
case AttributeHeader::COMMIT_COUNT:
tmp = rootfragrecptr.p->m_commit_count;
break;
default:
tmp = 0;
}
Uint32 * src = (Uint32*)&tmp;
signal->theData[0] = src[0];
signal->theData[1] = src[1];

View file

@ -2091,7 +2091,7 @@ private:
void execSTART_EXEC_SR(Signal* signal);
void execEXEC_SRREQ(Signal* signal);
void execEXEC_SRCONF(Signal* signal);
void execREAD_ROWCOUNTREQ(Signal* signal);
void execREAD_PSUEDO_REQ(Signal* signal);
void execDUMP_STATE_ORD(Signal* signal);
void execACC_COM_BLOCK(Signal* signal);

View file

@ -340,7 +340,7 @@ Dblqh::Dblqh(const class Configuration & conf):
addRecSignal(GSN_TUX_ADD_ATTRCONF, &Dblqh::execTUX_ADD_ATTRCONF);
addRecSignal(GSN_TUX_ADD_ATTRREF, &Dblqh::execTUX_ADD_ATTRREF);
addRecSignal(GSN_READ_ROWCOUNT_REQ, &Dblqh::execREAD_ROWCOUNTREQ);
addRecSignal(GSN_READ_PSUEDO_REQ, &Dblqh::execREAD_PSUEDO_REQ);
initData();

View file

@ -2509,7 +2509,7 @@ Dblqh::updatePackedList(Signal* signal, HostRecord * ahostptr, Uint16 hostId)
}//Dblqh::updatePackedList()
void
Dblqh::execREAD_ROWCOUNTREQ(Signal* signal){
Dblqh::execREAD_PSUEDO_REQ(Signal* signal){
jamEntry();
TcConnectionrecPtr regTcPtr;
regTcPtr.i = signal->theData[0];
@ -2520,7 +2520,7 @@ Dblqh::execREAD_ROWCOUNTREQ(Signal* signal){
ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
signal->theData[0] = regFragptr.p->accFragptr[regTcPtr.p->localFragptr];
EXECUTE_DIRECT(DBACC, GSN_READ_ROWCOUNT_REQ, signal, 1);
EXECUTE_DIRECT(DBACC, GSN_READ_PSUEDO_REQ, signal, 2);
}
/* ************>> */

View file

@ -1617,7 +1617,7 @@ private:
//------------------------------------------------------------------
//------------------------------------------------------------------
bool nullFlagCheck(Uint32 attrDes2);
bool readRowcount(Uint32 userPtr, Uint32* outBuffer);
Uint32 read_psuedo(Uint32 attrId, Uint32* outBuffer);
//------------------------------------------------------------------
//------------------------------------------------------------------

View file

@ -1985,7 +1985,7 @@ int Dbtup::interpreterNextLab(Signal* signal,
case Interpreter::EXIT_OK_LAST:
jam();
#if 1
#ifdef TRACE_INTERPRETER
ndbout_c(" - exit_ok_last");
#endif
operPtr.p->lastRow = 1;

View file

@ -187,14 +187,11 @@ int Dbtup::readAttributes(Page* const pagePtr,
} else {
return (Uint32)-1;
}//if
} else if(attributeId == AttributeHeader::FRAGMENT){
AttributeHeader::init(&outBuffer[tmpAttrBufIndex], attributeId, 1);
outBuffer[tmpAttrBufIndex+1] = fragptr.p->fragmentId;
tOutBufIndex = tmpAttrBufIndex + 2;
} else if(attributeId == AttributeHeader::ROW_COUNT){
AttributeHeader::init(&outBuffer[tmpAttrBufIndex], attributeId, 2);
readRowcount(operPtr.p->userpointer, outBuffer+tmpAttrBufIndex+1);
tOutBufIndex = tmpAttrBufIndex + 3;
} else if(attributeId & AttributeHeader::PSUEDO){
Uint32 sz = read_psuedo(attributeId,
outBuffer+tmpAttrBufIndex+1);
AttributeHeader::init(&outBuffer[tmpAttrBufIndex], attributeId, sz);
tOutBufIndex = tmpAttrBufIndex + 1 + sz;
} else {
terrorCode = ZATTRIBUTE_ID_ERROR;
return (Uint32)-1;
@ -903,13 +900,24 @@ Dbtup::updateDynSmallVarSize(Uint32* inBuffer,
return false;
}//Dbtup::updateDynSmallVarSize()
bool
Dbtup::readRowcount(Uint32 userPtr, Uint32* outBuffer){
Uint32
Dbtup::read_psuedo(Uint32 attrId, Uint32* outBuffer){
Uint32 tmp[sizeof(SignalHeader)+25];
Signal * signal = (Signal*)&tmp;
signal->theData[0] = userPtr;
EXECUTE_DIRECT(DBLQH, GSN_READ_ROWCOUNT_REQ, signal, 1);
outBuffer[0] = signal->theData[0];
outBuffer[1] = signal->theData[1];
switch(attrId){
case AttributeHeader::FRAGMENT:
* outBuffer = operPtr.p->fragId;
return 1;
case AttributeHeader::ROW_COUNT:
case AttributeHeader::COMMIT_COUNT:
signal->theData[0] = operPtr.p->userpointer;
signal->theData[1] = attrId;
EXECUTE_DIRECT(DBLQH, GSN_READ_PSUEDO_REQ, signal, 2);
outBuffer[0] = signal->theData[0];
outBuffer[1] = signal->theData[1];
return 2;
default:
return 0;
}
}

View file

@ -30,7 +30,7 @@ NdbDictionary::Column::Column(const char * name)
NdbDictionary::Column::Column(const NdbDictionary::Column & org)
: m_impl(* new NdbColumnImpl(* this))
{
m_impl.assign(org.m_impl);
m_impl = org.m_impl;
}
NdbDictionary::Column::Column(NdbColumnImpl& impl)
@ -272,11 +272,8 @@ NdbDictionary::Table::getTableId() const {
void
NdbDictionary::Table::addColumn(const Column & c){
// JONAS check this out!!
// Memory leak, the new Column will not be freed
//NdbDictionary::Column * col = new Column(c);
NdbColumnImpl* col = new NdbColumnImpl;
col->assign(NdbColumnImpl::getImpl(c));
(* col) = NdbColumnImpl::getImpl(c);
m_impl.m_columns.push_back(col);
if(c.getPrimaryKey()){
m_impl.m_noOfKeys++;
@ -491,11 +488,8 @@ NdbDictionary::Index::getIndexColumn(int no) const {
void
NdbDictionary::Index::addColumn(const Column & c){
// JONAS check this out!!
// Memory leak, the new Column will not be freed
//NdbDictionary::Column * col = new Column(c);
NdbColumnImpl* col = new NdbColumnImpl;
col->assign(NdbColumnImpl::getImpl(c));
(* col) = NdbColumnImpl::getImpl(c);
m_impl.m_columns.push_back(col);
}
@ -605,11 +599,8 @@ NdbDictionary::Event::setDurability(const EventDurability d)
void
NdbDictionary::Event::addColumn(const Column & c){
// JONAS check this out!!
// Memory leak, the new Column will not be freed
//NdbDictionary::Column * col = new Column(c);
NdbColumnImpl* col = new NdbColumnImpl;
col->assign(NdbColumnImpl::getImpl(c));
(* col) = NdbColumnImpl::getImpl(c);
m_impl.m_columns.push_back(col);
}
@ -901,3 +892,8 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col)
out << " NULL";
return out;
}
const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT = 0;
const NdbDictionary::Column * NdbDictionary::Column::ROW_COUNT = 0;
const NdbDictionary::Column * NdbDictionary::Column::COMMIT_COUNT = 0;

View file

@ -35,6 +35,7 @@
#include <NdbEventOperation.hpp>
#include "NdbEventOperationImpl.hpp"
#include "NdbBlob.hpp"
#include <AttributeHeader.hpp>
#define DEBUG_PRINT 0
#define INCOMPATIBLE_VERSION -2
@ -197,32 +198,26 @@ NdbColumnImpl::equal(const NdbColumnImpl& col) const
if(strcmp(m_defaultValue.c_str(), col.m_defaultValue.c_str()) != 0){
return false;
}
return true;
}
void
NdbColumnImpl::assign(const NdbColumnImpl& org)
{
m_attrId = org.m_attrId;
m_name.assign(org.m_name);
m_type = org.m_type;
m_precision = org.m_precision;
m_scale = org.m_scale;
m_length = org.m_length;
m_pk = org.m_pk;
m_tupleKey = org.m_tupleKey;
m_distributionKey = org.m_distributionKey;
m_distributionGroup = org.m_distributionGroup;
m_distributionGroupBits = org.m_distributionGroupBits;
m_nullable = org.m_nullable;
m_indexOnly = org.m_indexOnly;
m_autoIncrement = org.m_autoIncrement;
m_autoIncrementInitialValue = org.m_autoIncrementInitialValue;
m_defaultValue.assign(org.m_defaultValue);
m_keyInfoPos = org.m_keyInfoPos;
m_attrSize = org.m_attrSize;
m_arraySize = org.m_arraySize;
NdbDictionary::Column *
NdbColumnImpl::create_psuedo(const char * name){
NdbDictionary::Column * col = new NdbDictionary::Column();
col->setName(name);
if(!strcmp(name, "NDB$FRAGMENT")){
col->setType(NdbDictionary::Column::Unsigned);
col->m_impl.m_attrId = AttributeHeader::FRAGMENT;
} else if(!strcmp(name, "NDB$ROW_COUNT")){
col->setType(NdbDictionary::Column::Bigunsigned);
col->m_impl.m_attrId = AttributeHeader::ROW_COUNT;
} else if(!strcmp(name, "NDB$COMMIT_COUNT")){
col->setType(NdbDictionary::Column::Bigunsigned);
col->m_impl.m_attrId = AttributeHeader::COMMIT_COUNT;
} else {
abort();
}
}
/**
@ -332,7 +327,7 @@ NdbTableImpl::assign(const NdbTableImpl& org)
for(unsigned i = 0; i<org.m_columns.size(); i++){
NdbColumnImpl * col = new NdbColumnImpl();
const NdbColumnImpl * iorg = org.m_columns[i];
col->assign(* iorg);
(* col) = (* iorg);
m_columns.push_back(col);
}
@ -593,6 +588,8 @@ NdbDictionaryImpl::NdbDictionaryImpl(Ndb &ndb,
m_globalHash = 0;
}
static int f_dictionary_count = 0;
NdbDictionaryImpl::~NdbDictionaryImpl()
{
NdbElement_t<NdbTableImpl> * curr = m_localHash.m_tableHash.getNext(0);
@ -603,17 +600,20 @@ NdbDictionaryImpl::~NdbDictionaryImpl()
curr = m_localHash.m_tableHash.getNext(curr);
}
m_globalHash->lock();
if(--f_dictionary_count == 0){
delete NdbDictionary::Column::FRAGMENT;
delete NdbDictionary::Column::ROW_COUNT;
delete NdbDictionary::Column::COMMIT_COUNT;
NdbDictionary::Column::FRAGMENT= 0;
NdbDictionary::Column::ROW_COUNT= 0;
NdbDictionary::Column::COMMIT_COUNT= 0;
}
m_globalHash->unlock();
}
void
initDict(NdbDictionary::Dictionary & d)
{
TransporterFacade * tf = TransporterFacade::instance();
NdbDictionaryImpl & impl = NdbDictionaryImpl::getImpl(d);
impl.m_receiver.setTransporter(tf);
}
#if 0
bool
NdbDictionaryImpl::setTransporter(class TransporterFacade * tf)
{
@ -624,13 +624,25 @@ NdbDictionaryImpl::setTransporter(class TransporterFacade * tf)
return false;
}
#endif
bool
NdbDictionaryImpl::setTransporter(class Ndb* ndb,
class TransporterFacade * tf)
{
m_globalHash = &tf->m_globalDictCache;
return m_receiver.setTransporter(ndb, tf);
if(m_receiver.setTransporter(ndb, tf)){
m_globalHash->lock();
if(f_dictionary_count++ == 0){
NdbDictionary::Column::FRAGMENT=
NdbColumnImpl::create_psuedo("NDB$FRAGMENT");
NdbDictionary::Column::ROW_COUNT=
NdbColumnImpl::create_psuedo("NDB$ROW_COUNT");
NdbDictionary::Column::COMMIT_COUNT=
NdbColumnImpl::create_psuedo("NDB$COMMIT_COUNT");
}
m_globalHash->unlock();
}
}
NdbTableImpl *
@ -1223,6 +1235,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
impl->m_columns[attrDesc.AttributeId] = col;
it.next();
}
impl->m_noOfKeys = keyCount;
impl->m_keyLenInWords = keyInfoPos;
impl->m_sizeOfKeysInWords = keyInfoPos;

View file

@ -88,11 +88,12 @@ public:
* Equality/assign
*/
bool equal(const NdbColumnImpl&) const;
void assign(const NdbColumnImpl&);
static NdbColumnImpl & getImpl(NdbDictionary::Column & t);
static const NdbColumnImpl & getImpl(const NdbDictionary::Column & t);
NdbDictionary::Column * m_facade;
static NdbDictionary::Column * create_psuedo(const char *);
};
class NdbTableImpl : public NdbDictionary::Table, public NdbDictObjectImpl {

View file

@ -295,6 +295,12 @@ NdbOperation::getValue(Uint32 anAttrId, char* aValue)
return getValue_impl(m_currentTable->getColumn(anAttrId), aValue);
}
NdbRecAttr*
NdbOperation::getValue(const NdbDictionary::Column* col, char* aValue)
{
return getValue_impl(&NdbColumnImpl::getImpl(*col), aValue);
}
int
NdbOperation::equal(const char* anAttrName,
const char* aValuePassed,

View file

@ -129,50 +129,40 @@ select_count(Ndb* pNdb, const NdbDictionary::Table* pTab,
return NDBT_FAILED;
}
NdbResultSet * rs;
switch(lock){
case UtilTransactions::SL_ReadHold:
rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism);
break;
case UtilTransactions::SL_Exclusive:
rs = pOp->readTuples(NdbScanOperation::LM_Exclusive, 0, parallelism);
break;
case UtilTransactions::SL_Read:
default:
rs = pOp->readTuples(NdbScanOperation::LM_Dirty, 0, parallelism);
}
NdbResultSet * rs = pOp->readTuples(NdbScanOperation::LM_Dirty);
if( rs == 0 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->interpret_exit_ok();
check = pOp->interpret_exit_last_row();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
Uint32 tmp;
pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&tmp);
check = pTrans->execute(NoCommit);
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
Uint64 row_count = 0;
int eof;
int rows = 0;
eof = rs->nextResult();
while(eof == 0){
rows++;
eof = rs->nextResult();
while((eof = rs->nextResult(true)) == 0){
row_count += tmp;
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
@ -183,11 +173,11 @@ select_count(Ndb* pNdb, const NdbDictionary::Table* pTab,
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
if (count_rows != NULL){
*count_rows = rows;
*count_rows = row_count;
}
return NDBT_OK;