mirror of
https://github.com/MariaDB/server.git
synced 2025-01-19 13:32:33 +01:00
be15e3bc15
into mysql.com:/home/kent/bk/main/mysql-5.1
994 lines
31 KiB
C++
994 lines
31 KiB
C++
/* Copyright (C) 2003 MySQL AB
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
|
|
|
|
|
|
|
#include <ndb_global.h>
|
|
#include "NdbApi.hpp"
|
|
#include <NdbSchemaCon.hpp>
|
|
#include <NdbMain.h>
|
|
#include <md5_hash.hpp>
|
|
|
|
#include <NdbThread.h>
|
|
#include <NdbSleep.h>
|
|
#include <NdbTick.h>
|
|
#include <NdbOut.hpp>
|
|
#include <NdbTimer.hpp>
|
|
#include <NDBT_Error.hpp>
|
|
|
|
#include <NdbTest.hpp>
|
|
|
|
#define MAX_PARTS 4
|
|
#define MAX_SEEK 16
|
|
#define MAXSTRLEN 16
|
|
#define MAXATTR 64
|
|
#define MAXTABLES 64
|
|
#define MAXTHREADS 128
|
|
#define MAXPAR 1024
|
|
#define MAXATTRSIZE 1000
|
|
#define PKSIZE 2
|
|
|
|
enum StartType {
|
|
stIdle,
|
|
stInsert,
|
|
stRead,
|
|
stUpdate,
|
|
stDelete,
|
|
stStop
|
|
} ;
|
|
|
|
extern "C" { static void* threadLoop(void*); }
|
|
static void setAttrNames(void);
|
|
static void setTableNames(void);
|
|
static int readArguments(int argc, const char** argv);
|
|
static int createTables(Ndb*);
|
|
static void defineOperation(NdbConnection* aTransObject, StartType aType,
|
|
Uint32 base, Uint32 aIndex);
|
|
static void execute(StartType aType);
|
|
static bool executeThread(StartType aType, Ndb* aNdbObject, unsigned int);
|
|
static void executeCallback(int result, NdbConnection* NdbObject,
|
|
void* aObject);
|
|
static bool error_handler(const NdbError & err);
|
|
static Uint32 getKey(Uint32, Uint32) ;
|
|
static void input_error();
|
|
|
|
|
|
static int retry_opt = 3 ;
|
|
static int failed = 0 ;
|
|
|
|
ErrorData * flexAsynchErrorData;
|
|
|
|
struct ThreadNdb
|
|
{
|
|
int NoOfOps;
|
|
int ThreadNo;
|
|
};
|
|
|
|
static NdbThread* threadLife[MAXTHREADS];
|
|
static int tNodeId;
|
|
static int ThreadReady[MAXTHREADS];
|
|
static StartType ThreadStart[MAXTHREADS];
|
|
static char tableName[MAXTABLES][MAXSTRLEN+1];
|
|
static char attrName[MAXATTR][MAXSTRLEN+1];
|
|
|
|
// Program Parameters
|
|
static bool tLocal = false;
|
|
static int tLocalPart = 0;
|
|
static int tSendForce = 0;
|
|
static int tNoOfLoops = 1;
|
|
static int tAttributeSize = 1;
|
|
static unsigned int tNoOfThreads = 1;
|
|
static unsigned int tNoOfParallelTrans = 32;
|
|
static unsigned int tNoOfAttributes = 25;
|
|
static unsigned int tNoOfTransactions = 500;
|
|
static unsigned int tNoOfOpsPerTrans = 1;
|
|
static unsigned int tLoadFactor = 80;
|
|
static bool tempTable = false;
|
|
static bool startTransGuess = true;
|
|
|
|
//Program Flags
|
|
static int theTestFlag = 0;
|
|
static int theSimpleFlag = 0;
|
|
static int theDirtyFlag = 0;
|
|
static int theWriteFlag = 0;
|
|
static int theStdTableNameFlag = 0;
|
|
static int theTableCreateFlag = 0;
|
|
|
|
#define START_REAL_TIME
|
|
#define STOP_REAL_TIME
|
|
#define START_TIMER { NdbTimer timer; timer.doStart();
|
|
#define STOP_TIMER timer.doStop();
|
|
#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
|
|
|
|
static void
|
|
resetThreads(){
|
|
|
|
for (int i = 0; i < tNoOfThreads ; i++) {
|
|
ThreadReady[i] = 0;
|
|
ThreadStart[i] = stIdle;
|
|
}//for
|
|
}
|
|
|
|
static void
|
|
waitForThreads(void)
|
|
{
|
|
int cont = 0;
|
|
do {
|
|
cont = 0;
|
|
NdbSleep_MilliSleep(20);
|
|
for (int i = 0; i < tNoOfThreads ; i++) {
|
|
if (ThreadReady[i] == 0) {
|
|
cont = 1;
|
|
}//if
|
|
}//for
|
|
} while (cont == 1);
|
|
}
|
|
|
|
static void
|
|
tellThreads(StartType what)
|
|
{
|
|
for (int i = 0; i < tNoOfThreads ; i++)
|
|
ThreadStart[i] = what;
|
|
}
|
|
|
|
static Ndb_cluster_connection *g_cluster_connection= 0;
|
|
|
|
NDB_COMMAND(flexAsynch, "flexAsynch", "flexAsynch", "flexAsynch", 65535)
|
|
{
|
|
ndb_init();
|
|
ThreadNdb* pThreadData;
|
|
int tLoops=0, i;
|
|
int returnValue = NDBT_OK;
|
|
|
|
flexAsynchErrorData = new ErrorData;
|
|
flexAsynchErrorData->resetErrorCounters();
|
|
|
|
if (readArguments(argc, argv) != 0){
|
|
input_error();
|
|
return NDBT_ProgramExit(NDBT_WRONGARGS);
|
|
}
|
|
|
|
pThreadData = new ThreadNdb[MAXTHREADS];
|
|
|
|
ndbout << endl << "FLEXASYNCH - Starting normal mode" << endl;
|
|
ndbout << "Perform benchmark of insert, update and delete transactions";
|
|
ndbout << endl;
|
|
ndbout << " " << tNoOfThreads << " number of concurrent threads " << endl;
|
|
ndbout << " " << tNoOfParallelTrans;
|
|
ndbout << " number of parallel operation per thread " << endl;
|
|
ndbout << " " << tNoOfTransactions << " transaction(s) per round " << endl;
|
|
ndbout << " " << tNoOfLoops << " iterations " << endl;
|
|
ndbout << " " << "Load Factor is " << tLoadFactor << "%" << endl;
|
|
ndbout << " " << tNoOfAttributes << " attributes per table " << endl;
|
|
ndbout << " " << tAttributeSize;
|
|
ndbout << " is the number of 32 bit words per attribute " << endl;
|
|
if (tempTable == true) {
|
|
ndbout << " Tables are without logging " << endl;
|
|
} else {
|
|
ndbout << " Tables are with logging " << endl;
|
|
}//if
|
|
if (startTransGuess == true) {
|
|
ndbout << " Transactions are executed with hint provided" << endl;
|
|
} else {
|
|
ndbout << " Transactions are executed with round robin scheme" << endl;
|
|
}//if
|
|
if (tSendForce == 0) {
|
|
ndbout << " No force send is used, adaptive algorithm used" << endl;
|
|
} else if (tSendForce == 1) {
|
|
ndbout << " Force send used" << endl;
|
|
} else {
|
|
ndbout << " No force send is used, adaptive algorithm disabled" << endl;
|
|
}//if
|
|
|
|
ndbout << endl;
|
|
|
|
NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
|
|
|
|
/* print Setting */
|
|
flexAsynchErrorData->printSettings(ndbout);
|
|
|
|
setAttrNames();
|
|
setTableNames();
|
|
|
|
Ndb_cluster_connection con;
|
|
if(con.connect(12, 5, 1) != 0)
|
|
{
|
|
return NDBT_ProgramExit(NDBT_FAILED);
|
|
}
|
|
g_cluster_connection= &con;
|
|
|
|
Ndb * pNdb = new Ndb(g_cluster_connection, "TEST_DB");
|
|
pNdb->init();
|
|
tNodeId = pNdb->getNodeId();
|
|
|
|
ndbout << " NdbAPI node with id = " << pNdb->getNodeId() << endl;
|
|
ndbout << endl;
|
|
|
|
ndbout << "Waiting for ndb to become ready..." <<endl;
|
|
if (pNdb->waitUntilReady(10000) != 0){
|
|
ndbout << "NDB is not ready" << endl;
|
|
ndbout << "Benchmark failed!" << endl;
|
|
returnValue = NDBT_FAILED;
|
|
}
|
|
|
|
if(returnValue == NDBT_OK){
|
|
if (createTables(pNdb) != 0){
|
|
returnValue = NDBT_FAILED;
|
|
}
|
|
}
|
|
|
|
if(returnValue == NDBT_OK){
|
|
/****************************************************************
|
|
* Create NDB objects. *
|
|
****************************************************************/
|
|
resetThreads();
|
|
for (i = 0; i < tNoOfThreads ; i++) {
|
|
pThreadData[i].ThreadNo = i
|
|
;
|
|
threadLife[i] = NdbThread_Create(threadLoop,
|
|
(void**)&pThreadData[i],
|
|
32768,
|
|
"flexAsynchThread",
|
|
NDB_THREAD_PRIO_LOW);
|
|
}//for
|
|
ndbout << endl << "All NDB objects and table created" << endl << endl;
|
|
int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
|
|
/****************************************************************
|
|
* Execute program. *
|
|
****************************************************************/
|
|
|
|
for(;;) {
|
|
|
|
int loopCount = tLoops + 1 ;
|
|
ndbout << endl << "Loop # " << loopCount << endl << endl ;
|
|
|
|
/****************************************************************
|
|
* Perform inserts. *
|
|
****************************************************************/
|
|
|
|
failed = 0 ;
|
|
|
|
START_TIMER;
|
|
execute(stInsert);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
|
|
|
|
if (0 < failed) {
|
|
i = retry_opt ;
|
|
int ci = 1 ;
|
|
while (0 < failed && 0 < i){
|
|
ndbout << failed << " of the transactions returned errors!"
|
|
<< endl << endl;
|
|
ndbout << "Attempting to redo the failed transactions now..."
|
|
<< endl ;
|
|
ndbout << "Redo attempt " << ci <<" out of " << retry_opt
|
|
<< endl << endl;
|
|
failed = 0 ;
|
|
START_TIMER;
|
|
execute(stInsert);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
|
|
i-- ;
|
|
ci++;
|
|
}
|
|
if(0 == failed ){
|
|
ndbout << endl <<"Redo attempt succeeded" << endl << endl;
|
|
}else{
|
|
ndbout << endl <<"Redo attempt failed, moving on now..." << endl
|
|
<< endl;
|
|
}//if
|
|
}//if
|
|
|
|
/****************************************************************
|
|
* Perform read. *
|
|
****************************************************************/
|
|
|
|
failed = 0 ;
|
|
|
|
START_TIMER;
|
|
execute(stRead);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
|
|
|
|
if (0 < failed) {
|
|
i = retry_opt ;
|
|
int cr = 1;
|
|
while (0 < failed && 0 < i){
|
|
ndbout << failed << " of the transactions returned errors!"<<endl ;
|
|
ndbout << endl;
|
|
ndbout <<"Attempting to redo the failed transactions now..." << endl;
|
|
ndbout << endl;
|
|
ndbout <<"Redo attempt " << cr <<" out of ";
|
|
ndbout << retry_opt << endl << endl;
|
|
failed = 0 ;
|
|
START_TIMER;
|
|
execute(stRead);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
|
|
i-- ;
|
|
cr++ ;
|
|
}//while
|
|
if(0 == failed ) {
|
|
ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
|
|
}else{
|
|
ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
|
|
}//if
|
|
}//if
|
|
|
|
|
|
/****************************************************************
|
|
* Perform update. *
|
|
****************************************************************/
|
|
|
|
failed = 0 ;
|
|
|
|
START_TIMER;
|
|
execute(stUpdate);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
|
|
|
|
if (0 < failed) {
|
|
i = retry_opt ;
|
|
int cu = 1 ;
|
|
while (0 < failed && 0 < i){
|
|
ndbout << failed << " of the transactions returned errors!"<<endl ;
|
|
ndbout << endl;
|
|
ndbout <<"Attempting to redo the failed transactions now..." << endl;
|
|
ndbout << endl <<"Redo attempt " << cu <<" out of ";
|
|
ndbout << retry_opt << endl << endl;
|
|
failed = 0 ;
|
|
START_TIMER;
|
|
execute(stUpdate);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
|
|
i-- ;
|
|
cu++ ;
|
|
}//while
|
|
if(0 == failed ){
|
|
ndbout << endl <<"Redo attempt succeeded" << endl << endl;
|
|
} else {
|
|
ndbout << endl;
|
|
ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
|
|
}//if
|
|
}//if
|
|
|
|
/****************************************************************
|
|
* Perform read. *
|
|
****************************************************************/
|
|
|
|
failed = 0 ;
|
|
|
|
START_TIMER;
|
|
execute(stRead);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
|
|
|
|
if (0 < failed) {
|
|
i = retry_opt ;
|
|
int cr2 = 1 ;
|
|
while (0 < failed && 0 < i){
|
|
ndbout << failed << " of the transactions returned errors!"<<endl ;
|
|
ndbout << endl;
|
|
ndbout <<"Attempting to redo the failed transactions now..." << endl;
|
|
ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
|
|
ndbout << retry_opt << endl << endl;
|
|
failed = 0 ;
|
|
START_TIMER;
|
|
execute(stRead);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
|
|
i-- ;
|
|
cr2++ ;
|
|
}//while
|
|
if(0 == failed ){
|
|
ndbout << endl <<"Redo attempt succeeded" << endl << endl;
|
|
}else{
|
|
ndbout << endl;
|
|
ndbout << "Redo attempt failed, moving on now..." << endl << endl;
|
|
}//if
|
|
}//if
|
|
|
|
|
|
/****************************************************************
|
|
* Perform delete. *
|
|
****************************************************************/
|
|
|
|
failed = 0 ;
|
|
|
|
START_TIMER;
|
|
execute(stDelete);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
|
|
|
|
if (0 < failed) {
|
|
i = retry_opt ;
|
|
int cd = 1 ;
|
|
while (0 < failed && 0 < i){
|
|
ndbout << failed << " of the transactions returned errors!"<< endl ;
|
|
ndbout << endl;
|
|
ndbout <<"Attempting to redo the failed transactions now:" << endl ;
|
|
ndbout << endl <<"Redo attempt " << cd <<" out of ";
|
|
ndbout << retry_opt << endl << endl;
|
|
failed = 0 ;
|
|
START_TIMER;
|
|
execute(stDelete);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
|
|
i-- ;
|
|
cd++ ;
|
|
}//while
|
|
if(0 == failed ){
|
|
ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
|
|
}else{
|
|
ndbout << endl;
|
|
ndbout << "Redo attempt failed, moving on now..." << endl << endl;
|
|
}//if
|
|
}//if
|
|
|
|
tLoops++;
|
|
ndbout << "--------------------------------------------------" << endl;
|
|
|
|
if(tNoOfLoops != 0){
|
|
if(tNoOfLoops <= tLoops)
|
|
break ;
|
|
}
|
|
}//for
|
|
|
|
execute(stStop);
|
|
void * tmp;
|
|
for(i = 0; i<tNoOfThreads; i++){
|
|
NdbThread_WaitFor(threadLife[i], &tmp);
|
|
NdbThread_Destroy(&threadLife[i]);
|
|
}
|
|
}
|
|
delete [] pThreadData;
|
|
delete pNdb;
|
|
|
|
//printing errorCounters
|
|
flexAsynchErrorData->printErrorCounters(ndbout);
|
|
|
|
return NDBT_ProgramExit(returnValue);
|
|
}//main()
|
|
|
|
|
|
static void execute(StartType aType)
|
|
{
|
|
resetThreads();
|
|
tellThreads(aType);
|
|
waitForThreads();
|
|
}//execute()
|
|
|
|
static void*
|
|
threadLoop(void* ThreadData)
|
|
{
|
|
Ndb* localNdb;
|
|
StartType tType;
|
|
ThreadNdb* tabThread = (ThreadNdb*)ThreadData;
|
|
int threadNo = tabThread->ThreadNo;
|
|
localNdb = new Ndb(g_cluster_connection, "TEST_DB");
|
|
localNdb->init(1024);
|
|
localNdb->waitUntilReady(10000);
|
|
unsigned int threadBase = (threadNo << 16) + tNodeId ;
|
|
|
|
for (;;){
|
|
while (ThreadStart[threadNo] == stIdle) {
|
|
NdbSleep_MilliSleep(10);
|
|
}//while
|
|
|
|
// Check if signal to exit is received
|
|
if (ThreadStart[threadNo] == stStop) {
|
|
break;
|
|
}//if
|
|
|
|
tType = ThreadStart[threadNo];
|
|
ThreadStart[threadNo] = stIdle;
|
|
if(!executeThread(tType, localNdb, threadBase)){
|
|
break;
|
|
}
|
|
ThreadReady[threadNo] = 1;
|
|
}//for
|
|
|
|
delete localNdb;
|
|
ThreadReady[threadNo] = 1;
|
|
|
|
return NULL;
|
|
}//threadLoop()
|
|
|
|
static
|
|
bool
|
|
executeThread(StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
|
|
int i, j, k;
|
|
NdbConnection* tConArray[1024];
|
|
unsigned int tBase;
|
|
unsigned int tBase2;
|
|
|
|
for (i = 0; i < tNoOfTransactions; i++) {
|
|
if (tLocal == false) {
|
|
tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
|
|
} else {
|
|
tBase = i * tNoOfParallelTrans * MAX_SEEK;
|
|
}//if
|
|
START_REAL_TIME;
|
|
for (j = 0; j < tNoOfParallelTrans; j++) {
|
|
if (tLocal == false) {
|
|
tBase2 = tBase + (j * tNoOfOpsPerTrans);
|
|
} else {
|
|
tBase2 = tBase + (j * MAX_SEEK);
|
|
tBase2 = getKey(threadBase, tBase2);
|
|
}//if
|
|
if (startTransGuess == true) {
|
|
Uint64 Tkey64;
|
|
Uint32* Tkey32 = (Uint32*)&Tkey64;
|
|
Tkey32[0] = threadBase;
|
|
Tkey32[1] = tBase2;
|
|
tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
|
|
(const char*)&Tkey64, //Main PKey
|
|
(Uint32)4); //Key Length
|
|
} else {
|
|
tConArray[j] = aNdbObject->startTransaction();
|
|
}//if
|
|
if (tConArray[j] == NULL &&
|
|
!error_handler(aNdbObject->getNdbError()) ){
|
|
ndbout << endl << "Unable to recover! Quiting now" << endl ;
|
|
return false;
|
|
}//if
|
|
|
|
for (k = 0; k < tNoOfOpsPerTrans; k++) {
|
|
//-------------------------------------------------------
|
|
// Define the operation, but do not execute it yet.
|
|
//-------------------------------------------------------
|
|
defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
|
|
}//for
|
|
|
|
tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
|
|
}//for
|
|
STOP_REAL_TIME;
|
|
//-------------------------------------------------------
|
|
// Now we have defined a set of operations, it is now time
|
|
// to execute all of them.
|
|
//-------------------------------------------------------
|
|
int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
|
|
while (Tcomp < tNoOfParallelTrans) {
|
|
int TlocalComp = aNdbObject->pollNdb(3000, 0);
|
|
Tcomp += TlocalComp;
|
|
}//while
|
|
for (j = 0 ; j < tNoOfParallelTrans ; j++) {
|
|
aNdbObject->closeTransaction(tConArray[j]);
|
|
}//for
|
|
}//for
|
|
return true;
|
|
}//executeThread()
|
|
|
|
static
|
|
Uint32
|
|
getKey(Uint32 aBase, Uint32 anIndex) {
|
|
Uint32 Tfound = anIndex;
|
|
Uint64 Tkey64;
|
|
Uint32* Tkey32 = (Uint32*)&Tkey64;
|
|
Tkey32[0] = aBase;
|
|
Uint32 hash;
|
|
for (Uint32 i = anIndex; i < (anIndex + MAX_SEEK); i++) {
|
|
Tkey32[1] = (Uint32)i;
|
|
hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
|
|
hash = (hash >> 6) & (MAX_PARTS - 1);
|
|
if (hash == tLocalPart) {
|
|
Tfound = i;
|
|
break;
|
|
}//if
|
|
}//for
|
|
return Tfound;
|
|
}//getKey()
|
|
|
|
static void
|
|
executeCallback(int result, NdbConnection* NdbObject, void* aObject)
|
|
{
|
|
if (result == -1) {
|
|
|
|
// Add complete error handling here
|
|
|
|
int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
|
|
if (retCode == 1) {
|
|
if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){
|
|
ndbout_c("execute: %s", NdbObject->getNdbError().message);
|
|
ndbout_c("Error code = %d", NdbObject->getNdbError().code);}
|
|
} else if (retCode == 2) {
|
|
ndbout << "4115 should not happen in flexAsynch" << endl;
|
|
} else if (retCode == 3) {
|
|
/* What can we do here? */
|
|
ndbout_c("execute: %s", NdbObject->getNdbError().message);
|
|
}//if(retCode == 3)
|
|
|
|
// ndbout << "Error occured in poll:" << endl;
|
|
// ndbout << NdbObject->getNdbError() << endl;
|
|
failed++ ;
|
|
return;
|
|
}//if
|
|
return;
|
|
}//executeCallback()
|
|
|
|
|
|
|
|
static void
|
|
defineOperation(NdbConnection* localNdbConnection, StartType aType,
|
|
Uint32 threadBase, Uint32 aIndex)
|
|
{
|
|
NdbOperation* localNdbOperation;
|
|
unsigned int loopCountAttributes = tNoOfAttributes;
|
|
unsigned int countAttributes;
|
|
Uint32 attrValue[MAXATTRSIZE];
|
|
|
|
//-------------------------------------------------------
|
|
// Set-up the attribute values for this operation.
|
|
//-------------------------------------------------------
|
|
attrValue[0] = threadBase;
|
|
attrValue[1] = aIndex;
|
|
for (int k = 2; k < loopCountAttributes; k++) {
|
|
attrValue[k] = aIndex;
|
|
}//for
|
|
localNdbOperation = localNdbConnection->getNdbOperation(tableName[0]);
|
|
if (localNdbOperation == NULL) {
|
|
error_handler(localNdbConnection->getNdbError());
|
|
}//if
|
|
switch (aType) {
|
|
case stInsert: { // Insert case
|
|
if (theWriteFlag == 1 && theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyWrite();
|
|
} else if (theWriteFlag == 1) {
|
|
localNdbOperation->writeTuple();
|
|
} else {
|
|
localNdbOperation->insertTuple();
|
|
}//if
|
|
break;
|
|
}//case
|
|
case stRead: { // Read Case
|
|
if (theSimpleFlag == 1) {
|
|
localNdbOperation->simpleRead();
|
|
} else if (theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyRead();
|
|
} else {
|
|
localNdbOperation->readTuple();
|
|
}//if
|
|
break;
|
|
}//case
|
|
case stUpdate: { // Update Case
|
|
if (theWriteFlag == 1 && theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyWrite();
|
|
} else if (theWriteFlag == 1) {
|
|
localNdbOperation->writeTuple();
|
|
} else if (theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyUpdate();
|
|
} else {
|
|
localNdbOperation->updateTuple();
|
|
}//if
|
|
break;
|
|
}//case
|
|
case stDelete: { // Delete Case
|
|
localNdbOperation->deleteTuple();
|
|
break;
|
|
}//case
|
|
default: {
|
|
error_handler(localNdbOperation->getNdbError());
|
|
}//default
|
|
}//switch
|
|
localNdbOperation->equal((Uint32)0,(char*)&attrValue[0]);
|
|
switch (aType) {
|
|
case stInsert: // Insert case
|
|
case stUpdate: // Update Case
|
|
{
|
|
for (countAttributes = 1;
|
|
countAttributes < loopCountAttributes; countAttributes++) {
|
|
localNdbOperation->setValue(countAttributes,
|
|
(char*)&attrValue[0]);
|
|
}//for
|
|
break;
|
|
}//case
|
|
case stRead: { // Read Case
|
|
for (countAttributes = 1;
|
|
countAttributes < loopCountAttributes; countAttributes++) {
|
|
localNdbOperation->getValue(countAttributes,
|
|
(char*)&attrValue[0]);
|
|
}//for
|
|
break;
|
|
}//case
|
|
case stDelete: { // Delete Case
|
|
break;
|
|
}//case
|
|
default: {
|
|
//goto error_handler; < epaulsa
|
|
error_handler(localNdbOperation->getNdbError());
|
|
}//default
|
|
}//switch
|
|
return;
|
|
}//defineOperation()
|
|
|
|
static void setAttrNames()
|
|
{
|
|
int i;
|
|
|
|
for (i = 0; i < MAXATTR ; i++){
|
|
BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i);
|
|
}
|
|
}
|
|
|
|
|
|
static void setTableNames()
|
|
{
|
|
// Note! Uses only uppercase letters in table name's
|
|
// so that we can look at the tables wits SQL
|
|
int i;
|
|
for (i = 0; i < MAXTABLES ; i++){
|
|
if (theStdTableNameFlag==0){
|
|
BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i,
|
|
(int)(NdbTick_CurrentMillisecond()/1000));
|
|
} else {
|
|
BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
|
|
}
|
|
}
|
|
}
|
|
|
|
static
|
|
int
|
|
createTables(Ndb* pMyNdb){
|
|
|
|
NdbSchemaCon *MySchemaTransaction;
|
|
NdbSchemaOp *MySchemaOp;
|
|
int check;
|
|
|
|
if (theTableCreateFlag == 0) {
|
|
for(int i=0; i < 1 ;i++) {
|
|
ndbout << "Creating " << tableName[i] << "..." << endl;
|
|
MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
|
|
|
|
if(MySchemaTransaction == NULL &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
|
|
MySchemaOp = MySchemaTransaction->getNdbSchemaOp();
|
|
if(MySchemaOp == NULL &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
|
|
|
|
check = MySchemaOp->createTable( tableName[i]
|
|
,8 // Table Size
|
|
,TupleKey // Key Type
|
|
,40 // Nr of Pages
|
|
,All
|
|
,6
|
|
,(tLoadFactor - 5)
|
|
,(tLoadFactor)
|
|
,1
|
|
,!tempTable
|
|
);
|
|
|
|
if (check == -1 &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
|
|
check = MySchemaOp->createAttribute( (char*)attrName[0],
|
|
TupleKey,
|
|
32,
|
|
PKSIZE,
|
|
UnSigned,
|
|
MMBased,
|
|
NotNullAttribute );
|
|
|
|
if (check == -1 &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
for (int j = 1; j < tNoOfAttributes ; j++){
|
|
check = MySchemaOp->createAttribute( (char*)attrName[j],
|
|
NoKey,
|
|
32,
|
|
tAttributeSize,
|
|
UnSigned,
|
|
MMBased,
|
|
NotNullAttribute );
|
|
if (check == -1 &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
}
|
|
|
|
if (MySchemaTransaction->execute() == -1 &&
|
|
(!error_handler(MySchemaTransaction->getNdbError())))
|
|
return -1;
|
|
|
|
NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static
|
|
bool error_handler(const NdbError & err){
|
|
ndbout << err << endl ;
|
|
switch(err.classification){
|
|
case NdbError::TemporaryResourceError:
|
|
case NdbError::OverloadError:
|
|
case NdbError::SchemaError:
|
|
ndbout << endl << "Attempting to recover and continue now..." << endl ;
|
|
return true;
|
|
}
|
|
return false ; // return false to abort
|
|
}
|
|
static
|
|
bool error_handler(const char* error_string, int error_int) {
|
|
ndbout << error_string << endl ;
|
|
if ((4008 == error_int) ||
|
|
(721 == error_int) ||
|
|
(266 == error_int)){
|
|
ndbout << endl << "Attempting to recover and continue now..." << endl ;
|
|
return true ; // return true to retry
|
|
}
|
|
return false ; // return false to abort
|
|
}
|
|
|
|
static
|
|
int
|
|
readArguments(int argc, const char** argv){
|
|
|
|
int i = 1;
|
|
while (argc > 1){
|
|
if (strcmp(argv[i], "-t") == 0){
|
|
tNoOfThreads = atoi(argv[i+1]);
|
|
if ((tNoOfThreads < 1) || (tNoOfThreads > MAXTHREADS)){
|
|
ndbout_c("Invalid no of threads");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-p") == 0){
|
|
tNoOfParallelTrans = atoi(argv[i+1]);
|
|
if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
|
|
ndbout_c("Invalid no of parallell transactions");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-load_factor") == 0){
|
|
tLoadFactor = atoi(argv[i+1]);
|
|
if ((tLoadFactor < 40) || (tLoadFactor > 99)){
|
|
ndbout_c("Invalid load factor");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-c") == 0) {
|
|
tNoOfOpsPerTrans = atoi(argv[i+1]);
|
|
if (tNoOfOpsPerTrans < 1){
|
|
ndbout_c("Invalid no of operations per transaction");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-o") == 0) {
|
|
tNoOfTransactions = atoi(argv[i+1]);
|
|
if (tNoOfTransactions < 1){
|
|
ndbout_c("Invalid no of transactions");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-a") == 0){
|
|
tNoOfAttributes = atoi(argv[i+1]);
|
|
if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR)){
|
|
ndbout_c("Invalid no of attributes");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-n") == 0){
|
|
theStdTableNameFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-l") == 0){
|
|
tNoOfLoops = atoi(argv[i+1]);
|
|
if ((tNoOfLoops < 0) || (tNoOfLoops > 100000)){
|
|
ndbout_c("Invalid no of loops");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-s") == 0){
|
|
tAttributeSize = atoi(argv[i+1]);
|
|
if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE)){
|
|
ndbout_c("Invalid attributes size");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-local") == 0){
|
|
tLocalPart = atoi(argv[i+1]);
|
|
tLocal = true;
|
|
startTransGuess = true;
|
|
if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
|
|
ndbout_c("Invalid local part");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-simple") == 0){
|
|
theSimpleFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-adaptive") == 0){
|
|
tSendForce = 0;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-force") == 0){
|
|
tSendForce = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-non_adaptive") == 0){
|
|
tSendForce = 2;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-write") == 0){
|
|
theWriteFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-dirty") == 0){
|
|
theDirtyFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-test") == 0){
|
|
theTestFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-no_table_create") == 0){
|
|
theTableCreateFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-temp") == 0){
|
|
tempTable = true;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-no_hint") == 0){
|
|
startTransGuess = false;
|
|
argc++;
|
|
i--;
|
|
} else {
|
|
return -1;
|
|
}
|
|
|
|
argc -= 2;
|
|
i = i + 2;
|
|
}//while
|
|
if (tLocal == true) {
|
|
if (tNoOfOpsPerTrans != 1) {
|
|
ndbout_c("Not valid to have more than one op per trans with local");
|
|
}//if
|
|
if (startTransGuess == false) {
|
|
ndbout_c("Not valid to use no_hint with local");
|
|
}//if
|
|
}//if
|
|
return 0;
|
|
}
|
|
|
|
static
|
|
void
|
|
input_error(){
|
|
|
|
ndbout_c("FLEXASYNCH");
|
|
ndbout_c(" Perform benchmark of insert, update and delete transactions");
|
|
ndbout_c("");
|
|
ndbout_c("Arguments:");
|
|
ndbout_c(" -t Number of threads to start, default 1");
|
|
ndbout_c(" -p Number of parallel transactions per thread, default 32");
|
|
ndbout_c(" -o Number of transactions per loop, default 500");
|
|
ndbout_c(" -l Number of loops to run, default 1, 0=infinite");
|
|
ndbout_c(" -load_factor Number Load factor in index in percent (40 -> 99)");
|
|
ndbout_c(" -a Number of attributes, default 25");
|
|
ndbout_c(" -c Number of operations per transaction");
|
|
ndbout_c(" -s Size of each attribute, default 1 ");
|
|
ndbout_c(" (PK is always of size 1, independent of this value)");
|
|
ndbout_c(" -simple Use simple read to read from database");
|
|
ndbout_c(" -dirty Use dirty read to read from database");
|
|
ndbout_c(" -write Use writeTuple in insert and update");
|
|
ndbout_c(" -n Use standard table names");
|
|
ndbout_c(" -no_table_create Don't create tables in db");
|
|
ndbout_c(" -temp Create table(s) without logging");
|
|
ndbout_c(" -no_hint Don't give hint on where to execute transaction coordinator");
|
|
ndbout_c(" -adaptive Use adaptive send algorithm (default)");
|
|
ndbout_c(" -force Force send when communicating");
|
|
ndbout_c(" -non_adaptive Send at a 10 millisecond interval");
|
|
ndbout_c(" -local Number of part, only use keys in one part out of 16");
|
|
}
|
|
|
|
|
|
|