mariadb/storage/ndb/test/newtonapi/perf_test/perf.cpp

648 lines
15 KiB
C++
Raw Normal View History

/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
extern "C" {
#include <dba.h>
}
#include <NdbOut.hpp>
#include <NdbSleep.h>
#include <NdbTimer.hpp>
#include <NDBT_Stats.hpp>
#include <NDBT_ReturnCodes.h>
#include <NdbMain.h>
#include <time.h>
#undef min
#undef max
static const int NP_Insert = 0;
static const int NP_Update = 1;
static const int NP_WriteUpdate = 2;
static const int NP_WriteInsert = 3;
static const int NP_Delete = 4;
static const int NP_BulkRead = 5;
static const int NP_MAX = 5;
static const char * Operations[] = {
"Insert ",
"Update ",
"WriteUpd",
"WriteIns",
"Delete ",
"BulkRead"
};
/**
* Configuration variables
*/
static int NoOfTransactions = 10000;
static int ParallellTransactions = 1000;
static int OperationsPerTransaction = 10;
static int NoOfColumns = 20;
static int BytesPerInsert = 300;
static int BytesPerUpdate = 200;
static int LoopCount = 10;
/**
* Global variables
*/
static char TableName[255];
static DBA_ColumnDesc_t * ColumnDescriptions;
static DBA_ColumnBinding_t * InsertBindings;
static DBA_ColumnBinding_t * UpdateBindings; static int UpdateBindingColumns;
static DBA_ColumnBinding_t * DeleteBindings;
static char * TestData;
static DBA_Binding_t * InsertB;
static DBA_Binding_t * UpdateB;
static DBA_Binding_t * DeleteB;
/**
* Function prototypes
*/
static void sequence(int loops);
inline void * getPtr(int rowNo) { return TestData+rowNo*BytesPerInsert;}
inline void setPK(int rowNo, int pk){ * (int *)getPtr(rowNo) = pk; }
static void SetupTestData();
static void CleanupTestData();
static bool CreateTable();
static bool CleanTable();
static bool CreateBindings();
static void usage();
static
void
usage(){
int ForceSend, Interval;
DBA_GetParameter(0, &Interval);
DBA_GetParameter(3, &ForceSend);
ndbout << "newtonPerf" << endl
<< " -n Transactions per loop and operation ("
<< NoOfTransactions << ")" << endl
<< " -p parallell transactions (" << ParallellTransactions << ")"
<< endl
<< " -o operations per transaction (" << OperationsPerTransaction
<< ")" << endl
<< " -a no of columns (" << NoOfColumns << ")" << endl
<< " -b Table size in bytes (" << BytesPerInsert << ")" << endl
<< " -u Bytes per update (" << BytesPerUpdate << ")" << endl
<< " -l Loop count (" << LoopCount << ")" << endl
<< " -i Interval (" << Interval << "ms)" << endl
<< " -f Force send algorithm (" << ForceSend << ")" << endl
<< " -h Help" << endl;
}
static
bool
parseArgs(int argc, const char **argv){
bool a = false, b = false, u = false;
for(int i = 1; i<argc; i++){
if(argv[i][0] != '-'){
ndbout << "Invalid argument: " << argv[i] << endl;
return false;
}
if(argv[i][1] == 'h')
return false;
if(i == argc-1){
ndbout << "Expecting argument to " << argv[i] << endl;
return false;
}
switch(argv[i][1]){
case 'n':
NoOfTransactions = atoi(argv[i+1]);
break;
case 'p':
ParallellTransactions = atoi(argv[i+1]);
break;
case 'o':
OperationsPerTransaction = atoi(argv[i+1]);
break;
case 'a':
NoOfColumns = atoi(argv[i+1]);
a = true;
break;
case 'b':
BytesPerInsert = atoi(argv[i+1]);
b = true;
break;
case 'u':
BytesPerUpdate = atoi(argv[i+1]);
u = true;
break;
case 'l':
LoopCount = atoi(argv[i+1]);
break;
case 'f':
{
const int val = atoi(argv[i+1]);
if(DBA_SetParameter(3, val) != DBA_NO_ERROR){
ndbout << "Invalid force send algorithm: "
<< DBA_GetLatestErrorMsg()
<< "(" << DBA_GetLatestError() << ")" << endl;
return false;
}
}
break;
case 'i':
{
const int val = atoi(argv[i+1]);
if(DBA_SetParameter(0, val) != DBA_NO_ERROR){
ndbout << "Invalid NBP interval: "
<< DBA_GetLatestErrorMsg()
<< "(" << DBA_GetLatestError() << ")" << endl;
return false;
}
}
break;
default:
ndbout << "Invalid option: " << argv[i] << endl;
return false;
}
i++;
}
if(a && !b) BytesPerInsert = 15 * NoOfColumns;
if(!a && b) NoOfColumns = ((BytesPerInsert + 14) / 15)+1;
if(!u)
BytesPerUpdate = (2 * BytesPerInsert) / 3;
bool t = true;
if(NoOfColumns < 2) t = false;
if(BytesPerInsert < 8) t = false;
if(BytesPerUpdate < 8) t = false;
if(!t){
ndbout << "Invalid arguments combination of -a -b -u not working out"
<< endl;
return false;
}
return true;
}
NDB_COMMAND(newton_perf, "newton_perf",
"newton_perf", "newton_perf", 65535){
if(!parseArgs(argc, argv)){
usage();
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
ndbout << "-----------" << endl;
usage();
ndbout << endl;
SetupTestData();
DBA_Open();
if(!CreateTable()){
DBA_Close();
CleanupTestData();
return 0;
}
if(!CreateBindings()){
DBA_Close();
CleanupTestData();
return 0;
}
CleanTable();
sequence(LoopCount);
DBA_Close();
CleanupTestData();
DBA_DestroyBinding(InsertB);
DBA_DestroyBinding(UpdateB);
DBA_DestroyBinding(DeleteB);
}
static
void
ErrorMsg(const char * s){
ndbout << s
<< ": " << DBA_GetLatestError() << "-" << DBA_GetLatestErrorMsg()
<< ", " << DBA_GetLatestNdbError()
<< endl;
}
static
int
m4(int i){
const int j = i - (i & 3);
return j;
}
static
void
SetupTestData(){
ndbout << "Creating testdata" << endl;
ColumnDescriptions = new DBA_ColumnDesc_t[NoOfColumns];
InsertBindings = new DBA_ColumnBinding_t[NoOfColumns];
const int sz = m4((BytesPerInsert - ((NoOfColumns+1)/2)*4)/(NoOfColumns/2));
int sum = 0;
UpdateBindingColumns = 0;
for(int i = 0; i<NoOfColumns; i++){
char tmp[16];
if((i % 2) == 0){
sprintf(tmp, "I%d", i);
ColumnDescriptions[i].DataType = DBA_INT;
ColumnDescriptions[i].Size = 4;
sum += 4;
} else {
sprintf(tmp, "S%d", i);
ColumnDescriptions[i].DataType = DBA_CHAR;
ColumnDescriptions[i].Size = sz;
sum += sz;
}
ColumnDescriptions[i].IsKey = 0;
ColumnDescriptions[i].Name = strdup(tmp);
InsertBindings[i].Name = strdup(tmp);
InsertBindings[i].DataType = ColumnDescriptions[i].DataType;
InsertBindings[i].Size = ColumnDescriptions[i].Size;
InsertBindings[i].Offset = sum - ColumnDescriptions[i].Size;
InsertBindings[i].Ptr = 0;
if(sum <= BytesPerUpdate)
UpdateBindingColumns++;
}
if(UpdateBindingColumns == 1)
UpdateBindingColumns++;
ColumnDescriptions[0].IsKey = 1;
assert(sum <= BytesPerInsert);
sprintf(TableName, "NEWTON_%d_%d", sum, NoOfColumns);
UpdateBindings = new DBA_ColumnBinding_t[UpdateBindingColumns];
memcpy(UpdateBindings, InsertBindings,
UpdateBindingColumns*sizeof(DBA_ColumnBinding_t));
DeleteBindings = new DBA_ColumnBinding_t[1];
memcpy(DeleteBindings, InsertBindings,
1*sizeof(DBA_ColumnBinding_t));
TestData = (char *)malloc(NoOfTransactions *
OperationsPerTransaction * BytesPerInsert);
assert(TestData != 0);
for(int i = 0; i<NoOfTransactions; i++)
for(int j = 0; j<OperationsPerTransaction; j++){
const int pk = i * OperationsPerTransaction + j;
setPK(pk, pk);
}
}
static
void
CleanupTestData(){
free(TestData);
for(int i = 0; i<NoOfColumns; i++){
free((char*)ColumnDescriptions[i].Name);
free((char*)InsertBindings[i].Name);
}
delete [] ColumnDescriptions;
delete [] InsertBindings;
delete [] UpdateBindings;
delete [] DeleteBindings;
}
static bool CleanReturnValue = true;
static int CleanCallbacks = 0;
static int CleanRows = 0;
extern "C"
void
CleanCallback(DBA_ReqId_t reqId, DBA_Error_t error, DBA_ErrorCode_t ec){
CleanCallbacks++;
if(error == DBA_NO_ERROR)
CleanRows++;
}
static
bool
CleanTable(){
ndbout << "Cleaning table..." << flush;
CleanReturnValue = true;
CleanCallbacks = 0;
CleanRows = 0;
for(int i = 0; i<NoOfTransactions * OperationsPerTransaction; i++){
DBA_ArrayDeleteRows(DeleteB,
getPtr(i), 1,
CleanCallback);
while((i-CleanCallbacks)>ParallellTransactions)
NdbSleep_MilliSleep(100);
}
while(CleanCallbacks != (NoOfTransactions * OperationsPerTransaction))
NdbSleep_SecSleep(1);
ndbout << CleanRows << " rows deleted" << endl;
return CleanReturnValue;
}
static
bool
CreateBindings(){
ndbout << "Creating bindings" << endl;
InsertB = UpdateB = DeleteB = 0;
InsertB = DBA_CreateBinding(TableName, NoOfColumns,
InsertBindings, BytesPerInsert);
if(InsertB == 0){
ErrorMsg("Failed to create insert bindings");
return false;
}
UpdateB = DBA_CreateBinding(TableName, UpdateBindingColumns,
UpdateBindings, BytesPerInsert);
if(UpdateB == 0){
ErrorMsg("Failed to create update bindings");
DBA_DestroyBinding(InsertB);
return false;
}
DeleteB = DBA_CreateBinding(TableName, 1,
DeleteBindings, BytesPerInsert);
if(DeleteB == 0){
ErrorMsg("Failed to create delete bindings");
DBA_DestroyBinding(InsertB);
DBA_DestroyBinding(UpdateB);
return false;
}
return true;
}
static
bool
CreateTable(){
ndbout << "Creating " << TableName << endl;
return DBA_CreateTable( TableName,
NoOfColumns,
ColumnDescriptions ) == DBA_NO_ERROR;
}
/**
*
*/
static NdbTimer SequenceTimer;
static int CurrentOp = NP_Insert;
static int SequenceSent = 0;
static int SequenceRecv = 0;
static NDBT_Stats SequenceStats[NP_MAX][4];
static NDBT_Stats SequenceLatency[NP_MAX];
static int HashMax;
static DBA_ReqId_t * ReqHash; // ReqId - Latency/Row
static int * ReqHashPos; // (row in StartTime)
static int SequenceLatencyPos;
static NDB_TICKS * StartTime;
static
inline
int
computeHashMax(int elements){
HashMax = 1;
while(HashMax < elements)
HashMax *= 2;
if(HashMax < 1024)
HashMax = 1024;
return HashMax;
}
static
inline
int
hash(DBA_ReqId_t request){
int r = (request >> 2) & (HashMax-1);
return r;
}
static
inline
void
addRequest(DBA_ReqId_t request, int pos){
int i = hash(request);
while(ReqHash[i] != 0)
i = ((i + 1) & (HashMax-1));
ReqHash[i] = request;
ReqHashPos[i] = pos;
}
static
inline
int
getRequest(DBA_ReqId_t request){
int i = hash(request);
while(ReqHash[i] != request)
i = ((i + 1) & (HashMax-1));
ReqHash[i] = 0;
return ReqHashPos[i];
}
extern "C"
void
SequenceCallback(DBA_ReqId_t reqId, DBA_Error_t error, DBA_ErrorCode_t ec){
int p = getRequest(reqId) - 1;
if(error != DBA_NO_ERROR){
ndbout << "p = " << p << endl;
ndbout << "DBA_GetErrorMsg(" << error << ") = "
<< DBA_GetErrorMsg(error) << endl;
ndbout << "DBA_GetNdbErrorMsg(" << ec << ") = "
<< DBA_GetNdbErrorMsg(ec) << endl;
assert(error == DBA_NO_ERROR);
}
SequenceRecv++;
if(SequenceRecv == NoOfTransactions){
SequenceTimer.doStop();
}
if((p & 127) == 127){
NDB_TICKS t = NdbTick_CurrentMillisecond() - StartTime[p];
SequenceLatency[CurrentOp].addObservation(t);
}
}
typedef DBA_ReqId_t (* DBA_ArrayFunction)( const DBA_Binding_t* pBindings,
const void * pData,
int NbRows,
DBA_AsyncCallbackFn_t CbFunc );
inline
int
min(int a, int b){
return a > b ? b : a;
}
static
void
SequenceOp(DBA_ArrayFunction func, const DBA_Binding_t* pBindings, int op){
SequenceSent = 0;
SequenceRecv = 0;
SequenceLatencyPos = 1;
CurrentOp = op;
SequenceTimer.doStart();
for(int i = 0; i<NoOfTransactions; ){
const int l1 = ParallellTransactions - (SequenceSent - SequenceRecv);
const int l2 = min(NoOfTransactions - i, l1);
for(int j = 0; j<l2; j++){
const DBA_ReqId_t r = func(pBindings,
getPtr(i*OperationsPerTransaction),
OperationsPerTransaction,
SequenceCallback);
assert(r != 0);
SequenceSent++;
addRequest(r, i + 1);
i++;
if((SequenceSent & 127) == 127){
NDB_TICKS t = NdbTick_CurrentMillisecond();
StartTime[i] = t;
}
}
if(l2 == 0)
NdbSleep_MilliSleep(10);
}
while(SequenceRecv != SequenceSent)
NdbSleep_SecSleep(1);
ndbout << "Performed " << NoOfTransactions << " " << Operations[op]
<< " in ";
double p = NoOfTransactions * 1000;
double t = SequenceTimer.elapsedTime();
double o = p * OperationsPerTransaction;
p /= t;
o /= t;
int _p = p;
int _o = o;
double b = 0;
switch(op){
case NP_Insert:
case NP_WriteInsert:
case NP_WriteUpdate:
case NP_BulkRead:
b = BytesPerInsert;
break;
case NP_Update:
b = BytesPerUpdate;
break;
case NP_Delete:
b = 4;
break;
default:
b = 0;
}
b *= NoOfTransactions * OperationsPerTransaction;
b /= t;
int _b = b;
SequenceStats[op][0].addObservation(t);
SequenceStats[op][1].addObservation(p);
SequenceStats[op][2].addObservation(o);
SequenceStats[op][3].addObservation(b);
int t2 = SequenceStats[op][0].getMean();
int p2 = SequenceStats[op][1].getMean();
int o2 = SequenceStats[op][2].getMean();
int b2 = SequenceStats[op][3].getMean();
ndbout << SequenceTimer.elapsedTime() << "(" << t2 << ")ms";
ndbout << " -> " << _p << "(" << p2 << ") T/s - " << _o
<< "(" << o2 << ") O/s - " << _b << "(" << b2 << ") Kb/s" << endl;
ndbout << " Latency (ms) Avg: " << (int)SequenceLatency[op].getMean()
<< " min: " << (int)SequenceLatency[op].getMin()
<< " max: " << (int)SequenceLatency[op].getMax()
<< " stddev: " << (int)SequenceLatency[op].getStddev()
<< " n: " << SequenceLatency[op].getCount() << endl;
}
/**
* Sequence
*/
static
void
sequence(int loops){
computeHashMax(ParallellTransactions);
ReqHash = new DBA_ReqId_t[HashMax];
ReqHashPos = new int[HashMax];
StartTime = new NDB_TICKS[NoOfTransactions];
for(int i = 0; i<NP_MAX; i++){
SequenceLatency[i].reset();
for(int j = 0; j<4; j++)
SequenceStats[i][j].reset();
}
for(int i = 0; i<loops; i++){
ndbout << "Loop #" << (i+1) << endl;
SequenceOp(DBA_ArrayInsertRows, InsertB, NP_Insert);
// BulkRead
SequenceOp(DBA_ArrayUpdateRows, UpdateB, NP_Update);
SequenceOp(DBA_ArrayWriteRows, InsertB, NP_WriteUpdate);
SequenceOp(DBA_ArrayDeleteRows, DeleteB, NP_Delete);
SequenceOp(DBA_ArrayWriteRows, InsertB, NP_WriteInsert);
SequenceOp(DBA_ArrayDeleteRows, DeleteB, NP_Delete);
ndbout << "-------------------" << endl << endl;
}
delete [] ReqHash;
delete [] ReqHashPos;
delete [] StartTime;
}