This commit is contained in:
pekka@mysql.com 2005-04-23 17:55:21 +02:00
commit 850211eb73
6 changed files with 234 additions and 77 deletions

View file

@ -1779,6 +1779,10 @@ private:
Operationrec* const regOperPtr, Operationrec* const regOperPtr,
Tablerec* const regTabPtr); Tablerec* const regTabPtr);
int addTuxEntries(Signal* signal,
Operationrec* regOperPtr,
Tablerec* regTabPtr);
// these crash the node on error // these crash the node on error
void executeTuxCommitTriggers(Signal* signal, void executeTuxCommitTriggers(Signal* signal,
@ -1789,6 +1793,10 @@ private:
Operationrec* regOperPtr, Operationrec* regOperPtr,
Tablerec* const regTabPtr); Tablerec* const regTabPtr);
void removeTuxEntries(Signal* signal,
Operationrec* regOperPtr,
Tablerec* regTabPtr);
// ***************************************************************** // *****************************************************************
// Error Handling routines. // Error Handling routines.
// ***************************************************************** // *****************************************************************

View file

@ -973,25 +973,7 @@ Dbtup::executeTuxInsertTriggers(Signal* signal,
req->pageOffset = regOperPtr->pageOffset; req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpAdd; req->opInfo = TuxMaintReq::OpAdd;
// loop over index list return addTuxEntries(signal, regOperPtr, regTabPtr);
const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers;
TriggerPtr triggerPtr;
triggerList.first(triggerPtr);
while (triggerPtr.i != RNIL) {
ljam();
req->indexId = triggerPtr.p->indexId;
req->errorCode = RNIL;
EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ,
signal, TuxMaintReq::SignalLength);
ljamEntry();
if (req->errorCode != 0) {
ljam();
terrorCode = req->errorCode;
return -1;
}
triggerList.next(triggerPtr);
}
return 0;
} }
int int
@ -1012,9 +994,18 @@ Dbtup::executeTuxUpdateTriggers(Signal* signal,
req->pageOffset = regOperPtr->pageOffset; req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpAdd; req->opInfo = TuxMaintReq::OpAdd;
// loop over index list return addTuxEntries(signal, regOperPtr, regTabPtr);
}
int
Dbtup::addTuxEntries(Signal* signal,
Operationrec* regOperPtr,
Tablerec* regTabPtr)
{
TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers; const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers;
TriggerPtr triggerPtr; TriggerPtr triggerPtr;
Uint32 failPtrI;
triggerList.first(triggerPtr); triggerList.first(triggerPtr);
while (triggerPtr.i != RNIL) { while (triggerPtr.i != RNIL) {
ljam(); ljam();
@ -1026,11 +1017,29 @@ Dbtup::executeTuxUpdateTriggers(Signal* signal,
if (req->errorCode != 0) { if (req->errorCode != 0) {
ljam(); ljam();
terrorCode = req->errorCode; terrorCode = req->errorCode;
return -1; failPtrI = triggerPtr.i;
goto fail;
} }
triggerList.next(triggerPtr); triggerList.next(triggerPtr);
} }
return 0; return 0;
fail:
req->opInfo = TuxMaintReq::OpRemove;
triggerList.first(triggerPtr);
while (triggerPtr.i != failPtrI) {
ljam();
req->indexId = triggerPtr.p->indexId;
req->errorCode = RNIL;
EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ,
signal, TuxMaintReq::SignalLength);
ljamEntry();
ndbrequire(req->errorCode == 0);
triggerList.next(triggerPtr);
}
#ifdef VM_TRACE
ndbout << "aborted partial tux update: op " << hex << regOperPtr << endl;
#endif
return -1;
} }
int int
@ -1049,7 +1058,6 @@ Dbtup::executeTuxCommitTriggers(Signal* signal,
{ {
TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend(); TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
// get version // get version
// XXX could add prevTupVersion to Operationrec
Uint32 tupVersion; Uint32 tupVersion;
if (regOperPtr->optype == ZINSERT) { if (regOperPtr->optype == ZINSERT) {
if (! regOperPtr->deleteInsertFlag) if (! regOperPtr->deleteInsertFlag)
@ -1087,21 +1095,7 @@ Dbtup::executeTuxCommitTriggers(Signal* signal,
req->pageOffset = regOperPtr->pageOffset; req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpRemove; req->opInfo = TuxMaintReq::OpRemove;
// loop over index list removeTuxEntries(signal, regOperPtr, regTabPtr);
const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers;
TriggerPtr triggerPtr;
triggerList.first(triggerPtr);
while (triggerPtr.i != RNIL) {
ljam();
req->indexId = triggerPtr.p->indexId;
req->errorCode = RNIL;
EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ,
signal, TuxMaintReq::SignalLength);
ljamEntry();
// commit must succeed
ndbrequire(req->errorCode == 0);
triggerList.next(triggerPtr);
}
} }
void void
@ -1132,7 +1126,15 @@ Dbtup::executeTuxAbortTriggers(Signal* signal,
req->pageOffset = regOperPtr->pageOffset; req->pageOffset = regOperPtr->pageOffset;
req->tupVersion = tupVersion; req->tupVersion = tupVersion;
req->opInfo = TuxMaintReq::OpRemove; req->opInfo = TuxMaintReq::OpRemove;
// loop over index list removeTuxEntries(signal, regOperPtr, regTabPtr);
}
void
Dbtup::removeTuxEntries(Signal* signal,
Operationrec* regOperPtr,
Tablerec* regTabPtr)
{
TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers; const ArrayList<TupTriggerData>& triggerList = regTabPtr->tuxCustomTriggers;
TriggerPtr triggerPtr; TriggerPtr triggerPtr;
triggerList.first(triggerPtr); triggerList.first(triggerPtr);
@ -1143,7 +1145,7 @@ Dbtup::executeTuxAbortTriggers(Signal* signal,
EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ, EXECUTE_DIRECT(DBTUX, GSN_TUX_MAINT_REQ,
signal, TuxMaintReq::SignalLength); signal, TuxMaintReq::SignalLength);
ljamEntry(); ljamEntry();
// abort must succeed // must succeed
ndbrequire(req->errorCode == 0); ndbrequire(req->errorCode == 0);
triggerList.next(triggerPtr); triggerList.next(triggerPtr);
} }

View file

@ -135,6 +135,24 @@ abort DELETE none -
1) alternatively, store prevTupVersion in operation record. 1) alternatively, store prevTupVersion in operation record.
Abort from ordered index error
------------------------------
Obviously, index update failure causes operation failure.
The operation is then aborted later by TC.
The problem here is with multiple indexes. Some may have been
updated successfully before the one that failed. Therefore
the trigger code aborts the successful ones already in
the prepare phase.
In other words, multiple indexes are treated as one.
Abort from any cause
--------------------
[ hairy stuff ]
Read attributes, query status Read attributes, query status
----------------------------- -----------------------------
@ -170,14 +188,11 @@ used to decide if the scan can see the tuple.
This signal may also be called during any phase since commit/abort This signal may also be called during any phase since commit/abort
of all operations is not done in one time-slice. of all operations is not done in one time-slice.
Commit and abort
----------------
[ hairy stuff ]
Problems Problems
-------- --------
Current abort code can destroy a tuple version too early. This Current abort code can destroy a tuple version too early. This
happens in test case "ticuur" (insert-commit-update-update-rollback), happens in test case "ticuur" (insert-commit-update-update-rollback),
if abort of first update arrives before abort of second update. if abort of first update arrives before abort of second update.
vim: set textwidth=68:

