mirror of
https://github.com/MariaDB/server.git
synced 2025-01-25 00:04:33 +01:00
a5463c252a
configure.in: Auto merged include/my_global.h: Auto merged ndb/src/common/transporter/TransporterRegistry.cpp: Auto merged ndb/src/mgmclient/CommandInterpreter.cpp: Auto merged ndb/src/mgmsrv/ConfigInfo.cpp: Auto merged ndb/src/mgmsrv/MgmtSrvr.cpp: Auto merged BitKeeper/deleted/.del-acinclude.m4~f4ab416bac5003: Auto merged ndb/src/ndbapi/ClusterMgr.cpp: Auto merged ndb/src/ndbapi/TransporterFacade.cpp: Auto merged ndb/src/ndbapi/ndb_cluster_connection.cpp: Auto merged ndb/test/ndbapi/flexAsynch.cpp: Auto merged ndb/test/ndbapi/flexBench.cpp: Auto merged ndb/test/ndbapi/flexHammer.cpp: Auto merged ndb/test/ndbapi/flexTT.cpp: Auto merged ndb/test/ndbapi/flex_bench_mysql.cpp: Auto merged ndb/test/src/NDBT_Test.cpp: corrected automerge error
937 lines
29 KiB
C++
937 lines
29 KiB
C++
/* Copyright (C) 2003 MySQL AB
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; either version 2 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
|
|
|
|
|
#include <ndb_global.h>
|
|
|
|
#include <NdbApi.hpp>
|
|
#include <NdbSchemaCon.hpp>
|
|
#include <NdbMain.h>
|
|
#include <md5_hash.hpp>
|
|
|
|
#include <NdbThread.h>
|
|
#include <NdbSleep.h>
|
|
#include <NdbTick.h>
|
|
#include <NdbOut.hpp>
|
|
#include <NdbTimer.hpp>
|
|
|
|
#include <NdbTest.hpp>
|
|
#include <NDBT_Error.hpp>
|
|
|
|
#define MAX_PARTS 4
|
|
#define MAX_SEEK 16
|
|
#define MAXSTRLEN 16
|
|
#define MAXATTR 64
|
|
#define MAXTABLES 64
|
|
#define MAXTHREADS 128
|
|
#define MAXPAR 1024
|
|
#define MAXATTRSIZE 1000
|
|
#define PKSIZE 1
|
|
|
|
|
|
#ifdef NDB_WIN32
|
|
inline long lrand48(void) { return rand(); };
|
|
#endif
|
|
|
|
|
|
enum StartType {
|
|
stIdle,
|
|
stInsert,
|
|
stRead,
|
|
stUpdate,
|
|
stDelete,
|
|
stStop
|
|
} ;
|
|
|
|
struct ThreadNdb
|
|
{
|
|
int threadNo;
|
|
Ndb* threadNdb;
|
|
Uint32 threadBase;
|
|
Uint32 threadLoopCounter;
|
|
Uint32 threadNextStart;
|
|
Uint32 threadStop;
|
|
Uint32 threadLoopStop;
|
|
Uint32 threadIncrement;
|
|
Uint32 threadNoCompleted;
|
|
bool threadCompleted;
|
|
StartType threadStartType;
|
|
};
|
|
|
|
struct TransNdb
|
|
{
|
|
char transRecord[128];
|
|
Ndb* transNdb;
|
|
StartType transStartType;
|
|
Uint32 vpn_number;
|
|
Uint32 vpn_identity;
|
|
Uint32 transErrorCount;
|
|
NdbOperation* transOperation;
|
|
ThreadNdb* transThread;
|
|
};
|
|
|
|
extern "C" { static void* threadLoop(void*); }
|
|
static void setAttrNames(void);
|
|
static void setTableNames(void);
|
|
static int readArguments(int argc, const char** argv);
|
|
static int createTables(Ndb*);
|
|
static bool defineOperation(NdbConnection* aTransObject, TransNdb*,
|
|
Uint32 vpn_nb, Uint32 vpn_id);
|
|
static bool executeTransaction(TransNdb* transNdbRef);
|
|
static StartType random_choice();
|
|
static void execute(StartType aType);
|
|
static bool executeThread(ThreadNdb*, TransNdb*);
|
|
static void executeCallback(int result, NdbConnection* NdbObject,
|
|
void* aObject);
|
|
static bool error_handler(const NdbError & err) ;
|
|
static Uint32 getKey(Uint32, Uint32) ;
|
|
static void input_error();
|
|
|
|
ErrorData * flexTTErrorData;
|
|
|
|
static NdbThread* threadLife[MAXTHREADS];
|
|
static int tNodeId;
|
|
static int ThreadReady[MAXTHREADS];
|
|
static StartType ThreadStart[MAXTHREADS];
|
|
static char tableName[1][MAXSTRLEN+1];
|
|
static char attrName[5][MAXSTRLEN+1];
|
|
|
|
// Program Parameters
|
|
static bool tInsert = false;
|
|
static bool tDelete = false;
|
|
static bool tReadUpdate = true;
|
|
static int tUpdateFreq = 20;
|
|
static bool tLocal = false;
|
|
static int tLocalPart = 0;
|
|
static int tMinEvents = 0;
|
|
static int tSendForce = 0;
|
|
static int tNoOfLoops = 1;
|
|
static Uint32 tNoOfThreads = 1;
|
|
static Uint32 tNoOfParallelTrans = 32;
|
|
static Uint32 tNoOfTransactions = 500;
|
|
static Uint32 tLoadFactor = 80;
|
|
static bool tempTable = false;
|
|
static bool startTransGuess = true;
|
|
|
|
//Program Flags
|
|
static int theSimpleFlag = 0;
|
|
static int theDirtyFlag = 0;
|
|
static int theWriteFlag = 0;
|
|
static int theTableCreateFlag = 1;
|
|
|
|
#define START_REAL_TIME
|
|
#define STOP_REAL_TIME
|
|
#define START_TIMER { NdbTimer timer; timer.doStart();
|
|
#define STOP_TIMER timer.doStop();
|
|
#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
|
|
|
|
static void
|
|
resetThreads(){
|
|
|
|
for (int i = 0; i < tNoOfThreads ; i++) {
|
|
ThreadReady[i] = 0;
|
|
ThreadStart[i] = stIdle;
|
|
}//for
|
|
}
|
|
|
|
static void
|
|
waitForThreads(void)
|
|
{
|
|
int cont = 0;
|
|
do {
|
|
cont = 0;
|
|
NdbSleep_MilliSleep(20);
|
|
for (int i = 0; i < tNoOfThreads ; i++) {
|
|
if (ThreadReady[i] == 0) {
|
|
cont = 1;
|
|
}//if
|
|
}//for
|
|
} while (cont == 1);
|
|
}
|
|
|
|
static void
|
|
tellThreads(StartType what)
|
|
{
|
|
for (int i = 0; i < tNoOfThreads ; i++)
|
|
ThreadStart[i] = what;
|
|
}
|
|
|
|
static Ndb_cluster_connection *g_cluster_connection= 0;
|
|
|
|
NDB_COMMAND(flexTT, "flexTT", "flexTT", "flexTT", 65535)
|
|
{
|
|
ndb_init();
|
|
ThreadNdb* pThreadData;
|
|
int returnValue = NDBT_OK;
|
|
int i;
|
|
flexTTErrorData = new ErrorData;
|
|
flexTTErrorData->resetErrorCounters();
|
|
|
|
if (readArguments(argc, argv) != 0){
|
|
input_error();
|
|
return NDBT_ProgramExit(NDBT_WRONGARGS);
|
|
}
|
|
|
|
pThreadData = new ThreadNdb[MAXTHREADS];
|
|
|
|
ndbout << endl << "FLEXTT - Starting normal mode" << endl;
|
|
ndbout << "Perform TimesTen benchmark" << endl;
|
|
ndbout << " " << tNoOfThreads << " number of concurrent threads " << endl;
|
|
ndbout << " " << tNoOfParallelTrans;
|
|
ndbout << " number of parallel transaction per thread " << endl;
|
|
ndbout << " " << tNoOfTransactions << " transaction(s) per round " << endl;
|
|
ndbout << " " << tNoOfLoops << " iterations " << endl;
|
|
ndbout << " " << "Update Frequency is " << tUpdateFreq << "%" << endl;
|
|
ndbout << " " << "Load Factor is " << tLoadFactor << "%" << endl;
|
|
if (tLocal == true) {
|
|
ndbout << " " << "We only use Local Part = ";
|
|
ndbout << tLocalPart << endl;
|
|
}//if
|
|
if (tempTable == true) {
|
|
ndbout << " Tables are without logging " << endl;
|
|
} else {
|
|
ndbout << " Tables are with logging " << endl;
|
|
}//if
|
|
if (startTransGuess == true) {
|
|
ndbout << " Transactions are executed with hint provided" << endl;
|
|
} else {
|
|
ndbout << " Transactions are executed with round robin scheme" << endl;
|
|
}//if
|
|
if (tSendForce == 0) {
|
|
ndbout << " No force send is used, adaptive algorithm used" << endl;
|
|
} else if (tSendForce == 1) {
|
|
ndbout << " Force send used" << endl;
|
|
} else {
|
|
ndbout << " No force send is used, adaptive algorithm disabled" << endl;
|
|
}//if
|
|
|
|
ndbout << endl;
|
|
|
|
/* print Setting */
|
|
flexTTErrorData->printSettings(ndbout);
|
|
|
|
NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
|
|
|
|
setAttrNames();
|
|
setTableNames();
|
|
|
|
Ndb_cluster_connection con;
|
|
if(con.connect(12, 5, 1) != 0)
|
|
{
|
|
return NDBT_ProgramExit(NDBT_FAILED);
|
|
}
|
|
g_cluster_connection= &con;
|
|
|
|
Ndb * pNdb = new Ndb(g_cluster_connection, "TEST_DB");
|
|
pNdb->init();
|
|
tNodeId = pNdb->getNodeId();
|
|
|
|
ndbout << " NdbAPI node with id = " << pNdb->getNodeId() << endl;
|
|
ndbout << endl;
|
|
|
|
ndbout << "Waiting for ndb to become ready..." <<endl;
|
|
if (pNdb->waitUntilReady(2000) != 0){
|
|
ndbout << "NDB is not ready" << endl;
|
|
ndbout << "Benchmark failed!" << endl;
|
|
returnValue = NDBT_FAILED;
|
|
}
|
|
|
|
if(returnValue == NDBT_OK){
|
|
if (createTables(pNdb) != 0){
|
|
returnValue = NDBT_FAILED;
|
|
}
|
|
}
|
|
|
|
if(returnValue == NDBT_OK){
|
|
/****************************************************************
|
|
* Create NDB objects. *
|
|
****************************************************************/
|
|
resetThreads();
|
|
for (i = 0; i < tNoOfThreads ; i++) {
|
|
pThreadData[i].threadNo = i;
|
|
threadLife[i] = NdbThread_Create(threadLoop,
|
|
(void**)&pThreadData[i],
|
|
32768,
|
|
"flexAsynchThread",
|
|
NDB_THREAD_PRIO_LOW);
|
|
}//for
|
|
ndbout << endl << "All NDB objects and table created" << endl << endl;
|
|
int noOfTransacts = tNoOfParallelTrans * tNoOfTransactions *
|
|
tNoOfThreads * tNoOfLoops;
|
|
/****************************************************************
|
|
* Execute program. *
|
|
****************************************************************/
|
|
/****************************************************************
|
|
* Perform inserts. *
|
|
****************************************************************/
|
|
|
|
if (tInsert == true) {
|
|
tInsert = false;
|
|
tReadUpdate = false;
|
|
START_TIMER;
|
|
execute(stInsert);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("insert", noOfTransacts, 1);
|
|
}//if
|
|
/****************************************************************
|
|
* Perform read + updates. *
|
|
****************************************************************/
|
|
|
|
if (tReadUpdate == true) {
|
|
START_TIMER;
|
|
execute(stRead);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("update + read", noOfTransacts, 1);
|
|
}//if
|
|
/****************************************************************
|
|
* Perform delete. *
|
|
****************************************************************/
|
|
|
|
if (tDelete == true) {
|
|
tDelete = false;
|
|
START_TIMER;
|
|
execute(stDelete);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("delete", noOfTransacts, 1);
|
|
}//if
|
|
ndbout << "--------------------------------------------------" << endl;
|
|
|
|
execute(stStop);
|
|
void * tmp;
|
|
for(i = 0; i<tNoOfThreads; i++){
|
|
NdbThread_WaitFor(threadLife[i], &tmp);
|
|
NdbThread_Destroy(&threadLife[i]);
|
|
}
|
|
}
|
|
delete [] pThreadData;
|
|
delete pNdb;
|
|
|
|
//printing errorCounters
|
|
flexTTErrorData->printErrorCounters(ndbout);
|
|
|
|
return NDBT_ProgramExit(returnValue);
|
|
}//main()
|
|
|
|
|
|
static void execute(StartType aType)
|
|
{
|
|
resetThreads();
|
|
tellThreads(aType);
|
|
waitForThreads();
|
|
}//execute()
|
|
|
|
static void*
|
|
threadLoop(void* ThreadData)
|
|
{
|
|
Ndb* localNdb;
|
|
ThreadNdb* tabThread = (ThreadNdb*)ThreadData;
|
|
int loc_threadNo = tabThread->threadNo;
|
|
|
|
void * mem = malloc(sizeof(TransNdb)*tNoOfParallelTrans);
|
|
TransNdb* pTransData = (TransNdb*)mem;
|
|
|
|
localNdb = new Ndb(g_cluster_connection, "TEST_DB");
|
|
localNdb->init(1024);
|
|
localNdb->waitUntilReady();
|
|
|
|
if (tLocal == false) {
|
|
tabThread->threadIncrement = 1;
|
|
} else {
|
|
tabThread->threadIncrement = MAX_SEEK;
|
|
}//if
|
|
tabThread->threadBase = (loc_threadNo << 16) + tNodeId;
|
|
tabThread->threadNdb = localNdb;
|
|
tabThread->threadStop = tNoOfParallelTrans * tNoOfTransactions;
|
|
tabThread->threadStop *= tabThread->threadIncrement;
|
|
tabThread->threadLoopStop = tNoOfLoops;
|
|
Uint32 i, j;
|
|
for (i = 0; i < tNoOfParallelTrans; i++) {
|
|
pTransData[i].transNdb = localNdb;
|
|
pTransData[i].transThread = tabThread;
|
|
pTransData[i].transOperation = NULL;
|
|
pTransData[i].transStartType = stIdle;
|
|
pTransData[i].vpn_number = tabThread->threadBase;
|
|
pTransData[i].vpn_identity = 0;
|
|
pTransData[i].transErrorCount = 0;
|
|
for (j = 0; j < 128; j++) {
|
|
pTransData[i].transRecord[j] = 0x30;
|
|
}//for
|
|
}//for
|
|
|
|
for (;;){
|
|
while (ThreadStart[loc_threadNo] == stIdle) {
|
|
NdbSleep_MilliSleep(10);
|
|
}//while
|
|
|
|
// Check if signal to exit is received
|
|
if (ThreadStart[loc_threadNo] == stStop) {
|
|
break;
|
|
}//if
|
|
|
|
tabThread->threadStartType = ThreadStart[loc_threadNo];
|
|
tabThread->threadLoopCounter = 0;
|
|
tabThread->threadCompleted = false;
|
|
tabThread->threadNoCompleted = 0;
|
|
tabThread->threadNextStart = 0;
|
|
|
|
ThreadStart[loc_threadNo] = stIdle;
|
|
if(!executeThread(tabThread, pTransData)){
|
|
break;
|
|
}
|
|
ThreadReady[loc_threadNo] = 1;
|
|
}//for
|
|
|
|
free(mem);
|
|
delete localNdb;
|
|
ThreadReady[loc_threadNo] = 1;
|
|
|
|
return NULL; // Thread exits
|
|
}//threadLoop()
|
|
|
|
static
|
|
bool
|
|
executeThread(ThreadNdb* tabThread, TransNdb* atransDataArrayPtr) {
|
|
Uint32 i;
|
|
for (i = 0; i < tNoOfParallelTrans; i++) {
|
|
TransNdb* transNdbPtr = &atransDataArrayPtr[i];
|
|
transNdbPtr->vpn_identity = i * tabThread->threadIncrement;
|
|
transNdbPtr->transStartType = tabThread->threadStartType;
|
|
if (executeTransaction(transNdbPtr) == false) {
|
|
return false;
|
|
}//if
|
|
}//for
|
|
tabThread->threadNextStart = tNoOfParallelTrans * tabThread->threadIncrement;
|
|
do {
|
|
tabThread->threadNdb->sendPollNdb(3000, tMinEvents, tSendForce);
|
|
} while (tabThread->threadCompleted == false);
|
|
return true;
|
|
}//executeThread()
|
|
|
|
static
|
|
bool executeTransaction(TransNdb* transNdbRef)
|
|
{
|
|
NdbConnection* MyTrans;
|
|
ThreadNdb* tabThread = transNdbRef->transThread;
|
|
Ndb* aNdbObject = transNdbRef->transNdb;
|
|
Uint32 threadBase = tabThread->threadBase;
|
|
Uint32 startKey = transNdbRef->vpn_identity;
|
|
if (tLocal == true) {
|
|
startKey = getKey(startKey, threadBase);
|
|
}//if
|
|
if (startTransGuess == true) {
|
|
Uint32 tKey[2];
|
|
tKey[0] = startKey;
|
|
tKey[1] = threadBase;
|
|
MyTrans = aNdbObject->startTransaction((Uint32)0, //Priority
|
|
(const char*)&tKey[0], //Main PKey
|
|
(Uint32)8); //Key Length
|
|
} else {
|
|
MyTrans = aNdbObject->startTransaction();
|
|
}//if
|
|
if (MyTrans == NULL) {
|
|
error_handler(aNdbObject->getNdbError());
|
|
ndbout << endl << "Unable to recover! Quiting now" << endl ;
|
|
return false;
|
|
}//if
|
|
//-------------------------------------------------------
|
|
// Define the operation, but do not execute it yet.
|
|
//-------------------------------------------------------
|
|
if (!defineOperation(MyTrans, transNdbRef, startKey, threadBase))
|
|
return false;
|
|
|
|
return true;
|
|
}//executeTransaction()
|
|
|
|
|
|
static
|
|
Uint32
|
|
getKey(Uint32 aBase, Uint32 aThreadBase) {
|
|
Uint32 Tfound = aBase;
|
|
Uint32 hash;
|
|
Uint64 Tkey64;
|
|
Uint32* tKey32 = (Uint32*)&Tkey64;
|
|
tKey32[0] = aThreadBase;
|
|
for (int i = aBase; i < (aBase + MAX_SEEK); i++) {
|
|
tKey32[1] = (Uint32)i;
|
|
hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
|
|
hash = (hash >> 6) & (MAX_PARTS - 1);
|
|
if (hash == tLocalPart) {
|
|
Tfound = i;
|
|
break;
|
|
}//if
|
|
}//for
|
|
return Tfound;
|
|
}//getKey()
|
|
|
|
static void
|
|
executeCallback(int result, NdbConnection* NdbObject, void* aObject)
|
|
{
|
|
TransNdb* transNdbRef = (TransNdb*)aObject;
|
|
ThreadNdb* tabThread = transNdbRef->transThread;
|
|
Ndb* tNdb = transNdbRef->transNdb;
|
|
Uint32 vpn_id = transNdbRef->vpn_identity;
|
|
Uint32 vpn_nb = tabThread->threadBase;
|
|
|
|
if (result == -1) {
|
|
// Add complete error handling here
|
|
int retCode = flexTTErrorData->handleErrorCommon(NdbObject->getNdbError());
|
|
if (retCode == 1) {
|
|
if (NdbObject->getNdbError().code != 626 &&
|
|
NdbObject->getNdbError().code != 630) {
|
|
ndbout_c("execute: %s", NdbObject->getNdbError().message);
|
|
ndbout_c("Error code = %d", NdbObject->getNdbError().code);
|
|
}
|
|
} else if (retCode == 2) {
|
|
ndbout << "4115 should not happen in flexTT" << endl;
|
|
} else if (retCode == 3) {
|
|
/* What can we do here? */
|
|
ndbout_c("execute: %s", NdbObject->getNdbError().message);
|
|
}//if(retCode == 3)
|
|
transNdbRef->transErrorCount++;
|
|
const NdbError & err = NdbObject->getNdbError();
|
|
switch (err.classification) {
|
|
case NdbError::NoDataFound:
|
|
case NdbError::ConstraintViolation:
|
|
ndbout << "Error with vpn_id = " << vpn_id << " and vpn_nb = ";
|
|
ndbout << vpn_nb << endl;
|
|
ndbout << err << endl;
|
|
goto checkCompleted;
|
|
case NdbError::OverloadError:
|
|
NdbSleep_MilliSleep(10);
|
|
case NdbError::NodeRecoveryError:
|
|
case NdbError::UnknownResultError:
|
|
case NdbError::TimeoutExpired:
|
|
break;
|
|
default:
|
|
goto checkCompleted;
|
|
}//if
|
|
if ((transNdbRef->transErrorCount > 10) ||
|
|
(tabThread->threadNoCompleted > 0)) {
|
|
goto checkCompleted;
|
|
}//if
|
|
} else {
|
|
if (tabThread->threadNoCompleted == 0) {
|
|
transNdbRef->transErrorCount = 0;
|
|
transNdbRef->vpn_identity = tabThread->threadNextStart;
|
|
if (tabThread->threadNextStart == tabThread->threadStop) {
|
|
tabThread->threadLoopCounter++;
|
|
transNdbRef->vpn_identity = 0;
|
|
tabThread->threadNextStart = 0;
|
|
if (tabThread->threadLoopCounter == tNoOfLoops) {
|
|
goto checkCompleted;
|
|
}//if
|
|
}//if
|
|
tabThread->threadNextStart += tabThread->threadIncrement;
|
|
} else {
|
|
goto checkCompleted;
|
|
}//if
|
|
}//if
|
|
tNdb->closeTransaction(NdbObject);
|
|
executeTransaction(transNdbRef);
|
|
return;
|
|
|
|
checkCompleted:
|
|
tNdb->closeTransaction(NdbObject);
|
|
tabThread->threadNoCompleted++;
|
|
if (tabThread->threadNoCompleted == tNoOfParallelTrans) {
|
|
tabThread->threadCompleted = true;
|
|
}//if
|
|
return;
|
|
}//executeCallback()
|
|
|
|
static
|
|
StartType
|
|
random_choice()
|
|
{
|
|
//----------------------------------------------------
|
|
// Generate a random key between 0 and tNoOfRecords - 1
|
|
//----------------------------------------------------
|
|
UintR random_number = lrand48() % 100;
|
|
if (random_number < tUpdateFreq)
|
|
return stUpdate;
|
|
else
|
|
return stRead;
|
|
}//random_choice()
|
|
|
|
static bool
|
|
defineOperation(NdbConnection* localNdbConnection, TransNdb* transNdbRef,
|
|
unsigned int vpn_id, unsigned int vpn_nb)
|
|
{
|
|
NdbOperation* localNdbOperation;
|
|
StartType TType = transNdbRef->transStartType;
|
|
|
|
//-------------------------------------------------------
|
|
// Set-up the attribute values for this operation.
|
|
//-------------------------------------------------------
|
|
localNdbOperation = localNdbConnection->getNdbOperation(tableName[0]);
|
|
if (localNdbOperation == NULL) {
|
|
error_handler(localNdbConnection->getNdbError());
|
|
return false;
|
|
}//if
|
|
switch (TType) {
|
|
case stInsert: // Insert case
|
|
if (theWriteFlag == 1 && theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyWrite();
|
|
} else if (theWriteFlag == 1) {
|
|
localNdbOperation->writeTuple();
|
|
} else {
|
|
localNdbOperation->insertTuple();
|
|
}//if
|
|
break;
|
|
case stRead: // Read Case
|
|
TType = random_choice();
|
|
if (TType == stRead) {
|
|
if (theSimpleFlag == 1) {
|
|
localNdbOperation->simpleRead();
|
|
} else if (theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyRead();
|
|
} else {
|
|
localNdbOperation->readTuple();
|
|
}//if
|
|
} else {
|
|
if (theWriteFlag == 1 && theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyWrite();
|
|
} else if (theWriteFlag == 1) {
|
|
localNdbOperation->writeTuple();
|
|
} else if (theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyUpdate();
|
|
} else {
|
|
localNdbOperation->updateTuple();
|
|
}//if
|
|
}//if
|
|
break;
|
|
case stDelete: // Delete Case
|
|
localNdbOperation->deleteTuple();
|
|
break;
|
|
default:
|
|
error_handler(localNdbOperation->getNdbError());
|
|
}//switch
|
|
localNdbOperation->equal((Uint32)0,vpn_id);
|
|
localNdbOperation->equal((Uint32)1,vpn_nb);
|
|
char* attrValue = &transNdbRef->transRecord[0];
|
|
switch (TType) {
|
|
case stInsert: // Insert case
|
|
localNdbOperation->setValue((Uint32)2, attrValue);
|
|
localNdbOperation->setValue((Uint32)3, attrValue);
|
|
localNdbOperation->setValue((Uint32)4, attrValue);
|
|
break;
|
|
case stUpdate: // Update Case
|
|
localNdbOperation->setValue((Uint32)3, attrValue);
|
|
break;
|
|
case stRead: // Read Case
|
|
localNdbOperation->getValue((Uint32)2, attrValue);
|
|
localNdbOperation->getValue((Uint32)3, attrValue);
|
|
localNdbOperation->getValue((Uint32)4, attrValue);
|
|
break;
|
|
case stDelete: // Delete Case
|
|
break;
|
|
default:
|
|
error_handler(localNdbOperation->getNdbError());
|
|
}//switch
|
|
localNdbConnection->executeAsynchPrepare(Commit, &executeCallback,
|
|
(void*)transNdbRef);
|
|
return true;
|
|
}//defineOperation()
|
|
|
|
|
|
static void setAttrNames()
|
|
{
|
|
BaseString::snprintf(attrName[0], MAXSTRLEN, "VPN_ID");
|
|
BaseString::snprintf(attrName[1], MAXSTRLEN, "VPN_NB");
|
|
BaseString::snprintf(attrName[2], MAXSTRLEN, "DIRECTORY_NB");
|
|
BaseString::snprintf(attrName[3], MAXSTRLEN, "LAST_CALL_PARTY");
|
|
BaseString::snprintf(attrName[4], MAXSTRLEN, "DESCR");
|
|
}
|
|
|
|
|
|
static void setTableNames()
|
|
{
|
|
BaseString::snprintf(tableName[0], MAXSTRLEN, "VPN_USERS");
|
|
}
|
|
|
|
static
|
|
int
|
|
createTables(Ndb* pMyNdb){
|
|
|
|
NdbSchemaCon *MySchemaTransaction;
|
|
NdbSchemaOp *MySchemaOp;
|
|
int check;
|
|
|
|
if (theTableCreateFlag == 0) {
|
|
ndbout << "Creating Table: vpn_users " << "..." << endl;
|
|
MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
|
|
|
|
if(MySchemaTransaction == NULL &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
|
|
MySchemaOp = MySchemaTransaction->getNdbSchemaOp();
|
|
if(MySchemaOp == NULL &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
|
|
check = MySchemaOp->createTable( tableName[0]
|
|
,8 // Table Size
|
|
,TupleKey // Key Type
|
|
,40 // Nr of Pages
|
|
,All
|
|
,6
|
|
,(tLoadFactor - 5)
|
|
,tLoadFactor
|
|
,1
|
|
,!tempTable
|
|
);
|
|
|
|
if (check == -1 &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
|
|
check = MySchemaOp->createAttribute( (char*)attrName[0],
|
|
TupleKey,
|
|
32,
|
|
1,
|
|
UnSigned,
|
|
MMBased,
|
|
NotNullAttribute );
|
|
|
|
if (check == -1 &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
check = MySchemaOp->createAttribute( (char*)attrName[1],
|
|
TupleKey,
|
|
32,
|
|
1,
|
|
UnSigned,
|
|
MMBased,
|
|
NotNullAttribute );
|
|
|
|
if (check == -1 &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
check = MySchemaOp->createAttribute( (char*)attrName[2],
|
|
NoKey,
|
|
8,
|
|
10,
|
|
UnSigned,
|
|
MMBased,
|
|
NotNullAttribute );
|
|
if (check == -1 &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
|
|
check = MySchemaOp->createAttribute( (char*)attrName[3],
|
|
NoKey,
|
|
8,
|
|
10,
|
|
UnSigned,
|
|
MMBased,
|
|
NotNullAttribute );
|
|
if (check == -1 &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
|
|
check = MySchemaOp->createAttribute( (char*)attrName[4],
|
|
NoKey,
|
|
8,
|
|
100,
|
|
UnSigned,
|
|
MMBased,
|
|
NotNullAttribute );
|
|
if (check == -1 &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
|
|
if (MySchemaTransaction->execute() == -1 &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
|
|
NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
|
|
}//if
|
|
|
|
return 0;
|
|
}
|
|
|
|
bool error_handler(const NdbError& err){
|
|
ndbout << err << endl ;
|
|
switch(err.classification){
|
|
case NdbError::NodeRecoveryError:
|
|
case NdbError::SchemaError:
|
|
case NdbError::TimeoutExpired:
|
|
ndbout << endl << "Attempting to recover and continue now..." << endl ;
|
|
return true ; // return true to retry
|
|
}
|
|
return false;
|
|
}
|
|
#if 0
|
|
bool error_handler(const char* error_string, int error_int) {
|
|
ndbout << error_string << endl ;
|
|
if ((4008 == error_int) ||
|
|
(677 == error_int) ||
|
|
(891 == error_int) ||
|
|
(1221 == error_int) ||
|
|
(721 == error_int) ||
|
|
(266 == error_int)) {
|
|
ndbout << endl << "Attempting to recover and continue now..." << endl ;
|
|
return true ; // return true to retry
|
|
}
|
|
return false ; // return false to abort
|
|
}
|
|
#endif
|
|
|
|
static
|
|
int
|
|
readArguments(int argc, const char** argv){
|
|
|
|
int i = 1;
|
|
while (argc > 1){
|
|
if (strcmp(argv[i], "-t") == 0){
|
|
tNoOfThreads = atoi(argv[i+1]);
|
|
if ((tNoOfThreads < 1) || (tNoOfThreads > MAXTHREADS)){
|
|
ndbout_c("Invalid no of threads");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-p") == 0){
|
|
tNoOfParallelTrans = atoi(argv[i+1]);
|
|
if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
|
|
ndbout_c("Invalid no of parallell transactions");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-o") == 0) {
|
|
tNoOfTransactions = atoi(argv[i+1]);
|
|
if (tNoOfTransactions < 1){
|
|
ndbout_c("Invalid no of transactions");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-l") == 0){
|
|
tNoOfLoops = atoi(argv[i+1]);
|
|
if (tNoOfLoops < 1) {
|
|
ndbout_c("Invalid no of loops");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-e") == 0){
|
|
tMinEvents = atoi(argv[i+1]);
|
|
if ((tMinEvents < 1) || (tMinEvents > tNoOfParallelTrans)) {
|
|
ndbout_c("Invalid no of loops");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-local") == 0){
|
|
tLocalPart = atoi(argv[i+1]);
|
|
tLocal = true;
|
|
startTransGuess = true;
|
|
if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
|
|
ndbout_c("Invalid local part");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-ufreq") == 0){
|
|
tUpdateFreq = atoi(argv[i+1]);
|
|
if ((tUpdateFreq < 0) || (tUpdateFreq > 100)){
|
|
ndbout_c("Invalid Update Frequency");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-load_factor") == 0){
|
|
tLoadFactor = atoi(argv[i+1]);
|
|
if ((tLoadFactor < 40) || (tLoadFactor >= 100)){
|
|
ndbout_c("Invalid LoadFactor");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-d") == 0){
|
|
tDelete = true;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-i") == 0){
|
|
tInsert = true;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-simple") == 0){
|
|
theSimpleFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-adaptive") == 0){
|
|
tSendForce = 0;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-force") == 0){
|
|
tSendForce = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-non_adaptive") == 0){
|
|
tSendForce = 2;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-write") == 0){
|
|
theWriteFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-dirty") == 0){
|
|
theDirtyFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-table_create") == 0){
|
|
theTableCreateFlag = 0;
|
|
tInsert = true;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-temp") == 0){
|
|
tempTable = true;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-no_hint") == 0){
|
|
startTransGuess = false;
|
|
argc++;
|
|
i--;
|
|
} else {
|
|
return -1;
|
|
}
|
|
|
|
argc -= 2;
|
|
i = i + 2;
|
|
}//while
|
|
if (tLocal == true) {
|
|
if (startTransGuess == false) {
|
|
ndbout_c("Not valid to use no_hint with local");
|
|
}//if
|
|
}//if
|
|
return 0;
|
|
}
|
|
|
|
static
|
|
void
|
|
input_error(){
|
|
|
|
ndbout_c("FLEXTT");
|
|
ndbout_c(" Perform benchmark of insert, update and delete transactions");
|
|
ndbout_c("");
|
|
ndbout_c("Arguments:");
|
|
ndbout_c(" -t Number of threads to start, default 1");
|
|
ndbout_c(" -p Number of parallel transactions per thread, default 32");
|
|
ndbout_c(" -o Number of transactions per loop, default 500");
|
|
ndbout_c(" -ufreq Number Update Frequency in percent (0 -> 100), rest is read");
|
|
ndbout_c(" -load_factor Number Fill level in index in percent (40 -> 99)");
|
|
ndbout_c(" -l Number of loops to run, default 1, 0=infinite");
|
|
ndbout_c(" -i Start by inserting all records");
|
|
ndbout_c(" -d End by deleting all records (only one loop)");
|
|
ndbout_c(" -simple Use simple read to read from database");
|
|
ndbout_c(" -dirty Use dirty read to read from database");
|
|
ndbout_c(" -write Use writeTuple in insert and update");
|
|
ndbout_c(" -n Use standard table names");
|
|
ndbout_c(" -table_create Create tables in db");
|
|
ndbout_c(" -temp Create table(s) without logging");
|
|
ndbout_c(" -no_hint Don't give hint on where to execute transaction coordinator");
|
|
ndbout_c(" -adaptive Use adaptive send algorithm (default)");
|
|
ndbout_c(" -force Force send when communicating");
|
|
ndbout_c(" -non_adaptive Send at a 10 millisecond interval");
|
|
ndbout_c(" -local Number of part, only use keys in one part out of 16");
|
|
}
|