mirror of
https://github.com/MariaDB/server.git
synced 2025-01-18 21:12:26 +01:00
Merge whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb
into whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb-merge
This commit is contained in:
commit
0c0f6dfa03
9 changed files with 866 additions and 23 deletions
|
@ -179,7 +179,8 @@ Dbtup::dealloc_tuple(Signal* signal,
|
|||
&disk, tmpptr, gci);
|
||||
}
|
||||
|
||||
if (! (bits & Tuple_header::LCP_SKIP) && lcpScan_ptr_i != RNIL)
|
||||
if (! (bits & (Tuple_header::LCP_SKIP | Tuple_header::ALLOC)) &&
|
||||
lcpScan_ptr_i != RNIL)
|
||||
{
|
||||
ScanOpPtr scanOp;
|
||||
c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
#include <NDBT.hpp>
|
||||
#include <HugoCalculator.hpp>
|
||||
#include <HugoOperations.hpp>
|
||||
|
||||
class NDBT_Stats;
|
||||
|
||||
class HugoTransactions : public HugoOperations {
|
||||
public:
|
||||
|
@ -109,10 +109,24 @@ public:
|
|||
void setRetryMax(int retryMax = 100) { m_retryMax = retryMax; }
|
||||
|
||||
Uint32 m_latest_gci;
|
||||
|
||||
void setStatsLatency(NDBT_Stats* stats) { m_stats_latency = stats; }
|
||||
|
||||
// allows multiple threads to update separate batches
|
||||
void setThrInfo(int thr_count, int thr_no) {
|
||||
m_thr_count = thr_count;
|
||||
m_thr_no = thr_no;
|
||||
}
|
||||
|
||||
protected:
|
||||
NDBT_ResultRow row;
|
||||
int m_defaultScanUpdateMethod;
|
||||
int m_retryMax;
|
||||
|
||||
NDBT_Stats* m_stats_latency;
|
||||
|
||||
int m_thr_count; // 0 if no separation between threads
|
||||
int m_thr_no;
|
||||
};
|
||||
|
||||
|
||||
|
|
226
storage/ndb/test/include/NDBT_Thread.hpp
Normal file
226
storage/ndb/test/include/NDBT_Thread.hpp
Normal file
|
@ -0,0 +1,226 @@
|
|||
/* Copyright (C) 2003 MySQL AB
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#ifndef NDB_THREAD_HPP
|
||||
#define NDB_THREAD_HPP
|
||||
|
||||
#include <NdbMutex.h>
|
||||
#include <NdbCondition.h>
|
||||
#include <NdbThread.h>
|
||||
|
||||
// NDBT_Thread ctor -> NDBT_Thread_run -> thr.run()
|
||||
extern "C" {
|
||||
static void* NDBT_Thread_run(void* arg);
|
||||
}
|
||||
|
||||
// Function to run in a thread.
|
||||
|
||||
typedef void NDBT_ThreadFunc(class NDBT_Thread&);
|
||||
|
||||
/*
|
||||
* NDBT_Thread
|
||||
*
|
||||
* Represents a thread. The thread pauses at startup.
|
||||
* Main process sets a function to run. When the function
|
||||
* returns, the thread pauses again to wait for a command.
|
||||
* This allows main process to sync with the thread and
|
||||
* exchange data with it.
|
||||
*
|
||||
* Input to thread is typically options. The input area
|
||||
* is read-only in the thread. Output from thread is
|
||||
* results such as statistics. Error code is handled
|
||||
* separately.
|
||||
*
|
||||
* Pointer to Ndb object and method to create it are
|
||||
* provided for convenience.
|
||||
*/
|
||||
|
||||
class NDBT_ThreadSet;
|
||||
|
||||
class NDBT_Thread {
|
||||
public:
|
||||
NDBT_Thread();
|
||||
NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no);
|
||||
void create(NDBT_ThreadSet* thread_set, int thread_no);
|
||||
~NDBT_Thread();
|
||||
|
||||
// if part of a set
|
||||
inline NDBT_ThreadSet& get_thread_set() const {
|
||||
assert(m_thread_set != 0);
|
||||
return *m_thread_set;
|
||||
}
|
||||
inline int get_thread_no() const {
|
||||
return m_thread_no;
|
||||
}
|
||||
|
||||
// { Wait -> Start -> Stop }+ -> Exit
|
||||
enum State {
|
||||
Wait = 1, // wait for command
|
||||
Start, // run current function
|
||||
Stop, // stopped (paused) when current function done
|
||||
Exit // exit thread
|
||||
};
|
||||
|
||||
// tell thread to start running current function
|
||||
void start();
|
||||
// wait for thread to stop when function is done
|
||||
void stop();
|
||||
// tell thread to exit
|
||||
void exit();
|
||||
// collect thread after exit
|
||||
void join();
|
||||
|
||||
// set function to run
|
||||
inline void set_func(NDBT_ThreadFunc* func) {
|
||||
m_func = func;
|
||||
}
|
||||
|
||||
// input area
|
||||
inline void set_input(const void* input) {
|
||||
m_input = input;
|
||||
}
|
||||
inline const void* get_input() const {
|
||||
return m_input;
|
||||
}
|
||||
|
||||
// output area
|
||||
inline void set_output(void* output) {
|
||||
m_output = output;
|
||||
}
|
||||
inline void* get_output() const {
|
||||
return m_output;
|
||||
}
|
||||
template <class T> inline void set_output() {
|
||||
set_output(new T);
|
||||
}
|
||||
inline void delete_output() {
|
||||
delete m_output;
|
||||
m_output = 0;
|
||||
}
|
||||
|
||||
// thread-specific Ndb object
|
||||
inline class Ndb* get_ndb() const {
|
||||
return m_ndb;
|
||||
}
|
||||
int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB");
|
||||
void disconnect();
|
||||
|
||||
// error code (OS, Ndb, other)
|
||||
void clear_err() {
|
||||
m_err = 0;
|
||||
}
|
||||
void set_err(int err) {
|
||||
m_err = err;
|
||||
}
|
||||
int get_err() const {
|
||||
return m_err;
|
||||
}
|
||||
|
||||
private:
|
||||
friend class NDBT_ThreadSet;
|
||||
friend void* NDBT_Thread_run(void* arg);
|
||||
|
||||
enum { Magic = 0xabacadae };
|
||||
Uint32 m_magic;
|
||||
|
||||
State m_state;
|
||||
NDBT_ThreadSet* m_thread_set;
|
||||
int m_thread_no;
|
||||
|
||||
NDBT_ThreadFunc* m_func;
|
||||
const void* m_input;
|
||||
void* m_output;
|
||||
class Ndb* m_ndb;
|
||||
int m_err;
|
||||
|
||||
// run the thread
|
||||
void run();
|
||||
|
||||
void lock() {
|
||||
NdbMutex_Lock(m_mutex);
|
||||
}
|
||||
void unlock() {
|
||||
NdbMutex_Unlock(m_mutex);
|
||||
}
|
||||
|
||||
void wait() {
|
||||
NdbCondition_Wait(m_cond, m_mutex);
|
||||
}
|
||||
void signal() {
|
||||
NdbCondition_Signal(m_cond);
|
||||
}
|
||||
|
||||
NdbMutex* m_mutex;
|
||||
NdbCondition* m_cond;
|
||||
NdbThread* m_thread;
|
||||
void* m_status;
|
||||
};
|
||||
|
||||
/*
|
||||
* A set of threads, indexed from 0 to count-1. Methods
|
||||
* are applied to each thread (serially). Input area is
|
||||
* common to all threads. Output areas are allocated
|
||||
* separately according to a template class.
|
||||
*/
|
||||
|
||||
class NDBT_ThreadSet {
|
||||
public:
|
||||
NDBT_ThreadSet(int count);
|
||||
~NDBT_ThreadSet();
|
||||
|
||||
inline int get_count() const {
|
||||
return m_count;
|
||||
}
|
||||
inline NDBT_Thread& get_thread(int n) {
|
||||
assert(n < m_count && m_thread[n] != 0);
|
||||
return *m_thread[n];
|
||||
}
|
||||
|
||||
// tell each thread to start running
|
||||
void start();
|
||||
// wait for each thread to stop
|
||||
void stop();
|
||||
// tell each thread to exit
|
||||
void exit();
|
||||
// collect each thread after exit
|
||||
void join();
|
||||
|
||||
// set function to run in each thread
|
||||
void set_func(NDBT_ThreadFunc* func);
|
||||
|
||||
// set input area (same instance in each thread)
|
||||
void set_input(const void* input);
|
||||
|
||||
// set output areas
|
||||
template <class T> inline void set_output() {
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
NDBT_Thread& thr = *m_thread[n];
|
||||
thr.set_output<T>();
|
||||
}
|
||||
}
|
||||
void delete_output();
|
||||
|
||||
// thread-specific Ndb objects
|
||||
int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB");
|
||||
void disconnect();
|
||||
|
||||
int get_err() const;
|
||||
|
||||
private:
|
||||
int m_count;
|
||||
NDBT_Thread** m_thread;
|
||||
};
|
||||
|
||||
#endif
|
|
@ -14,8 +14,9 @@
|
|||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include "HugoTransactions.hpp"
|
||||
#include <NDBT_Stats.hpp>
|
||||
#include <NdbSleep.h>
|
||||
|
||||
#include <NdbTick.h>
|
||||
|
||||
HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab,
|
||||
const NdbDictionary::Index* idx):
|
||||
|
@ -24,6 +25,10 @@ HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab,
|
|||
|
||||
m_defaultScanUpdateMethod = 3;
|
||||
setRetryMax();
|
||||
m_stats_latency = 0;
|
||||
|
||||
m_thr_count = 0;
|
||||
m_thr_no = -1;
|
||||
}
|
||||
|
||||
HugoTransactions::~HugoTransactions(){
|
||||
|
@ -820,6 +825,16 @@ HugoTransactions::pkReadRecords(Ndb* pNdb,
|
|||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
MicroSecondTimer timer_start;
|
||||
MicroSecondTimer timer_stop;
|
||||
bool timer_active =
|
||||
m_stats_latency != 0 &&
|
||||
r >= batch && // first batch is "warmup"
|
||||
r + batch != records; // last batch is usually partial
|
||||
|
||||
if (timer_active)
|
||||
NdbTick_getMicroTimer(&timer_start);
|
||||
|
||||
if(pkReadRecord(pNdb, r, batch, lm) != NDBT_OK)
|
||||
{
|
||||
ERR(pTrans->getNdbError());
|
||||
|
@ -892,6 +907,12 @@ HugoTransactions::pkReadRecords(Ndb* pNdb,
|
|||
}
|
||||
|
||||
closeTransaction(pNdb);
|
||||
|
||||
if (timer_active) {
|
||||
NdbTick_getMicroTimer(&timer_stop);
|
||||
NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
|
||||
m_stats_latency->addObservation((double)ticks);
|
||||
}
|
||||
}
|
||||
deallocRows();
|
||||
g_info << reads << " records read" << endl;
|
||||
|
@ -913,9 +934,17 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb,
|
|||
allocRows(batch);
|
||||
|
||||
g_info << "|- Updating records (batch=" << batch << ")..." << endl;
|
||||
int batch_no = 0;
|
||||
while (r < records){
|
||||
if(r + batch > records)
|
||||
batch = records - r;
|
||||
|
||||
if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
|
||||
{
|
||||
r += batch;
|
||||
batch_no++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (retryAttempt >= m_retryMax){
|
||||
g_info << "ERROR: has retried this operation " << retryAttempt
|
||||
|
@ -963,6 +992,16 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb,
|
|||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
MicroSecondTimer timer_start;
|
||||
MicroSecondTimer timer_stop;
|
||||
bool timer_active =
|
||||
m_stats_latency != 0 &&
|
||||
r >= batch && // first batch is "warmup"
|
||||
r + batch != records; // last batch is usually partial
|
||||
|
||||
if (timer_active)
|
||||
NdbTick_getMicroTimer(&timer_start);
|
||||
|
||||
if(pIndexScanOp)
|
||||
{
|
||||
int rows_found = 0;
|
||||
|
@ -1039,8 +1078,15 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb,
|
|||
}
|
||||
|
||||
closeTransaction(pNdb);
|
||||
|
||||
|
||||
if (timer_active) {
|
||||
NdbTick_getMicroTimer(&timer_stop);
|
||||
NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
|
||||
m_stats_latency->addObservation((double)ticks);
|
||||
}
|
||||
|
||||
r += batch; // Read next record
|
||||
batch_no++;
|
||||
}
|
||||
|
||||
deallocRows();
|
||||
|
@ -1228,10 +1274,18 @@ HugoTransactions::pkDelRecords(Ndb* pNdb,
|
|||
int check;
|
||||
|
||||
g_info << "|- Deleting records..." << endl;
|
||||
int batch_no = 0;
|
||||
while (r < records){
|
||||
if(r + batch > records)
|
||||
batch = records - r;
|
||||
|
||||
if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
|
||||
{
|
||||
r += batch;
|
||||
batch_no++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (retryAttempt >= m_retryMax){
|
||||
g_info << "ERROR: has retried this operation " << retryAttempt
|
||||
<< " times, failing!" << endl;
|
||||
|
@ -1255,6 +1309,16 @@ HugoTransactions::pkDelRecords(Ndb* pNdb,
|
|||
return NDBT_FAILED;
|
||||
}
|
||||
|
||||
MicroSecondTimer timer_start;
|
||||
MicroSecondTimer timer_stop;
|
||||
bool timer_active =
|
||||
m_stats_latency != 0 &&
|
||||
r >= batch && // first batch is "warmup"
|
||||
r + batch != records; // last batch is usually partial
|
||||
|
||||
if (timer_active)
|
||||
NdbTick_getMicroTimer(&timer_start);
|
||||
|
||||
if(pkDeleteRecord(pNdb, r, batch) != NDBT_OK)
|
||||
{
|
||||
ERR(pTrans->getNdbError());
|
||||
|
@ -1303,9 +1367,15 @@ HugoTransactions::pkDelRecords(Ndb* pNdb,
|
|||
m_latest_gci = pTrans->getGCI();
|
||||
}
|
||||
closeTransaction(pNdb);
|
||||
|
||||
r += batch; // Read next record
|
||||
|
||||
if (timer_active) {
|
||||
NdbTick_getMicroTimer(&timer_stop);
|
||||
NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
|
||||
m_stats_latency->addObservation((double)ticks);
|
||||
}
|
||||
|
||||
r += batch; // Read next record
|
||||
batch_no++;
|
||||
}
|
||||
|
||||
g_info << "|- " << deleted << " records deleted" << endl;
|
||||
|
|
|
@ -24,7 +24,7 @@ libNDBT_a_SOURCES = \
|
|||
NdbRestarter.cpp NdbRestarts.cpp NDBT_Output.cpp \
|
||||
NdbBackup.cpp NdbConfig.cpp NdbGrep.cpp NDBT_Table.cpp \
|
||||
NdbSchemaCon.cpp NdbSchemaOp.cpp getarg.c \
|
||||
CpcClient.cpp NdbMixRestarter.cpp
|
||||
CpcClient.cpp NdbMixRestarter.cpp NDBT_Thread.cpp
|
||||
|
||||
INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/common/mgmcommon -I$(top_srcdir)/storage/ndb/include/mgmcommon -I$(top_srcdir)/storage/ndb/include/kernel -I$(top_srcdir)/storage/ndb/src/mgmapi
|
||||
|
||||
|
|
283
storage/ndb/test/src/NDBT_Thread.cpp
Normal file
283
storage/ndb/test/src/NDBT_Thread.cpp
Normal file
|
@ -0,0 +1,283 @@
|
|||
/* Copyright (C) 2003 MySQL AB
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <NDBT_Thread.hpp>
|
||||
#include <NdbApi.hpp>
|
||||
|
||||
NDBT_Thread::NDBT_Thread()
|
||||
{
|
||||
create(0, -1);
|
||||
}
|
||||
|
||||
NDBT_Thread::NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no)
|
||||
{
|
||||
create(thread_set, thread_no);
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_Thread::create(NDBT_ThreadSet* thread_set, int thread_no)
|
||||
{
|
||||
m_magic = NDBT_Thread::Magic;
|
||||
|
||||
m_state = Wait;
|
||||
m_thread_set = thread_set;
|
||||
m_thread_no = thread_no;
|
||||
m_func = 0;
|
||||
m_input = 0;
|
||||
m_output = 0;
|
||||
m_ndb = 0;
|
||||
m_err = 0;
|
||||
|
||||
m_mutex = NdbMutex_Create();
|
||||
assert(m_mutex != 0);
|
||||
m_cond = NdbCondition_Create();
|
||||
assert(m_cond != 0);
|
||||
|
||||
char buf[20];
|
||||
sprintf(buf, "NDBT_%04u");
|
||||
const char* name = strdup(buf);
|
||||
assert(name != 0);
|
||||
|
||||
unsigned stacksize = 512 * 1024;
|
||||
NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
|
||||
m_thread = NdbThread_Create(NDBT_Thread_run,
|
||||
(void**)this, stacksize, name, prio);
|
||||
assert(m_thread != 0);
|
||||
}
|
||||
|
||||
NDBT_Thread::~NDBT_Thread()
|
||||
{
|
||||
if (m_thread != 0) {
|
||||
NdbThread_Destroy(&m_thread);
|
||||
m_thread = 0;
|
||||
}
|
||||
if (m_cond != 0) {
|
||||
NdbCondition_Destroy(m_cond);
|
||||
m_cond = 0;
|
||||
}
|
||||
if (m_mutex != 0) {
|
||||
NdbMutex_Destroy(m_mutex);
|
||||
m_mutex = 0;
|
||||
}
|
||||
}
|
||||
|
||||
static void*
|
||||
NDBT_Thread_run(void* arg)
|
||||
{
|
||||
assert(arg != 0);
|
||||
NDBT_Thread& thr = *(NDBT_Thread*)arg;
|
||||
assert(thr.m_magic == NDBT_Thread::Magic);
|
||||
thr.run();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_Thread::run()
|
||||
{
|
||||
while (1) {
|
||||
lock();
|
||||
while (m_state != Start && m_state != Exit) {
|
||||
wait();
|
||||
}
|
||||
if (m_state == Exit) {
|
||||
unlock();
|
||||
break;
|
||||
}
|
||||
(*m_func)(*this);
|
||||
m_state = Stop;
|
||||
signal();
|
||||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// methods for main process
|
||||
|
||||
void
|
||||
NDBT_Thread::start()
|
||||
{
|
||||
lock();
|
||||
m_state = Start;
|
||||
signal();
|
||||
unlock();
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_Thread::stop()
|
||||
{
|
||||
lock();
|
||||
while (m_state != Stop)
|
||||
wait();
|
||||
m_state = Wait;
|
||||
unlock();
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_Thread::exit()
|
||||
{
|
||||
lock();
|
||||
m_state = Exit;
|
||||
signal();
|
||||
unlock();
|
||||
};
|
||||
|
||||
void
|
||||
NDBT_Thread::join()
|
||||
{
|
||||
NdbThread_WaitFor(m_thread, &m_status);
|
||||
m_thread = 0;
|
||||
}
|
||||
|
||||
int
|
||||
NDBT_Thread::connect(class Ndb_cluster_connection* ncc, const char* db)
|
||||
{
|
||||
m_ndb = new Ndb(ncc, db);
|
||||
if (m_ndb->init() == -1 ||
|
||||
m_ndb->waitUntilReady() == -1) {
|
||||
m_err = m_ndb->getNdbError().code;
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_Thread::disconnect()
|
||||
{
|
||||
delete m_ndb;
|
||||
m_ndb = 0;
|
||||
}
|
||||
|
||||
// set of threads
|
||||
|
||||
NDBT_ThreadSet::NDBT_ThreadSet(int count)
|
||||
{
|
||||
m_count = count;
|
||||
m_thread = new NDBT_Thread* [count];
|
||||
for (int n = 0; n < count; n++) {
|
||||
m_thread[n] = new NDBT_Thread(this, n);
|
||||
}
|
||||
}
|
||||
|
||||
NDBT_ThreadSet::~NDBT_ThreadSet()
|
||||
{
|
||||
delete_output();
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
delete m_thread[n];
|
||||
m_thread[n] = 0;
|
||||
}
|
||||
delete [] m_thread;
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_ThreadSet::start()
|
||||
{
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
NDBT_Thread& thr = *m_thread[n];
|
||||
thr.start();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_ThreadSet::stop()
|
||||
{
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
NDBT_Thread& thr = *m_thread[n];
|
||||
thr.stop();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_ThreadSet::exit()
|
||||
{
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
NDBT_Thread& thr = *m_thread[n];
|
||||
thr.exit();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_ThreadSet::join()
|
||||
{
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
NDBT_Thread& thr = *m_thread[n];
|
||||
thr.join();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_ThreadSet::set_func(NDBT_ThreadFunc* func)
|
||||
{
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
NDBT_Thread& thr = *m_thread[n];
|
||||
thr.set_func(func);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_ThreadSet::set_input(const void* input)
|
||||
{
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
NDBT_Thread& thr = *m_thread[n];
|
||||
thr.set_input(input);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_ThreadSet::delete_output()
|
||||
{
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
if (m_thread[n] != 0) {
|
||||
NDBT_Thread& thr = *m_thread[n];
|
||||
thr.delete_output();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int
|
||||
NDBT_ThreadSet::connect(class Ndb_cluster_connection* ncc, const char* db)
|
||||
{
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
assert(m_thread[n] != 0);
|
||||
NDBT_Thread& thr = *m_thread[n];
|
||||
if (thr.connect(ncc, db) == -1)
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
NDBT_ThreadSet::disconnect()
|
||||
{
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
if (m_thread[n] != 0) {
|
||||
NDBT_Thread& thr = *m_thread[n];
|
||||
thr.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int
|
||||
NDBT_ThreadSet::get_err() const
|
||||
{
|
||||
for (int n = 0; n < m_count; n++) {
|
||||
if (m_thread[n] != 0) {
|
||||
NDBT_Thread& thr = *m_thread[n];
|
||||
int err = thr.get_err();
|
||||
if (err != 0)
|
||||
return err;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
|
@ -20,22 +20,41 @@
|
|||
#include <NdbApi.hpp>
|
||||
#include <NdbMain.h>
|
||||
#include <NDBT.hpp>
|
||||
#include <NDBT_Thread.hpp>
|
||||
#include <NDBT_Stats.hpp>
|
||||
#include <NdbSleep.h>
|
||||
#include <getarg.h>
|
||||
|
||||
#include <HugoTransactions.hpp>
|
||||
|
||||
static NDBT_ThreadFunc hugoPkDelete;
|
||||
|
||||
struct ThrInput {
|
||||
const NdbDictionary::Table* pTab;
|
||||
int records;
|
||||
int batch;
|
||||
int stats;
|
||||
};
|
||||
|
||||
struct ThrOutput {
|
||||
NDBT_Stats latency;
|
||||
};
|
||||
|
||||
int main(int argc, const char** argv){
|
||||
ndb_init();
|
||||
|
||||
int _records = 0;
|
||||
int _loops = 1;
|
||||
int _batch = 0;
|
||||
int _threads = 1;
|
||||
int _stats = 0;
|
||||
int _batch = 1;
|
||||
const char* _tabname = NULL;
|
||||
int _help = 0;
|
||||
|
||||
struct getargs args[] = {
|
||||
{ "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" },
|
||||
{ "threads", 't', arg_integer, &_threads, "number of threads (default 1)", "threads" },
|
||||
{ "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" },
|
||||
// { "batch", 'b', arg_integer, &_batch, "batch value", "batch" },
|
||||
{ "records", 'r', arg_integer, &_records, "Number of records", "records" },
|
||||
{ "usage", '?', arg_flag, &_help, "Print help", "" }
|
||||
|
@ -81,12 +100,57 @@ int main(int argc, const char** argv){
|
|||
return NDBT_ProgramExit(NDBT_WRONGARGS);
|
||||
}
|
||||
|
||||
HugoTransactions hugoTrans(*pTab);
|
||||
// threads
|
||||
NDBT_ThreadSet ths(_threads);
|
||||
|
||||
// create Ndb object for each thread
|
||||
if (ths.connect(&con, "TEST_DB") == -1) {
|
||||
ndbout << "connect failed: err=" << ths.get_err() << endl;
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
}
|
||||
|
||||
// input is options
|
||||
ThrInput input;
|
||||
ths.set_input(&input);
|
||||
input.pTab = pTab;
|
||||
input.records = _records;
|
||||
input.batch = _batch;
|
||||
input.stats = _stats;
|
||||
|
||||
// output is stats
|
||||
ThrOutput output;
|
||||
ths.set_output<ThrOutput>();
|
||||
|
||||
int i = 0;
|
||||
while (i<_loops || _loops==0) {
|
||||
while (i < _loops || _loops == 0) {
|
||||
ndbout << i << ": ";
|
||||
if (hugoTrans.pkDelRecords(&MyNdb, _records) != 0){
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
|
||||
ths.set_func(hugoPkDelete);
|
||||
ths.start();
|
||||
ths.stop();
|
||||
|
||||
if (ths.get_err())
|
||||
NDBT_ProgramExit(NDBT_FAILED);
|
||||
|
||||
if (_stats) {
|
||||
NDBT_Stats latency;
|
||||
|
||||
// add stats from each thread
|
||||
int n;
|
||||
for (n = 0; n < ths.get_count(); n++) {
|
||||
NDBT_Thread& thr = ths.get_thread(n);
|
||||
ThrOutput* output = (ThrOutput*)thr.get_output();
|
||||
latency += output->latency;
|
||||
}
|
||||
|
||||
ndbout
|
||||
<< "latency per batch (us): "
|
||||
<< " samples=" << latency.getCount()
|
||||
<< " min=" << (int)latency.getMin()
|
||||
<< " max=" << (int)latency.getMax()
|
||||
<< " mean=" << (int)latency.getMean()
|
||||
<< " stddev=" << (int)latency.getStddev()
|
||||
<< endl;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
@ -94,3 +158,23 @@ int main(int argc, const char** argv){
|
|||
return NDBT_ProgramExit(NDBT_OK);
|
||||
}
|
||||
|
||||
static void hugoPkDelete(NDBT_Thread& thr)
|
||||
{
|
||||
const ThrInput* input = (const ThrInput*)thr.get_input();
|
||||
ThrOutput* output = (ThrOutput*)thr.get_output();
|
||||
|
||||
HugoTransactions hugoTrans(*input->pTab);
|
||||
output->latency.reset();
|
||||
if (input->stats)
|
||||
hugoTrans.setStatsLatency(&output->latency);
|
||||
|
||||
NDBT_ThreadSet& ths = thr.get_thread_set();
|
||||
hugoTrans.setThrInfo(ths.get_count(), thr.get_thread_no());
|
||||
|
||||
int ret;
|
||||
ret = hugoTrans.pkDelRecords(thr.get_ndb(),
|
||||
input->records,
|
||||
input->batch);
|
||||
if (ret != 0)
|
||||
thr.set_err(ret);
|
||||
}
|
||||
|
|
|
@ -20,17 +20,33 @@
|
|||
#include <NdbApi.hpp>
|
||||
#include <NdbMain.h>
|
||||
#include <NDBT.hpp>
|
||||
#include <NDBT_Thread.hpp>
|
||||
#include <NDBT_Stats.hpp>
|
||||
#include <NdbSleep.h>
|
||||
#include <getarg.h>
|
||||
|
||||
#include <HugoTransactions.hpp>
|
||||
|
||||
static NDBT_ThreadFunc hugoPkRead;
|
||||
|
||||
struct ThrInput {
|
||||
const NdbDictionary::Table* pTab;
|
||||
int records;
|
||||
int batch;
|
||||
int stats;
|
||||
};
|
||||
|
||||
struct ThrOutput {
|
||||
NDBT_Stats latency;
|
||||
};
|
||||
|
||||
int main(int argc, const char** argv){
|
||||
ndb_init();
|
||||
|
||||
int _records = 0;
|
||||
int _loops = 1;
|
||||
int _threads = 1;
|
||||
int _stats = 0;
|
||||
int _abort = 0;
|
||||
int _batch = 1;
|
||||
const char* _tabname = NULL;
|
||||
|
@ -39,6 +55,8 @@ int main(int argc, const char** argv){
|
|||
struct getargs args[] = {
|
||||
{ "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" },
|
||||
{ "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" },
|
||||
{ "threads", 't', arg_integer, &_threads, "number of threads (default 1)", "threads" },
|
||||
{ "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" },
|
||||
{ "batch", 'b', arg_integer, &_batch, "batch value(not 0)", "batch" },
|
||||
{ "records", 'r', arg_integer, &_records, "Number of records", "records" },
|
||||
{ "usage", '?', arg_flag, &_help, "Print help", "" }
|
||||
|
@ -64,6 +82,7 @@ int main(int argc, const char** argv){
|
|||
{
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
}
|
||||
|
||||
Ndb MyNdb(&con, "TEST_DB" );
|
||||
|
||||
if(MyNdb.init() != 0){
|
||||
|
@ -81,12 +100,57 @@ int main(int argc, const char** argv){
|
|||
return NDBT_ProgramExit(NDBT_WRONGARGS);
|
||||
}
|
||||
|
||||
HugoTransactions hugoTrans(*pTab);
|
||||
// threads
|
||||
NDBT_ThreadSet ths(_threads);
|
||||
|
||||
// create Ndb object for each thread
|
||||
if (ths.connect(&con, "TEST_DB") == -1) {
|
||||
ndbout << "connect failed: err=" << ths.get_err() << endl;
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
}
|
||||
|
||||
// input is options
|
||||
ThrInput input;
|
||||
ths.set_input(&input);
|
||||
input.pTab = pTab;
|
||||
input.records = _records;
|
||||
input.batch = _batch;
|
||||
input.stats = _stats;
|
||||
|
||||
// output is stats
|
||||
ThrOutput output;
|
||||
ths.set_output<ThrOutput>();
|
||||
|
||||
int i = 0;
|
||||
while (i<_loops || _loops==0) {
|
||||
while (i < _loops || _loops == 0) {
|
||||
ndbout << i << ": ";
|
||||
if (hugoTrans.pkReadRecords(&MyNdb, _records, _batch) != 0){
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
|
||||
ths.set_func(hugoPkRead);
|
||||
ths.start();
|
||||
ths.stop();
|
||||
|
||||
if (ths.get_err())
|
||||
NDBT_ProgramExit(NDBT_FAILED);
|
||||
|
||||
if (_stats) {
|
||||
NDBT_Stats latency;
|
||||
|
||||
// add stats from each thread
|
||||
int n;
|
||||
for (n = 0; n < ths.get_count(); n++) {
|
||||
NDBT_Thread& thr = ths.get_thread(n);
|
||||
ThrOutput* output = (ThrOutput*)thr.get_output();
|
||||
latency += output->latency;
|
||||
}
|
||||
|
||||
ndbout
|
||||
<< "latency per batch (us): "
|
||||
<< " samples=" << latency.getCount()
|
||||
<< " min=" << (int)latency.getMin()
|
||||
<< " max=" << (int)latency.getMax()
|
||||
<< " mean=" << (int)latency.getMean()
|
||||
<< " stddev=" << (int)latency.getStddev()
|
||||
<< endl;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
@ -94,3 +158,20 @@ int main(int argc, const char** argv){
|
|||
return NDBT_ProgramExit(NDBT_OK);
|
||||
}
|
||||
|
||||
static void hugoPkRead(NDBT_Thread& thr)
|
||||
{
|
||||
const ThrInput* input = (const ThrInput*)thr.get_input();
|
||||
ThrOutput* output = (ThrOutput*)thr.get_output();
|
||||
|
||||
HugoTransactions hugoTrans(*input->pTab);
|
||||
output->latency.reset();
|
||||
if (input->stats)
|
||||
hugoTrans.setStatsLatency(&output->latency);
|
||||
|
||||
int ret;
|
||||
ret = hugoTrans.pkReadRecords(thr.get_ndb(),
|
||||
input->records,
|
||||
input->batch);
|
||||
if (ret != 0)
|
||||
thr.set_err(ret);
|
||||
}
|
||||
|
|
|
@ -20,24 +20,43 @@
|
|||
#include <NdbApi.hpp>
|
||||
#include <NdbMain.h>
|
||||
#include <NDBT.hpp>
|
||||
#include <NDBT_Thread.hpp>
|
||||
#include <NDBT_Stats.hpp>
|
||||
#include <NdbSleep.h>
|
||||
#include <getarg.h>
|
||||
|
||||
#include <HugoTransactions.hpp>
|
||||
|
||||
static NDBT_ThreadFunc hugoPkUpdate;
|
||||
|
||||
struct ThrInput {
|
||||
const NdbDictionary::Table* pTab;
|
||||
int records;
|
||||
int batch;
|
||||
int stats;
|
||||
};
|
||||
|
||||
struct ThrOutput {
|
||||
NDBT_Stats latency;
|
||||
};
|
||||
|
||||
int main(int argc, const char** argv){
|
||||
ndb_init();
|
||||
|
||||
int _records = 0;
|
||||
int _loops = 1;
|
||||
int _threads = 1;
|
||||
int _stats = 0;
|
||||
int _abort = 0;
|
||||
int _batch = 0;
|
||||
int _batch = 1;
|
||||
const char* _tabname = NULL, *db = 0;
|
||||
int _help = 0;
|
||||
|
||||
struct getargs args[] = {
|
||||
{ "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" },
|
||||
{ "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" },
|
||||
{ "threads", 't', arg_integer, &_threads, "number of threads (default 1)", "threads" },
|
||||
{ "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" },
|
||||
// { "batch", 'b', arg_integer, &_batch, "batch value", "batch" },
|
||||
{ "records", 'r', arg_integer, &_records, "Number of records", "records" },
|
||||
{ "usage", '?', arg_flag, &_help, "Print help", "" },
|
||||
|
@ -83,16 +102,81 @@ int main(int argc, const char** argv){
|
|||
return NDBT_ProgramExit(NDBT_WRONGARGS);
|
||||
}
|
||||
|
||||
HugoTransactions hugoTrans(*pTab);
|
||||
// threads
|
||||
NDBT_ThreadSet ths(_threads);
|
||||
|
||||
// create Ndb object for each thread
|
||||
if (ths.connect(&con, "TEST_DB") == -1) {
|
||||
ndbout << "connect failed: err=" << ths.get_err() << endl;
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
}
|
||||
|
||||
// input is options
|
||||
ThrInput input;
|
||||
ths.set_input(&input);
|
||||
input.pTab = pTab;
|
||||
input.records = _records;
|
||||
input.batch = _batch;
|
||||
input.stats = _stats;
|
||||
|
||||
// output is stats
|
||||
ThrOutput output;
|
||||
ths.set_output<ThrOutput>();
|
||||
|
||||
int i = 0;
|
||||
while (i<_loops || _loops==0) {
|
||||
ndbout << "loop " << i << ": ";
|
||||
if (hugoTrans.pkUpdateRecords(&MyNdb,
|
||||
_records) != 0){
|
||||
return NDBT_ProgramExit(NDBT_FAILED);
|
||||
while (i < _loops || _loops == 0) {
|
||||
ndbout << i << ": ";
|
||||
|
||||
ths.set_func(hugoPkUpdate);
|
||||
ths.start();
|
||||
ths.stop();
|
||||
|
||||
if (ths.get_err())
|
||||
NDBT_ProgramExit(NDBT_FAILED);
|
||||
|
||||
if (_stats) {
|
||||
NDBT_Stats latency;
|
||||
|
||||
// add stats from each thread
|
||||
int n;
|
||||
for (n = 0; n < ths.get_count(); n++) {
|
||||
NDBT_Thread& thr = ths.get_thread(n);
|
||||
ThrOutput* output = (ThrOutput*)thr.get_output();
|
||||
latency += output->latency;
|
||||
}
|
||||
|
||||
ndbout
|
||||
<< "latency per batch (us): "
|
||||
<< " samples=" << latency.getCount()
|
||||
<< " min=" << (int)latency.getMin()
|
||||
<< " max=" << (int)latency.getMax()
|
||||
<< " mean=" << (int)latency.getMean()
|
||||
<< " stddev=" << (int)latency.getStddev()
|
||||
<< endl;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
||||
return NDBT_ProgramExit(NDBT_OK);
|
||||
}
|
||||
|
||||
static void hugoPkUpdate(NDBT_Thread& thr)
|
||||
{
|
||||
const ThrInput* input = (const ThrInput*)thr.get_input();
|
||||
ThrOutput* output = (ThrOutput*)thr.get_output();
|
||||
|
||||
HugoTransactions hugoTrans(*input->pTab);
|
||||
output->latency.reset();
|
||||
if (input->stats)
|
||||
hugoTrans.setStatsLatency(&output->latency);
|
||||
|
||||
NDBT_ThreadSet& ths = thr.get_thread_set();
|
||||
hugoTrans.setThrInfo(ths.get_count(), thr.get_thread_no());
|
||||
|
||||
int ret;
|
||||
ret = hugoTrans.pkUpdateRecords(thr.get_ndb(),
|
||||
input->records,
|
||||
input->batch);
|
||||
if (ret != 0)
|
||||
thr.set_err(ret);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue