mariadb/ndb/test/ndbapi/bench/asyncGenerator.cpp

572 lines
15 KiB
C++
Raw Normal View History

/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/***************************************************************
* I N C L U D E D F I L E S *
***************************************************************/
#include <ndb_global.h>
#include "dbGenerator.h"
#include <NdbApi.hpp>
#include <NdbOut.hpp>
#include <NdbSleep.h>
/***************************************************************
* L O C A L C O N S T A N T S *
***************************************************************/
/***************************************************************
* L O C A L D A T A S T R U C T U R E S *
***************************************************************/
/***************************************************************
* L O C A L F U N C T I O N S *
***************************************************************/
static void getRandomSubscriberNumber(SubscriberNumber number);
static void getRandomServerId(ServerId *serverId);
static void getRandomChangedBy(ChangedBy changedBy);
static void getRandomChangedTime(ChangedTime changedTime);
static void clearTransaction(TransactionDefinition *trans);
static void initGeneratorStatistics(GeneratorStatistics *gen);
static void doOneTransaction(ThreadData * td,
int parallellism,
int millisSendPoll,
int minEventSendPoll,
int forceSendPoll);
static void doTransaction_T1(Ndb * pNDB, ThreadData * td, int async);
static void doTransaction_T2(Ndb * pNDB, ThreadData * td, int async);
static void doTransaction_T3(Ndb * pNDB, ThreadData * td, int async);
static void doTransaction_T4(Ndb * pNDB, ThreadData * td, int async);
static void doTransaction_T5(Ndb * pNDB, ThreadData * td, int async);
/***************************************************************
* L O C A L D A T A *
***************************************************************/
static SequenceValues transactionDefinition[] = {
{25, 1},
{25, 2},
{20, 3},
{15, 4},
{15, 5},
{0, 0}
};
static SequenceValues rollbackDefinition[] = {
{98, 0},
{2 , 1},
{0, 0}
};
static int maxsize = 0;
/***************************************************************
* P U B L I C D A T A *
***************************************************************/
/***************************************************************
****************************************************************
* L O C A L F U N C T I O N S C O D E S E C T I O N *
****************************************************************
***************************************************************/
static void getRandomSubscriberNumber(SubscriberNumber number)
{
uint32 tmp;
char sbuf[SUBSCRIBER_NUMBER_LENGTH + 1];
tmp = myRandom48(NO_OF_SUBSCRIBERS);
sprintf(sbuf, "%.*d", SUBSCRIBER_NUMBER_LENGTH, tmp);
memcpy(number, sbuf, SUBSCRIBER_NUMBER_LENGTH);
}
static void getRandomServerId(ServerId *serverId)
{
*serverId = myRandom48(NO_OF_SERVERS);
}
static void getRandomChangedBy(ChangedBy changedBy)
{
memset(changedBy, myRandom48(26)+'A', CHANGED_BY_LENGTH);
changedBy[CHANGED_BY_LENGTH] = 0;
}
static void getRandomChangedTime(ChangedTime changedTime)
{
memset(changedTime, myRandom48(26)+'A', CHANGED_TIME_LENGTH);
changedTime[CHANGED_TIME_LENGTH] = 0;
}
static void clearTransaction(TransactionDefinition *trans)
{
trans->count = 0;
trans->branchExecuted = 0;
trans->rollbackExecuted = 0;
trans->latencyCounter = myRandom48(127);
trans->latency.reset();
}
static int listFull(SessionList *list)
{
return(list->numberInList == SESSION_LIST_LENGTH);
}
static int listEmpty(SessionList *list)
{
return(list->numberInList == 0);
}
static void insertSession(SessionList *list,
SubscriberNumber number,
ServerId serverId)
{
SessionElement *e;
if( listFull(list) ) return;
e = &list->list[list->writeIndex];
strcpy(e->subscriberNumber, number);
e->serverId = serverId;
list->writeIndex = (list->writeIndex + 1) % SESSION_LIST_LENGTH;
list->numberInList++;
if( list->numberInList > maxsize )
maxsize = list->numberInList;
}
static SessionElement *getNextSession(SessionList *list)
{
if( listEmpty(list) ) return(0);
return(&list->list[list->readIndex]);
}
static void deleteSession(SessionList *list)
{
if( listEmpty(list) ) return;
list->readIndex = (list->readIndex + 1) % SESSION_LIST_LENGTH;
list->numberInList--;
}
static void initGeneratorStatistics(GeneratorStatistics *gen)
{
int i;
if( initSequence(&gen->transactionSequence,
transactionDefinition) != 0 ) {
ndbout_c("could not set the transaction types");
exit(0);
}
if( initSequence(&gen->rollbackSequenceT4,
rollbackDefinition) != 0 ) {
ndbout_c("could not set the rollback sequence");
exit(0);
}
if( initSequence(&gen->rollbackSequenceT5,
rollbackDefinition) != 0 ) {
ndbout_c("could not set the rollback sequence");
exit(0);
}
for(i = 0; i < NUM_TRANSACTION_TYPES; i++ )
clearTransaction(&gen->transactions[i]);
gen->totalTransactions = 0;
gen->activeSessions.numberInList = 0;
gen->activeSessions.readIndex = 0;
gen->activeSessions.writeIndex = 0;
}
static
void
doOneTransaction(ThreadData * td, int p, int millis, int minEvents, int force)
{
int i;
unsigned int transactionType;
int async = 1;
if (p == 1) {
async = 0;
}//if
for(i = 0; i<p; i++){
if(td[i].runState == Runnable){
transactionType = getNextRandom(&td[i].generator.transactionSequence);
switch(transactionType) {
case 1:
doTransaction_T1(td[i].pNDB, &td[i], async);
break;
case 2:
doTransaction_T2(td[i].pNDB, &td[i], async);
break;
case 3:
doTransaction_T3(td[i].pNDB, &td[i], async);
break;
case 4:
doTransaction_T4(td[i].pNDB, &td[i], async);
break;
case 5:
doTransaction_T5(td[i].pNDB, &td[i], async);
break;
default:
ndbout_c("Unknown transaction type: %d", transactionType);
}
}
}
if (async == 1) {
td[0].pNDB->sendPollNdb(millis, minEvents, force);
}//if
}
static
void
doTransaction_T1(Ndb * pNDB, ThreadData * td, int async)
{
/*----------------*/
/* Init arguments */
/*----------------*/
getRandomSubscriberNumber(td->transactionData.number);
getRandomChangedBy(td->transactionData.changed_by);
BaseString::snprintf(td->transactionData.changed_time,
sizeof(td->transactionData.changed_time),
"%ld - %d", td->changedTime++, myRandom48(65536*1024));
//getRandomChangedTime(td->transactionData.changed_time);
td->transactionData.location = td->transactionData.changed_by[0];
/*-----------------*/
/* Run transaction */
/*-----------------*/
td->runState = Running;
td->generator.transactions[0].startLatency();
start_T1(pNDB, td, async);
}
static
void
doTransaction_T2(Ndb * pNDB, ThreadData * td, int async)
{
/*----------------*/
/* Init arguments */
/*----------------*/
getRandomSubscriberNumber(td->transactionData.number);
/*-----------------*/
/* Run transaction */
/*-----------------*/
td->runState = Running;
td->generator.transactions[1].startLatency();
start_T2(pNDB, td, async);
}
static
void
doTransaction_T3(Ndb * pNDB, ThreadData * td, int async)
{
SessionElement *se;
/*----------------*/
/* Init arguments */
/*----------------*/
se = getNextSession(&td->generator.activeSessions);
if( se ) {
strcpy(td->transactionData.number, se->subscriberNumber);
td->transactionData.server_id = se->serverId;
td->transactionData.sessionElement = 1;
} else {
getRandomSubscriberNumber(td->transactionData.number);
getRandomServerId(&td->transactionData.server_id);
td->transactionData.sessionElement = 0;
}
td->transactionData.server_bit = (1 << td->transactionData.server_id);
/*-----------------*/
/* Run transaction */
/*-----------------*/
td->runState = Running;
td->generator.transactions[2].startLatency();
start_T3(pNDB, td, async);
}
static
void
doTransaction_T4(Ndb * pNDB, ThreadData * td, int async)
{
/*----------------*/
/* Init arguments */
/*----------------*/
getRandomSubscriberNumber(td->transactionData.number);
getRandomServerId(&td->transactionData.server_id);
td->transactionData.server_bit = (1 << td->transactionData.server_id);
td->transactionData.do_rollback =
getNextRandom(&td->generator.rollbackSequenceT4);
#if 0
memset(td->transactionData.session_details,
myRandom48(26)+'A', SESSION_DETAILS_LENGTH);
#endif
td->transactionData.session_details[SESSION_DETAILS_LENGTH] = 0;
/*-----------------*/
/* Run transaction */
/*-----------------*/
td->runState = Running;
td->generator.transactions[3].startLatency();
start_T4(pNDB, td, async);
}
static
void
doTransaction_T5(Ndb * pNDB, ThreadData * td, int async)
{
SessionElement * se;
se = getNextSession(&td->generator.activeSessions);
if( se ) {
strcpy(td->transactionData.number, se->subscriberNumber);
td->transactionData.server_id = se->serverId;
td->transactionData.sessionElement = 1;
}
else {
getRandomSubscriberNumber(td->transactionData.number);
getRandomServerId(&td->transactionData.server_id);
td->transactionData.sessionElement = 0;
}
td->transactionData.server_bit = (1 << td->transactionData.server_id);
td->transactionData.do_rollback
= getNextRandom(&td->generator.rollbackSequenceT5);
/*-----------------*/
/* Run transaction */
/*-----------------*/
td->runState = Running;
td->generator.transactions[4].startLatency();
start_T5(pNDB, td, async);
}
void
complete_T1(ThreadData * data){
data->generator.transactions[0].stopLatency();
data->generator.transactions[0].count++;
data->runState = Runnable;
data->generator.totalTransactions++;
}
void
complete_T2(ThreadData * data){
data->generator.transactions[1].stopLatency();
data->generator.transactions[1].count++;
data->runState = Runnable;
data->generator.totalTransactions++;
}
void
complete_T3(ThreadData * data){
data->generator.transactions[2].stopLatency();
data->generator.transactions[2].count++;
if(data->transactionData.branchExecuted)
data->generator.transactions[2].branchExecuted++;
data->runState = Runnable;
data->generator.totalTransactions++;
}
void
complete_T4(ThreadData * data){
data->generator.transactions[3].stopLatency();
data->generator.transactions[3].count++;
if(data->transactionData.branchExecuted)
data->generator.transactions[3].branchExecuted++;
if(data->transactionData.do_rollback)
data->generator.transactions[3].rollbackExecuted++;
if(data->transactionData.branchExecuted &&
!data->transactionData.do_rollback){
insertSession(&data->generator.activeSessions,
data->transactionData.number,
data->transactionData.server_id);
}
data->runState = Runnable;
data->generator.totalTransactions++;
}
void
complete_T5(ThreadData * data){
data->generator.transactions[4].stopLatency();
data->generator.transactions[4].count++;
if(data->transactionData.branchExecuted)
data->generator.transactions[4].branchExecuted++;
if(data->transactionData.do_rollback)
data->generator.transactions[4].rollbackExecuted++;
if(data->transactionData.sessionElement &&
!data->transactionData.do_rollback){
deleteSession(&data->generator.activeSessions);
}
data->runState = Runnable;
data->generator.totalTransactions++;
}
/***************************************************************
****************************************************************
* P U B L I C F U N C T I O N S C O D E S E C T I O N *
****************************************************************
***************************************************************/
void
asyncGenerator(ThreadData *data,
int parallellism,
int millisSendPoll,
int minEventSendPoll,
int forceSendPoll)
{
ThreadData * startUp;
GeneratorStatistics *st;
double periodStop;
double benchTimeStart;
double benchTimeEnd;
int i, j, done;
myRandom48Init(data->randomSeed);
for(i = 0; i<parallellism; i++){
initGeneratorStatistics(&data[i].generator);
}
startUp = (ThreadData*)malloc(parallellism * sizeof(ThreadData));
memcpy(startUp, data, (parallellism * sizeof(ThreadData)));
/*----------------*/
/* warm up period */
/*----------------*/
periodStop = userGetTime() + (double)data[0].warmUpSeconds;
while(userGetTime() < periodStop){
doOneTransaction(startUp, parallellism,
millisSendPoll, minEventSendPoll, forceSendPoll);
}
ndbout_c("Waiting for startup to finish");
/**
* Wait for all transactions
*/
done = 0;
while(!done){
done = 1;
for(i = 0; i<parallellism; i++){
if(startUp[i].runState != Runnable){
done = 0;
break;
}
}
if(!done){
startUp[0].pNDB->sendPollNdb();
}
}
ndbout_c("Benchmark period starts");
/*-------------------------*/
/* normal benchmark period */
/*-------------------------*/
benchTimeStart = userGetTime();
periodStop = benchTimeStart + (double)data[0].testSeconds;
while(userGetTime() < periodStop)
doOneTransaction(data, parallellism,
millisSendPoll, minEventSendPoll, forceSendPoll);
benchTimeEnd = userGetTime();
ndbout_c("Benchmark period done");
/**
* Wait for all transactions
*/
done = 0;
while(!done){
done = 1;
for(i = 0; i<parallellism; i++){
if(data[i].runState != Runnable){
done = 0;
break;
}
}
if(!done){
data[0].pNDB->sendPollNdb();
}
}
/*------------------*/
/* cool down period */
/*------------------*/
periodStop = userGetTime() + (double)data[0].coolDownSeconds;
while(userGetTime() < periodStop){
doOneTransaction(startUp, parallellism,
millisSendPoll, minEventSendPoll, forceSendPoll);
}
done = 0;
while(!done){
done = 1;
for(i = 0; i<parallellism; i++){
if(startUp[i].runState != Runnable){
done = 0;
break;
}
}
if(!done){
startUp[0].pNDB->sendPollNdb();
}
}
/*---------------------------------------------------------*/
/* add the times for all transaction for inner loop timing */
/*---------------------------------------------------------*/
for(j = 0; j<parallellism; j++){
st = &data[j].generator;
st->outerLoopTime = benchTimeEnd - benchTimeStart;
st->outerTps = getTps(st->totalTransactions, st->outerLoopTime);
}
/* ndbout_c("maxsize = %d\n",maxsize); */
free(startUp);
}