mariadb/ndb/test/ndbapi/flexAsynch.cpp
unknown f5b95d0be1 Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2
mysqld hasn't been built on AIX with ndb-everything in quite a while.
this allowed a variety of changes to be added that broke the AIX build
for both the GNU and IBM compilers (but the IBM suite in particular).
Changeset lets build to complete on AIX 5.2 for users of the GNU and
the IBM suite both. Tudo bem?


config/ac-macros/large_file.m4:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (2)
  
  we no longer declare anything large-file on AIX.  the
  GNU C++ compiler declares _LARGE_FILE_API all of its
  own, and either way we're now pulling in <standards.h>
  when on AIX, which defines _LARGE_FILE_API (if not
  already defined).
configure.in:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (1)
  
  build NDB binaries as static on AIX. because that actually
  *works*.
  
  when building dynamic, with the IBM compiler (xlC_r), and
  the build breaks on AIX due to missing symbols
  (__vec__delete2 et al.), try adding -lhC to the Makefile.
include/mysql.h:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (2)
  
  we're now pulling in <standards.h> when on AIX, which
  defines _LARGE_FILE_API (if not already defined).
ndb/src/common/util/File.cpp:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (3)
  
  do not de-scope the standards, for they may be funky macros
ndb/src/mgmclient/Makefile.am:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (4)
  
  make IBM C++ compiler happy on AIX
ndb/src/mgmsrv/Makefile.am:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (5)
  
  GNU compiler has no sense of humour about this
