Merge jonas@perch:src/51-jonas

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb


storage/ndb/src/kernel/blocks/ERROR_codes.txt:
  Auto merged
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp:
  Auto merged
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  Auto merged
storage/ndb/src/mgmsrv/Services.cpp:
  Auto merged
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  Auto merged
storage/ndb/test/ndbapi/testNodeRestart.cpp:
  Auto merged
storage/ndb/test/run-test/daily-basic-tests.txt:
  Auto merged
This commit is contained in:
unknown 2006-06-12 13:46:10 +02:00
commit 1265506376
26 changed files with 2796 additions and 1577 deletions

View file

@ -197,7 +197,7 @@ AC_DEFUN([MYSQL_SETUP_NDBCLUSTER], [
MAKE_BINARY_DISTRIBUTION_OPTIONS="$MAKE_BINARY_DISTRIBUTION_OPTIONS --with-ndbcluster"
# CXXFLAGS="$CXXFLAGS \$(NDB_CXXFLAGS)"
CXXFLAGS="$CXXFLAGS \$(NDB_CXXFLAGS)"
if test "$have_ndb_debug" = "default"
then
have_ndb_debug=$with_debug

View file

@ -67,6 +67,9 @@ private:
static Uint32 getNRScanFlag(const Uint32 & requestInfo);
static void setNRScanFlag(Uint32 & requestInfo, Uint32 nr);
static Uint32 getLcpScanFlag(const Uint32 & requestInfo);
static void setLcpScanFlag(Uint32 & requestInfo, Uint32 nr);
};
/**
@ -77,6 +80,7 @@ private:
* z = Descending (TUX) - 1 Bit 6
* d = No disk scan - 1 Bit 7
* n = Node recovery scan - 1 Bit 8
* c = LCP scan - 1 Bit 9
*
* 1111111111222222222233
* 01234567890123456789012345678901
@ -88,6 +92,7 @@ private:
#define AS_DESCENDING_SHIFT (6)
#define AS_NO_DISK_SCAN (7)
#define AS_NR_SCAN (8)
#define AS_LCP_SCAN (9)
inline
Uint32
@ -154,6 +159,19 @@ AccScanReq::setNRScanFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << AS_NR_SCAN);
}
inline
Uint32
AccScanReq::getLcpScanFlag(const Uint32 & requestInfo){
return (requestInfo >> AS_LCP_SCAN) & 1;
}
inline
void
AccScanReq::setLcpScanFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "AccScanReq::setNoDiskScanFlag");
requestInfo |= (val << AS_LCP_SCAN);
}
class AccScanConf {
/**
* Sender(s)

View file

@ -141,7 +141,7 @@ public:
TuxSetLogFlags = 12002,
TuxMetaDataJunk = 12009,
DumpTsman = 9000,
DumpTsman = 9002,
DumpLgman = 10000,
DumpPgman = 11000
};

View file

@ -61,7 +61,8 @@ public:
static Uint32 getAttrLen(const Uint32 & requestInfo);
static Uint32 getScanPrio(const Uint32 & requestInfo);
static Uint32 getNoDiskFlag(const Uint32 & requestInfo);
static Uint32 getLcpScanFlag(const Uint32 & requestInfo);
static void setLockMode(Uint32 & requestInfo, Uint32 lockMode);
static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock);
static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo);
@ -72,6 +73,7 @@ public:
static void setAttrLen(Uint32 & requestInfo, Uint32 attrLen);
static void setScanPrio(Uint32& requestInfo, Uint32 prio);
static void setNoDiskFlag(Uint32& requestInfo, Uint32 val);
static void setLcpScanFlag(Uint32 & requestInfo, Uint32 val);
};
class KeyInfo20 {
@ -198,6 +200,7 @@ public:
* Request Info
*
* a = Length of attrinfo - 16 Bits (16-31)
* c = LCP scan - 1 Bit 3
* d = No disk - 1 Bit 4
* l = Lock Mode - 1 Bit 5
* h = Hold lock - 1 Bit 7
@ -205,7 +208,7 @@ public:
* r = read committed - 1 Bit 9
* x = range scan - 1 Bit 6
* z = descending - 1 Bit 10
* t = tup scan -1 Bit 11 (implies x=z=0)
* t = tup scan - 1 Bit 11 (implies x=z=0)
* p = Scan prio - 4 Bits (12-15) -> max 15
*
* 1111111111222222222233
@ -222,6 +225,7 @@ public:
#define SF_RANGE_SCAN_SHIFT (6)
#define SF_DESCENDING_SHIFT (10)
#define SF_TUP_SCAN_SHIFT (11)
#define SF_LCP_SCAN_SHIFT (3)
#define SF_ATTR_LEN_SHIFT (16)
#define SF_ATTR_LEN_MASK (65535)
@ -359,6 +363,19 @@ ScanFragReq::setNoDiskFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << SF_NO_DISK_SHIFT);
}
inline
Uint32
ScanFragReq::getLcpScanFlag(const Uint32 & requestInfo){
return (requestInfo >> SF_LCP_SCAN_SHIFT) & 1;
}
inline
void
ScanFragReq::setLcpScanFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "ScanFragReq::setLcpScanFlag");
requestInfo |= (val << SF_LCP_SCAN_SHIFT);
}
inline
Uint32
KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){

View file

@ -2985,22 +2985,25 @@ Backup::parseTableDescription(Signal* signal,
if (disk)
{
/**
* Remove all disk attributes, but add DISK_REF (8 bytes)
* Remove all disk attributes
*/
tabPtr.p->noOfAttributes -= (disk - 1);
tabPtr.p->noOfAttributes -= disk;
AttributePtr attrPtr;
ndbrequire(tabPtr.p->attributes.seize(attrPtr));
Uint32 sz32 = 2;
attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
attrPtr.p->data.m_flags = Attribute::COL_FIXED;
attrPtr.p->data.sz32 = 2;
attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
tabPtr.p->sz_FixedAttributes += sz32;
{
AttributePtr attrPtr;
ndbrequire(tabPtr.p->attributes.seize(attrPtr));
Uint32 sz32 = 2;
attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
attrPtr.p->data.m_flags = Attribute::COL_FIXED;
attrPtr.p->data.sz32 = 2;
attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
tabPtr.p->sz_FixedAttributes += sz32;
tabPtr.p->noOfAttributes ++;
}
}
{
AttributePtr attrPtr;
ndbrequire(tabPtr.p->attributes.seize(attrPtr));
@ -3309,6 +3312,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
ScanFragReq::setScanPrio(req->requestInfo, 1);
ScanFragReq::setTupScanFlag(req->requestInfo, 1);
ScanFragReq::setNoDiskFlag(req->requestInfo, 1);
ScanFragReq::setLcpScanFlag(req->requestInfo, 1);
}
req->transId1 = 0;
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);

View file

