ndb - bug#26450

partial backport from 6.2 + add fix of bug


storage/ndb/include/kernel/GlobalSignalNumbers.h:
  add prep_copy_frag
storage/ndb/include/kernel/signaldata/AccScan.hpp:
  add new argument specifying which page to scan to
storage/ndb/include/kernel/signaldata/CopyFrag.hpp:
  add new argument specifying which page to scan to
storage/ndb/include/ndb_version.h.in:
  add versioning checks for prep_copy_frag
storage/ndb/src/common/debugger/signaldata/SignalNames.cpp:
  add prep_copy_frag
storage/ndb/src/kernel/blocks/ERROR_codes.txt:
  new error codes
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp:
  add new to-step
storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp:
  add new to-step
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp:
  add new to-step
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  add new to-step
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp:
  add new to-step
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  add new to-step
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  add new argument specifying which page to scan to
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp:
  add utility to get max page used by fragment
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp:
  add NR scan to > frag.noOfPages
storage/ndb/test/ndbapi/testSystemRestart.cpp:
  add testcase
storage/ndb/test/run-test/daily-basic-tests.txt:
  add testcase
storage/ndb/test/src/NdbRestarts.cpp:
  add testcase
This commit is contained in:
unknown 2007-10-23 11:24:34 +02:00
parent 406e143304
commit c61548f18c
18 changed files with 452 additions and 22 deletions

View file

@ -195,9 +195,11 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
/* 132 not unused */
/* 133 not unused */
#define GSN_CM_HEARTBEAT 134 /* distr. */
/* 135 unused */
/* 136 unused */
/* 137 unused */
#define GSN_PREPARE_COPY_FRAG_REQ 135
#define GSN_PREPARE_COPY_FRAG_REF 136
#define GSN_PREPARE_COPY_FRAG_CONF 137
#define GSN_CM_NODEINFOCONF 138 /* distr. */
#define GSN_CM_NODEINFOREF 139 /* distr. */
#define GSN_CM_NODEINFOREQ 140 /* distr. */

View file

@ -49,6 +49,7 @@ private:
Uint32 savePointId;
Uint32 gci;
};
Uint32 maxPage;
/**
* Previously there where also a scan type

View file

@ -29,7 +29,7 @@ class CopyFragReq {
*/
friend class Dblqh;
public:
STATIC_CONST( SignalLength = 9 );
STATIC_CONST( SignalLength = 10 );
private:
Uint32 userPtr;
@ -42,6 +42,7 @@ private:
Uint32 gci;
Uint32 nodeCount;
Uint32 nodeList[1];
//Uint32 maxPage; is stored in nodeList[nodeCount]
};
class CopyFragConf {
@ -95,4 +96,42 @@ struct UpdateFragDistKeyOrd
STATIC_CONST( SignalLength = 3 );
};
struct PrepareCopyFragReq
{
STATIC_CONST( SignalLength = 6 );
Uint32 senderRef;
Uint32 senderData;
Uint32 tableId;
Uint32 fragId;
Uint32 copyNodeId;
Uint32 startingNodeId;
};
struct PrepareCopyFragRef
{
Uint32 senderRef;
Uint32 senderData;
Uint32 tableId;
Uint32 fragId;
Uint32 copyNodeId;
Uint32 startingNodeId;
Uint32 errorCode;
STATIC_CONST( SignalLength = 7 );
};
struct PrepareCopyFragConf
{
STATIC_CONST( SignalLength = 7 );
Uint32 senderRef;
Uint32 senderData;
Uint32 tableId;
Uint32 fragId;
Uint32 copyNodeId;
Uint32 startingNodeId;
Uint32 maxPageNo;
};
#endif

View file