ndb/test/ndbapi/benchronja.cpp:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (6)
  
  MAXTHREADS collides with a #define from <sys/thread.h> on AIX
  (IBM compiler).  Call it NDB_MAXTHREADS instead.  Also explicitly
  #undef it here lest someone use it by habit and get really funny
  results.  (K&R says we may #undef non-existent symbols.)
ndb/test/ndbapi/flexAsynch.cpp:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (6)
  
  MAXTHREADS collides with a #define from <sys/thread.h> on AIX
  (IBM compiler).  Call it NDB_MAXTHREADS instead.  Also explicitly
  #undef it here lest someone use it by habit and get really funny
  results.  (K&R says we may #undef non-existent symbols.)
ndb/test/ndbapi/flexHammer.cpp:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (6)
  
  MAXTHREADS collides with a #define from <sys/thread.h> on AIX
  (IBM compiler).  Call it NDB_MAXTHREADS instead.  Also explicitly
  #undef it here lest someone use it by habit and get really funny
  results.  (K&R says we may #undef non-existent symbols.)
ndb/test/ndbapi/flexScan.cpp:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (6)
  
  MAXTHREADS collides with a #define from <sys/thread.h> on AIX
  (IBM compiler).  Call it NDB_MAXTHREADS instead.  Also explicitly
  #undef it here lest someone use it by habit and get really funny
  results.  (K&R says we may #undef non-existent symbols.)
ndb/test/ndbapi/flexTT.cpp:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (6)
  
  MAXTHREADS collides with a #define from <sys/thread.h> on AIX
  (IBM compiler).  Call it NDB_MAXTHREADS instead.  Also explicitly
  #undef it here lest someone use it by habit and get really funny
  results.  (K&R says we may #undef non-existent symbols.)
ndb/test/ndbapi/flexTimedAsynch.cpp:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (6)
  
  MAXTHREADS collides with a #define from <sys/thread.h> on AIX
  (IBM compiler).  Call it NDB_MAXTHREADS instead.  Also explicitly
  #undef it here lest someone use it by habit and get really funny
  results.  (K&R says we may #undef non-existent symbols.)
ndb/test/ndbapi/initronja.cpp:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (6)
  
  MAXTHREADS collides with a #define from <sys/thread.h> on AIX
  (IBM compiler).  Call it NDB_MAXTHREADS instead.  Also explicitly
  #undef it here lest someone use it by habit and get really funny
  results.  (K&R says we may #undef non-existent symbols.)
ndb/test/ndbapi/testOperations.cpp:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (7)
  
  IBM C compiler on AIX is not happy with the re-def.
ndb/test/ndbapi/testScanFilter.cpp:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (8)
  
  The IBM C++ compiler on AIX doesn't like initializing from pow().
  This works, but breaks a VAL (bool res_cal[TUPLE_NUM] ...) later on.
ndb/test/odbc/SQL99_test/SQL99_test.cpp:
  Bug #10776: Failure to compile ndb ReadNodesConf.cpp on AIX 5.2 (6)
  
  MAXTHREADS collides with a #define from <sys/thread.h> on AIX
  (IBM compiler).  Call it NDB_MAXTHREADS instead.  Also explicitly
  #undef it here lest someone use it by habit and get really funny
  results.  (K&R says we may #undef non-existent symbols.)
2007-08-01 04:56:58 +02:00

1001 lines
31 KiB
C++

/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include "NdbApi.hpp"
#include <NdbSchemaCon.hpp>
#include <NdbMain.h>
#include <md5_hash.hpp>
#include <NdbThread.h>
#include <NdbSleep.h>
#include <NdbTick.h>
#include <NdbOut.hpp>
#include <NdbTimer.hpp>
#include <NDBT_Error.hpp>
#include <NdbTest.hpp>
#define MAX_PARTS 4
#define MAX_SEEK 16
#define MAXSTRLEN 16
#define MAXATTR 64
#define MAXTABLES 64
#define NDB_MAXTHREADS 128
/*
NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
#define from <sys/thread.h> on AIX (IBM compiler). We explicitly
#undef it here lest someone use it by habit and get really funny
results. K&R says we may #undef non-existent symbols, so let's go.
*/
#undef MAXTHREADS
#define MAXPAR 1024
#define MAXATTRSIZE 1000
#define PKSIZE 2
enum StartType {
stIdle,
stInsert,
stRead,
stUpdate,
stDelete,
stStop
} ;
extern "C" { static void* threadLoop(void*); }
static void setAttrNames(void);
static void setTableNames(void);
static int readArguments(int argc, const char** argv);
static int createTables(Ndb*);
static void defineOperation(NdbConnection* aTransObject, StartType aType,
Uint32 base, Uint32 aIndex);
static void execute(StartType aType);
static bool executeThread(StartType aType, Ndb* aNdbObject, unsigned int);
static void executeCallback(int result, NdbConnection* NdbObject,
void* aObject);
static bool error_handler(const NdbError & err);
static Uint32 getKey(Uint32, Uint32) ;
static void input_error();
static int retry_opt = 3 ;
static int failed = 0 ;
ErrorData * flexAsynchErrorData;
struct ThreadNdb
{
int NoOfOps;
int ThreadNo;
};
static NdbThread* threadLife[NDB_MAXTHREADS];
static int tNodeId;
static int ThreadReady[NDB_MAXTHREADS];
static StartType ThreadStart[NDB_MAXTHREADS];
static char tableName[MAXTABLES][MAXSTRLEN+1];
static char attrName[MAXATTR][MAXSTRLEN+1];
// Program Parameters
static bool tLocal = false;
static int tLocalPart = 0;
static int tSendForce = 0;
static int tNoOfLoops = 1;
static int tAttributeSize = 1;
static unsigned int tNoOfThreads = 1;
static unsigned int tNoOfParallelTrans = 32;
static unsigned int tNoOfAttributes = 25;
static unsigned int tNoOfTransactions = 500;
static unsigned int tNoOfOpsPerTrans = 1;
static unsigned int tLoadFactor = 80;
static bool tempTable = false;
static bool startTransGuess = true;
//Program Flags
static int theTestFlag = 0;
static int theSimpleFlag = 0;
static int theDirtyFlag = 0;
static int theWriteFlag = 0;
static int theStdTableNameFlag = 0;
static int theTableCreateFlag = 0;
#define START_REAL_TIME
#define STOP_REAL_TIME
#define START_TIMER { NdbTimer timer; timer.doStart();
#define STOP_TIMER timer.doStop();
#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
static void
resetThreads(){
for (int i = 0; i < tNoOfThreads ; i++) {
ThreadReady[i] = 0;
ThreadStart[i] = stIdle;
}//for
}
static void
waitForThreads(void)
{
int cont = 0;
do {
cont = 0;
NdbSleep_MilliSleep(20);
for (int i = 0; i < tNoOfThreads ; i++) {
if (ThreadReady[i] == 0) {
cont = 1;
}//if
}//for
} while (cont == 1);
}
static void
tellThreads(StartType what)
{
for (int i = 0; i < tNoOfThreads ; i++)
ThreadStart[i] = what;
}
static Ndb_cluster_connection *g_cluster_connection= 0;
NDB_COMMAND(flexAsynch, "flexAsynch", "flexAsynch", "flexAsynch", 65535)
{
ndb_init();
ThreadNdb* pThreadData;
int tLoops=0, i;
int returnValue = NDBT_OK;
flexAsynchErrorData = new ErrorData;
flexAsynchErrorData->resetErrorCounters();
if (readArguments(argc, argv) != 0){
input_error();
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
pThreadData = new ThreadNdb[NDB_MAXTHREADS];
ndbout << endl << "FLEXASYNCH - Starting normal mode" << endl;
ndbout << "Perform benchmark of insert, update and delete transactions";
ndbout << endl;
ndbout << " " << tNoOfThreads << " number of concurrent threads " << endl;
ndbout << " " << tNoOfParallelTrans;
ndbout << " number of parallel operation per thread " << endl;
ndbout << " " << tNoOfTransactions << " transaction(s) per round " << endl;
ndbout << " " << tNoOfLoops << " iterations " << endl;
ndbout << " " << "Load Factor is " << tLoadFactor << "%" << endl;
ndbout << " " << tNoOfAttributes << " attributes per table " << endl;
ndbout << " " << tAttributeSize;
ndbout << " is the number of 32 bit words per attribute " << endl;
if (tempTable == true) {
ndbout << " Tables are without logging " << endl;
} else {
ndbout << " Tables are with logging " << endl;
}//if
if (startTransGuess == true) {
ndbout << " Transactions are executed with hint provided" << endl;
} else {
ndbout << " Transactions are executed with round robin scheme" << endl;
}//if
if (tSendForce == 0) {
ndbout << " No force send is used, adaptive algorithm used" << endl;
} else if (tSendForce == 1) {
ndbout << " Force send used" << endl;
} else {
ndbout << " No force send is used, adaptive algorithm disabled" << endl;
}//if
ndbout << endl;
NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
/* print Setting */
flexAsynchErrorData->printSettings(ndbout);
setAttrNames();
setTableNames();
Ndb_cluster_connection con;
if(con.connect(12, 5, 1) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
}
g_cluster_connection= &con;
Ndb * pNdb = new Ndb(g_cluster_connection, "TEST_DB");
pNdb->init();
tNodeId = pNdb->getNodeId();
ndbout << " NdbAPI node with id = " << pNdb->getNodeId() << endl;
ndbout << endl;
ndbout << "Waiting for ndb to become ready..." <<endl;
if (pNdb->waitUntilReady(10000) != 0){
ndbout << "NDB is not ready" << endl;
ndbout << "Benchmark failed!" << endl;
returnValue = NDBT_FAILED;
}
if(returnValue == NDBT_OK){
if (createTables(pNdb) != 0){
returnValue = NDBT_FAILED;
}
}
if(returnValue == NDBT_OK){
/****************************************************************
* Create NDB objects. *
****************************************************************/
resetThreads();
for (i = 0; i < tNoOfThreads ; i++) {
pThreadData[i].ThreadNo = i
;
threadLife[i] = NdbThread_Create(threadLoop,
(void**)&pThreadData[i],
32768,
"flexAsynchThread",
NDB_THREAD_PRIO_LOW);
}//for
ndbout << endl << "All NDB objects and table created" << endl << endl;
int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
/****************************************************************
* Execute program. *
****************************************************************/
for(;;) {
int loopCount = tLoops + 1 ;
ndbout << endl << "Loop # " << loopCount << endl << endl ;
/****************************************************************
* Perform inserts. *
****************************************************************/
failed = 0 ;
START_TIMER;
execute(stInsert);
STOP_TIMER;
PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
if (0 < failed) {
i = retry_opt ;
int ci = 1 ;
while (0 < failed && 0 < i){
ndbout << failed << " of the transactions returned errors!"
<< endl << endl;
ndbout << "Attempting to redo the failed transactions now..."
<< endl ;
ndbout << "Redo attempt " << ci <<" out of " << retry_opt
<< endl << endl;
failed = 0 ;
START_TIMER;
execute(stInsert);
STOP_TIMER;
PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
i-- ;
ci++;
}
if(0 == failed ){
ndbout << endl <<"Redo attempt succeeded" << endl << endl;
}else{
ndbout << endl <<"Redo attempt failed, moving on now..." << endl
<< endl;
}//if
}//if
/****************************************************************
* Perform read. *
****************************************************************/
failed = 0 ;
START_TIMER;
execute(stRead);
STOP_TIMER;
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
if (0 < failed) {
i = retry_opt ;
int cr = 1;
while (0 < failed && 0 < i){
ndbout << failed << " of the transactions returned errors!"<<endl ;
ndbout << endl;
ndbout <<"Attempting to redo the failed transactions now..." << endl;
ndbout << endl;
ndbout <<"Redo attempt " << cr <<" out of ";
ndbout << retry_opt << endl << endl;
failed = 0 ;
START_TIMER;
execute(stRead);
STOP_TIMER;
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
i-- ;
cr++ ;
}//while
if(0 == failed ) {
ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
}else{
ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
}//if
}//if
/****************************************************************
* Perform update. *
****************************************************************/
failed = 0 ;
START_TIMER;
execute(stUpdate);
STOP_TIMER;
PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
if (0 < failed) {
i = retry_opt ;
int cu = 1 ;
while (0 < failed && 0 < i){
ndbout << failed << " of the transactions returned errors!"<<endl ;
ndbout << endl;
ndbout <<"Attempting to redo the failed transactions now..." << endl;
ndbout << endl <<"Redo attempt " << cu <<" out of ";
ndbout << retry_opt << endl << endl;
failed = 0 ;
START_TIMER;
execute(stUpdate);
STOP_TIMER;
PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
i-- ;
cu++ ;
}//while
if(0 == failed ){
ndbout << endl <<"Redo attempt succeeded" << endl << endl;
} else {
ndbout << endl;
ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
}//if
}//if
/****************************************************************
* Perform read. *
****************************************************************/
failed = 0 ;
START_TIMER;
execute(stRead);
STOP_TIMER;
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
if (0 < failed) {
i = retry_opt ;
int cr2 = 1 ;
while (0 < failed && 0 < i){
ndbout << failed << " of the transactions returned errors!"<<endl ;
ndbout << endl;
ndbout <<"Attempting to redo the failed transactions now..." << endl;
ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
ndbout << retry_opt << endl << endl;
failed = 0 ;
START_TIMER;
execute(stRead);
STOP_TIMER;
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
i-- ;
cr2++ ;
}//while
if(0 == failed ){
ndbout << endl <<"Redo attempt succeeded" << endl << endl;
}else{
ndbout << endl;
ndbout << "Redo attempt failed, moving on now..." << endl << endl;
}//if
}//if
/****************************************************************
* Perform delete. *
****************************************************************/
failed = 0 ;
START_TIMER;
execute(stDelete);
STOP_TIMER;
PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
if (0 < failed) {
i = retry_opt ;
int cd = 1 ;
while (0 < failed && 0 < i){
ndbout << failed << " of the transactions returned errors!"<< endl ;
ndbout << endl;
ndbout <<"Attempting to redo the failed transactions now:" << endl ;
ndbout << endl <<"Redo attempt " << cd <<" out of ";
ndbout << retry_opt << endl << endl;
failed = 0 ;
START_TIMER;
execute(stDelete);
STOP_TIMER;
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
i-- ;
cd++ ;
}//while
if(0 == failed ){
ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
}else{
ndbout << endl;
ndbout << "Redo attempt failed, moving on now..." << endl << endl;
}//if
}//if
tLoops++;
ndbout << "--------------------------------------------------" << endl;
if(tNoOfLoops != 0){
if(tNoOfLoops <= tLoops)
break ;
}
}//for
execute(stStop);
void * tmp;
for(i = 0; i<tNoOfThreads; i++){
NdbThread_WaitFor(threadLife[i], &tmp);
NdbThread_Destroy(&threadLife[i]);
}
}
delete [] pThreadData;
delete pNdb;
//printing errorCounters
flexAsynchErrorData->printErrorCounters(ndbout);
return NDBT_ProgramExit(returnValue);
}//main()
static void execute(StartType aType)
{
resetThreads();
tellThreads(aType);
waitForThreads();
}//execute()
static void*
threadLoop(void* ThreadData)
{
Ndb* localNdb;
StartType tType;
ThreadNdb* tabThread = (ThreadNdb*)ThreadData;
int threadNo = tabThread->ThreadNo;
localNdb = new Ndb(g_cluster_connection, "TEST_DB");
localNdb->init(1024);
localNdb->waitUntilReady(10000);
unsigned int threadBase = (threadNo << 16) + tNodeId ;
for (;;){
while (ThreadStart[threadNo] == stIdle) {
NdbSleep_MilliSleep(10);
}//while
// Check if signal to exit is received
if (ThreadStart[threadNo] == stStop) {
break;
}//if
tType = ThreadStart[threadNo];
ThreadStart[threadNo] = stIdle;
if(!executeThread(tType, localNdb, threadBase)){
break;
}
ThreadReady[threadNo] = 1;
}//for
delete localNdb;
ThreadReady[threadNo] = 1;
return NULL;
}//threadLoop()
static
bool
executeThread(StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
int i, j, k;
NdbConnection* tConArray[1024];
unsigned int tBase;
unsigned int tBase2;
for (i = 0; i < tNoOfTransactions; i++) {
if (tLocal == false) {
tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
} else {
tBase = i * tNoOfParallelTrans * MAX_SEEK;
}//if
START_REAL_TIME;
for (j = 0; j < tNoOfParallelTrans; j++) {
if (tLocal == false) {
tBase2 = tBase + (j * tNoOfOpsPerTrans);
} else {
tBase2 = tBase + (j * MAX_SEEK);
tBase2 = getKey(threadBase, tBase2);
}//if
if (startTransGuess == true) {
Uint64 Tkey64;
Uint32* Tkey32 = (Uint32*)&Tkey64;
Tkey32[0] = threadBase;
Tkey32[1] = tBase2;
tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
(const char*)&Tkey64, //Main PKey
(Uint32)4); //Key Length
} else {
tConArray[j] = aNdbObject->startTransaction();
}//if
if (tConArray[j] == NULL &&
!error_handler(aNdbObject->getNdbError()) ){
ndbout << endl << "Unable to recover! Quiting now" << endl ;
return false;
}//if
for (k = 0; k < tNoOfOpsPerTrans; k++) {
//-------------------------------------------------------
// Define the operation, but do not execute it yet.
//-------------------------------------------------------
defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
}//for
tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
}//for
STOP_REAL_TIME;
//-------------------------------------------------------
// Now we have defined a set of operations, it is now time
// to execute all of them.
//-------------------------------------------------------
int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
while (Tcomp < tNoOfParallelTrans) {
int TlocalComp = aNdbObject->pollNdb(3000, 0);
Tcomp += TlocalComp;
}//while
for (j = 0 ; j < tNoOfParallelTrans ; j++) {
aNdbObject->closeTransaction(tConArray[j]);
}//for
}//for
return true;
}//executeThread()
static
Uint32
getKey(Uint32 aBase, Uint32 anIndex) {
Uint32 Tfound = anIndex;
Uint64 Tkey64;
Uint32* Tkey32 = (Uint32*)&Tkey64;
Tkey32[0] = aBase;
Uint32 hash;
for (Uint32 i = anIndex; i < (anIndex + MAX_SEEK); i++) {
Tkey32[1] = (Uint32)i;
hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
hash = (hash >> 6) & (MAX_PARTS - 1);
if (hash == tLocalPart) {
Tfound = i;
break;
}//if
}//for
return Tfound;
}//getKey()
static void
executeCallback(int result, NdbConnection* NdbObject, void* aObject)
{
if (result == -1) {
// Add complete error handling here
int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
if (retCode == 1) {
if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){
ndbout_c("execute: %s", NdbObject->getNdbError().message);
ndbout_c("Error code = %d", NdbObject->getNdbError().code);}
} else if (retCode == 2) {
ndbout << "4115 should not happen in flexAsynch" << endl;
} else if (retCode == 3) {
/* What can we do here? */
ndbout_c("execute: %s", NdbObject->getNdbError().message);
}//if(retCode == 3)
// ndbout << "Error occured in poll:" << endl;
// ndbout << NdbObject->getNdbError() << endl;
failed++ ;
return;
}//if
return;
}//executeCallback()
static void
defineOperation(NdbConnection* localNdbConnection, StartType aType,
Uint32 threadBase, Uint32 aIndex)
{
NdbOperation* localNdbOperation;
unsigned int loopCountAttributes = tNoOfAttributes;
unsigned int countAttributes;
Uint32 attrValue[MAXATTRSIZE];
//-------------------------------------------------------
// Set-up the attribute values for this operation.
//-------------------------------------------------------
attrValue[0] = threadBase;
attrValue[1] = aIndex;
for (int k = 2; k < loopCountAttributes; k++) {
attrValue[k] = aIndex;
}//for
localNdbOperation = localNdbConnection->getNdbOperation(tableName[0]);
if (localNdbOperation == NULL) {
error_handler(localNdbConnection->getNdbError());
}//if
switch (aType) {
case stInsert: { // Insert case
if (theWriteFlag == 1 && theDirtyFlag == 1) {
localNdbOperation->dirtyWrite();
} else if (theWriteFlag == 1) {
localNdbOperation->writeTuple();
} else {
localNdbOperation->insertTuple();
}//if
break;
}//case
case stRead: { // Read Case
if (theSimpleFlag == 1) {
localNdbOperation->simpleRead();
} else if (theDirtyFlag == 1) {
localNdbOperation->dirtyRead();
} else {
localNdbOperation->readTuple();
}//if
break;
}//case
case stUpdate: { // Update Case
if (theWriteFlag == 1 && theDirtyFlag == 1) {
localNdbOperation->dirtyWrite();
} else if (theWriteFlag == 1) {
localNdbOperation->writeTuple();
} else if (theDirtyFlag == 1) {
localNdbOperation->dirtyUpdate();
} else {
localNdbOperation->updateTuple();
}//if
break;
}//case
case stDelete: { // Delete Case
localNdbOperation->deleteTuple();
break;
}//case
default: {
error_handler(localNdbOperation->getNdbError());
}//default
}//switch
localNdbOperation->equal((Uint32)0,(char*)&attrValue[0]);
switch (aType) {
case stInsert: // Insert case
case stUpdate: // Update Case
{
for (countAttributes = 1;
countAttributes < loopCountAttributes; countAttributes++) {
localNdbOperation->setValue(countAttributes,
(char*)&attrValue[0]);
}//for
break;
}//case
case stRead: { // Read Case
for (countAttributes = 1;
countAttributes < loopCountAttributes; countAttributes++) {
localNdbOperation->getValue(countAttributes,
(char*)&attrValue[0]);
}//for
break;
}//case
case stDelete: { // Delete Case
break;
}//case
default: {
//goto error_handler; < epaulsa
error_handler(localNdbOperation->getNdbError());
}//default
}//switch
return;
}//defineOperation()
static void setAttrNames()
{
int i;
for (i = 0; i < MAXATTR ; i++){
BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i);
}
}
static void setTableNames()
{
// Note! Uses only uppercase letters in table name's
// so that we can look at the tables wits SQL
int i;
for (i = 0; i < MAXTABLES ; i++){
if (theStdTableNameFlag==0){
BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i,
(int)(NdbTick_CurrentMillisecond()/1000));
} else {
BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
}
}
}
static
int
createTables(Ndb* pMyNdb){
NdbSchemaCon *MySchemaTransaction;
NdbSchemaOp *MySchemaOp;
int check;
if (theTableCreateFlag == 0) {
for(int i=0; i < 1 ;i++) {
ndbout << "Creating " << tableName[i] << "..." << endl;
MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
if(MySchemaTransaction == NULL &&
(!error_handler(MySchemaTransaction->getNdbError())))
return -1;
MySchemaOp = MySchemaTransaction->getNdbSchemaOp();
if(MySchemaOp == NULL &&
(!error_handler(MySchemaTransaction->getNdbError())))
return -1;
check = MySchemaOp->createTable( tableName[i]
,8 // Table Size
,TupleKey // Key Type
,40 // Nr of Pages
,All
,6
,(tLoadFactor - 5)
,(tLoadFactor)
,1
,!tempTable
);
if (check == -1 &&
(!error_handler(MySchemaTransaction->getNdbError())))
return -1;
check = MySchemaOp->createAttribute( (char*)attrName[0],
TupleKey,
32,
PKSIZE,
UnSigned,
MMBased,
NotNullAttribute );
if (check == -1 &&
(!error_handler(MySchemaTransaction->getNdbError())))
return -1;
for (int j = 1; j < tNoOfAttributes ; j++){
check = MySchemaOp->createAttribute( (char*)attrName[j],
NoKey,
32,
tAttributeSize,
UnSigned,
MMBased,
NotNullAttribute );
if (check == -1 &&
(!error_handler(MySchemaTransaction->getNdbError())))
return -1;
}
if (MySchemaTransaction->execute() == -1 &&
(!error_handler(MySchemaTransaction->getNdbError())))
return -1;
NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
}
}
return 0;
}
static
bool error_handler(const NdbError & err){
ndbout << err << endl ;
switch(err.classification){
case NdbError::TemporaryResourceError:
case NdbError::OverloadError:
case NdbError::SchemaError:
ndbout << endl << "Attempting to recover and continue now..." << endl ;
return true;
}
return false ; // return false to abort
}
static
bool error_handler(const char* error_string, int error_int) {
ndbout << error_string << endl ;
if ((4008 == error_int) ||
(721 == error_int) ||
(266 == error_int)){
ndbout << endl << "Attempting to recover and continue now..." << endl ;
return true ; // return true to retry
}
return false ; // return false to abort
}
static
int
readArguments(int argc, const char** argv){
int i = 1;
while (argc > 1){
if (strcmp(argv[i], "-t") == 0){
tNoOfThreads = atoi(argv[i+1]);
if ((tNoOfThreads < 1) || (tNoOfThreads > NDB_MAXTHREADS)){
ndbout_c("Invalid no of threads");
return -1;
}
} else if (strcmp(argv[i], "-p") == 0){
tNoOfParallelTrans = atoi(argv[i+1]);
if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
ndbout_c("Invalid no of parallell transactions");
return -1;
}
} else if (strcmp(argv[i], "-load_factor") == 0){
tLoadFactor = atoi(argv[i+1]);
if ((tLoadFactor < 40) || (tLoadFactor > 99)){
ndbout_c("Invalid load factor");
return -1;
}
} else if (strcmp(argv[i], "-c") == 0) {
tNoOfOpsPerTrans = atoi(argv[i+1]);
if (tNoOfOpsPerTrans < 1){
ndbout_c("Invalid no of operations per transaction");
return -1;
}
} else if (strcmp(argv[i], "-o") == 0) {
tNoOfTransactions = atoi(argv[i+1]);
if (tNoOfTransactions < 1){
ndbout_c("Invalid no of transactions");
return -1;
}
} else if (strcmp(argv[i], "-a") == 0){
tNoOfAttributes = atoi(argv[i+1]);
if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR)){
ndbout_c("Invalid no of attributes");
return -1;
}
} else if (strcmp(argv[i], "-n") == 0){
theStdTableNameFlag = 1;
argc++;
i--;
} else if (strcmp(argv[i], "-l") == 0){
tNoOfLoops = atoi(argv[i+1]);
if ((tNoOfLoops < 0) || (tNoOfLoops > 100000)){
ndbout_c("Invalid no of loops");
return -1;
}
} else if (strcmp(argv[i], "-s") == 0){
tAttributeSize = atoi(argv[i+1]);
if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE)){
ndbout_c("Invalid attributes size");
return -1;
}
} else if (strcmp(argv[i], "-local") == 0){
tLocalPart = atoi(argv[i+1]);
tLocal = true;
startTransGuess = true;
if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
ndbout_c("Invalid local part");
return -1;
}
} else if (strcmp(argv[i], "-simple") == 0){
theSimpleFlag = 1;
argc++;
i--;
} else if (strcmp(argv[i], "-adaptive") == 0){
tSendForce = 0;
argc++;
i--;
} else if (strcmp(argv[i], "-force") == 0){
tSendForce = 1;
argc++;
i--;
} else if (strcmp(argv[i], "-non_adaptive") == 0){
tSendForce = 2;
argc++;
i--;
} else if (strcmp(argv[i], "-write") == 0){
theWriteFlag = 1;
argc++;
i--;
} else if (strcmp(argv[i], "-dirty") == 0){
theDirtyFlag = 1;
argc++;
i--;
} else if (strcmp(argv[i], "-test") == 0){
theTestFlag = 1;
argc++;
i--;
} else if (strcmp(argv[i], "-no_table_create") == 0){
theTableCreateFlag = 1;
argc++;
i--;
} else if (strcmp(argv[i], "-temp") == 0){
tempTable = true;
argc++;
i--;
} else if (strcmp(argv[i], "-no_hint") == 0){
startTransGuess = false;
argc++;
i--;
} else {
return -1;
}
argc -= 2;
i = i + 2;
}//while
if (tLocal == true) {
if (tNoOfOpsPerTrans != 1) {
ndbout_c("Not valid to have more than one op per trans with local");
}//if
if (startTransGuess == false) {
ndbout_c("Not valid to use no_hint with local");
}//if
}//if
return 0;
}
static
void
input_error(){
ndbout_c("FLEXASYNCH");
ndbout_c(" Perform benchmark of insert, update and delete transactions");
ndbout_c("");
ndbout_c("Arguments:");
ndbout_c(" -t Number of threads to start, default 1");
ndbout_c(" -p Number of parallel transactions per thread, default 32");
ndbout_c(" -o Number of transactions per loop, default 500");
ndbout_c(" -l Number of loops to run, default 1, 0=infinite");
ndbout_c(" -load_factor Number Load factor in index in percent (40 -> 99)");
ndbout_c(" -a Number of attributes, default 25");
ndbout_c(" -c Number of operations per transaction");
ndbout_c(" -s Size of each attribute, default 1 ");
ndbout_c(" (PK is always of size 1, independent of this value)");
ndbout_c(" -simple Use simple read to read from database");
ndbout_c(" -dirty Use dirty read to read from database");
ndbout_c(" -write Use writeTuple in insert and update");
ndbout_c(" -n Use standard table names");
ndbout_c(" -no_table_create Don't create tables in db");
ndbout_c(" -temp Create table(s) without logging");
ndbout_c(" -no_hint Don't give hint on where to execute transaction coordinator");
ndbout_c(" -adaptive Use adaptive send algorithm (default)");
ndbout_c(" -force Force send when communicating");
ndbout_c(" -non_adaptive Send at a 10 millisecond interval");
ndbout_c(" -local Number of part, only use keys in one part out of 16");
}