@ -17,14 +17,13 @@
#ifndef DBACC_H
#define DBACC_H
#ifdef VM_TRACE
#define ACC_SAFE_QUEUE
#endif
#include <pc.hpp>
#include <SimulatedBlock.hpp>
// primary key is stored in TUP
#include "../dbtup/Dbtup.hpp"
#ifdef DBACC_C
// Debug Macros
#define dbgWord32(ptr, ind, val)
@ -135,7 +134,10 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: "
#define ZRIGHT 2
#define ZROOTFRAGMENTSIZE 32
#define ZSCAN_LOCK_ALL 3
#define ZSCAN_OP 5
/**
* Check kernel_types for other operation types
*/
#define ZSCAN_OP 6
#define ZSCAN_REC_SIZE 256
#define ZSTAND_BY 2
#define ZTABLESIZE 16
@ -358,7 +360,6 @@ struct Fragmentrec {
// List of lock owners and list of lock waiters to support LCP handling
//-----------------------------------------------------------------------------
Uint32 lockOwnersList;
Uint32 m_current_sequence_no;
//-----------------------------------------------------------------------------
// References to Directory Ranges (which in turn references directories, which
@ -478,7 +479,7 @@ struct Fragmentrec {
/* OPERATIONREC */
/* --------------------------------------------------------------------------------- */
struct Operationrec {
Uint32 keydata[8];
Uint32 m_op_bits;
Uint32 localdata[2];
Uint32 elementIsforward;
Uint32 elementPage;
@ -487,42 +488,61 @@ struct Operationrec {
Uint32 fragptr;
Uint32 hashvaluePart;
Uint32 hashValue;
Uint32 insertDeleteLen;
Uint32 keyinfoPage;
Uint32 nextLockOwnerOp;
Uint32 nextOp;
Uint32 nextParallelQue;
Uint32 nextQueOp;
Uint32 nextSerialQue;
union {
Uint32 nextSerialQue;
Uint32 m_lock_owner_ptr_i; // if nextParallelQue = RNIL, else undefined
};
Uint32 prevOp;
Uint32 prevLockOwnerOp;
Uint32 prevParallelQue;
Uint32 prevQueOp;
Uint32 prevSerialQue;
union {
Uint32 prevParallelQue;
Uint32 m_lo_last_parallel_op_ptr_i;
};
union {
Uint32 prevSerialQue;
Uint32 m_lo_last_serial_op_ptr_i;
};
Uint32 scanRecPtr;
Uint32 transId1;
Uint32 transId2;
Uint32 longPagePtr;
Uint32 longKeyPageIndex;
Uint32 m_sequence_no;
State opState;
Uint32 userptr;
State transactionstate;
Uint16 elementContainer;
Uint16 tupkeylen;
Uint32 xfrmtupkeylen;
Uint32 userblockref;
Uint32 scanBits;
Uint8 elementIsDisappeared;
Uint8 insertIsDone;
Uint8 lockMode;
Uint8 lockOwner;
Uint8 nodeType;
Uint8 operation;
Uint8 opSimple;
Uint8 dirtyRead;
Uint8 commitDeleteCheckFlag;
Uint8 isAccLockReq;
enum OpBits {
OP_MASK = 0x0000F // 4 bits for operation type
,OP_LOCK_MODE = 0x00010 // 0 - shared lock, 1 = exclusive lock
,OP_ACC_LOCK_MODE = 0x00020 // Or:de lock mode of all operation
// before me
,OP_LOCK_OWNER = 0x00040
,OP_RUN_QUEUE = 0x00080 // In parallell queue of lock owner
,OP_DIRTY_READ = 0x00100
,OP_LOCK_REQ = 0x00200 // isAccLockReq
,OP_COMMIT_DELETE_CHECK = 0x00400
,OP_INSERT_IS_DONE = 0x00800
,OP_ELEMENT_DISAPPEARED = 0x01000
,OP_STATE_MASK = 0xF0000
,OP_STATE_IDLE = 0xF0000
,OP_STATE_WAITING = 0x00000
,OP_STATE_RUNNING = 0x10000
,OP_STATE_EXECUTED = 0x30000
,OP_EXECUTED_DIRTY_READ = 0x3050F
,OP_INITIAL = ~(Uint32)0
};
bool is_same_trans(const Operationrec* op) const {
return
transId1 == op->transId1 && transId2 == op->transId2;
}
}; /* p2c: size = 168 bytes */
typedef Ptr<Operationrec> OperationrecPtr;
@ -585,7 +605,6 @@ struct ScanRec {
Uint32 scanUserblockref;
Uint32 scanMask;
Uint8 scanLockMode;
Uint8 scanKeyinfoFlag;
Uint8 scanTimer;
Uint8 scanContinuebCounter;
Uint8 scanReadCommittedFlag;
@ -610,7 +629,8 @@ public:
virtual ~Dbacc();
// pointer to TUP instance in this thread
Dbtup* c_tup;
class Dbtup* c_tup;
class Dblqh* c_lqh;
void execACCMINUPDATE(Signal* signal);
@ -648,7 +668,8 @@ private:
void ACCKEY_error(Uint32 fromWhere);
void commitDeleteCheck();
void report_dealloc(Signal* signal, const Operationrec* opPtrP);
typedef void * RootfragmentrecPtr;
void initRootFragPageZero(FragmentrecPtr, Page8Ptr);
void initFragAdd(Signal*, FragmentrecPtr);
@ -687,14 +708,30 @@ private:
bool addfragtotab(Signal* signal, Uint32 rootIndex, Uint32 fragId);
void initOpRec(Signal* signal);
void sendAcckeyconf(Signal* signal);
Uint32 placeReadInLockQueue(Signal* signal);
void placeSerialQueueRead(Signal* signal);
void checkOnlyReadEntry(Signal* signal);
Uint32 getNoParallelTransaction(const Operationrec*);
void moveLastParallelQueue(Signal* signal);
void moveLastParallelQueueWrite(Signal* signal);
Uint32 placeWriteInLockQueue(Signal* signal);
void placeSerialQueueWrite(Signal* signal);
#ifdef VM_TRACE
Uint32 getNoParallelTransactionFull(const Operationrec*);
#endif
#ifdef ACC_SAFE_QUEUE
bool validate_lock_queue(OperationrecPtr opPtr);
Uint32 get_parallel_head(OperationrecPtr opPtr);
void dump_lock_queue(OperationrecPtr loPtr);
#else
bool validate_lock_queue(OperationrecPtr) { return true;}
#endif
public:
void execACCKEY_ORD(Signal* signal, Uint32 opPtrI);
void startNext(Signal* signal, OperationrecPtr lastOp);
private:
Uint32 placeReadInLockQueue(OperationrecPtr lockOwnerPtr);
Uint32 placeWriteInLockQueue(OperationrecPtr lockOwnerPtr);
void placeSerialQueue(OperationrecPtr lockOwner, OperationrecPtr op);
void abortSerieQueueOperation(Signal* signal, OperationrecPtr op);
void abortParallelQueueOperation(Signal* signal, OperationrecPtr op);
void expandcontainer(Signal* signal);
void shrinkcontainer(Signal* signal);
void nextcontainerinfoExp(Signal* signal);
@ -724,8 +761,8 @@ private:
void increaselistcont(Signal* signal);
void seizeLeftlist(Signal* signal);
void seizeRightlist(Signal* signal);
Uint32 readTablePk(Uint32 localkey1);
void getElement(Signal* signal);
Uint32 readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec*);
Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner);
void getdirindex(Signal* signal);
void commitdelete(Signal* signal);
void deleteElement(Signal* signal);
@ -734,12 +771,17 @@ private:
void releaseRightlist(Signal* signal);
void checkoverfreelist(Signal* signal);
void abortOperation(Signal* signal);
void accAbortReqLab(Signal* signal);
void commitOperation(Signal* signal);
void copyOpInfo(Signal* signal);
void copyOpInfo(OperationrecPtr dst, OperationrecPtr src);
Uint32 executeNextOperation(Signal* signal);
void releaselock(Signal* signal);
void release_lockowner(Signal* signal, OperationrecPtr, bool commit);
void startNew(Signal* signal, OperationrecPtr newOwner);
void abortWaitingOperation(Signal*, OperationrecPtr);
void abortExecutedOperation(Signal*, OperationrecPtr);
void takeOutFragWaitQue(Signal* signal);
void check_lock_upgrade(Signal* signal, OperationrecPtr release_op, bool lo);
void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner,
OperationrecPtr release_op);
void allocOverflowPage(Signal* signal);
@ -788,8 +830,8 @@ private:
void senddatapagesLab(Signal* signal);
void sttorrysignalLab(Signal* signal);
void sendholdconfsignalLab(Signal* signal);
void accIsLockedLab(Signal* signal);
void insertExistElemLab(Signal* signal);
void accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr);
void insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr);
void refaccConnectLab(Signal* signal);
void releaseScanLab(Signal* signal);
void ndbrestart1Lab(Signal* signal);
@ -848,8 +890,6 @@ private:
Operationrec *operationrec;
OperationrecPtr operationRecPtr;
OperationrecPtr idrOperationRecPtr;
OperationrecPtr copyInOperPtr;
OperationrecPtr copyOperPtr;
OperationrecPtr mlpqOperPtr;
OperationrecPtr queOperPtr;
OperationrecPtr readWriteOpPtr;
@ -893,8 +933,6 @@ private:
Page8Ptr lcnPageptr;
Page8Ptr lcnCopyPageptr;
Page8Ptr lupPageptr;
Page8Ptr priPageptr;
Page8Ptr pwiPageptr;
Page8Ptr ciPageidptr;
Page8Ptr gsePageidptr;
Page8Ptr isoPageptr;
@ -934,8 +972,6 @@ private:
Tabrec *tabrec;
TabrecPtr tabptr;
Uint32 ctablesize;
Uint32 tpwiElementptr;
Uint32 tpriElementptr;
Uint32 tgseElementptr;
Uint32 tgseContainerptr;
Uint32 trlHead;
@ -969,8 +1005,6 @@ private:
Uint32 tdelForward;
Uint32 tiopPageId;
Uint32 tipPageId;
Uint32 tgeLocked;
Uint32 tgeResult;
Uint32 tgeContainerptr;
Uint32 tgeElementptr;
Uint32 tgeForward;

View file

@ -130,8 +130,6 @@ Dbacc::Dbacc(Block_context& ctx):
&fragrecptr,
&operationRecPtr,
&idrOperationRecPtr,
&copyInOperPtr,
&copyOperPtr,
&mlpqOperPtr,
&queOperPtr,
&readWriteOpPtr,
@ -161,8 +159,6 @@ Dbacc::Dbacc(Block_context& ctx):
&lcnPageptr,
&lcnCopyPageptr,
&lupPageptr,
&priPageptr,
&pwiPageptr,
&ciPageidptr,
&gsePageidptr,
&isoPageptr,

File diff suppressed because it is too large Load diff

View file

@ -33,7 +33,8 @@
// primary key is stored in TUP
#include "../dbtup/Dbtup.hpp"
#include "../dbacc/Dbacc.hpp"
class Dbacc;
class Dbtup;
#ifdef DBLQH_C
// Constants
@ -571,6 +572,7 @@ public:
Uint8 rangeScan;
Uint8 descending;
Uint8 tupScan;
Uint8 lcpScan;
Uint8 scanTcWaiting;
Uint8 scanKeyinfoFlag;
Uint8 m_last_row;
@ -2556,8 +2558,19 @@ private:
Dbtup* c_tup;
Dbacc* c_acc;
/**
* Read primary key from tup
*/
Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst);
/**
* Read primary key from operation
*/
public:
Uint32 readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm);
private:
void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32);
void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32);
@ -2924,6 +2937,11 @@ public:
}
DLHashTable<ScanRecord> c_scanTakeOverHash;
inline bool TRACE_OP_CHECK(const TcConnectionrec* regTcPtr);
#ifdef ERROR_INSERT
void TRACE_OP_DUMP(const TcConnectionrec* regTcPtr, const char * pos);
#endif
};
inline
@ -2991,10 +3009,19 @@ Dblqh::accminupdate(Signal* signal, Uint32 opId, const Local_key* key)
signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS | key->m_page_idx;
c_acc->execACCMINUPDATE(signal);
if (ERROR_INSERTED(5712))
if (ERROR_INSERTED(5712) || ERROR_INSERTED(5713))
ndbout << " LK: " << *key;
regTcPtr.p->m_row_id = *key;
}
inline
bool
Dblqh::TRACE_OP_CHECK(const TcConnectionrec* regTcPtr)
{
return (ERROR_INSERTED(5712) &&
(regTcPtr->operation == ZINSERT ||
regTcPtr->operation == ZDELETE)) ||
ERROR_INSERTED(5713);
}
#endif

View file