@ -88,5 +88,52 @@ Uint32 ndbGetOwnVersion();
#define NDBD_NODE_VERSION_REP NDB_MAKE_VERSION(6,1,1)
#define NDBD_PREPARE_COPY_FRAG_VERSION NDB_MAKE_VERSION(6,2,1)
#define NDBD_PREPARE_COPY_FRAG_V2_51 NDB_MAKE_VERSION(5,1,23)
#define NDBD_PREPARE_COPY_FRAG_V2_62 NDB_MAKE_VERSION(6,2,8)
#define NDBD_PREPARE_COPY_FRAG_V2_63 NDB_MAKE_VERSION(6,3,6)
/**
* 0 = NO PREP COPY FRAG SUPPORT
* 1 = NO MAX PAGE SUPPORT
* 2 = LATEST VERSION
*/
static
inline
int
ndb_check_prep_copy_frag_version(Uint32 version)
{
if (version == NDB_VERSION_D)
return 2;
const Uint32 major = (version >> 16) & 0xFF;
const Uint32 minor = (version >> 8) & 0xFF;
if (major >= 6)
{
if (minor == 2)
{
if (version >= NDBD_PREPARE_COPY_FRAG_V2_62)
return 2;
if (version >= NDBD_PREPARE_COPY_FRAG_VERSION)
return 1;
return 0;
}
else if (minor == 3)
{
if (version >= NDBD_PREPARE_COPY_FRAG_V2_63)
return 2;
return 1;
}
return 2;
}
else if (major == 5 && minor == 1)
{
if (version >= NDBD_PREPARE_COPY_FRAG_V2_51)
return 2;
}
return 0;
}
#endif

View file

@ -640,5 +640,9 @@ const GsnName SignalNames [] = {
,{ GSN_ROUTE_ORD, "ROUTE_ORD" }
,{ GSN_NODE_VERSION_REP, "NODE_VERSION_REP" }
,{ GSN_PREPARE_COPY_FRAG_REQ, "PREPARE_COPY_FRAG_REQ" }
,{ GSN_PREPARE_COPY_FRAG_REF, "PREPARE_COPY_FRAG_REF" }
,{ GSN_PREPARE_COPY_FRAG_CONF, "PREPARE_COPY_FRAG_CONF" }
};
const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);

View file

@ -3,7 +3,7 @@ Next NDBCNTR 1002
Next NDBFS 2000
Next DBACC 3002
Next DBTUP 4029
Next DBLQH 5045
Next DBLQH 5047
Next DBDICT 6008
Next DBDIH 7193
Next DBTC 8054
@ -186,6 +186,8 @@ handling in DBTC to ensure that node failures are also well handled in
time-out handling. They can also be used to test multiple node failure
handling.
5045: Crash in PREPARE_COPY_FRAG_REQ
5046: Crash if LQHKEYREQ (NrCopy) comes when frag-state is incorrect
ERROR CODES FOR TESTING TIME-OUT HANDLING IN DBLQH
-------------------------------------------------

View file

@ -545,7 +545,8 @@ public:
TO_WAIT_ENDING = 21,
ENDING = 22,
STARTING_LOCAL_FRAGMENTS = 24
STARTING_LOCAL_FRAGMENTS = 24,
PREPARE_COPY = 25
};
enum ToSlaveStatus {
TO_SLAVE_IDLE = 0,
@ -556,6 +557,7 @@ public:
TO_SLAVE_COPY_COMPLETED = 5
};
Uint32 startGci;
Uint32 maxPage;
Uint32 toCopyNode;
Uint32 toCurrentFragid;
Uint32 toCurrentReplica;
@ -672,6 +674,8 @@ private:
void execNODE_FAILREP(Signal *);
void execCOPY_FRAGCONF(Signal *);
void execCOPY_FRAGREF(Signal *);
void execPREPARE_COPY_FRAG_REF(Signal*);
void execPREPARE_COPY_FRAG_CONF(Signal*);
void execDIADDTABREQ(Signal *);
void execDIGETNODESREQ(Signal *);
void execDIRELEASEREQ(Signal *);
@ -1114,6 +1118,7 @@ private:
void sendStartTo(Signal *, Uint32 takeOverPtr);
void startNextCopyFragment(Signal *, Uint32 takeOverPtr);
void toCopyFragLab(Signal *, Uint32 takeOverPtr);
void toStartCopyFrag(Signal *, TakeOverRecordPtr);
void startHsAddFragConfLab(Signal *);
void prepareSendCreateFragReq(Signal *, Uint32 takeOverPtr);
void sendUpdateTo(Signal *, Uint32 takeOverPtr, Uint32 updateState);