View file

@ -23,6 +23,11 @@
int int
Dbtux::allocNode(Signal* signal, NodeHandle& node) Dbtux::allocNode(Signal* signal, NodeHandle& node)
{ {
if (ERROR_INSERTED(12007)) {
jam();
CLEAR_ERROR_INSERT_VALUE;
return TuxMaintReq::NoMemError;
}
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
Uint32 pageId = NullTupLoc.getPageId(); Uint32 pageId = NullTupLoc.getPageId();
Uint32 pageOffset = NullTupLoc.getPageOffset(); Uint32 pageOffset = NullTupLoc.getPageOffset();
@ -34,6 +39,12 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
node.m_loc = TupLoc(pageId, pageOffset); node.m_loc = TupLoc(pageId, pageOffset);
node.m_node = reinterpret_cast<TreeNode*>(node32); node.m_node = reinterpret_cast<TreeNode*>(node32);
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0); ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
} else {
switch (errorCode) {
case 827:
errorCode = TuxMaintReq::NoMemError;
break;
}
} }
return errorCode; return errorCode;
} }

View file

@ -179,10 +179,11 @@ ErrorBundle ErrorCodes[] = {
*/ */
{ 623, IS, "623" }, { 623, IS, "623" },
{ 624, IS, "624" }, { 624, IS, "624" },
{ 625, IS, "Out of memory in Ndb Kernel, index part (increase IndexMemory)" }, { 625, IS, "Out of memory in Ndb Kernel, hash index part (increase IndexMemory)" },
{ 640, IS, "Too many hash indexes (should not happen)" }, { 640, IS, "Too many hash indexes (should not happen)" },
{ 826, IS, "Too many tables and attributes (increase MaxNoOfAttributes or MaxNoOfTables)" }, { 826, IS, "Too many tables and attributes (increase MaxNoOfAttributes or MaxNoOfTables)" },
{ 827, IS, "Out of memory in Ndb Kernel, data part (increase DataMemory)" }, { 827, IS, "Out of memory in Ndb Kernel, table data (increase DataMemory)" },
{ 902, IS, "Out of memory in Ndb Kernel, ordered index data (increase DataMemory)" },
{ 902, IS, "Out of memory in Ndb Kernel, data part (increase DataMemory)" }, { 902, IS, "Out of memory in Ndb Kernel, data part (increase DataMemory)" },
{ 903, IS, "Too many ordered indexes (increase MaxNoOfOrderedIndexes)" }, { 903, IS, "Too many ordered indexes (increase MaxNoOfOrderedIndexes)" },
{ 904, IS, "Out of fragment records (increase MaxNoOfOrderedIndexes)" }, { 904, IS, "Out of fragment records (increase MaxNoOfOrderedIndexes)" },

View file

@ -259,6 +259,8 @@ struct Par : public Opt {
bool m_verify; bool m_verify;
// deadlock possible // deadlock possible
bool m_deadlock; bool m_deadlock;
// abort percentabge
unsigned m_abortpct;
NdbOperation::LockMode m_lockmode; NdbOperation::LockMode m_lockmode;
// ordered range scan // ordered range scan
bool m_ordered; bool m_ordered;
@ -281,6 +283,7 @@ struct Par : public Opt {
m_randomkey(false), m_randomkey(false),
m_verify(false), m_verify(false),
m_deadlock(false), m_deadlock(false),
m_abortpct(0),
m_lockmode(NdbOperation::LM_Read), m_lockmode(NdbOperation::LM_Read),
m_ordered(false), m_ordered(false),
m_descending(false) { m_descending(false) {
@ -1143,7 +1146,7 @@ struct Con {
NdbScanFilter* m_scanfilter; NdbScanFilter* m_scanfilter;
enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive }; enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
ScanMode m_scanmode; ScanMode m_scanmode;
enum ErrType { ErrNone = 0, ErrDeadlock, ErrOther }; enum ErrType { ErrNone = 0, ErrDeadlock, ErrNospace, ErrOther };
ErrType m_errtype; ErrType m_errtype;
Con() : Con() :
m_ndb(0), m_dic(0), m_tx(0), m_op(0), m_indexop(0), m_ndb(0), m_dic(0), m_tx(0), m_op(0), m_indexop(0),
@ -1172,7 +1175,7 @@ struct Con {
int endFilter(); int endFilter();
int setFilter(int num, int cond, const void* value, unsigned len); int setFilter(int num, int cond, const void* value, unsigned len);
int execute(ExecType t); int execute(ExecType t);
int execute(ExecType t, bool& deadlock); int execute(ExecType t, bool& deadlock, bool& nospace);
int readTuples(Par par); int readTuples(Par par);
int readIndexTuples(Par par); int readIndexTuples(Par par);
int executeScan(); int executeScan();
@ -1354,17 +1357,21 @@ Con::execute(ExecType t)
} }
int int
Con::execute(ExecType t, bool& deadlock) Con::execute(ExecType t, bool& deadlock, bool& nospace)
{ {
int ret = execute(t); int ret = execute(t);
if (ret != 0) { if (ret != 0 && deadlock && m_errtype == ErrDeadlock) {
if (deadlock && m_errtype == ErrDeadlock) { LL3("caught deadlock");
LL3("caught deadlock"); ret = 0;
ret = 0;
}
} else { } else {
deadlock = false; deadlock = false;
} }
if (ret != 0 && nospace && m_errtype == ErrNospace) {
LL3("caught nospace");
ret = 0;
} else {
nospace = false;
}
CHK(ret == 0); CHK(ret == 0);
return 0; return 0;
} }
@ -1475,6 +1482,8 @@ Con::printerror(NdbOut& out)
// 631 is new, occurs only on 4 db nodes, needs to be checked out // 631 is new, occurs only on 4 db nodes, needs to be checked out
if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631) if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631)
m_errtype = ErrDeadlock; m_errtype = ErrDeadlock;
if (code == 826 || code == 827 || code == 902)
m_errtype = ErrNospace;
} }
if (m_op && m_op->getNdbError().code != 0) { if (m_op && m_op->getNdbError().code != 0) {
LL0(++any << " op : error " << m_op->getNdbError()); LL0(++any << " op : error " << m_op->getNdbError());
@ -1644,6 +1653,36 @@ createindex(Par par)
// data sets // data sets
static unsigned
urandom(unsigned n)
{
if (n == 0)
return 0;
unsigned i = random() % n;
return i;
}
static int
irandom(unsigned n)
{
if (n == 0)
return 0;
int i = random() % n;
if (random() & 0x1)
i = -i;
return i;
}
static bool
randompct(unsigned pct)
{
if (pct == 0)
return false;
if (pct >= 100)
return true;
return urandom(100) < pct;
}
// Val - typed column value // Val - typed column value
struct Val { struct Val {
@ -2480,8 +2519,8 @@ struct Set {
void dbsave(unsigned i); void dbsave(unsigned i);
void calc(Par par, unsigned i, unsigned mask = 0); void calc(Par par, unsigned i, unsigned mask = 0);
bool pending(unsigned i, unsigned mask) const; bool pending(unsigned i, unsigned mask) const;
void notpending(unsigned i); void notpending(unsigned i, ExecType et = Commit);
void notpending(const Lst& lst); void notpending(const Lst& lst, ExecType et = Commit);
void dbdiscard(unsigned i); void dbdiscard(unsigned i);
void dbdiscard(const Lst& lst); void dbdiscard(const Lst& lst);
const Row& dbrow(unsigned i) const; const Row& dbrow(unsigned i) const;
@ -2831,7 +2870,33 @@ Set::putval(unsigned i, bool force, unsigned n)
return 0; return 0;
} }
// verify void
Set::notpending(unsigned i, ExecType et)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
if (et == Commit) {
if (row.m_pending == Row::InsOp)
row.m_exist = true;
if (row.m_pending == Row::DelOp)
row.m_exist = false;
} else {
if (row.m_pending == Row::InsOp)
row.m_exist = false;
if (row.m_pending == Row::DelOp)
row.m_exist = true;
}
row.m_pending = Row::NoOp;
}
void
Set::notpending(const Lst& lst, ExecType et)
{
for (unsigned j = 0; j < lst.m_cnt; j++) {
unsigned i = lst.m_arr[j];
notpending(i, et);
}
}
int int
Set::verify(Par par, const Set& set2) const Set::verify(Par par, const Set& set2) const
@ -3213,14 +3278,20 @@ pkinsert(Par par)
lst.push(i); lst.push(i);
if (lst.cnt() == par.m_batch) { if (lst.cnt() == par.m_batch) {
bool deadlock = par.m_deadlock; bool deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); bool nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
con.closeTransaction(); con.closeTransaction();
if (deadlock) { if (deadlock) {
LL1("pkinsert: stop on deadlock [at 1]"); LL1("pkinsert: stop on deadlock [at 1]");
return 0; return 0;
} }
if (nospace) {
LL1("pkinsert: cnt=" << j << " stop on nospace");
return 0;
}
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.unlock(); set.unlock();
lst.reset(); lst.reset();
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
@ -3228,14 +3299,20 @@ pkinsert(Par par)
} }
if (lst.cnt() != 0) { if (lst.cnt() != 0) {
bool deadlock = par.m_deadlock; bool deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); bool nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
con.closeTransaction(); con.closeTransaction();
if (deadlock) { if (deadlock) {
LL1("pkinsert: stop on deadlock [at 2]"); LL1("pkinsert: stop on deadlock [at 2]");
return 0; return 0;
} }
if (nospace) {
LL1("pkinsert: end: stop on nospace");
return 0;
}
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.unlock(); set.unlock();
return 0; return 0;
} }
@ -3253,6 +3330,7 @@ pkupdate(Par par)
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
Lst lst; Lst lst;
bool deadlock = false; bool deadlock = false;
bool nospace = false;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2); unsigned i = thrrow(par, j2);
@ -3269,28 +3347,38 @@ pkupdate(Par par)
lst.push(i); lst.push(i);
if (lst.cnt() == par.m_batch) { if (lst.cnt() == par.m_batch) {
deadlock = par.m_deadlock; deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
if (deadlock) { if (deadlock) {
LL1("pkupdate: stop on deadlock [at 1]"); LL1("pkupdate: stop on deadlock [at 1]");
break; break;
} }
if (nospace) {
LL1("pkupdate: cnt=" << j << " stop on nospace [at 1]");
break;
}
con.closeTransaction(); con.closeTransaction();
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.dbdiscard(lst); set.dbdiscard(lst);
set.unlock(); set.unlock();
lst.reset(); lst.reset();
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
} }
} }
if (! deadlock && lst.cnt() != 0) { if (! deadlock && ! nospace && lst.cnt() != 0) {
deadlock = par.m_deadlock; deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
if (deadlock) { if (deadlock) {
LL1("pkupdate: stop on deadlock [at 1]"); LL1("pkupdate: stop on deadlock [at 2]");
} else if (nospace) {
LL1("pkupdate: end: stop on nospace [at 2]");
} else { } else {
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.dbdiscard(lst); set.dbdiscard(lst);
set.unlock(); set.unlock();
} }
@ -3309,6 +3397,7 @@ pkdelete(Par par)
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
Lst lst; Lst lst;
bool deadlock = false; bool deadlock = false;
bool nospace = false;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2); unsigned i = thrrow(par, j2);
@ -3323,27 +3412,31 @@ pkdelete(Par par)
lst.push(i); lst.push(i);
if (lst.cnt() == par.m_batch) { if (lst.cnt() == par.m_batch) {
deadlock = par.m_deadlock; deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
if (deadlock) { if (deadlock) {
LL1("pkdelete: stop on deadlock [at 1]"); LL1("pkdelete: stop on deadlock [at 1]");
break; break;
} }
con.closeTransaction(); con.closeTransaction();
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.unlock(); set.unlock();
lst.reset(); lst.reset();
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
} }
} }
if (! deadlock && lst.cnt() != 0) { if (! deadlock && ! nospace && lst.cnt() != 0) {
deadlock = par.m_deadlock; deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0); nospace = true;
ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
CHK(con.execute(et, deadlock, nospace) == 0);
if (deadlock) { if (deadlock) {
LL1("pkdelete: stop on deadlock [at 2]"); LL1("pkdelete: stop on deadlock [at 2]");
} else { } else {
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst, et);
set.unlock(); set.unlock();
} }
} }
@ -4094,6 +4187,10 @@ readverify(Par par)
if (par.m_noverify) if (par.m_noverify)
return 0; return 0;
par.m_verify = true; par.m_verify = true;
if (par.m_abortpct != 0) {
LL2("skip verify in this version"); // implement in 5.0 version
par.m_verify = false;
}
par.m_lockmode = NdbOperation::LM_CommittedRead; par.m_lockmode = NdbOperation::LM_CommittedRead;
CHK(pkread(par) == 0); CHK(pkread(par) == 0);
CHK(scanreadall(par) == 0); CHK(scanreadall(par) == 0);
@ -4106,6 +4203,10 @@ readverifyfull(Par par)
if (par.m_noverify) if (par.m_noverify)
return 0; return 0;
par.m_verify = true; par.m_verify = true;
if (par.m_abortpct != 0) {
LL2("skip verify in this version"); // implement in 5.0 version
par.m_verify = false;
}
par.m_lockmode = NdbOperation::LM_CommittedRead; par.m_lockmode = NdbOperation::LM_CommittedRead;
const Tab& tab = par.tab(); const Tab& tab = par.tab();
if (par.m_no == 0) { if (par.m_no == 0) {
@ -4457,11 +4558,11 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
for (n = 0; n < threads; n++) { for (n = 0; n < threads; n++) {
LL4("start " << n); LL4("start " << n);
Thr& thr = *g_thrlist[n]; Thr& thr = *g_thrlist[n];
thr.m_par.m_tab = par.m_tab; Par oldpar = thr.m_par;
thr.m_par.m_set = par.m_set; // update parameters
thr.m_par.m_tmr = par.m_tmr; thr.m_par = par;
thr.m_par.m_lno = par.m_lno; thr.m_par.m_no = oldpar.m_no;
thr.m_par.m_slno = par.m_slno; thr.m_par.m_con = oldpar.m_con;
thr.m_func = func; thr.m_func = func;
thr.start(); thr.start();
} }
@ -4590,6 +4691,24 @@ tbusybuild(Par par)
return 0; return 0;
} }
static int
trollback(Par par)
{
par.m_abortpct = 50;
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST);
}
return 0;
}
static int static int
ttimebuild(Par par) ttimebuild(Par par)
{ {
@ -4712,6 +4831,7 @@ tcaselist[] = {
TCase("d", tpkopsread, "pk operations and scan reads"), TCase("d", tpkopsread, "pk operations and scan reads"),
TCase("e", tmixedops, "pk operations and scan operations"), TCase("e", tmixedops, "pk operations and scan operations"),
TCase("f", tbusybuild, "pk operations and index build"), TCase("f", tbusybuild, "pk operations and index build"),
TCase("g", trollback, "operations with random rollbacks"),
TCase("t", ttimebuild, "time index build"), TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"), TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"), TCase("v", ttimescan, "time full scan table vs index on pk"),