@ -67,48 +67,70 @@
// seen only when we debug the product
#ifdef VM_TRACE
#define DEBUG(x) ndbout << "DBLQH: "<< x << endl;
static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Operation_t op)
{
switch(op){
case ZREAD: out << "READ"; break;
case ZREAD_EX: out << "READ-EX"; break;
case ZINSERT: out << "INSERT"; break;
case ZUPDATE: out << "UPDATE"; break;
case ZDELETE: out << "DELETE"; break;
case ZWRITE: out << "WRITE"; break;
}
return out;
}
#else
#define DEBUG(x)
#endif
@ -120,7 +142,7 @@ const Uint32 NR_ScanNo = 0;
#if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR
#include <NdbConfig.h>
NdbOut * tracenrout = 0;
static NdbOut * tracenrout = 0;
static int TRACENR_FLAG = 0;
#define TRACENR(x) (* tracenrout) << x
#define SET_TRACENR_FLAG TRACENR_FLAG = 1
@ -132,6 +154,13 @@ static int TRACENR_FLAG = 0;
#define CLEAR_TRACENR_FLAG
#endif
#ifdef ERROR_INSERT
static NdbOut * traceopout = 0;
#define TRACE_OP(regTcPtr, place) do { if (TRACE_OP_CHECK(regTcPtr)) TRACE_OP_DUMP(regTcPtr, place); } while(0)
#else
#define TRACE_OP(x, y) {}
#endif
/* ------------------------------------------------------------------------- */
/* ------- SEND SYSTEM ERROR ------- */
/* */
@ -454,6 +483,10 @@ void Dblqh::execSTTOR(Signal* signal)
name = NdbConfig_SignalLogFileName(getOwnNodeId());
tracenrout = new NdbOut(* new FileOutputStream(fopen(name, "w+")));
#endif
#ifdef ERROR_INSERT
traceopout = &ndbout;
#endif
return;
break;
@ -2531,14 +2564,15 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
case TcConnectionrec::WAIT_ACC_ABORT:
case TcConnectionrec::ABORT_QUEUED:
jam();
/* -------------------------------------------------------------------------- */
/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */
/* -------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------- */
/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */
/* ------------------------------------------------------------------------- */
break;
default:
ndbrequire(false);
break;
}//switch
}//Dblqh::execTUPKEYCONF()
/* ************> */
@ -2560,6 +2594,8 @@ void Dblqh::execTUPKEYREF(Signal* signal)
c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr;
TRACE_OP(regTcPtr, "TUPKEYREF");
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
@ -2568,7 +2604,7 @@ void Dblqh::execTUPKEYREF(Signal* signal)
ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP ||
regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT);
}
switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP:
jam();
@ -3606,57 +3642,7 @@ void Dblqh::endgettupkeyLab(Signal* signal)
regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR;
return;
}//if
//#define TRACE_LQHKEYREQ
#ifdef TRACE_LQHKEYREQ
{
ndbout << (regTcPtr->operation == ZREAD ? "READ" :
regTcPtr->operation == ZUPDATE ? "UPDATE" :
regTcPtr->operation == ZINSERT ? "INSERT" :
regTcPtr->operation == ZDELETE ? "DELETE" : "<Other>")
<< "(" << (int)regTcPtr->operation << ")"
<< " from=(" << getBlockName(refToBlock(regTcPtr->clientBlockref))
<< ", " << refToNode(regTcPtr->clientBlockref) << ")"
<< " table=" << regTcPtr->tableref << " ";
ndbout << "hash: " << hex << regTcPtr->hashValue << endl;
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "]" << endl;
ndbout << "attr=[" << hex;
for(i = 0; i<regTcPtr->reclenAiLqhkey && i < 5; i++)
ndbout << hex << regTcPtr->firstAttrinfo[i] << " ";
AttrbufPtr regAttrinbufptr;
regAttrinbufptr.i= regTcPtr->firstAttrinbuf;
while(i < regTcPtr->totReclenAi)
{
ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
ndbrequire(dataLen != 0);
ndbrequire(i + dataLen <= regTcPtr->totReclenAi);
for(Uint32 j= 0; j<dataLen; j++, i++)
ndbout << hex << regAttrinbufptr.p->attrbuf[j] << " ";
regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
}
ndbout << "]" << endl;
}
#endif
/* ---------------------------------------------------------------------- */
/* NOW RECEPTION OF LQHKEYREQ IS COMPLETED THE NEXT STEP IS TO START*/
/* PROCESSING THE MESSAGE. IF THE MESSAGE IS TO A STAND-BY NODE */
@ -3763,6 +3749,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
/* ----------------------------------------------------------------- */
if (TRACENR_FLAG)
{
TRACE_OP(regTcPtr, "RECEIVED");
switch (regTcPtr->operation) {
case ZREAD: TRACENR("READ"); break;
case ZUPDATE: TRACENR("UPDATE"); break;
@ -3847,6 +3834,9 @@ Dblqh::exec_acckeyreq(Signal* signal, TcConnectionrecPtr regTcPtr)
signal->theData[8] = sig2;
signal->theData[9] = sig3;
signal->theData[10] = sig4;
TRACE_OP(regTcPtr.p, "ACC");
if (regTcPtr.p->primKeyLen > 4) {
sendKeyinfoAcc(signal, 11);
}//if
@ -4133,7 +4123,7 @@ Dblqh::nr_copy_delete_row(Signal* signal,
jam();
ndbrequire(rowid == 0);
signal->theData[0] = accPtr;
signal->theData[1] = false;
signal->theData[1] = 0;
EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2);
jamEntry();
return;
@ -4144,16 +4134,18 @@ Dblqh::nr_copy_delete_row(Signal* signal,
*/
ndbrequire(regTcPtr.p->m_dealloc == 0);
Local_key save = regTcPtr.p->m_row_id;
signal->theData[0] = regTcPtr.p->accConnectrec;
c_acc->execACCKEY_ORD(signal, accPtr);
signal->theData[0] = accPtr;
EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1);
jamEntry();
ndbrequire(regTcPtr.p->m_dealloc == 1);
int ret = c_tup->nr_delete(signal, regTcPtr.i,
fragPtr.p->tupFragptr, &regTcPtr.p->m_row_id,
regTcPtr.p->gci);
jamEntry();
if (ret)
{
ndbassert(ret == 1);
@ -4167,7 +4159,7 @@ Dblqh::nr_copy_delete_row(Signal* signal,
}
TRACENR("DELETED: " << regTcPtr.p->m_row_id << endl);
regTcPtr.p->m_dealloc = 0;
regTcPtr.p->m_row_id = save;
fragptr = fragPtr;
@ -4274,6 +4266,45 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op)
}
}
Uint32
Dblqh::readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm)
{
TcConnectionrecPtr regTcPtr;
DatabufPtr regDatabufptr;
Uint64 Tmp[MAX_KEY_SIZE_IN_WORDS >> 1];
jamEntry();
regTcPtr.i = opPtrI;
ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
Uint32 tableId = regTcPtr.p->tableref;
Uint32 keyLen = regTcPtr.p->primKeyLen;
regDatabufptr.i = regTcPtr.p->firstTupkeybuf;
Uint32 * tmp = xfrm ? (Uint32*)Tmp : dst;
memcpy(tmp, regTcPtr.p->tupkeyData, sizeof(regTcPtr.p->tupkeyData));
if (keyLen > 4)
{
tmp += 4;
Uint32 pos = 4;
do {
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
memcpy(tmp, regDatabufptr.p->data, sizeof(regDatabufptr.p->data));
regDatabufptr.i = regDatabufptr.p->nextDatabuf;
tmp += sizeof(regDatabufptr.p->data) >> 2;
pos += sizeof(regDatabufptr.p->data) >> 2;
} while(pos < keyLen);
}
if (xfrm)
{
jam();
Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
return xfrm_key(tableId, (Uint32*)Tmp, dst, ~0, keyPartLen);
}
return keyLen;
}
/* =*======================================================================= */
/* ======= SEND KEYINFO TO ACC ======= */
@ -4447,10 +4478,6 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
* ----------------------------------------------------------------------- */
Uint32 page_idx = local_key & MAX_TUPLES_PER_PAGE;
Uint32 page_no = local_key >> MAX_TUPLES_BITS;
#ifdef TRACE_LQHKEYREQ
ndbout << "localkey: [ " << hex << page_no << " " << page_idx << "]"
<< endl;
#endif
Uint32 Ttupreq = regTcPtr->dirtyOp;
Ttupreq = Ttupreq + (regTcPtr->opSimple << 1);
Ttupreq = Ttupreq + (op << 6);
@ -4506,70 +4533,13 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
tupKeyReq->m_row_id_page_no = sig0;
tupKeyReq->m_row_id_page_idx = sig1;
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
{
ndbout << "INSERT " << regFragptrP->tabRef
<< "(" << regFragptrP->fragId << ")";
{
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "] ";
}
if(regTcPtr->m_use_rowid)
ndbout << " " << regTcPtr->m_row_id;
}
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZDELETE)
{
Local_key lk; lk.assref(local_key);
ndbout << "DELETE " << regFragptrP->tabRef
<< "(" << regFragptrP->fragId << ") " << lk;
{
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "]" << endl;
}
}
TRACE_OP(regTcPtr, "TUPKEYREQ");
regTcPtr->m_use_rowid |= (op == ZINSERT);
regTcPtr->m_row_id.m_page_no = page_no;
regTcPtr->m_row_id.m_page_idx = page_idx;
EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
{
ndbout << endl;
}
}//Dblqh::execACCKEYCONF()
void
@ -4654,27 +4624,37 @@ void Dblqh::tupkeyConfLab(Signal* signal)
const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0];
TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
Uint32 readLen = tupKeyConf->readLength;
Uint32 writeLen = tupKeyConf->writeLength;
Uint32 accOp = regTcPtr->accConnectrec;
c_acc->execACCKEY_ORD(signal, accOp);
TRACE_OP(regTcPtr, "TUPKEYCONF");
if (regTcPtr->simpleRead) {
jam();
/* ----------------------------------------------------------------------
* THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE OPERATION.
* SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK (FOR LOCAL CHECKPOINTS) YET
* THE OPERATION IS A SIMPLE READ.
* WE WILL IMMEDIATELY COMMIT THE OPERATION.
* SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK
* (FOR LOCAL CHECKPOINTS) YET
* WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED.
* WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN READ LENGTH
* ---------------------------------------------------------------------- */
* WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN
* READ LENGTH
* --------------------------------------------------------------------- */
commitContinueAfterBlockedLab(signal);
return;
}//if
if (tupKeyConf->readLength != 0) {
if (readLen != 0)
{
jam();
/* SET BIT 15 IN REQINFO */
LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1);
regTcPtr->readlenAi = tupKeyConf->readLength;
regTcPtr->readlenAi = readLen;
}//if
regTcPtr->totSendlenAi = tupKeyConf->writeLength;
regTcPtr->totSendlenAi = writeLen;
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
@ -5597,6 +5577,8 @@ void Dblqh::releaseOprec(Signal* signal)
if (TRACENR_FLAG)
TRACENR("DELETED: " << regTcPtr->m_row_id << endl);
TRACE_OP(regTcPtr, "DEALLOC");
signal->theData[0] = regTcPtr->fragmentid;
signal->theData[1] = regTcPtr->tableref;
@ -5818,6 +5800,10 @@ void Dblqh::execCOMMIT(Signal* signal)
ptrAss(tcConnectptr, regTcConnectionrec);
if ((tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) {
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TRACE_OP(regTcPtr, "COMMIT");
commitReqLab(signal, gci);
return;
}//if
@ -5937,6 +5923,10 @@ void Dblqh::execCOMPLETE(Signal* signal)
if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) &&
(tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) {
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TRACE_OP(regTcPtr, "COMPLETE");
if (tcConnectptr.p->seqNoReplica != 0 &&
tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) {
jam();
@ -6313,12 +6303,16 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
TRACENR(endl);
}
TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
} else {
if(!dirtyOp){
TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
@ -6362,6 +6356,8 @@ Dblqh::tupcommit_conf_callback(Signal* signal, Uint32 tcPtrI)
c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr;
TRACE_OP(tcPtr, "ACC_COMMITREQ");
Uint32 acc = refToBlock(tcPtr->tcAccBlockref);
signal->theData[0] = tcPtr->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
@ -6670,6 +6666,8 @@ void Dblqh::execABORT(Signal* signal)
regTcPtr->commitAckMarker = RNIL;
}
TRACE_OP(regTcPtr, "ABORT");
abortStateHandlerLab(signal);
return;
@ -7087,23 +7085,30 @@ void Dblqh::abortContinueAfterBlockedLab(Signal* signal, bool canBlock)
* ALSO AS PART OF A NORMAL ABORT WITHOUT BLOCKING.
* WE MUST ABORT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN
* AND SEES A STATE IN TUP.
* ------------------------------------------------------------------------ */
* ----------------------------------------------------------------------- */
TcConnectionrec * const regTcPtr = tcConnectptr.p;
fragptr.i = regTcPtr->fragmentptr;
c_fragment_pool.getPtr(fragptr);
signal->theData[0] = regTcPtr->tupConnectrec;
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
TRACE_OP(regTcPtr, "ACC ABORT");
regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
signal->theData[0] = regTcPtr->accConnectrec;
signal->theData[1] = true;
signal->theData[1] = 2; // JOB BUFFER IF NEEDED
EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2);
/* ------------------------------------------------------------------------
* We need to insert a real-time break by sending ACC_ABORTCONF through the
* job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
* TUPKEYREF that are in the job buffer but not yet processed. Doing
* everything without that would race and create a state error when they
* are executed.
* ----------------------------------------------------------------------- */
if (signal->theData[1] == RNIL)
{
jam();
/* ------------------------------------------------------------------------
* We need to insert a real-time break by sending ACC_ABORTCONF through the
* job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
* TUPKEYREF that are in the job buffer but not yet processed. Doing
* everything without that would race and create a state error when they
* are executed.
* --------------------------------------------------------------------- */
return;
}
execACC_ABORTCONF(signal);
return;
}//Dblqh::abortContinueAfterBlockedLab()
@ -7117,6 +7122,11 @@ void Dblqh::execACC_ABORTCONF(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
TcConnectionrec * const regTcPtr = tcConnectptr.p;
ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
TRACE_OP(regTcPtr, "ACC_ABORTCONF");
signal->theData[0] = regTcPtr->tupConnectrec;
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
continueAbortLab(signal);
return;
}//Dblqh::execACC_ABORTCONF()
@ -7623,6 +7633,7 @@ void Dblqh::execNEXT_SCANCONF(Signal* signal)
scanLockReleasedLab(signal);
break;
default:
ndbout_c("%d", scanptr.p->scanState);
ndbrequire(false);
}//switch
}//Dblqh::execNEXT_SCANCONF()
@ -8351,6 +8362,8 @@ void Dblqh::continueAfterReceivingAllAiLab(Signal* signal)
AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending);
AccScanReq::setNoDiskScanFlag(req->requestInfo,
!tcConnectptr.p->m_disk_table);
AccScanReq::setLcpScanFlag(req->requestInfo, scanptr.p->lcpScan);
req->transId1 = tcConnectptr.p->transid[0];
req->transId2 = tcConnectptr.p->transid[1];
req->savePointId = tcConnectptr.p->savePointId;
@ -8837,7 +8850,9 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
}//if
// If accOperationPtr == RNIL no record was returned by ACC
if (nextScanConf->accOperationPtr == RNIL) {
Uint32 accOpPtr = nextScanConf->accOperationPtr;
if (accOpPtr == RNIL)
{
jam();
/*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
@ -8871,7 +8886,8 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
jam();
set_acc_ptr_in_scan_record(scanptr.p,
scanptr.p->m_curr_batch_size_rows,
nextScanConf->accOperationPtr);
accOpPtr);
jam();
nextScanConfLoopLab(signal);
}//Dblqh::nextScanConfScanLab()
@ -9049,6 +9065,7 @@ Dblqh::readPrimaryKeys(ScanRecord *scanP, TcConnectionrec *tcConP, Uint32 *dst)
}
int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst, false);
jamEntry();
if(0)
ndbout_c("readPrimaryKeys(table: %d fragment: %d [ %d %d ] -> %d",
tableId, fragId, fragPageId, pageIndex, ret);
@ -9071,12 +9088,25 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
Uint32 rows = scanptr.p->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
if (accOpPtr != (Uint32)-1)
{
c_acc->execACCKEY_ORD(signal, accOpPtr);
jamEntry();
}
else
{
ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
}
if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */
if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) {
if ((scanptr.p->scanLockHold == ZTRUE) && rows)
{
jam();
scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal);
@ -9093,7 +9123,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
}//if
ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->m_curr_batch_size_bytes+= tdata4;
scanptr.p->m_curr_batch_size_rows++;
scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->m_last_row = tdata5;
if (scanptr.p->check_scan_batch_completed() | tdata5){
if (scanptr.p->scanLockHold == ZTRUE) {
@ -9103,7 +9133,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
return;
} else {
jam();
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal);
return;
}
@ -9187,12 +9217,24 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
Uint32 rows = scanptr.p->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
if (accOpPtr != (Uint32)-1)
{
c_acc->execACCKEY_ORD(signal, accOpPtr);
}
else
{
ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
}
if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */
if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) {
if ((scanptr.p->scanLockHold == ZTRUE) && rows)
{
jam();
scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal);
@ -9213,8 +9255,8 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
scanptr.p->scanReleaseCounter = 1;
} else {
jam();
scanptr.p->m_curr_batch_size_rows++;
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->scanReleaseCounter = rows + 1;
}//if
/* --------------------------------------------------------------------
* WE NEED TO RELEASE ALL LOCKS CURRENTLY
@ -9224,7 +9266,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
return;
}//if
Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount;
if (scanptr.p->m_curr_batch_size_rows > 0) {
if (rows) {
if (time_passed > 1) {
/* -----------------------------------------------------------------------
* WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A
@ -9232,7 +9274,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
* THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we
* send the found tuples to the API.
* ----------------------------------------------------------------------- */
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows + 1;
scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal);
return;
}
@ -9377,7 +9419,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo);
const Uint32 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
const Uint32 descending = ScanFragReq::getDescendingFlag(reqinfo);
const Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo);
Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo);
const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo);
const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo);
@ -9395,6 +9437,11 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->m_max_batch_size_rows = max_rows;
scanptr.p->m_max_batch_size_bytes = max_bytes;
#if 0
if (! rangeScan)
tupScan = 1;
#endif
if (! rangeScan && ! tupScan)
scanptr.p->scanBlockref = tcConnectptr.p->tcAccBlockref;
else if (! tupScan)
@ -9408,6 +9455,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->rangeScan = rangeScan;
scanptr.p->descending = descending;
scanptr.p->tupScan = tupScan;
scanptr.p->lcpScan = ScanFragReq::getLcpScanFlag(reqinfo);
scanptr.p->scanState = ScanRecord::SCAN_FREE;
scanptr.p->scanFlag = ZFALSE;
scanptr.p->m_row_id.setNull();
@ -9416,7 +9464,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
scanptr.p->m_last_row = 0;
scanptr.p->scanStoredProcId = RNIL;
scanptr.p->copyPtr = RNIL;
if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){
jam();
return ScanFragRef::ZWRONG_BATCH_SIZE;
@ -9437,8 +9485,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
* !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11
* idx uses from MAX_PARALLEL_SCANS_PER_FRAG - MAX = 12-42)
*/
Uint32 start = (rangeScan || tupScan ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
Uint32 stop = (rangeScan || tupScan ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
tupScan = 0; // Make sure that close tup scan does not start acc scan incorrectly
Uint32 start = (rangeScan || tupScan) ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ;
Uint32 stop = (rangeScan || tupScan) ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG :
MAX_PARALLEL_SCANS_PER_FRAG - 1;
stop += start;
Uint32 free = tFragPtr.p->m_scanNumberMask.find(start);
@ -9872,6 +9922,8 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
fragptr.p->m_scanNumberMask.clear(NR_ScanNo);
scanptr.p->scanBlockref = DBTUP_REF;
scanptr.p->scanLockHold = ZFALSE;
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
initScanTc(0,
0,
@ -10074,7 +10126,7 @@ void Dblqh::nextScanConfCopyLab(Signal* signal)
initCopyTc(signal, ZDELETE);
set_acc_ptr_in_scan_record(scanptr.p, 0, RNIL);
tcConP->gci = nextScanConf->gci;
tcConP->primKeyLen = 0;
tcConP->totSendlenAi = 0;
tcConP->connectState = TcConnectionrec::COPY_CONNECTED;
@ -10197,6 +10249,12 @@ void Dblqh::copyTupkeyConfLab(Signal* signal)
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
ScanRecord* scanP = scanptr.p;
Uint32 rows = scanP->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, rows, false);
ndbassert(accOpPtr != (Uint32)-1);
c_acc->execACCKEY_ORD(signal, accOpPtr);
if (tcConnectptr.p->errorCode != 0) {
jam();
closeCopyLab(signal);
@ -18538,6 +18596,21 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
}
ndbrequire(arg != 2308);
}
#ifdef ERROR_INSERT
if (arg == 5712 || arg == 5713)
{
if (arg == 5712)
{
traceopout = &ndbout;
}
else if (arg == 5713)
{
traceopout = tracenrout;
}
SET_ERROR_INSERT_VALUE(arg);
}
#endif
}//Dblqh::execDUMP_STATE_ORD()
@ -18702,3 +18775,39 @@ void Dblqh::writeDbgInfoPageHeader(LogPageRecordPtr logP, Uint32 place,
logP.p->logPageWord[ZPOS_IN_WRITING]= 1;
}
#if defined ERROR_INSERT
void
Dblqh::TRACE_OP_DUMP(const Dblqh::TcConnectionrec* regTcPtr, const char * pos)
{
(* traceopout)
<< "[ " << hex << regTcPtr->transid[0]
<< " " << hex << regTcPtr->transid[1] << " ] " << dec
<< pos
<< " " << (Operation_t)regTcPtr->operation
<< " " << regTcPtr->tableref
<< "(" << regTcPtr->fragmentid << ")"
<< "(" << (regTcPtr->seqNoReplica == 0 ? "P" : "B") << ")" ;
{
(* traceopout) << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
(* traceopout) << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
(* traceopout) << hex << regDatabufptr.p->data[j] << " ";
}
(* traceopout) << "] ";
}
if (regTcPtr->m_use_rowid)
(* traceopout) << " " << regTcPtr->m_row_id;
(* traceopout) << endl;
}
#endif