View file

@ -259,6 +259,11 @@ Dbdih::Dbdih(Block_context& ctx):
addRecSignal(GSN_START_FRAGREF,
&Dbdih::execSTART_FRAGREF);
addRecSignal(GSN_PREPARE_COPY_FRAG_REF,
&Dbdih::execPREPARE_COPY_FRAG_REF);
addRecSignal(GSN_PREPARE_COPY_FRAG_CONF,
&Dbdih::execPREPARE_COPY_FRAG_CONF);
apiConnectRecord = 0;
connectRecord = 0;

View file

@ -3155,6 +3155,94 @@ void Dbdih::toCopyFragLab(Signal* signal,
TakeOverRecordPtr takeOverPtr;
RETURN_IF_TAKE_OVER_INTERRUPTED(takeOverPtrI, takeOverPtr);
/**
* Inform starting node that TakeOver is about to start
*/
Uint32 nodeId = takeOverPtr.p->toStartingNode;
Uint32 version = getNodeInfo(nodeId).m_version;
if (ndb_check_prep_copy_frag_version(version))
{
jam();
TabRecordPtr tabPtr;
tabPtr.i = takeOverPtr.p->toCurrentTabref;
ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
FragmentstorePtr fragPtr;
getFragstore(tabPtr.p, takeOverPtr.p->toCurrentFragid, fragPtr);
Uint32 nodes[MAX_REPLICAS];
extractNodeInfo(fragPtr.p, nodes);
PrepareCopyFragReq* req= (PrepareCopyFragReq*)signal->getDataPtrSend();
req->senderRef = reference();
req->senderData = takeOverPtrI;
req->tableId = takeOverPtr.p->toCurrentTabref;
req->fragId = takeOverPtr.p->toCurrentFragid;
req->copyNodeId = nodes[0]; // Src
req->startingNodeId = takeOverPtr.p->toStartingNode; // Dst
Uint32 ref = calcLqhBlockRef(takeOverPtr.p->toStartingNode);
sendSignal(ref, GSN_PREPARE_COPY_FRAG_REQ, signal,
PrepareCopyFragReq::SignalLength, JBB);
takeOverPtr.p->toMasterStatus = TakeOverRecord::PREPARE_COPY;
return;
}
takeOverPtr.p->maxPage = RNIL;
toStartCopyFrag(signal, takeOverPtr);
}
void
Dbdih::execPREPARE_COPY_FRAG_REF(Signal* signal)
{
jamEntry();
PrepareCopyFragRef ref = *(PrepareCopyFragRef*)signal->getDataPtr();
TakeOverRecordPtr takeOverPtr;
RETURN_IF_TAKE_OVER_INTERRUPTED(ref.senderData, takeOverPtr);
ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::PREPARE_COPY);
/**
* Treat this as copy frag ref
*/
CopyFragRef * cfref = (CopyFragRef*)signal->getDataPtrSend();
cfref->userPtr = ref.senderData;
cfref->startingNodeId = ref.startingNodeId;
cfref->errorCode = ref.errorCode;
cfref->tableId = ref.tableId;
cfref->fragId = ref.fragId;
cfref->sendingNodeId = ref.copyNodeId;
takeOverPtr.p->toMasterStatus = TakeOverRecord::COPY_FRAG;
execCOPY_FRAGREF(signal);
}
void
Dbdih::execPREPARE_COPY_FRAG_CONF(Signal* signal)
{
PrepareCopyFragConf conf = *(PrepareCopyFragConf*)signal->getDataPtr();
TakeOverRecordPtr takeOverPtr;
RETURN_IF_TAKE_OVER_INTERRUPTED(conf.senderData, takeOverPtr);
Uint32 version = getNodeInfo(refToNode(conf.senderRef)).m_version;
if (ndb_check_prep_copy_frag_version(version) >= 2)
{
jam();
takeOverPtr.p->maxPage = conf.maxPageNo;
}
else
{
jam();
takeOverPtr.p->maxPage = RNIL;
}
toStartCopyFrag(signal, takeOverPtr);
}
void
Dbdih::toStartCopyFrag(Signal* signal, TakeOverRecordPtr takeOverPtr)
{
CreateReplicaRecordPtr createReplicaPtr;
createReplicaPtr.i = 0;
ptrAss(createReplicaPtr, createReplicaRecord);
@ -3178,8 +3266,8 @@ void Dbdih::toCopyFragLab(Signal* signal,
createReplicaPtr.p->hotSpareUse = true;
createReplicaPtr.p->dataNodeId = takeOverPtr.p->toStartingNode;
prepareSendCreateFragReq(signal, takeOverPtrI);
}//Dbdih::toCopyFragLab()
prepareSendCreateFragReq(signal, takeOverPtr.i);
}//Dbdih::toStartCopy()
void Dbdih::prepareSendCreateFragReq(Signal* signal, Uint32 takeOverPtrI)
{
@ -3412,10 +3500,12 @@ void Dbdih::execCREATE_FRAGCONF(Signal* signal)
copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
copyFragReq->distributionKey = fragPtr.p->distributionKey;
copyFragReq->gci = gci;
copyFragReq->nodeCount = extractNodeInfo(fragPtr.p,
copyFragReq->nodeList);
Uint32 len = copyFragReq->nodeCount =
extractNodeInfo(fragPtr.p,
copyFragReq->nodeList);
copyFragReq->nodeList[len] = takeOverPtr.p->maxPage;
sendSignal(ref, GSN_COPY_FRAGREQ, signal,
CopyFragReq::SignalLength + copyFragReq->nodeCount, JBB);
CopyFragReq::SignalLength + len, JBB);
} else {
ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE);
jam();
@ -4576,13 +4666,22 @@ void Dbdih::checkTakeOverInMasterStartNodeFailure(Signal* signal,
ok = true;
jam();
//-----------------------------------------------------------------------
// The starting node will discover the problem. We will receive either
// The copying node will discover the problem. We will receive either
// COPY_FRAGREQ or COPY_FRAGCONF and then we can release the take over
// record and end the process. If the copying node should also die then
// we will try to send prepare create fragment and will then discover
// that the starting node has failed.
//-----------------------------------------------------------------------
break;
case TakeOverRecord::PREPARE_COPY:
ok = true;
jam();
/**
* We're waiting for the starting node...which just died...
* endTakeOver
*/
endTakeOver(takeOverPtr.i);
break;
case TakeOverRecord::COPY_ACTIVE:
ok = true;
jam();

View file

@ -2144,6 +2144,7 @@ private:
void execSTORED_PROCCONF(Signal* signal);
void execSTORED_PROCREF(Signal* signal);
void execCOPY_FRAGREQ(Signal* signal);
void execPREPARE_COPY_FRAG_REQ(Signal* signal);
void execUPDATE_FRAG_DIST_KEY_ORD(Signal*);
void execCOPY_ACTIVEREQ(Signal* signal);
void execCOPY_STATEREQ(Signal* signal);

View file

@ -310,6 +310,9 @@ Dblqh::Dblqh(Block_context& ctx):
addRecSignal(GSN_UPDATE_FRAG_DIST_KEY_ORD,
&Dblqh::execUPDATE_FRAG_DIST_KEY_ORD);
addRecSignal(GSN_PREPARE_COPY_FRAG_REQ,
&Dblqh::execPREPARE_COPY_FRAG_REQ);
initData();
#ifdef VM_TRACE

View file

@ -3670,6 +3670,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal)
{
ndbout_c("fragptr.p->fragStatus: %d",
fragptr.p->fragStatus);
CRASH_INSERTION(5046);
}
ndbassert(fragptr.p->fragStatus == Fragrecord::ACTIVE_CREATION);
fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY;
@ -10083,6 +10084,86 @@ Dblqh::calculateHash(Uint32 tableId, const Uint32* src)
return md5_hash(Tmp, keyLen);
}//Dblqh::calculateHash()
/**
* PREPARE COPY FRAG REQ
*/
void
Dblqh::execPREPARE_COPY_FRAG_REQ(Signal* signal)
{
jamEntry();
PrepareCopyFragReq req = *(PrepareCopyFragReq*)signal->getDataPtr();
CRASH_INSERTION(5045);
tabptr.i = req.tableId;
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
Uint32 max_page = RNIL;
if (getOwnNodeId() != req.startingNodeId)
{
jam();
/**
* This is currently dead code...
* but is provided so we can impl. a better scan+delete on
* starting node wo/ having to change running node
*/
ndbrequire(getOwnNodeId() == req.copyNodeId);
c_tup->get_frag_info(req.tableId, req.fragId, &max_page);
PrepareCopyFragConf* conf = (PrepareCopyFragConf*)signal->getDataPtrSend();
conf->senderData = req.senderData;
conf->senderRef = reference();
conf->tableId = req.tableId;
conf->fragId = req.fragId;
conf->copyNodeId = req.copyNodeId;
conf->startingNodeId = req.startingNodeId;
conf->maxPageNo = max_page;
sendSignal(req.senderRef, GSN_PREPARE_COPY_FRAG_CONF,
signal, PrepareCopyFragConf::SignalLength, JBB);
return;
}
if (! DictTabInfo::isOrderedIndex(tabptr.p->tableType))
{
jam();
ndbrequire(getFragmentrec(signal, req.fragId));
/**
*
*/
if (cstartType == NodeState::ST_SYSTEM_RESTART)
{
jam();
signal->theData[0] = fragptr.p->tabRef;
signal->theData[1] = fragptr.p->fragId;
sendSignal(DBACC_REF, GSN_EXPANDCHECK2, signal, 2, JBB);
}
/**
*
*/
fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED;
fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
fragptr.p->logFlag = Fragrecord::STATE_FALSE;
c_tup->get_frag_info(req.tableId, req.fragId, &max_page);
}
PrepareCopyFragConf* conf = (PrepareCopyFragConf*)signal->getDataPtrSend();
conf->senderData = req.senderData;
conf->senderRef = reference();
conf->tableId = req.tableId;
conf->fragId = req.fragId;
conf->copyNodeId = req.copyNodeId;
conf->startingNodeId = req.startingNodeId;
conf->maxPageNo = max_page;
sendSignal(req.senderRef, GSN_PREPARE_COPY_FRAG_CONF,
signal, PrepareCopyFragConf::SignalLength, JBB);
}
/* *************************************** */
/* COPY_FRAGREQ: Start copying a fragment */
/* *************************************** */
@ -10118,6 +10199,13 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
for (i = 0; i<nodeCount; i++)
nodemask.set(copyFragReq->nodeList[i]);
}
Uint32 maxPage = copyFragReq->nodeList[nodeCount];
Uint32 version = getNodeInfo(refToNode(userRef)).m_version;
if (ndb_check_prep_copy_frag_version(version) < 2)
{
jam();
maxPage = RNIL;
}
if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
jam();
@ -10193,14 +10281,15 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
req->requestInfo = 0;
AccScanReq::setLockMode(req->requestInfo, 0);
AccScanReq::setReadCommittedFlag(req->requestInfo, 0);
AccScanReq::setNRScanFlag(req->requestInfo, gci ? 1 : 0);
AccScanReq::setNRScanFlag(req->requestInfo, 1);
AccScanReq::setNoDiskScanFlag(req->requestInfo, 1);
req->transId1 = tcConnectptr.p->transid[0];
req->transId2 = tcConnectptr.p->transid[1];
req->savePointId = tcConnectptr.p->savePointId;
req->maxPage = maxPage;
sendSignal(scanptr.p->scanBlockref, GSN_ACC_SCANREQ, signal,
AccScanReq::SignalLength, JBB);
AccScanReq::SignalLength + 1, JBB);
if (! nodemask.isclear())
{

View file

@ -518,6 +518,7 @@ typedef Ptr<Fragoperrec> FragoperrecPtr;
Uint32 m_savePointId;
Uint32 m_scanGCI;
};
Uint32 m_endPage;
// lock waited for or obtained and not yet passed to LQH
Uint32 m_accLockOp;
@ -1576,6 +1577,8 @@ public:
void nr_delete_page_callback(Signal*, Uint32 op, Uint32 page);
void nr_delete_log_buffer_callback(Signal*, Uint32 op, Uint32 page);
bool get_frag_info(Uint32 tableId, Uint32 fragId, Uint32* maxPage);
private:
BLOCK_DEFINES(Dbtup);