View file

@ -7052,6 +7052,18 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
found = true;
}
}
ScanFragList deliv(c_scan_frag_pool, scanptr.p->m_delivered_scan_frags);
for(deliv.first(ptr); !ptr.isNull(); deliv.next(ptr))
{
jam();
if (refToNode(ptr.p->lqhBlockref) == failedNodeId)
{
jam();
found = true;
break;
}
}
}
if(found){
jam();
@ -7081,15 +7093,20 @@ Dbtc::nodeFailCheckTransactions(Signal* signal,
for (transPtr.i = transPtrI; transPtr.i < capiConnectFilesize; transPtr.i++)
{
ptrCheckGuard(transPtr, capiConnectFilesize, apiConnectRecord);
Uint32 state = transPtr.p->apiConnectstate;
if (transPtr.p->m_transaction_nodes.get(failedNodeId))
{
jam();
// Force timeout regardless of state
c_appl_timeout_value = 1;
setApiConTimer(transPtr.i, TtcTimer - 2, __LINE__);
timeOutFoundLab(signal, transPtr.i, ZNODEFAIL_BEFORE_COMMIT);
c_appl_timeout_value = TapplTimeout;
// avoid assertion in timeoutfoundlab
if (state != CS_PREPARE_TO_COMMIT)
{
// Force timeout regardless of state
c_appl_timeout_value = 1;
setApiConTimer(transPtr.i, TtcTimer - 2, __LINE__);
timeOutFoundLab(signal, transPtr.i, ZNODEFAIL_BEFORE_COMMIT);
c_appl_timeout_value = TapplTimeout;
}
}
// Send CONTINUEB to continue later

View file

@ -297,7 +297,6 @@ enum TransState {
};
enum TupleState {
TUPLE_INITIAL_INSERT = 0,
TUPLE_PREPARED = 1,
TUPLE_ALREADY_ABORTED = 2,
TUPLE_TO_BE_COMMITTED = 3
@ -305,7 +304,6 @@ enum TupleState {
enum State {
NOT_INITIALIZED = 0,
COMMON_AREA_PAGES = 1,
IDLE = 17,
ACTIVE = 18,
SYSTEM_RESTART = 19,
@ -1441,7 +1439,6 @@ private:
void execSET_VAR_REQ(Signal* signal);
void execDROP_TAB_REQ(Signal* signal);
void execALTER_TAB_REQ(Signal* signal);
void execTUP_ALLOCREQ(Signal* signal);
void execTUP_DEALLOCREQ(Signal* signal);
void execTUP_WRITELOG_REQ(Signal* signal);

View file

@ -202,29 +202,6 @@ Dbtup::receive_attrinfo(Signal* signal, Uint32 op,
}
}
void Dbtup::execTUP_ALLOCREQ(Signal* signal)
{
OperationrecPtr regOperPtr;
jamEntry();
regOperPtr.i= signal->theData[0];
c_operation_pool.getPtr(regOperPtr);
regOperPtr.p->op_struct.tuple_state= TUPLE_INITIAL_INSERT;
//ndbout_c("execTUP_ALLOCREQ");
signal->theData[0]= 0;
signal->theData[1]= ~0 >> MAX_TUPLES_BITS;
signal->theData[2]= (1 << MAX_TUPLES_BITS) - 1;
return;
mem_error:
jam();
signal->theData[0]= ZMEM_NOMEM_ERROR;
return;
}
void
Dbtup::setChecksum(Tuple_header* tuple_ptr,
Tablerec* regTabPtr)
@ -455,13 +432,13 @@ Dbtup::load_diskpage(Signal* signal,
ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
Tablerec* regTabPtr = tabptr.p;
if(regOperPtr->op_struct.tuple_state == TUPLE_INITIAL_INSERT)
if(local_key == ~(Uint32)0)
{
jam();
regOperPtr->op_struct.m_wait_log_buffer= 1;
regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
return 1;
}
}
jam();
Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
@ -663,7 +640,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
regOperPtr->savepointId= sig1;
regOperPtr->op_struct.primary_replica= sig2;
regOperPtr->m_tuple_location.m_page_idx= sig3;
Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
sig1= tupKeyReq->opRef;
sig2= tupKeyReq->tcOpIndex;
@ -673,7 +650,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
req_struct.tc_operation_ptr= sig1;
req_struct.TC_index= sig2;
req_struct.TC_ref= sig3;
req_struct.frag_page_id= sig4;
Uint32 pageid = req_struct.frag_page_id= sig4;
req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
sig1= tupKeyReq->attrBufLen;
@ -706,7 +683,8 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
copyAttrinfo(regOperPtr, &cinBuffer[0]);
if(Roptype == ZINSERT && get_tuple_state(regOperPtr)== TUPLE_INITIAL_INSERT)
Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
if(Roptype == ZINSERT && localkey == ~0)
{
// No tuple allocatated yet
goto do_insert;
@ -1159,49 +1137,6 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
}
void
Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
Tablerec* regTabPtr)
{
regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc);
req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
if(cnt1)
{
// Disk part is 32-bit aligned
char *varptr = req_struct->m_var_data[MM].m_data_ptr;
ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset);
}
else
{
ptr -= Tuple_header::HeaderSize;
}
req_struct->m_disk_ptr= (Tuple_header*)ptr;
if(cnt2)
{
KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
dst->m_var_len_offset= cnt2;
dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
}
// Set all null bits
memset(req_struct->m_disk_ptr->m_null_bits+
regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
4*regTabPtr->m_offsets[DD].m_null_words);
req_struct->m_tuple_ptr->m_header_bits =
(Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE);
}
int Dbtup::handleInsertReq(Signal* signal,
Ptr<Operationrec> regOperPtr,
Ptr<Fragrecord> fragPtr,
@ -1215,8 +1150,8 @@ int Dbtup::handleInsertReq(Signal* signal,
Tuple_header *tuple_ptr;
bool disk = regTabPtr->m_no_of_disk_attributes > 0;
bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT;
bool disk_insert = regOperPtr.p->is_first_operation() && disk;
bool mem_insert = regOperPtr.p->is_first_operation();
bool disk_insert = mem_insert && disk;
bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
bool rowid = req_struct->m_use_rowid;
Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
@ -1244,21 +1179,16 @@ int Dbtup::handleInsertReq(Signal* signal,
if(mem_insert)
{
jam();
ndbassert(regOperPtr.p->is_first_operation()); // disk insert
prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
}
else
{
if (!regOperPtr.p->is_first_operation())
{
Operationrec* prevOp= req_struct->prevOpPtr.p;
ndbassert(prevOp->op_struct.op_type == ZDELETE);
tup_version= prevOp->tupVersion + 1;
if(!prevOp->is_first_operation())
org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
}
Operationrec* prevOp= req_struct->prevOpPtr.p;
ndbassert(prevOp->op_struct.op_type == ZDELETE);
tup_version= prevOp->tupVersion + 1;
if(!prevOp->is_first_operation())
org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
if (regTabPtr->need_expand())
expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
else
@ -1268,11 +1198,6 @@ int Dbtup::handleInsertReq(Signal* signal,
if (disk_insert)
{
int res;
if (unlikely(!mem_insert))
{
sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size;
fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr);
}
if (ERROR_INSERTED(4015))
{
@ -1381,6 +1306,7 @@ int Dbtup::handleInsertReq(Signal* signal,
}
if (unlikely(ptr == 0))
{
jam();
goto alloc_rowid_error;
}
}
@ -1396,7 +1322,7 @@ int Dbtup::handleInsertReq(Signal* signal,
(varsize ? Tuple_header::CHAINED_ROW : 0);
regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
}
else if(!rowid || !regOperPtr.p->is_first_operation())
else
{
int ret;
if (ERROR_INSERTED(4020))
@ -1417,20 +1343,6 @@ int Dbtup::handleInsertReq(Signal* signal,
req_struct->m_use_rowid = false;
base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
}
else
{
if ((req_struct->m_row_id.m_page_no == frag_page_id &&
req_struct->m_row_id.m_page_idx == regOperPtr.p->m_tuple_location.m_page_idx))
{
ndbout_c("no mem insert but rowid (same)");
base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
}
else
{
// no mem insert, but rowid
ndbrequire(false);
}
}
base->m_header_bits |= Tuple_header::ALLOC &
(regOperPtr.p->is_first_operation() ? ~0 : 1);

View file

@ -89,7 +89,6 @@ Dbtup::Dbtup(Block_context& ctx, Pgman* pgman)
addRecSignal(GSN_DROP_TAB_REQ, &Dbtup::execDROP_TAB_REQ);
addRecSignal(GSN_TUP_ALLOCREQ, &Dbtup::execTUP_ALLOCREQ);
addRecSignal(GSN_TUP_DEALLOCREQ, &Dbtup::execTUP_DEALLOCREQ);
addRecSignal(GSN_TUP_WRITELOG_REQ, &Dbtup::execTUP_WRITELOG_REQ);

View file

@ -53,7 +53,10 @@ Dbtup::execACC_SCANREQ(Signal* signal)
Fragrecord& frag = *fragPtr.p;
// flags
Uint32 bits = 0;
if (frag.m_lcp_scan_op == RNIL) {
if (!AccScanReq::getLcpScanFlag(req->requestInfo) ||
tablePtr.p->m_no_of_disk_attributes == 0)
{
// seize from pool and link to per-fragment list
LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
if (! list.seize(scanPtr)) {
@ -63,23 +66,25 @@ Dbtup::execACC_SCANREQ(Signal* signal)
if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
&& tablePtr.p->m_no_of_disk_attributes)
{
bits |= ScanOp::SCAN_DD;
bits |= ScanOp::SCAN_DD;
}
bool mm = (bits & ScanOp::SCAN_DD);
if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
bits |= ScanOp::SCAN_VS;
// disk pages have fixed page format
ndbrequire(! (bits & ScanOp::SCAN_DD));
// disk pages have fixed page format
ndbrequire(! (bits & ScanOp::SCAN_DD));
}
if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) {
if (AccScanReq::getLockMode(req->requestInfo) == 0)
bits |= ScanOp::SCAN_LOCK_SH;
else
bits |= ScanOp::SCAN_LOCK_EX;
if (AccScanReq::getLockMode(req->requestInfo) == 0)
bits |= ScanOp::SCAN_LOCK_SH;
else
bits |= ScanOp::SCAN_LOCK_EX;
}
} else {
jam();
// LCP scan and disk
ndbrequire(frag.m_lcp_scan_op == c_lcp_scan_op);
c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
bits |= ScanOp::SCAN_LCP;
@ -156,7 +161,7 @@ Dbtup::execNEXT_SCANREQ(Signal* signal)
conf->scanPtr = scan.m_userPtr;
unsigned signalLength = 1;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
signal, signalLength, JBB);
return;
}
break;
@ -171,7 +176,7 @@ Dbtup::execNEXT_SCANREQ(Signal* signal)
lockReq->requestInfo = AccLockReq::AbortWithConf;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
signal, AccLockReq::UndoSignalLength);
signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_state = ScanOp::Aborting;
@ -182,10 +187,10 @@ Dbtup::execNEXT_SCANREQ(Signal* signal)
ndbrequire(scan.m_accLockOp != RNIL);
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
signal, AccLockReq::UndoSignalLength);
signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
@ -433,7 +438,7 @@ Dbtup::execACCKEYCONF(Signal* signal)
jam();
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
@ -582,12 +587,15 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
Fragrecord& frag = *fragPtr.p;
// tuple found
Tuple_header* th = 0;
Uint32 thbits = 0;
Uint32 loop_count = 0;
Uint32 scanGCI = scanPtr.p->m_scanGCI;
Uint32 foundGCI;
bool mm = (bits & ScanOp::SCAN_DD);
bool lcp = (bits & ScanOp::SCAN_LCP);
const bool mm = (bits & ScanOp::SCAN_DD);
const bool lcp = (bits & ScanOp::SCAN_LCP);
const bool dirty = (bits & ScanOp::SCAN_LOCK) == 0;
Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
Uint32 size = table.m_offsets[mm].m_fix_header_size +
(bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0);
@ -750,22 +758,22 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
{
pos.m_get = ScanPos::Get_next_tuple_fs;
th = (Tuple_header*)&page->m_data[key.m_page_idx];
thbits = th->m_header_bits;
if (likely(! (bits & ScanOp::SCAN_NR)))
{
if (! (th->m_header_bits & Tuple_header::FREE)) {
goto found_tuple;
}
else
jam();
if (! (thbits & Tuple_header::FREE))
{
jam();
// skip free tuple
}
if (! ((thbits & Tuple_header::ALLOC) && dirty))
goto found_tuple;
}
}
else
{
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI)
{
if (! (th->m_header_bits & Tuple_header::FREE))
if (! (thbits & Tuple_header::FREE))
{
jam();
goto found_tuple;
@ -775,9 +783,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
goto found_deleted_rowid;
}
}
else
else if (thbits != Fix_page::FREE_RECORD &&
th->m_operation_ptr_i != RNIL)
{
jam();
goto found_tuple; // Locked tuple...
// skip free tuple
}
}
@ -793,8 +803,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
jam();
{
// caller has already set pos.m_get to next tuple
if (! (bits & ScanOp::SCAN_LCP &&
th->m_header_bits & Tuple_header::LCP_SKIP)) {
if (! (bits & ScanOp::SCAN_LCP && thbits & Tuple_header::LCP_SKIP)) {
Local_key& key_mm = pos.m_key_mm;
if (! (bits & ScanOp::SCAN_DD)) {
key_mm = pos.m_key;
@ -810,7 +819,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
} else {
jam();
// clear it so that it will show up in next LCP
th->m_header_bits &= ~(Uint32)Tuple_header::LCP_SKIP;
th->m_header_bits = thbits & ~(Uint32)Tuple_header::LCP_SKIP;
if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
jam();
setChecksum(th, tablePtr.p);
}
}
}
break;
@ -833,7 +846,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI)
{
if (! (th->m_header_bits & Tuple_header::FREE))
if (! (thbits & Tuple_header::FREE))
break;
}
}
@ -893,7 +906,7 @@ found_lcp_keep:
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scan.m_userPtr;
conf->accOperationPtr = RNIL + 1;
conf->accOperationPtr = (Uint32)-1;
conf->fragId = frag.fragmentId;
conf->localKey[0] = lcp_list;
conf->localKey[1] = 0;

View file

@ -321,7 +321,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
conf->scanPtr = scan.m_userPtr;
unsigned signalLength = 1;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
signal, signalLength, JBB);
return;
}
break;
@ -344,7 +344,8 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::AbortWithConf;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_state = ScanOp::Aborting;
@ -355,9 +356,10 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
ndbrequire(scan.m_accLockOp != RNIL);
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
@ -612,7 +614,7 @@ Dbtux::execACCKEYCONF(Signal* signal)
jam();
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();

View file

@ -38,6 +38,7 @@
#include <mgmapi_config_parameters.h>
int global_flag_send_heartbeat_now= 0;
int global_flag_skip_invalidate_cache = 0;
// Just a C wrapper for threadMain
extern "C"
@ -458,11 +459,14 @@ ClusterMgr::reportNodeFailed(NodeId nodeId){
theNode.nfCompleteRep = false;
if(noOfAliveNodes == 0)
{
theFacade.m_globalDictCache.lock();
theFacade.m_globalDictCache.invalidate_all();
theFacade.m_globalDictCache.unlock();
m_connect_count ++;
m_cluster_state = CS_waiting_for_clean_cache;
if (!global_flag_skip_invalidate_cache)
{
theFacade.m_globalDictCache.lock();
theFacade.m_globalDictCache.invalidate_all();
theFacade.m_globalDictCache.unlock();
m_connect_count ++;
m_cluster_state = CS_waiting_for_clean_cache;
}
NFCompleteRep rep;
for(Uint32 i = 1; i<MAX_NODES; i++){
if(theNodes[i].defined && theNodes[i].nfCompleteRep == false){

View file

@ -478,10 +478,14 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
*/
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber);
if(theError.code)
return -1;
Uint32 seq = theNdbCon->theNodeSequence;
const Uint32 seq = theNdbCon->theNodeSequence;
if(theError.code)
{
goto err4;
}
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0)
{
@ -564,6 +568,10 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
if(theError.code == 0)
setErrorCode(4028); // seq changed = Node fail
break;
case -4:
err4:
setErrorCode(theError.code);
break;
}
theNdbCon->theTransactionIsStarted = false;

View file

@ -108,6 +108,8 @@ public:
NDBT_ResultRow& get_row(Uint32 idx) { return *rows[idx];}
int execute_async(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError);
int execute_async_prepare(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError);
int wait_async(Ndb*, int timeout = -1);
protected:

View file

@ -1090,11 +1090,6 @@ runMassiveRollback4(NDBT_Context* ctx, NDBT_Step* step){
ok = false;
break;
}
if (hugoOps.execute_NoCommit(pNdb) != 0)
{
ok = false;
break;
}
}
hugoOps.execute_Rollback(pNdb);
CHECK(hugoOps.closeTransaction(pNdb) == 0);
@ -1199,6 +1194,62 @@ runTupErrors(NDBT_Context* ctx, NDBT_Step* step){
return NDBT_OK;
}
int
runInsertError(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
HugoOperations hugoOp1(*ctx->getTab());
HugoOperations hugoOp2(*ctx->getTab());
Ndb* pNdb = GETNDB(step);
NdbRestarter restarter;
restarter.insertErrorInAllNodes(4017);
const Uint32 LOOPS = 10;
for (Uint32 i = 0; i<LOOPS; i++)
{
CHECK(hugoOp1.startTransaction(pNdb) == 0);
CHECK(hugoOp1.pkInsertRecord(pNdb, 1) == 0);
CHECK(hugoOp2.startTransaction(pNdb) == 0);
CHECK(hugoOp2.pkReadRecord(pNdb, 1, 1) == 0);
CHECK(hugoOp1.execute_async_prepare(pNdb, NdbTransaction::Commit) == 0);
CHECK(hugoOp2.execute_async_prepare(pNdb, NdbTransaction::Commit) == 0);
hugoOp1.wait_async(pNdb);
hugoOp2.wait_async(pNdb);
CHECK(hugoOp1.closeTransaction(pNdb) == 0);
CHECK(hugoOp2.closeTransaction(pNdb) == 0);
}
restarter.insertErrorInAllNodes(0);
return result;
}
int
runInsertError2(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
HugoOperations hugoOp1(*ctx->getTab());
Ndb* pNdb = GETNDB(step);
NdbRestarter restarter;
restarter.insertErrorInAllNodes(4017);
const Uint32 LOOPS = 1;
for (Uint32 i = 0; i<LOOPS; i++)
{
CHECK(hugoOp1.startTransaction(pNdb) == 0);
CHECK(hugoOp1.pkInsertRecord(pNdb, 1) == 0);
CHECK(hugoOp1.pkDeleteRecord(pNdb, 1) == 0);
hugoOp1.execute_NoCommit(pNdb);
CHECK(hugoOp1.closeTransaction(pNdb) == 0);
}
restarter.insertErrorInAllNodes(0);
return NDBT_OK;
}
NDBT_TESTSUITE(testBasic);
TESTCASE("PkInsert",
"Verify that we can insert and delete from this table using PK"
@ -1449,16 +1500,16 @@ TESTCASE("MassiveTransaction",
INITIALIZER(runLoadTable2);
FINALIZER(runClearTable2);
}
TESTCASE("Fill",
"Verify what happens when we fill the db" ){
INITIALIZER(runFillTable);
INITIALIZER(runPkRead);
FINALIZER(runClearTable2);
}
TESTCASE("TupError",
"Verify what happens when we fill the db" ){
INITIALIZER(runTupErrors);
}
TESTCASE("InsertError", "" ){
INITIALIZER(runInsertError);
}
TESTCASE("InsertError2", "" ){
INITIALIZER(runInsertError2);
}
NDBT_TESTSUITE_END(testBasic);
#if 0
@ -1469,6 +1520,12 @@ TESTCASE("ReadConsistency",
STEP(runReadOne);
FINALIZER(runClearTable2);
}
TESTCASE("Fill",
"Verify what happens when we fill the db" ){
INITIALIZER(runFillTable);
INITIALIZER(runPkRead);
FINALIZER(runClearTable2);
}
#endif
int main(int argc, const char** argv){

View file

@ -696,7 +696,10 @@ runBug18612(NDBT_Context* ctx, NDBT_Step* step){
do {
int tmp = restarter.getRandomNodeOtherNodeGroup(node1, rand());
if (tmp == -1)
break;
{
ctx->stopTest();
return NDBT_OK;
}
node1 = tmp;
} while(nodesmask.get(node1));
@ -876,12 +879,15 @@ int runBug20185(NDBT_Context* ctx, NDBT_Step* step){
HugoOperations hugoOps(*ctx->getTab());
Ndb* pNdb = GETNDB(step);
const int masterNode = restarter.getMasterNodeId();
int dump[] = { 7090, 20 } ;
if (restarter.dumpStateAllNodes(dump, 2))
return NDBT_FAILED;
NdbSleep_MilliSleep(3000);
retry:
if(hugoOps.startTransaction(pNdb) != 0)
return NDBT_FAILED;
@ -891,8 +897,14 @@ int runBug20185(NDBT_Context* ctx, NDBT_Step* step){
if (hugoOps.execute_NoCommit(pNdb) != 0)
return NDBT_FAILED;
int nodeId;
const int node = hugoOps.getTransaction()->getConnectedNodeId();
if (node != masterNode)
{
hugoOps.closeTransaction(pNdb);
goto retry;
}
int nodeId;
do {
nodeId = restarter.getDbNodeId(rand() % restarter.getNumDbNodes());
} while (nodeId == node);

View file

@ -665,9 +665,9 @@ main(int argc, const char** argv){
for(Uint32 i = 0; i < 12; i++)
{
if(i == 6 || i == 8 || i == 10)
if(false && (i == 6 || i == 8 || i == 10))
continue;
BaseString name("bug_9749");
name.appfmt("_%d", i);
NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts,

View file

@ -286,15 +286,26 @@ int runRandScanRead(NDBT_Context* ctx, NDBT_Step* step){
int records = ctx->getNumRecords();
int parallelism = ctx->getProperty("Parallelism", 240);
int abort = ctx->getProperty("AbortProb", 5);
int tupscan = ctx->getProperty("TupScan", (Uint32)0);
int i = 0;
HugoTransactions hugoTrans(*ctx->getTab());
while (i<loops && !ctx->isTestStopped()) {
g_info << i << ": ";
NdbOperation::LockMode lm = (NdbOperation::LockMode)(rand() % 3);
int scan_flags = 0;
if (tupscan == 1)
scan_flags |= NdbScanOperation::SF_TupScan;
else if (tupscan == 2 && ((rand() & 0x800)))
{
scan_flags |= NdbScanOperation::SF_TupScan;
}
if (hugoTrans.scanReadRecords(GETNDB(step),
records, abort, parallelism,
lm) != 0){
lm,
scan_flags) != 0){
return NDBT_FAILED;
}
i++;
@ -1320,6 +1331,16 @@ TESTCASE("ScanRead488",
STEPS(runRandScanRead, 70);
FINALIZER(runClearTable);
}
TESTCASE("ScanRead488T",
"Verify scan requirement: It's only possible to have 11 concurrent "\
"scans per fragment running in Ndb kernel at the same time. "\
"When this limit is exceeded the scan will be aborted with errorcode "\
"488."){
TC_PROPERTY("TupScan", 1);
INITIALIZER(runLoadTable);
STEPS(runRandScanRead, 70);
FINALIZER(runClearTable);
}
TESTCASE("ScanRead488O",
"Verify scan requirement: It's only possible to have 11 concurrent "\
"scans per fragment running in Ndb kernel at the same time. "\
@ -1336,6 +1357,7 @@ TESTCASE("ScanRead488_Mixed",
"scans per fragment running in Ndb kernel at the same time. "\
"When this limit is exceeded the scan will be aborted with errorcode "\
"488."){
TC_PROPERTY("TupScan", 2);
INITIALIZER(createOrderedPkIndex);
INITIALIZER(runLoadTable);
STEPS(runRandScanRead, 50);

View file

@ -199,7 +199,7 @@ max-time: 500
cmd: testBasicAsynch
args: -n PkDeleteAsynch
max-time: 500
max-time: 1000
cmd: testBasic
args: -n MassiveRollback T1 T7 D1 D2
@ -219,6 +219,14 @@ max-time: 500
cmd: testBasic
args: -n TupError
max-time: 500
cmd: testBasic
args: -n InsertError T1
max-time: 500
cmd: testBasic
args: -n InsertError2 T1
max-time: 500
cmd: testTimeout
args: T1
@ -273,6 +281,10 @@ max-time: 500
cmd: testScan
args: -n ScanRead488O -l 10 T6 D1 D2
max-time: 1000
cmd: testScan
args: -n ScanRead488T -l 10 T6 D1 D2
max-time: 1000
cmd: testScan
args: -n ScanRead488_Mixed -l 10 T6 D1 D2

View file

@ -471,16 +471,33 @@ HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et,
return NDBT_OK;
}
int
HugoOperations::execute_async_prepare(Ndb* pNdb, NdbTransaction::ExecType et,
NdbTransaction::AbortOption eao){
m_async_reply= 0;
pTrans->executeAsynchPrepare(et,
HugoOperations_async_callback,
this,
eao);
return NDBT_OK;
}
int
HugoOperations::wait_async(Ndb* pNdb, int timeout)
{
pNdb->pollNdb(1000);
if(m_async_reply)
volatile int * wait = &m_async_reply;
while (!* wait)
{
if(m_async_return)
ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl;
return m_async_return;
pNdb->sendPollNdb(1000);
if(* wait)
{
if(m_async_return)
ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl;
return m_async_return;
}
}
ndbout_c("wait returned nothing...");
return -1;

View file

@ -1110,6 +1110,7 @@ static int opt_timer;
static char * opt_remote_mgm = NULL;
static char * opt_testname = NULL;
static int opt_verbose;
static int opt_seed = 0;
static struct my_option my_long_options[] =
{
@ -1129,6 +1130,9 @@ static struct my_option my_long_options[] =
{ "loops", 'l', "Number of loops",
(gptr*) &opt_loops, (gptr*) &opt_loops, 0,
GET_INT, REQUIRED_ARG, 5, 0, 0, 0, 0, 0 },
{ "seed", 1024, "Random seed",
(gptr*) &opt_seed, (gptr*) &opt_seed, 0,
GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "testname", 'n', "Name of test to run",
(gptr*) &opt_testname, (gptr*) &opt_testname, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
@ -1145,6 +1149,8 @@ static struct my_option my_long_options[] =
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
extern int global_flag_skip_invalidate_cache;
static void usage()
{
ndb_std_print_version();
@ -1224,6 +1230,16 @@ int NDBT_TestSuite::execute(int argc, const char** argv){
{
return NDBT_ProgramExit(NDBT_FAILED);
}
if (opt_seed == 0)
{
opt_seed = NdbTick_CurrentMillisecond();
}
ndbout_c("random seed: %u", opt_seed);
srand(opt_seed);
srandom(opt_seed);
global_flag_skip_invalidate_cache = 1;
{
Ndb ndb(&con, "TEST_DB");