View file

@ -1464,3 +1464,22 @@ Dbtup::complete_restore_lcp(Uint32 tableId, Uint32 fragId)
tabDesc += 2;
}
}
bool
Dbtup::get_frag_info(Uint32 tableId, Uint32 fragId, Uint32* maxPage)
{
jamEntry();
TablerecPtr tabPtr;
tabPtr.i= tableId;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
FragrecordPtr fragPtr;
getFragmentrec(fragPtr, fragId, tabPtr.p);
if (maxPage)
{
* maxPage = fragPtr.p->noOfPages;
}
return true;
}

View file

@ -95,7 +95,23 @@ Dbtup::execACC_SCANREQ(Signal* signal)
}
}
bits |= AccScanReq::getNRScanFlag(req->requestInfo) ? ScanOp::SCAN_NR : 0;
if (AccScanReq::getNRScanFlag(req->requestInfo))
{
jam();
bits |= ScanOp::SCAN_NR;
scanPtr.p->m_endPage = req->maxPage;
if (req->maxPage != RNIL && req->maxPage > frag.noOfPages)
{
ndbout_c("%u %u endPage: %u (noOfPages: %u)",
tablePtr.i, fragId,
req->maxPage, fragPtr.p->noOfPages);
}
}
else
{
jam();
scanPtr.p->m_endPage = RNIL;
}
// set up scan op
new (scanPtr.p) ScanOp();
@ -540,7 +556,7 @@ Dbtup::scanFirst(Signal*, ScanOpPtr scanPtr)
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
Fragrecord& frag = *fragPtr.p;
// in the future should not pre-allocate pages
if (frag.noOfPages == 0) {
if (frag.noOfPages == 0 && ((bits & ScanOp::SCAN_NR) == 0)) {
jam();
scan.m_state = ScanOp::Last;
return;
@ -632,11 +648,23 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
key.m_page_no++;
if (key.m_page_no >= frag.noOfPages) {
jam();
if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL))
{
jam();
if (key.m_page_no < scan.m_endPage)
{
jam();
ndbout_c("scanning page %u", key.m_page_no);
goto cont;
}
}
// no more pages, scan ends
pos.m_get = ScanPos::Get_undef;
scan.m_state = ScanOp::Last;
return true;
}
cont:
key.m_page_idx = 0;
pos.m_get = ScanPos::Get_page_mm;
// clear cached value
@ -649,7 +677,13 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
{
if (pos.m_realpid_mm == RNIL) {
jam();
pos.m_realpid_mm = getRealpid(fragPtr.p, key.m_page_no);
if (key.m_page_no < frag.noOfPages)
pos.m_realpid_mm = getRealpid(fragPtr.p, key.m_page_no);
else
{
ndbassert(bits & ScanOp::SCAN_NR);
goto nopage;
}
}
PagePtr pagePtr;
c_page_pool.getPtr(pagePtr, pos.m_realpid_mm);
@ -657,9 +691,18 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (pagePtr.p->page_state == ZEMPTY_MM) {
// skip empty page
jam();
pos.m_get = ScanPos::Get_next_page_mm;
break; // incr loop count
if (! (bits & ScanOp::SCAN_NR))
{
pos.m_get = ScanPos::Get_next_page_mm;
break; // incr loop count
}
else
{
jam();
pos.m_realpid_mm = RNIL;
}
}
nopage:
pos.m_page = pagePtr.p;
pos.m_get = ScanPos::Get_tuple;
}
@ -820,11 +863,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
{
pos.m_get = ScanPos::Get_next_tuple_fs;
th = (Tuple_header*)&page->m_data[key.m_page_idx];
thbits = th->m_header_bits;
if (likely(! (bits & ScanOp::SCAN_NR)))
{
jam();
thbits = th->m_header_bits;
if (! (thbits & Tuple_header::FREE))
{
goto found_tuple;
@ -832,7 +875,15 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
}
else
{
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI)
if (pos.m_realpid_mm == RNIL)
{
jam();
foundGCI = 0;
goto found_deleted_rowid;
}
thbits = th->m_header_bits;
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
foundGCI == 0)
{
if (! (thbits & Tuple_header::FREE))
{
@ -904,7 +955,8 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
Fix_page *mmpage = (Fix_page*)c_page_pool.getPtr(pos.m_realpid_mm);
th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI)
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
foundGCI == 0)
{
if (! (thbits & Tuple_header::FREE))
break;

View file

@ -1501,6 +1501,54 @@ int runSR_DD_2(NDBT_Context* ctx, NDBT_Step* step)
return result;
}
int
runBug27434(NDBT_Context* ctx, NDBT_Step* step)
{
int result = NDBT_OK;
NdbRestarter restarter;
Ndb* pNdb = GETNDB(step);
const Uint32 nodeCount = restarter.getNumDbNodes();
if (nodeCount < 2)
return NDBT_OK;
int args[] = { DumpStateOrd::DihMaxTimeBetweenLCP };
int dump[] = { DumpStateOrd::DihStartLcpImmediately };
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_CHECKPOINT, 0 };
NdbLogEventHandle handle =
ndb_mgm_create_logevent_handle(restarter.handle, filter);
struct ndb_logevent event;
do {
int node1 = restarter.getDbNodeId(rand() % nodeCount);
CHECK(restarter.restartOneDbNode(node1, false, true, true) == 0);
NdbSleep_SecSleep(3);
CHECK(restarter.waitNodesNoStart(&node1, 1) == 0);
CHECK(restarter.dumpStateAllNodes(args, 1) == 0);
for (Uint32 i = 0; i<3; i++)
{
CHECK(restarter.dumpStateAllNodes(dump, 1) == 0);
while(ndb_logevent_get_next(handle, &event, 0) >= 0 &&
event.type != NDB_LE_LocalCheckpointStarted);
while(ndb_logevent_get_next(handle, &event, 0) >= 0 &&
event.type != NDB_LE_LocalCheckpointCompleted);
}
restarter.restartAll(false, true, true);
NdbSleep_SecSleep(3);
CHECK(restarter.waitClusterNoStart() == 0);
restarter.insertErrorInNode(node1, 5046);
restarter.startAll();
CHECK(restarter.waitClusterStarted() == 0);
} while(false);
return result;
}
NDBT_TESTSUITE(testSystemRestart);
TESTCASE("SR1",
"Basic system restart test. Focus on testing restart from REDO log.\n"
@ -1681,6 +1729,12 @@ TESTCASE("Bug24664",
STEP(runBug24664);
FINALIZER(runClearTable);
}
TESTCASE("Bug27434",
"")
{
INITIALIZER(runWaitStarted);
STEP(runBug27434);
}
TESTCASE("SR_DD_1", "")
{
TC_PROPERTY("ALL", 1);

View file

@ -880,6 +880,10 @@ max-time: 1000
cmd: testNodeRestart
args: -n Bug27466 T1
max-time: 1500
cmd: testSystemRestart
args: -n Bug27434 T1
max-time: 1000
cmd: test_event
args: -l 10 -n Bug27169 T1

View file

@ -607,6 +607,7 @@ NFDuringNR_codes[] = {
5026,
7139,
7132,
5045,
//LCP
8000,