Merge baker@bk-internal.mysql.com:/home/bk/mysql-5.0

into avenger.(none):/export/brian/mysql/merge/5.0
This commit is contained in:
brian@avenger.(none) 2004-11-24 13:34:35 -08:00
commit 71f31d1c94
73 changed files with 1175 additions and 763 deletions

View file

@ -374,6 +374,7 @@ $ENV{"LD_LIBRARY_PATH"}= ("$test_dir/lib" .
if ($opt_stage <= 5 && !$opt_no_test && !$opt_no_mysqltest)
{
my $flags= "";
my $force= "";
$flags.= " --with-ndbcluster" if ($opt_with_cluster);
$flags.= " --force" if (!$opt_one_error);
log_timestamp();

View file

@ -26,8 +26,8 @@ bin_PROGRAMS = mysql mysqladmin mysqlcheck mysqlshow \
mysqldump mysqlimport mysqltest mysqlbinlog mysqlmanagerc mysqlmanager-pwgen
noinst_HEADERS = sql_string.h completion_hash.h my_readline.h \
client_priv.h
mysqladmin_SOURCES = mysqladmin.cc
mysql_SOURCES = mysql.cc readline.cc sql_string.cc completion_hash.cc
mysqladmin_SOURCES = mysqladmin.cc
mysql_LDADD = @readline_link@ @TERMCAP_LIB@ $(LDADD) $(CXXLDFLAGS)
mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS)
mysql_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) $(DEPLIB)

View file

@ -110,6 +110,9 @@ esac
case "$target" in
i[[4567]]86-*-*)
CFLAGS="$CFLAGS -DUNIV_INTEL_X86";;
# The compiler on Linux/S390 does not seem to have inlining
s390-*-*)
CFLAGS="$CFLAGS -DUNIV_MUST_NOT_INLINE";;
esac
AC_OUTPUT(Makefile os/Makefile ut/Makefile btr/Makefile dnl

View file

@ -19,6 +19,9 @@ select 'a ' = 'a\t', 'a ' < 'a\t', 'a ' > 'a\t';
select 'a a' > 'a', 'a \t' < 'a';
'a a' > 'a' 'a \t' < 'a'
1 1
select 'c' like '\_' as want0;
want0
0
CREATE TABLE t (
c char(20) NOT NULL
) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

View file

@ -814,3 +814,6 @@ t2 CREATE TABLE `t2` (
) ENGINE=MyISAM DEFAULT CHARSET=latin1
drop table t2;
drop table t1;
select 'c' like '\_' as want0;
want0
0

View file

@ -24,6 +24,11 @@ select 'a ' = 'a\t', 'a ' < 'a\t', 'a ' > 'a\t';
select 'a a' > 'a', 'a \t' < 'a';
#
# Bug #6787 LIKE not working properly with _ and utf8 data
#
select 'c' like '\_' as want0;
#
# Bug #5679 utf8_unicode_ci LIKE--trailing % doesn't equal zero characters
#

View file

@ -660,3 +660,9 @@ create table t2 select concat(a,_utf8'') as a, concat(b,_utf8'')as b from t1;
show create table t2;
drop table t2;
drop table t1;
#
# Bug #6787 LIKE not working properly with _ and utf8 data
#
select 'c' like '\_' as want0;

35
ndb/docs/wl2077.txt Normal file
View file

@ -0,0 +1,35 @@
100' * (select 1 from T1 (1M rows) where key = rand());
1 host, 1 ndbd, api co-hosted
results in 1000 rows / sec
wo/reset bounds w/ rb
4.1-read committed a) 4.9 b) 7.4
4.1-read hold lock c) 4.7 d) 6.7
wl2077-read committed 6.4 (+30%) 10.8 (+45%)
wl2077-read hold lock 4.6 (-1%) 6.7 (+ 0%)
-- Comparision e)
serial pk: 10.9'
batched (1000): 59'
serial uniq index: 8.4'
batched (1000): 33'
index range (1000): 186'
----
load) testScanPerf -c 1 -d 1 T1
a) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 0 T1
b) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 1 T1
c) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 0 T1
d) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 1 T1
e) testReadPerf -i 25 -c 0 -d 0 T1
--- music join 1db-co 2db-co
4.1 13s 14s
4.1 wo/ blobs 1.7s 3.2s
wl2077 12s 14s
wl2077 wo/ blobs 1.2s (-30%) 2.5s (-22%)

View file

@ -28,15 +28,14 @@ ndbapi/NdbIndexScanOperation.hpp \
ndbapi/ndberror.h
mgmapiinclude_HEADERS = \
mgmapi/LocalConfig.hpp \
mgmapi/mgmapi.h \
mgmapi/mgmapi_debug.h
mgmapi/mgmapi_debug.h \
mgmapi/mgmapi_config_parameters.h \
mgmapi/mgmapi_config_parameters_debug.h
noinst_HEADERS = \
ndb_global.h \
ndb_net.h \
mgmapi/mgmapi_config_parameters.h \
mgmapi/mgmapi_config_parameters_debug.h
ndb_net.h
EXTRA_DIST = debugger editline kernel logger mgmcommon \
portlib transporter util

View file

@ -356,11 +356,26 @@ extern "C" {
/**
* Create a handle to a management server
*
* @return A management handle<br>
* or NULL if no management handle could be created.
* @return A management handle<br>
* or NULL if no management handle could be created.
*/
NdbMgmHandle ndb_mgm_create_handle();
/**
* Set connecst string to management server
*
* @param handle Management handle
* @param connect_string Connect string to the management server,
*
* @return -1 on error.
*/
int ndb_mgm_set_connectstring(NdbMgmHandle handle,
const char *connect_string);
int ndb_mgm_get_configuration_nodeid(NdbMgmHandle handle);
int ndb_mgm_get_connected_port(NdbMgmHandle handle);
const char *ndb_mgm_get_connected_host(NdbMgmHandle handle);
/**
* Destroy a management server handle
*
@ -378,11 +393,10 @@ extern "C" {
* Connect to a management server
*
* @param handle Management handle.
* @param mgmsrv Hostname and port of the management server,
* "hostname:port".
* @return -1 on error.
*/
int ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv);
int ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
int retry_delay_in_seconds, int verbose);
/**
* Disconnect from a management server
@ -709,9 +723,7 @@ extern "C" {
void ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *);
int ndb_mgm_alloc_nodeid(NdbMgmHandle handle,
unsigned version,
unsigned *pnodeid,
int nodetype);
unsigned version, int nodetype);
/**
* Config iterator
*/

View file

@ -20,7 +20,6 @@
#include <ndb_types.h>
#include <mgmapi.h>
#include <BaseString.hpp>
#include <LocalConfig.hpp>
/**
* @class ConfigRetriever
@ -28,10 +27,11 @@
*/
class ConfigRetriever {
public:
ConfigRetriever(LocalConfig &local_config, Uint32 version, Uint32 nodeType);
ConfigRetriever(const char * _connect_string,
Uint32 version, Uint32 nodeType);
~ConfigRetriever();
int do_connect(int exit_on_connect_failure= false);
int do_connect(int no_retries, int retry_delay_in_seconds, int verbose);
/**
* Get configuration for current node.
@ -46,12 +46,14 @@ public:
*/
struct ndb_mgm_configuration * getConfig();
void resetError();
int hasError();
const char * getErrorString();
/**
* @return Node id of this node (as stated in local config or connectString)
*/
Uint32 allocNodeId();
Uint32 allocNodeId(int no_retries, int retry_delay_in_seconds);
/**
* Get config using socket
@ -68,22 +70,26 @@ public:
*/
bool verifyConfig(const struct ndb_mgm_configuration *, Uint32 nodeid);
Uint32 get_mgmd_port() const {return m_mgmd_port;};
const char *get_mgmd_host() const {return m_mgmd_host;};
Uint32 get_mgmd_port() const;
const char *get_mgmd_host() const;
Uint32 get_configuration_nodeid() const;
private:
BaseString errorString;
enum ErrorType {
CR_ERROR = 0,
CR_RETRY = 1
CR_NO_ERROR = 0,
CR_ERROR = 1,
CR_RETRY = 2
};
ErrorType latestErrorType;
void setError(ErrorType, const char * errorMsg);
struct LocalConfig& _localConfig;
Uint32 _ownNodeId;
Uint32 _ownNodeId;
/*
Uint32 m_mgmd_port;
const char *m_mgmd_host;
*/
Uint32 m_version;
Uint32 m_node_type;

View file

@ -113,7 +113,7 @@ public:
* Reset bounds and put operation in list that will be
* sent on next execute
*/
int reset_bounds();
int reset_bounds(bool forceSend = false);
bool getSorted() const { return m_ordered; }
private:
@ -127,8 +127,8 @@ private:
virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*);
void fix_get_values();
int next_result_ordered(bool fetchAllowed);
int send_next_scan_ordered(Uint32 idx);
int next_result_ordered(bool fetchAllowed, bool forceSend = false);
int send_next_scan_ordered(Uint32 idx, bool forceSend = false);
int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*);
Uint32 m_sort_columns;

View file

@ -89,17 +89,17 @@ public:
* - 1: if there are no more tuples to scan.
* - 2: if there are no more cached records in NdbApi
*/
int nextResult(bool fetchAllowed = true);
int nextResult(bool fetchAllowed = true, bool forceSend = false);
/**
* Close result set (scan)
*/
void close();
void close(bool forceSend = false);
/**
* Restart
*/
int restart();
int restart(bool forceSend = false);
/**
* Transfer scan operation to an updating transaction. Use this function

View file

@ -90,11 +90,11 @@ protected:
NdbScanOperation(Ndb* aNdb);
virtual ~NdbScanOperation();
int nextResult(bool fetchAllowed = true);
int nextResult(bool fetchAllowed = true, bool forceSend = false);
virtual void release();
void closeScan();
int close_impl(class TransporterFacade*);
void closeScan(bool forceSend = false);
int close_impl(class TransporterFacade*, bool forceSend = false);
// Overloaded methods from NdbCursorOperation
int executeCursor(int ProcessorId);
@ -103,6 +103,7 @@ protected:
int init(const NdbTableImpl* tab, NdbConnection* myConnection);
int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId);
int doSend(int ProcessorId);
void checkForceSend(bool forceSend);
virtual void setErrorCode(int aErrorCode);
virtual void setErrorCodeAbort(int aErrorCode);
@ -138,7 +139,7 @@ protected:
Uint32 m_sent_receivers_count; // NOTE needs mutex to access
NdbReceiver** m_sent_receivers; // receive thread puts them here
int send_next_scan(Uint32 cnt, bool close);
int send_next_scan(Uint32 cnt, bool close, bool forceSend = false);
void receiver_delivered(NdbReceiver*);
void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP();
@ -148,7 +149,7 @@ protected:
Uint32 m_ordered;
int restart();
int restart(bool forceSend = false);
};
inline

View file

@ -19,7 +19,6 @@
#define CLUSTER_CONNECTION_HPP
class TransporterFacade;
class LocalConfig;
class ConfigRetriever;
class NdbThread;
@ -38,7 +37,6 @@ private:
void connect_thread();
char *m_connect_string;
TransporterFacade *m_facade;
LocalConfig *m_local_config;
ConfigRetriever *m_config_retriever;
NdbThread *m_connect_thread;
int (*m_connect_callback)(void);

View file

@ -32,10 +32,13 @@
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "version", 'V', "Output version information and exit.", 0, 0, 0, \
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "connect-string", 'c', \
{ "ndb-connectstring", 'c', \
"Set connect string for connecting to ndb_mgmd. " \
"<constr>=\"host=<hostname:port>[;nodeid=<id>]\". " \
"Overides specifying entries in NDB_CONNECTSTRING and config file", \
"Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \
"Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\
{ "connect-string", 'c', "same as --ndb-connectstring",\
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }
#else
@ -46,11 +49,14 @@
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "version", 'V', "Output version information and exit.", 0, 0, 0, \
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "connect-string", 'c', \
{ "ndb-connectstring", 'c', \
"Set connect string for connecting to ndb_mgmd. " \
"<constr>=\"host=<hostname:port>[;nodeid=<id>]\". " \
"Overides specifying entries in NDB_CONNECTSTRING and config file", \
"Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \
"Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\
{ "connect-string", 'c', "same as --ndb-connectstring",\
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,\
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }
#endif

View file

@ -30,13 +30,14 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
fprintf(output, " apiConnectPtr: H\'%.8x",
sig->apiConnectPtr);
fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo);
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u\n",
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u ReadCommitted: %u\n",
sig->getParallelism(requestInfo),
sig->getScanBatch(requestInfo),
sig->getLockMode(requestInfo),
sig->getKeyinfoFlag(requestInfo),
sig->getHoldLockFlag(requestInfo),
sig->getRangeScanFlag(requestInfo),
sig->getKeyinfoFlag(requestInfo));
sig->getReadCommittedFlag(requestInfo));
Uint32 keyLen = (sig->attrLenKeyLen >> 16);
Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF);

View file

@ -20,7 +20,6 @@
#include <ConfigRetriever.hpp>
#include <SocketServer.hpp>
#include "LocalConfig.hpp"
#include <NdbSleep.h>
#include <NdbOut.hpp>
@ -45,90 +44,62 @@
//****************************************************************************
//****************************************************************************
ConfigRetriever::ConfigRetriever(LocalConfig &local_config,
ConfigRetriever::ConfigRetriever(const char * _connect_string,
Uint32 version, Uint32 node_type)
: _localConfig(local_config)
{
m_handle= 0;
m_version = version;
m_node_type = node_type;
_ownNodeId = _localConfig._ownNodeId;
_ownNodeId= 0;
m_handle= ndb_mgm_create_handle();
if (m_handle == 0) {
setError(CR_ERROR, "Unable to allocate mgm handle");
return;
}
if (ndb_mgm_set_connectstring(m_handle, _connect_string))
{
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
return;
}
resetError();
}
ConfigRetriever::~ConfigRetriever(){
ConfigRetriever::~ConfigRetriever()
{
if (m_handle) {
ndb_mgm_disconnect(m_handle);
ndb_mgm_destroy_handle(&m_handle);
}
}
Uint32
ConfigRetriever::get_configuration_nodeid() const
{
return ndb_mgm_get_configuration_nodeid(m_handle);
}
Uint32 ConfigRetriever::get_mgmd_port() const
{
return ndb_mgm_get_connected_port(m_handle);
}
const char *ConfigRetriever::get_mgmd_host() const
{
return ndb_mgm_get_connected_host(m_handle);
}
//****************************************************************************
//****************************************************************************
int
ConfigRetriever::do_connect(int exit_on_connect_failure){
m_mgmd_port= 0;
m_mgmd_host= 0;
if(!m_handle)
m_handle= ndb_mgm_create_handle();
if (m_handle == 0) {
setError(CR_ERROR, "Unable to allocate mgm handle");
return -1;
}
int retry = 1;
int retry_max = 12; // Max number of retry attempts
int retry_interval= 5; // Seconds between each retry
while(retry < retry_max){
Uint32 type = CR_ERROR;
BaseString tmp;
for (unsigned int i = 0; i<_localConfig.ids.size(); i++){
MgmtSrvrId * m = &_localConfig.ids[i];
DBUG_PRINT("info",("trying %s:%d",
m->name.c_str(),
m->port));
switch(m->type){
case MgmId_TCP:
tmp.assfmt("%s:%d", m->name.c_str(), m->port);
if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) {
m_mgmd_port= m->port;
m_mgmd_host= m->name.c_str();
DBUG_PRINT("info",("connected to ndb_mgmd at %s:%d",
m_mgmd_host,
m_mgmd_port));
return 0;
}
setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle));
case MgmId_File:
break;
}
}
if(latestErrorType == CR_RETRY){
DBUG_PRINT("info",("CR_RETRY"));
if (exit_on_connect_failure)
return 1;
REPORT_WARNING("Failed to retrieve cluster configuration");
ndbout << "(Cause of failure: " << getErrorString() << ")" << endl;
ndbout << "Attempt " << retry << " of " << retry_max << ". "
<< "Trying again in "<< retry_interval <<" seconds..."
<< endl << endl;
NdbSleep_SecSleep(retry_interval);
} else {
break;
}
retry++;
}
ndb_mgm_destroy_handle(&m_handle);
m_handle= 0;
m_mgmd_port= 0;
m_mgmd_host= 0;
return -1;
ConfigRetriever::do_connect(int no_retries,
int retry_delay_in_seconds, int verbose)
{
return
(ndb_mgm_connect(m_handle,no_retries,retry_delay_in_seconds,verbose)==0) ?
0 : -1;
}
//****************************************************************************
@ -140,22 +111,9 @@ ConfigRetriever::getConfig() {
struct ndb_mgm_configuration * p = 0;
if(m_handle != 0){
if(m_handle != 0)
p = getConfig(m_handle);
} else {
for (unsigned int i = 0; i<_localConfig.ids.size(); i++){
MgmtSrvrId * m = &_localConfig.ids[i];
switch(m->type){
case MgmId_File:
p = getConfig(m->name.c_str());
break;
case MgmId_TCP:
break;
}
if(p)
break;
}
}
if(p == 0)
return 0;
@ -227,6 +185,16 @@ ConfigRetriever::setError(ErrorType et, const char * s){
latestErrorType = et;
}
void
ConfigRetriever::resetError(){
setError(CR_NO_ERROR,0);
}
int
ConfigRetriever::hasError()
{
return latestErrorType != CR_NO_ERROR;
}
const char *
ConfigRetriever::getErrorString(){
@ -341,16 +309,23 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf, Uint32
}
Uint32
ConfigRetriever::allocNodeId(){
unsigned nodeid= _ownNodeId;
if(m_handle != 0){
int res= ndb_mgm_alloc_nodeid(m_handle, m_version, &nodeid, m_node_type);
if(res != 0) {
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
return 0;
ConfigRetriever::allocNodeId(int no_retries, int retry_delay_in_seconds)
{
_ownNodeId= 0;
if(m_handle != 0)
{
while (1)
{
int res= ndb_mgm_alloc_nodeid(m_handle, m_version, m_node_type);
if(res >= 0)
return _ownNodeId= (Uint32)res;
if (no_retries == 0)
break;
no_retries--;
NdbSleep_SecSleep(retry_delay_in_seconds);
}
}
return _ownNodeId= nodeid;
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
} else
setError(CR_ERROR, "management server handle not initialized");
return 0;
}

View file

@ -70,7 +70,6 @@ struct NdbUpGradeCompatible {
#ifndef TEST_VERSION
struct NdbUpGradeCompatible ndbCompatibleTable_full[] = {
{ MAKE_VERSION(3,5,2), MAKE_VERSION(3,5,1), UG_Exact },
{ MAKE_VERSION(4,1,8), MAKE_VERSION(3,5,4), UG_Exact }, /* Aligned version with MySQL */
{ 0, 0, UG_Null }
};

View file

@ -147,7 +147,6 @@ public:
Uint32 nfConnect;
Uint32 table;
Uint32 userpointer;
Uint32 nodeCount;
BlockReference userblockref;
};
typedef Ptr<ConnectRecord> ConnectRecordPtr;

View file

@ -7080,24 +7080,22 @@ void Dbdih::execDIGETPRIMREQ(Signal* signal)
ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE);
connectPtr.i = signal->theData[0];
if(connectPtr.i != RNIL){
if(connectPtr.i != RNIL)
{
jam();
ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord);
ndbrequire(connectPtr.p->connectState == ConnectRecord::INUSE);
getFragstore(tabPtr.p, fragId, fragPtr);
connectPtr.p->nodeCount = extractNodeInfo(fragPtr.p, connectPtr.p->nodes);
signal->theData[0] = connectPtr.p->userpointer;
signal->theData[1] = passThrough;
signal->theData[2] = connectPtr.p->nodes[0];
sendSignal(connectPtr.p->userblockref, GSN_DIGETPRIMCONF, signal, 3, JBB);
return;
}//if
//connectPtr.i == RNIL -> question without connect record
}
else
{
jam();
signal->theData[0] = RNIL;
}
Uint32 nodes[MAX_REPLICAS];
getFragstore(tabPtr.p, fragId, fragPtr);
Uint32 count = extractNodeInfo(fragPtr.p, nodes);
signal->theData[0] = RNIL;
signal->theData[1] = passThrough;
signal->theData[2] = nodes[0];
signal->theData[3] = nodes[1];

View file

@ -550,6 +550,11 @@ public:
UintR scanErrorCounter;
UintR scanLocalFragid;
UintR scanSchemaVersion;
/**
* This is _always_ main table, even in range scan
* in which case scanTcrec->fragmentptr is different
*/
Uint32 fragPtrI;
UintR scanStoredProcId;
ScanState scanState;
@ -2925,4 +2930,23 @@ Dblqh::ScanRecord::check_scan_batch_completed() const
(max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes));
}
inline
void
Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
{
if (index == 0) {
acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
} else {
Uint32 attr_buf_index, attr_buf_rec;
AttrbufPtr regAttrPtr;
jam();
attr_buf_rec= (index + 31) / 32;
attr_buf_index= (index - 1) & 31;
regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
}
}
#endif

View file

@ -3084,6 +3084,7 @@ void Dblqh::execATTRINFO(Signal* signal)
return;
break;
default:
ndbout_c("%d", regTcPtr->transactionState);
ndbrequire(false);
break;
}//switch
@ -7161,10 +7162,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
// Update timer on tcConnectRecord
tcConnectptr.p->tcTimer = cLqhTimeOutCount;
init_acc_ptr_list(scanptr.p);
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
scanNextLoopLab(signal);
}//Dblqh::continueScanNextReqLab()
@ -7363,22 +7361,32 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
tcConnectptr.i = scanptr.p->scanTcrec;
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
releaseActiveFrag(signal);
if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) {
if ((scanptr.p->scanErrorCounter > 0) ||
(scanptr.p->scanCompletedStatus == ZTRUE)) {
jam();
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes = 0;
closeScanLab(signal);
} else if (scanptr.p->check_scan_batch_completed() &&
scanptr.p->scanLockHold != ZTRUE) {
jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
} else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
jam();
closeScanLab(signal);
return;
} else {
jam();
/*
We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only
come here when scanHoldLock == ZTRUE
*/
* We came here after releasing locks after
* receiving SCAN_NEXTREQ from TC. We only come here
* when scanHoldLock == ZTRUE
*/
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes = 0;
continueScanNextReqLab(signal);
}//if
} else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) {
@ -7465,25 +7473,6 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP)
scanP->scan_acc_index = 0;
}
inline
void
Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
{
if (index == 0) {
acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
} else {
Uint32 attr_buf_index, attr_buf_rec;
AttrbufPtr regAttrPtr;
jam();
attr_buf_rec= (index + 31) / 32;
attr_buf_index= (index - 1) & 31;
regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
}
}
Uint32
Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP,
Uint32 index,
@ -7714,6 +7703,9 @@ void Dblqh::abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode){
jam();
scanptr.i = scan_ptr_i;
c_scanRecordPool.getPtr(scanptr);
fragptr.i = tcConnectptr.p->fragmentptr;
ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
finishScanrec(signal);
releaseScanrec(signal);
tcConnectptr.p->transactionState = TcConnectionrec::IDLE;
@ -8007,6 +7999,13 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
/*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
************************************************************ */
if (!scanptr.p->scanLockHold)
{
jam();
closeScanLab(signal);
return;
}
if (scanptr.p->scanCompletedStatus == ZTRUE) {
if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) {
@ -8507,8 +8506,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
ScanFragRef::SignalLength, JBB);
} else {
jam();
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
}//if
finishScanrec(signal);
@ -8576,10 +8573,12 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
/**
* Used for scan take over
*/
FragrecordPtr tFragPtr;
tFragPtr.i = fragptr.p->tableFragptr;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
scanptr.p->fragPtrI = fragptr.p->tableFragptr;
{
FragrecordPtr tFragPtr;
tFragPtr.i = fragptr.p->tableFragptr;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
scanptr.p->fragPtrI = fragptr.p->tableFragptr;
}
/**
* !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11
@ -8588,8 +8587,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
stop += start;
Uint32 free = tFragPtr.p->m_scanNumberMask.find(start);
Uint32 free = fragptr.p->m_scanNumberMask.find(start);
if(free == Fragrecord::ScanNumberMask::NotFound || free >= stop){
jam();
@ -8603,16 +8602,16 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
*/
scanptr.p->scanState = ScanRecord::IN_QUEUE;
LocalDLFifoList<ScanRecord> queue(c_scanRecordPool,
tFragPtr.p->m_queuedScans);
fragptr.p->m_queuedScans);
queue.add(scanptr);
return ZOK;
}
scanptr.p->scanNumber = free;
tFragPtr.p->m_scanNumberMask.clear(free);// Update mask
fragptr.p->m_scanNumberMask.clear(free);// Update mask
LocalDLList<ScanRecord> active(c_scanRecordPool, tFragPtr.p->m_activeScans);
LocalDLList<ScanRecord> active(c_scanRecordPool, fragptr.p->m_activeScans);
active.add(scanptr);
if(scanptr.p->scanKeyinfoFlag){
jam();
@ -8672,12 +8671,8 @@ void Dblqh::finishScanrec(Signal* signal)
{
release_acc_ptr_list(scanptr.p);
FragrecordPtr tFragPtr;
tFragPtr.i = scanptr.p->fragPtrI;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
LocalDLFifoList<ScanRecord> queue(c_scanRecordPool,
tFragPtr.p->m_queuedScans);
fragptr.p->m_queuedScans);
if(scanptr.p->scanState == ScanRecord::IN_QUEUE){
jam();
@ -8695,11 +8690,11 @@ void Dblqh::finishScanrec(Signal* signal)
ndbrequire(tmp.p == scanptr.p);
}
LocalDLList<ScanRecord> scans(c_scanRecordPool, tFragPtr.p->m_activeScans);
LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans);
scans.release(scanptr);
const Uint32 scanNumber = scanptr.p->scanNumber;
ndbrequire(!tFragPtr.p->m_scanNumberMask.get(scanNumber));
ndbrequire(!fragptr.p->m_scanNumberMask.get(scanNumber));
ScanRecordPtr restart;
/**
@ -8707,13 +8702,13 @@ void Dblqh::finishScanrec(Signal* signal)
*/
if(scanNumber == NR_ScanNo || !queue.first(restart)){
jam();
tFragPtr.p->m_scanNumberMask.set(scanNumber);
fragptr.p->m_scanNumberMask.set(scanNumber);
return;
}
if(ERROR_INSERTED(5034)){
jam();
tFragPtr.p->m_scanNumberMask.set(scanNumber);
fragptr.p->m_scanNumberMask.set(scanNumber);
return;
}
@ -8724,7 +8719,7 @@ void Dblqh::finishScanrec(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
restart.p->scanNumber = scanNumber;
restart.p->scanState = ScanRecord::WAIT_ACC_SCAN;
queue.remove(restart);
scans.add(restart);
if(restart.p->scanKeyinfoFlag){
@ -8912,6 +8907,13 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
conf->total_len= total_len;
sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF,
signal, ScanFragConf::SignalLength, JBB);
if(!scanptr.p->scanLockHold)
{
jam();
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
}
}//Dblqh::sendScanFragConf()
/* ######################################################################### */

View file

@ -1054,9 +1054,8 @@ public:
// Id of the ScanRecord this fragment scan belongs to
Uint32 scanRec;
// The maximum number of operations that can be scanned before
// returning to TC
Uint16 scanFragConcurrency;
// The value of fragmentCompleted in the last received SCAN_FRAGCONF
Uint8 m_scan_frag_conf_status;
inline void startFragTimer(Uint32 timeVal){
scanFragTimer = timeVal;
@ -1193,8 +1192,10 @@ public:
// Number of operation records per scanned fragment
// Number of operations in first batch
// Max number of bytes per batch
Uint16 noOprecPerFrag;
Uint16 first_batch_size;
union {
Uint16 first_batch_size_rows;
Uint16 batch_size_rows;
};
Uint32 batch_byte_size;
Uint32 scanRequestInfo; // ScanFrag format

View file

@ -8646,9 +8646,9 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
scanptr.p->scanTableref = tabptr.i;
scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion;
scanptr.p->scanParallel = scanParallel;
scanptr.p->noOprecPerFrag = noOprecPerFrag;
scanptr.p->first_batch_size= scanTabReq->first_batch_size;
scanptr.p->batch_byte_size= scanTabReq->batch_byte_size;
scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size;
scanptr.p->batch_byte_size = scanTabReq->batch_byte_size;
scanptr.p->batch_size_rows = noOprecPerFrag;
Uint32 tmp = 0;
const UintR ri = scanTabReq->requestInfo;
@ -8672,7 +8672,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
ndbrequire(list.seize(ptr));
ptr.p->scanRec = scanptr.i;
ptr.p->scanFragId = 0;
ptr.p->scanFragConcurrency = noOprecPerFrag;
ptr.p->m_apiPtr = cdata[i];
}//for
@ -8945,6 +8944,25 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
/**
* This must be false as select count(*) otherwise
* can "pass" committing on backup fragments and
* get incorrect row count
*/
if(false && ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo))
{
jam();
Uint32 max = 3+signal->theData[6];
Uint32 nodeid = getOwnNodeId();
for(Uint32 i = 3; i<max; i++)
if(signal->theData[i] == nodeid)
{
jam();
tnodeid = nodeid;
break;
}
}
{
/**
* Check table
@ -9141,6 +9159,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0];
const Uint32 noCompletedOps = conf->completedOps;
const Uint32 status = conf->fragmentCompleted;
scanFragptr.i = conf->senderData;
c_scan_frag_pool.getPtr(scanFragptr);
@ -9163,11 +9182,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
const Uint32 status = conf->fragmentCompleted;
if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
jam();
if(status == ZFALSE){
if(status == 0){
/**
* We have started closing = we sent a close -> ignore this
*/
@ -9184,11 +9201,11 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
return;
}
if(status == ZCLOSED && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
if(noCompletedOps == 0 && status != 0 &&
scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
/**
* Start on next fragment
*/
ndbrequire(noCompletedOps == 0);
scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
scanFragptr.p->startFragTimer(ctcTimer);
@ -9218,6 +9235,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
scanptr.p->m_queued_count++;
}
scanFragptr.p->m_scan_frag_conf_status = status;
scanFragptr.p->m_ops = noCompletedOps;
scanFragptr.p->m_totalLen = total_len;
scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY;
@ -9311,7 +9329,6 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
/*********************************************************************
* APPLICATION IS CLOSING THE SCAN.
**********************************************************************/
ndbrequire(len == 0);
close_scan_req(signal, scanptr, true);
return;
}//if
@ -9330,11 +9347,12 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
// Copy op ptrs so I dont overwrite them when sending...
memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len);
ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0];
nextReq->closeFlag = ZFALSE;
nextReq->transId1 = apiConnectptr.p->transid[0];
nextReq->transId2 = apiConnectptr.p->transid[1];
nextReq->batch_size_bytes= scanP->batch_byte_size;
ScanFragNextReq tmp;
tmp.closeFlag = ZFALSE;
tmp.transId1 = apiConnectptr.p->transid[0];
tmp.transId2 = apiConnectptr.p->transid[1];
tmp.batch_size_rows = scanP->batch_size_rows;
tmp.batch_size_bytes = scanP->batch_byte_size;
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
@ -9344,15 +9362,37 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
c_scan_frag_pool.getPtr(scanFragptr);
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED);
scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
scanFragptr.p->startFragTimer(ctcTimer);
scanFragptr.p->m_ops = 0;
nextReq->senderData = scanFragptr.i;
nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency;
sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
if(scanFragptr.p->m_scan_frag_conf_status)
{
/**
* last scan was complete
*/
jam();
ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag);
scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
tcConnectptr.i = scanptr.p->scanTcrec;
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++;
signal->theData[0] = tcConnectptr.p->dihConnectptr;
signal->theData[1] = scanFragptr.i;
signal->theData[2] = scanptr.p->scanTableref;
signal->theData[3] = scanFragptr.p->scanFragId;
sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB);
}
else
{
jam();
scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend();
* req = tmp;
req->senderData = scanFragptr.i;
sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
}
delivered.remove(scanFragptr);
running.add(scanFragptr);
}//for
@ -9416,7 +9456,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
ndbrequire(curr.p->scanFragState == ScanFragRec::DELIVERED);
delivered.remove(curr);
if(curr.p->m_ops > 0){
if(curr.p->m_ops > 0 && curr.p->m_scan_frag_conf_status == 0){
jam();
running.add(curr);
curr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
@ -9551,7 +9591,7 @@ void Dbtc::sendScanFragReq(Signal* signal,
req->transId1 = apiConnectptr.p->transid[0];
req->transId2 = apiConnectptr.p->transid[1];
req->clientOpPtr = scanFragP->m_apiPtr;
req->batch_size_rows= scanFragP->scanFragConcurrency;
req->batch_size_rows= scanP->batch_size_rows;
req->batch_size_bytes= scanP->batch_byte_size;
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB);
@ -9573,6 +9613,8 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
jam();
ops += 21;
}
Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId;
ScanTabConf * conf = (ScanTabConf*)&signal->theData[0];
conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
@ -9588,24 +9630,25 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
ScanFragRecPtr curr = ptr; // Remove while iterating...
queued.next(ptr);
bool done = curr.p->m_scan_frag_conf_status && --left;
* ops++ = curr.p->m_apiPtr;
* ops++ = curr.i;
* ops++ = done ? RNIL : curr.i;
* ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops;
queued.remove(curr);
if(curr.p->m_ops > 0){
if(!done){
delivered.add(curr);
curr.p->scanFragState = ScanFragRec::DELIVERED;
curr.p->stopFragTimer();
} else {
(* --ops) = ScanTabConf::EndOfData; ops++;
c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
}
}
}
if(scanPtr.p->m_delivered_scan_frags.isEmpty() &&
scanPtr.p->m_running_scan_frags.isEmpty()){
conf->requestInfo = op_count | ScanTabConf::EndOfData;
@ -10424,9 +10467,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sfp.i,
sfp.p->scanFragState,
sfp.p->scanFragId);
infoEvent(" nodeid=%d, concurr=%d, timer=%d",
infoEvent(" nodeid=%d, timer=%d",
refToNode(sfp.p->lqhBlockref),
sfp.p->scanFragConcurrency,
sfp.p->scanFragTimer);
}
@ -10504,7 +10546,7 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sp.p->scanAiLength,
sp.p->scanParallel,
sp.p->scanReceivedOperations,
sp.p->noOprecPerFrag);
sp.p->batch_size_rows);
infoEvent(" schv=%d, tab=%d, sproc=%d",
sp.p->scanSchemaVersion,
sp.p->scanTableref,

View file

@ -1888,7 +1888,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->requestInfo = 0;
req->savePointId = 0;
ScanFragReq::setLockMode(req->requestInfo, 0);
ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
ScanFragReq::setAttrLen(req->requestInfo, attrLen);
req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;

View file

@ -19,7 +19,6 @@
#include <ndb_version.h>
#include "Configuration.hpp"
#include <LocalConfig.hpp>
#include <TransporterRegistry.hpp>
#include "vm/SimBlockList.hpp"
@ -69,16 +68,9 @@ int main(int argc, char** argv)
return NRT_Default;
}
LocalConfig local_config;
if (!local_config.init(theConfig->getConnectString(),0)){
local_config.printError();
local_config.printUsage();
return NRT_Default;
}
{ // Do configuration
signal(SIGPIPE, SIG_IGN);
theConfig->fetch_configuration(local_config);
theConfig->fetch_configuration();
}
chdir(NdbConfig_get_path(0));
@ -141,7 +133,7 @@ int main(int argc, char** argv)
exit(0);
}
g_eventLogger.info("Ndb has terminated (pid %d) restarting", child);
theConfig->fetch_configuration(local_config);
theConfig->fetch_configuration();
}
g_eventLogger.info("Angel pid: %d ndb pid: %d", getppid(), getpid());

View file

@ -17,7 +17,6 @@
#include <ndb_global.h>
#include <ndb_opts.h>
#include <LocalConfig.hpp>
#include "Configuration.hpp"
#include <ErrorHandlingMacros.hpp>
#include "GlobalData.hpp"
@ -35,6 +34,7 @@
#include <kernel_types.h>
#include <ndb_limits.h>
#include <ndbapi_limits.h>
#include "pc.hpp"
#include <LogLevel.hpp>
#include <NdbSleep.h>
@ -108,7 +108,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
bool
Configuration::init(int argc, char** argv)
{
const char *load_default_groups[]= { "ndbd",0 };
const char *load_default_groups[]= { "mysql_cluster","ndbd",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
@ -189,7 +189,7 @@ Configuration::closeConfiguration(){
}
void
Configuration::fetch_configuration(LocalConfig &local_config){
Configuration::fetch_configuration(){
/**
* Fetch configuration from management server
*/
@ -199,8 +199,17 @@ Configuration::fetch_configuration(LocalConfig &local_config){
m_mgmd_port= 0;
m_mgmd_host= 0;
m_config_retriever= new ConfigRetriever(local_config, NDB_VERSION, NODE_TYPE_DB);
if(m_config_retriever->do_connect() == -1){
m_config_retriever= new ConfigRetriever(getConnectString(),
NDB_VERSION, NODE_TYPE_DB);
if (m_config_retriever->hasError())
{
ERROR_SET(fatal, ERR_INVALID_CONFIG,
"Could not connect initialize handle to management server",
m_config_retriever->getErrorString());
}
if(m_config_retriever->do_connect(12,5,1) == -1){
const char * s = m_config_retriever->getErrorString();
if(s == 0)
s = "No error given!";
@ -215,13 +224,7 @@ Configuration::fetch_configuration(LocalConfig &local_config){
ConfigRetriever &cr= *m_config_retriever;
if((globalData.ownId = cr.allocNodeId()) == 0){
for(Uint32 i = 0; i<3; i++){
NdbSleep_SecSleep(3);
if((globalData.ownId = cr.allocNodeId()) != 0)
break;
}
}
globalData.ownId = cr.allocNodeId(2 /*retry*/,3 /*delay*/);
if(globalData.ownId == 0){
ERROR_SET(fatal, ERR_INVALID_CONFIG,
@ -452,6 +455,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){
unsigned int noOfTables = 0;
unsigned int noOfUniqueHashIndexes = 0;
unsigned int noOfOrderedIndexes = 0;
unsigned int noOfTriggers = 0;
unsigned int noOfReplicas = 0;
unsigned int noOfDBNodes = 0;
unsigned int noOfAPINodes = 0;
@ -476,6 +480,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){
{ CFG_DB_NO_TABLES, &noOfTables, false },
{ CFG_DB_NO_ORDERED_INDEXES, &noOfOrderedIndexes, false },
{ CFG_DB_NO_UNIQUE_HASH_INDEXES, &noOfUniqueHashIndexes, false },
{ CFG_DB_NO_TRIGGERS, &noOfTriggers, true },
{ CFG_DB_NO_REPLICAS, &noOfReplicas, false },
{ CFG_DB_NO_ATTRIBUTES, &noOfAttributes, false },
{ CFG_DB_NO_OPS, &noOfOperations, false },
@ -584,6 +589,18 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){
ConfigValues::Iterator it2(*ownConfig, db.m_config);
it2.set(CFG_DB_NO_TABLES, noOfTables);
it2.set(CFG_DB_NO_ATTRIBUTES, noOfAttributes);
{
Uint32 neededNoOfTriggers = /* types: Insert/Update/Delete/Custom */
3 * noOfUniqueHashIndexes + /* for unique hash indexes, I/U/D */
3 * NDB_MAX_ACTIVE_EVENTS + /* for events in suma, I/U/D */
3 * noOfTables + /* for backup, I/U/D */
noOfOrderedIndexes; /* for ordered indexes, C */
if (noOfTriggers < neededNoOfTriggers)
{
noOfTriggers= neededNoOfTriggers;
it2.set(CFG_DB_NO_TRIGGERS, noOfTriggers);
}
}
/**
* Do size calculations

View file

@ -21,7 +21,6 @@
#include <ndb_types.h>
class ConfigRetriever;
class LocalConfig;
class Configuration {
public:
@ -33,7 +32,7 @@ public:
*/
bool init(int argc, char** argv);
void fetch_configuration(LocalConfig &local_config);
void fetch_configuration();
void setupConfiguration();
void closeConfiguration();

View file

@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <LocalConfig.hpp>
#include "LocalConfig.hpp"
#include <NdbEnv.h>
#include <NdbConfig.h>
#include <NdbAutoPtr.hpp>
@ -294,4 +294,19 @@ LocalConfig::readConnectString(const char * connectString,
return return_value;
}
char *
LocalConfig::makeConnectString(char *buf, int sz)
{
int p= BaseString::snprintf(buf,sz,"nodeid=%d", _ownNodeId);
for (int i = 0; (i < ids.size()) && (sz-p > 0); i++)
{
if (ids[i].type != MgmId_TCP)
continue;
p+=BaseString::snprintf(buf+p,sz-p,",%s:%d",
ids[i].name.c_str(), ids[i].port);
}
buf[sz-1]=0;
return buf;
}
template class Vector<MgmtSrvrId>;

View file

@ -61,6 +61,7 @@ struct LocalConfig {
bool parseHostName(const char *buf);
bool parseFileName(const char *buf);
bool parseString(const char *buf, BaseString &err);
char * makeConnectString(char *buf, int sz);
};
#endif // LocalConfig_H

View file

@ -20,6 +20,7 @@
#include <LocalConfig.hpp>
#include <NdbAutoPtr.hpp>
#include <NdbSleep.h>
#include <NdbTCP.h>
#include "mgmapi.h"
#include "mgmapi_debug.h"
@ -83,8 +84,8 @@ typedef Parser<ParserDummy> Parser_t;
#define NDB_MGM_MAX_ERR_DESC_SIZE 256
struct ndb_mgm_handle {
char * hostname;
unsigned short port;
char * connectstring;
int cfg_i;
int connected;
int last_error;
@ -95,7 +96,7 @@ struct ndb_mgm_handle {
NDB_SOCKET_TYPE socket;
char cfg_ptr[sizeof(LocalConfig)];
LocalConfig cfg;
#ifdef MGMAPI_LOG
FILE* logfile;
@ -148,14 +149,16 @@ ndb_mgm_create_handle()
h->connected = 0;
h->last_error = 0;
h->last_error_line = 0;
h->hostname = 0;
h->socket = NDB_INVALID_SOCKET;
h->read_timeout = 50000;
h->write_timeout = 100;
new (h->cfg_ptr) LocalConfig;
h->cfg_i = 0;
strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE);
new (&(h->cfg)) LocalConfig;
h->cfg.init(0, 0);
#ifdef MGMAPI_LOG
h->logfile = 0;
#endif
@ -163,6 +166,23 @@ ndb_mgm_create_handle()
return h;
}
extern "C"
int
ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv)
{
new (&(handle->cfg)) LocalConfig;
if (!handle->cfg.init(mgmsrv, 0) ||
handle->cfg.ids.size() == 0)
{
new (&(handle->cfg)) LocalConfig;
handle->cfg.init(0, 0); /* reset the LocalCongig */
SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, "");
return -1;
}
handle->cfg_i= 0;
return 0;
}
/**
* Destroy a handle
*/
@ -175,14 +195,13 @@ ndb_mgm_destroy_handle(NdbMgmHandle * handle)
if((* handle)->connected){
ndb_mgm_disconnect(* handle);
}
my_free((* handle)->hostname,MYF(MY_ALLOW_ZERO_PTR));
#ifdef MGMAPI_LOG
if ((* handle)->logfile != 0){
fclose((* handle)->logfile);
(* handle)->logfile = 0;
}
#endif
((LocalConfig*)((*handle)->cfg_ptr))->~LocalConfig();
(*handle)->cfg.~LocalConfig();
my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR));
* handle = 0;
}
@ -314,7 +333,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
*/
extern "C"
int
ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv)
ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
int retry_delay_in_seconds, int verbose)
{
SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_connect");
CHECK_HANDLE(handle, -1);
@ -331,36 +351,48 @@ ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv)
/**
* Do connect
*/
LocalConfig *cfg= (LocalConfig*)(handle->cfg_ptr);
new (cfg) LocalConfig;
if (!cfg->init(mgmsrv, 0) ||
cfg->ids.size() == 0)
{
SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, "");
return -1;
}
LocalConfig &cfg= handle->cfg;
NDB_SOCKET_TYPE sockfd= NDB_INVALID_SOCKET;
Uint32 i;
for (i = 0; i < cfg->ids.size(); i++)
while (sockfd == NDB_INVALID_SOCKET)
{
if (cfg->ids[i].type != MgmId_TCP)
continue;
SocketClient s(cfg->ids[i].name.c_str(), cfg->ids[i].port);
sockfd = s.connect();
// do all the mgmt servers
for (i = 0; i < cfg.ids.size(); i++)
{
if (cfg.ids[i].type != MgmId_TCP)
continue;
SocketClient s(cfg.ids[i].name.c_str(), cfg.ids[i].port);
sockfd = s.connect();
if (sockfd != NDB_INVALID_SOCKET)
break;
}
if (sockfd != NDB_INVALID_SOCKET)
break;
}
if (sockfd == NDB_INVALID_SOCKET)
{
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
"Unable to connect using connectstring %s", mgmsrv);
return -1;
if (verbose > 0) {
char buf[1024];
ndbout_c("Unable to connect with connect string: %s",
cfg.makeConnectString(buf,sizeof(buf)));
verbose= -1;
}
if (no_retries == 0) {
char buf[1024];
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
"Unable to connect with connect string: %s",
cfg.makeConnectString(buf,sizeof(buf)));
return -1;
}
if (verbose == -1) {
ndbout << "retrying every " << retry_delay_in_seconds << " seconds:";
verbose= -2;
}
NdbSleep_SecSleep(retry_delay_in_seconds);
if (verbose == -2) {
ndbout << " " << no_retries;
}
no_retries--;
}
my_free(handle->hostname,MYF(MY_ALLOW_ZERO_PTR));
handle->hostname = my_strdup(cfg->ids[i].name.c_str(),MYF(MY_WME));
handle->port = cfg->ids[i].port;
handle->cfg_i = i;
handle->socket = sockfd;
handle->connected = 1;
@ -1068,7 +1100,9 @@ ndb_mgm_listen_event(NdbMgmHandle handle, int filter[])
};
CHECK_HANDLE(handle, -1);
SocketClient s(handle->hostname, handle->port);
const char *hostname= ndb_mgm_get_connected_host(handle);
int port= ndb_mgm_get_connected_port(handle);
SocketClient s(hostname, port);
const NDB_SOCKET_TYPE sockfd = s.connect();
if (sockfd < 0) {
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
@ -1613,16 +1647,37 @@ ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *cfg)
extern "C"
int
ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodeid, int nodetype)
ndb_mgm_get_configuration_nodeid(NdbMgmHandle handle)
{
CHECK_HANDLE(handle, 0);
return handle->cfg._ownNodeId;
}
extern "C"
int ndb_mgm_get_connected_port(NdbMgmHandle handle)
{
return handle->cfg.ids[handle->cfg_i].port;
}
extern "C"
const char *ndb_mgm_get_connected_host(NdbMgmHandle handle)
{
return handle->cfg.ids[handle->cfg_i].name.c_str();
}
extern "C"
int
ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype)
{
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
int nodeid= handle->cfg._ownNodeId;
Properties args;
args.put("version", version);
args.put("nodetype", nodetype);
args.put("nodeid", *pnodeid);
args.put("nodeid", nodeid);
args.put("user", "mysqld");
args.put("password", "mysqld");
args.put("public key", "a public key");
@ -1638,26 +1693,29 @@ ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodei
prop= ndb_mgm_call(handle, reply, "get nodeid", &args);
CHECK_REPLY(prop, -1);
int res= -1;
nodeid= -1;
do {
const char * buf;
if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){
const char *hostname= ndb_mgm_get_connected_host(handle);
unsigned port= ndb_mgm_get_connected_port(handle);
BaseString err;
err.assfmt("Could not alloc node id at %s port %d: %s",
handle->hostname, handle->port, buf);
hostname, port, buf);
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
err.c_str());
break;
}
if(!prop->get("nodeid", pnodeid) != 0){
Uint32 _nodeid;
if(!prop->get("nodeid", &_nodeid) != 0){
ndbout_c("ERROR Message: <nodeid Unspecified>\n");
break;
}
res= 0;
nodeid= _nodeid;
}while(0);
delete prop;
return res;
return nodeid;
}
/*****************************************************************************

View file

@ -153,7 +153,6 @@ private:
NdbMgmHandle m_mgmsrv;
bool connected;
const char *host;
int try_reconnect;
#ifdef HAVE_GLOBAL_REPLICATION
NdbRepHandle m_repserver;
@ -379,15 +378,16 @@ CommandInterpreter::CommandInterpreter(const char *_host)
m_mgmsrv = ndb_mgm_create_handle();
if(m_mgmsrv == NULL) {
ndbout_c("Cannot create handle to management server.");
exit(-1);
}
if (ndb_mgm_set_connectstring(m_mgmsrv, _host))
{
printError();
exit(-1);
}
connected = false;
try_reconnect = 0;
if (_host)
host= my_strdup(_host,MYF(MY_WME));
else
host= 0;
#ifdef HAVE_GLOBAL_REPLICATION
rep_host = NULL;
m_repserver = NULL;
@ -402,8 +402,6 @@ CommandInterpreter::~CommandInterpreter()
{
connected = false;
ndb_mgm_destroy_handle(&m_mgmsrv);
my_free((char *)host,MYF(MY_ALLOW_ZERO_PTR));
host = NULL;
}
static bool
@ -438,18 +436,8 @@ bool
CommandInterpreter::connect()
{
if(!connected) {
int tries = try_reconnect; // tries == 0 => infinite
while(!connected) {
if(ndb_mgm_connect(m_mgmsrv, host) == -1) {
ndbout << "Cannot connect to management server (" << host << ").";
tries--;
if (tries == 0)
break;
ndbout << "Retrying in 5 seconds." << endl;
NdbSleep_SecSleep(5);
} else
connected = true;
}
if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1))
connected = true;
}
return connected;
}

View file

@ -30,9 +30,10 @@ extern "C" int add_history(const char *command); /* From readline directory */
#include <NdbMain.h>
#include <NdbHost.h>
#include <BaseString.hpp>
#include <NdbOut.hpp>
#include <mgmapi.h>
#include <ndb_version.h>
#include <LocalConfig.hpp>
#include "ndb_mgmclient.hpp"
@ -138,7 +139,7 @@ int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *_host = 0;
int _port = 0;
const char *load_default_groups[]= { "ndb_mgm",0 };
const char *load_default_groups[]= { "mysql_cluster","ndb_mgm",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;

View file

@ -399,16 +399,20 @@ MgmtSrvr::getPort() const {
}
/* Constructor */
MgmtSrvr::MgmtSrvr(NodeId nodeId,
SocketServer *socket_server,
const BaseString &configFilename,
LocalConfig &local_config,
Config * config):
int MgmtSrvr::init()
{
if ( _ownNodeId > 0)
return 0;
return -1;
}
MgmtSrvr::MgmtSrvr(SocketServer *socket_server,
const char *config_filename,
const char *connect_string) :
_blockNumber(1), // Hard coded block number since it makes it easy to send
// signals to other management servers.
m_socket_server(socket_server),
_ownReference(0),
m_local_config(local_config),
theSignalIdleList(NULL),
theWaitState(WAIT_SUBSCRIBE_CONF),
m_statisticsListner(this)
@ -416,6 +420,8 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
DBUG_ENTER("MgmtSrvr::MgmtSrvr");
_ownNodeId= 0;
_config = NULL;
_isStopThread = false;
@ -426,12 +432,43 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
theFacade = 0;
m_newConfig = NULL;
m_configFilename = configFilename;
m_configFilename.assign(config_filename);
m_nextConfigGenerationNumber = 0;
_config = (config == 0 ? readConfig() : config);
m_config_retriever= new ConfigRetriever(connect_string,
NDB_VERSION, NDB_MGM_NODE_TYPE_MGM);
// first try to allocate nodeid from another management server
if(m_config_retriever->do_connect(0,0,0) == 0)
{
int tmp_nodeid= 0;
tmp_nodeid= m_config_retriever->allocNodeId(0 /*retry*/,0 /*delay*/);
if (tmp_nodeid == 0)
{
ndbout_c(m_config_retriever->getErrorString());
exit(-1);
}
// read config from other managent server
_config= fetchConfig();
if (_config == 0)
{
ndbout << m_config_retriever->getErrorString() << endl;
exit(-1);
}
_ownNodeId= tmp_nodeid;
}
if (_ownNodeId == 0)
{
// read config locally
_config= readConfig();
if (_config == 0) {
ndbout << "Unable to read config file" << endl;
exit(-1);
}
}
theMgmtWaitForResponseCondPtr = NdbCondition_Create();
m_configMutex = NdbMutex_Create();
@ -443,9 +480,11 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
nodeTypes[i] = (enum ndb_mgm_node_type)-1;
m_connect_address[i].s_addr= 0;
}
{
ndb_mgm_configuration_iterator * iter = ndb_mgm_create_configuration_iterator
(config->m_configValues, CFG_SECTION_NODE);
ndb_mgm_configuration_iterator
*iter = ndb_mgm_create_configuration_iterator(_config->m_configValues,
CFG_SECTION_NODE);
for(ndb_mgm_first(iter); ndb_mgm_valid(iter); ndb_mgm_next(iter)){
unsigned type, id;
if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0)
@ -478,8 +517,6 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
}
_props = NULL;
_ownNodeId= 0;
NodeId tmp= nodeId;
BaseString error_string;
if ((m_node_id_mutex = NdbMutex_Create()) == 0)
@ -488,43 +525,25 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
exit(-1);
}
#if 0
char my_hostname[256];
struct sockaddr_in tmp_addr;
SOCKET_SIZE_TYPE addrlen= sizeof(tmp_addr);
if (!g_no_nodeid_checks) {
if (gethostname(my_hostname, sizeof(my_hostname))) {
ndbout << "error: gethostname() - " << strerror(errno) << endl;
exit(-1);
}
if (Ndb_getInAddr(&(((sockaddr_in*)&tmp_addr)->sin_addr),my_hostname)) {
ndbout << "error: Ndb_getInAddr(" << my_hostname << ") - "
<< strerror(errno) << endl;
if (_ownNodeId == 0) // we did not get node id from other server
{
NodeId tmp= m_config_retriever->get_configuration_nodeid();
if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
0, 0, error_string)){
ndbout << "Unable to obtain requested nodeid: "
<< error_string.c_str() << endl;
exit(-1);
}
_ownNodeId = tmp;
}
if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
(struct sockaddr *)&tmp_addr,
&addrlen, error_string)){
ndbout << "Unable to obtain requested nodeid: "
<< error_string.c_str() << endl;
exit(-1);
}
#else
if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
0, 0, error_string)){
ndbout << "Unable to obtain requested nodeid: "
<< error_string.c_str() << endl;
exit(-1);
}
#endif
_ownNodeId = tmp;
{
DBUG_PRINT("info", ("verifyConfig"));
ConfigRetriever cr(m_local_config, NDB_VERSION, NDB_MGM_NODE_TYPE_MGM);
if (!cr.verifyConfig(config->m_configValues, _ownNodeId)) {
ndbout << cr.getErrorString() << endl;
if (!m_config_retriever->verifyConfig(_config->m_configValues,
_ownNodeId))
{
ndbout << m_config_retriever->getErrorString() << endl;
exit(-1);
}
}
@ -657,6 +676,8 @@ MgmtSrvr::~MgmtSrvr()
NdbThread_WaitFor(m_signalRecvThread, &res);
NdbThread_Destroy(&m_signalRecvThread);
}
if (m_config_retriever)
delete m_config_retriever;
}
//****************************************************************************

View file

@ -175,11 +175,10 @@ public:
/* Constructor */
MgmtSrvr(NodeId nodeId, /* Local nodeid */
SocketServer *socket_server,
const BaseString &config_filename, /* Where to save config */
LocalConfig &local_config, /* Ndb.cfg filename */
Config * config);
MgmtSrvr(SocketServer *socket_server,
const char *config_filename, /* Where to save config */
const char *connect_string);
int init();
NodeId getOwnNodeId() const {return _ownNodeId;};
/**
@ -538,7 +537,6 @@ private:
NdbMutex *m_configMutex;
const Config * _config;
Config * m_newConfig;
LocalConfig &m_local_config;
BaseString m_configFilename;
Uint32 m_nextConfigGenerationNumber;
@ -755,6 +753,9 @@ private:
Config *_props;
int send(class NdbApiSignal* signal, Uint32 node, Uint32 node_type);
ConfigRetriever *m_config_retriever;
public:
/**
* This method does not exist

View file

@ -272,30 +272,20 @@ MgmtSrvr::saveConfig(const Config *conf) {
Config *
MgmtSrvr::readConfig() {
Config *conf = NULL;
if(m_configFilename.length() != 0) {
/* Use config file */
InitConfigFileParser parser;
conf = parser.parseConfig(m_configFilename.c_str());
if(conf == NULL) {
/* Try to get configuration from other MGM server */
return fetchConfig();
}
}
Config *conf;
InitConfigFileParser parser;
conf = parser.parseConfig(m_configFilename.c_str());
return conf;
}
Config *
MgmtSrvr::fetchConfig() {
ConfigRetriever cr(m_local_config, NDB_VERSION, NODE_TYPE_MGM);
struct ndb_mgm_configuration * tmp = cr.getConfig();
struct ndb_mgm_configuration * tmp = m_config_retriever->getConfig();
if(tmp != 0){
Config * conf = new Config();
conf->m_configValues = tmp;
return conf;
}
return 0;
}

View file

@ -62,7 +62,6 @@ struct MgmGlobals {
int non_interactive;
int interactive;
const char * config_filename;
const char * local_config_filename;
/** Stuff found in environment or in local config */
NodeId localNodeId;
@ -70,9 +69,6 @@ struct MgmGlobals {
char * interface_name;
int port;
/** The configuration of the cluster */
Config * cluster_config;
/** The Mgmt Server */
MgmtSrvr * mgmObject;
@ -86,9 +82,6 @@ static MgmGlobals glob;
/******************************************************************************
* Function prototypes
******************************************************************************/
static bool readLocalConfig();
static bool readGlobalConfig();
/**
* Global variables
*/
@ -100,16 +93,28 @@ static char *opt_connect_str= 0;
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_mgm"),
#ifndef DBUG_OFF
{ "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.",
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 },
#endif
{ "usage", '?', "Display this help and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "help", '?', "Display this help and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "version", 'V', "Output version information and exit.", 0, 0, 0,
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "connect-string", 1023,
"Set connect string for connecting to ndb_mgmd. "
"<constr>=\"host=<hostname:port>[;nodeid=<id>]\". "
"Overides specifying entries in NDB_CONNECTSTRING and config file",
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "config-file", 'f', "Specify cluster configuration file",
(gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "daemon", 'd', "Run ndb_mgmd in daemon mode (default)",
(gptr*) &glob.daemon, (gptr*) &glob.daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
{ "l", 'l', "Specify configuration file connect string (default Ndb.cfg if available)",
(gptr*) &glob.local_config_filename, (gptr*) &glob.local_config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "interactive", 256, "Run interactive. Not supported but provided for testing purposes",
(gptr*) &glob.interactive, (gptr*) &glob.interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
@ -173,7 +178,7 @@ int main(int argc, char** argv)
global_mgmt_server_check = 1;
glob.config_filename= "config.ini";
const char *load_default_groups[]= { "ndb_mgmd",0 };
const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
@ -189,29 +194,16 @@ int main(int argc, char** argv)
MgmApiService * mapi = new MgmApiService();
/****************************
* Read configuration files *
****************************/
LocalConfig local_config;
if(!local_config.init(opt_connect_str,glob.local_config_filename)){
local_config.printError();
goto error_end;
}
glob.localNodeId = local_config._ownNodeId;
glob.mgmObject = new MgmtSrvr(glob.socketServer,
glob.config_filename,
opt_connect_str);
if (!readGlobalConfig())
if (glob.mgmObject->init())
goto error_end;
glob.mgmObject = new MgmtSrvr(glob.localNodeId, glob.socketServer,
BaseString(glob.config_filename),
local_config,
glob.cluster_config);
chdir(NdbConfig_get_path(0));
glob.cluster_config = 0;
glob.localNodeId= glob.mgmObject->getOwnNodeId();
if (glob.localNodeId == 0) {
goto error_end;
}
@ -322,9 +314,7 @@ MgmGlobals::MgmGlobals(){
// Default values
port = 0;
config_filename = NULL;
local_config_filename = NULL;
interface_name = 0;
cluster_config = 0;
daemon = 1;
non_interactive = 0;
interactive = 0;
@ -337,27 +327,6 @@ MgmGlobals::~MgmGlobals(){
delete socketServer;
if (mgmObject)
delete mgmObject;
if (cluster_config)
delete cluster_config;
if (interface_name)
free(interface_name);
}
/**
* @fn readGlobalConfig
* @param glob : Global variables
* @return true if success, false otherwise.
*/
static bool
readGlobalConfig() {
if(glob.config_filename == NULL)
return false;
/* Use config file */
InitConfigFileParser parser;
glob.cluster_config = parser.parseConfig(glob.config_filename);
if(glob.cluster_config == 0){
return false;
}
return true;
}

View file

@ -1122,8 +1122,11 @@ NdbConnection::getNdbIndexScanOperation(const NdbIndexImpl* index,
if (indexTable != 0){
NdbIndexScanOperation* tOp =
getNdbScanOperation((NdbTableImpl *) indexTable);
tOp->m_currentTable = table;
if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor;
if(tOp)
{
tOp->m_currentTable = table;
tOp->m_cursor_type = NdbScanOperation::IndexCursor;
}
return tOp;
} else {
setOperationErrorCodeAbort(theNdb->theError.code);
@ -1618,9 +1621,6 @@ from other transactions.
/**
* There's always a TCKEYCONF when using IgnoreError
*/
#ifdef VM_TRACE
ndbout_c("Not completing transaction 2");
#endif
return -1;
}
/**********************************************************************/
@ -1872,9 +1872,6 @@ NdbConnection::OpCompleteFailure(Uint8 abortOption, bool setFailure)
/**
* There's always a TCKEYCONF when using IgnoreError
*/
#ifdef VM_TRACE
ndbout_c("Not completing transaction");
#endif
return -1;
}

View file

@ -57,12 +57,18 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){
if(checkState_TransId(&ref->transId1)){
theScanningOp->theError.code = ref->errorCode;
theScanningOp->execCLOSE_SCAN_REP();
if(!ref->closeNeeded){
theScanningOp->execCLOSE_SCAN_REP();
return 0;
}
assert(theScanningOp->m_sent_receivers_count);
/**
* Setup so that close_impl will actually perform a close
* and not "close scan"-optimze it away
*/
theScanningOp->m_conf_receivers_count++;
theScanningOp->m_conf_receivers[0] = theScanningOp->m_receivers[0];
theScanningOp->m_conf_receivers[0]->m_tcPtrI = ~0;
return 0;
} else {
#ifdef NDB_NO_DROPPED_SIGNAL
@ -97,7 +103,7 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
theScanningOp->execCLOSE_SCAN_REP();
return 0;
}
for(Uint32 i = 0; i<len; i += 3){
Uint32 opCount, totalLen;
Uint32 ptrI = * ops++;
@ -109,24 +115,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
void * tPtr = theNdb->int2void(ptrI);
assert(tPtr); // For now
NdbReceiver* tOp = theNdb->void2rec(tPtr);
if (tOp && tOp->checkMagicNumber()){
if(tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)){
/**
*
*/
theScanningOp->receiver_delivered(tOp);
} else if(info == ScanTabConf::EndOfData){
if (tOp && tOp->checkMagicNumber())
{
if (tcPtrI == RNIL && opCount == 0)
theScanningOp->receiver_completed(tOp);
}
}
}
if (conf->requestInfo & ScanTabConf::EndOfData) {
if(theScanningOp->m_ordered)
theScanningOp->m_api_receivers_count = 0;
if(theScanningOp->m_api_receivers_count +
theScanningOp->m_conf_receivers_count +
theScanningOp->m_sent_receivers_count){
abort();
else if (tOp->execSCANOPCONF(tcPtrI, totalLen, opCount))
theScanningOp->receiver_delivered(tOp);
}
}
return 0;

View file

@ -44,10 +44,10 @@ void NdbResultSet::init()
{
}
int NdbResultSet::nextResult(bool fetchAllowed)
int NdbResultSet::nextResult(bool fetchAllowed, bool forceSend)
{
int res;
if ((res = m_operation->nextResult(fetchAllowed)) == 0) {
if ((res = m_operation->nextResult(fetchAllowed, forceSend)) == 0) {
// handle blobs
NdbBlob* tBlob = m_operation->theBlobList;
while (tBlob != 0) {
@ -67,9 +67,9 @@ int NdbResultSet::nextResult(bool fetchAllowed)
return res;
}
void NdbResultSet::close()
void NdbResultSet::close(bool forceSend)
{
m_operation->closeScan();
m_operation->closeScan(forceSend);
}
NdbOperation*
@ -98,6 +98,6 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){
}
int
NdbResultSet::restart(){
return m_operation->restart();
NdbResultSet::restart(bool forceSend){
return m_operation->restart(forceSend);
}

View file

@ -35,6 +35,8 @@
#include <signaldata/AttrInfo.hpp>
#include <signaldata/TcKeyReq.hpp>
#define DEBUG_NEXT_RESULT 0
NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
NdbOperation(aNdb),
m_resultSet(0),
@ -275,6 +277,9 @@ NdbScanOperation::fix_receivers(Uint32 parallel){
void
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
if(theError.code == 0){
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver_delivered");
Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1;
if(idx != last){
@ -298,6 +303,9 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
void
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
if(theError.code == 0){
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver_completed");
Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1;
if(idx != last){
@ -445,12 +453,12 @@ NdbScanOperation::executeCursor(int nodeId){
return -1;
}
#define DEBUG_NEXT_RESULT 0
int NdbScanOperation::nextResult(bool fetchAllowed)
int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
{
if(m_ordered)
return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed);
return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
forceSend);
/**
* Check current receiver
@ -487,7 +495,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
forceSend) == 0){
idx = m_current_api_receiver;
last = m_api_receivers_count;
@ -578,8 +587,9 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
}
int
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
if(cnt > 0 || stopScanFlag){
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
bool forceSend){
if(cnt > 0){
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
@ -595,38 +605,57 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
*/
Uint32 last = m_sent_receivers_count;
Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
Uint32 sent = 0;
for(Uint32 i = 0; i<cnt; i++){
NdbReceiver * tRec = m_api_receivers[i];
m_sent_receivers[last+i] = tRec;
tRec->m_list_index = last+i;
prep_array[i] = tRec->m_tcPtrI;
tRec->prepareSend();
if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
{
m_sent_receivers[last+sent] = tRec;
tRec->m_list_index = last+sent;
tRec->prepareSend();
sent++;
}
}
memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*));
memmove(m_api_receivers, m_api_receivers+cnt,
(theParallelism-cnt) * sizeof(char*));
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
int ret;
if(cnt > 21){
tSignal.setLength(4);
LinearSectionPtr ptr[3];
ptr[0].p = prep_array;
ptr[0].sz = cnt;
ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
} else {
tSignal.setLength(4+cnt);
ret = tp->sendSignal(&tSignal, nodeId);
int ret = 0;
if(sent)
{
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
if(cnt > 21){
tSignal.setLength(4);
LinearSectionPtr ptr[3];
ptr[0].p = prep_array;
ptr[0].sz = sent;
ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
} else {
tSignal.setLength(4+sent);
ret = tp->sendSignal(&tSignal, nodeId);
}
}
if (!ret) checkForceSend(forceSend);
m_sent_receivers_count = last + cnt + stopScanFlag;
m_sent_receivers_count = last + sent;
m_api_receivers_count -= cnt;
m_current_api_receiver = 0;
return ret;
}
return 0;
}
void NdbScanOperation::checkForceSend(bool forceSend)
{
if (forceSend) {
TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
} else {
TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
}//if
}
int
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
{
@ -642,7 +671,7 @@ NdbScanOperation::doSend(int ProcessorId)
return 0;
}
void NdbScanOperation::closeScan()
void NdbScanOperation::closeScan(bool forceSend)
{
if(m_transConnection){
if(DEBUG_NEXT_RESULT)
@ -657,7 +686,7 @@ void NdbScanOperation::closeScan()
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
close_impl(tp);
close_impl(tp, forceSend);
} while(0);
@ -673,6 +702,7 @@ NdbScanOperation::execCLOSE_SCAN_REP(){
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
m_current_api_receiver = m_ordered ? theParallelism : 0;
}
void NdbScanOperation::release()
@ -1293,7 +1323,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
}
int
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
bool forceSend){
Uint32 u_idx = 0, u_last = 0;
Uint32 s_idx = m_current_api_receiver; // first sorted
@ -1319,7 +1350,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){
if(seq == tp->getNodeSequence(nodeId) &&
!send_next_scan_ordered(s_idx, forceSend)){
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
@ -1408,14 +1440,26 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
}
int
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
if(idx == theParallelism)
return 0;
NdbReceiver* tRec = m_api_receivers[idx];
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
Uint32 last = m_sent_receivers_count;
Uint32* theData = tSignal.getDataPtrSend();
Uint32* prep_array = theData + 4;
m_current_api_receiver = idx + 1;
if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
{
if(DEBUG_NEXT_RESULT)
ndbout_c("receiver completed, don't send");
return 0;
}
theData[0] = theNdbCon->theTCConPtr;
theData[1] = 0;
Uint64 transId = theNdbCon->theTransactionId;
@ -1425,35 +1469,35 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
/**
* Prepare ops
*/
Uint32 last = m_sent_receivers_count;
Uint32 * prep_array = theData + 4;
NdbReceiver * tRec = m_api_receivers[idx];
m_sent_receivers[last] = tRec;
tRec->m_list_index = last;
prep_array[0] = tRec->m_tcPtrI;
tRec->prepareSend();
m_sent_receivers_count = last + 1;
m_current_api_receiver = idx + 1;
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
tSignal.setLength(4+1);
return tp->sendSignal(&tSignal, nodeId);
int ret= tp->sendSignal(&tSignal, nodeId);
if (!ret) checkForceSend(forceSend);
return ret;
}
int
NdbScanOperation::close_impl(TransporterFacade* tp){
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq != tp->getNodeSequence(nodeId)){
if(seq != tp->getNodeSequence(nodeId))
{
theNdbCon->theReleaseOnClose = true;
return -1;
}
while(theError.code == 0 && m_sent_receivers_count){
/**
* Wait for outstanding
*/
while(theError.code == 0 && m_sent_receivers_count)
{
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@ -1471,18 +1515,52 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
}
}
if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan
if(send_next_scan(0, true) == -1){ // Close scan
theNdbCon->theReleaseOnClose = true;
return -1;
}
/**
* move all conf'ed into api
* so that send_next_scan can check if they needs to be closed
*/
Uint32 api = m_api_receivers_count;
Uint32 conf = m_conf_receivers_count;
if(m_ordered)
{
/**
* Ordered scan, keep the m_api_receivers "to the right"
*/
memmove(m_api_receivers, m_api_receivers+m_current_api_receiver,
(theParallelism - m_current_api_receiver) * sizeof(char*));
api = (theParallelism - m_current_api_receiver);
m_api_receivers_count = api;
}
if(DEBUG_NEXT_RESULT)
ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
m_ordered, api, conf,
m_sent_receivers_count, m_current_api_receiver, theParallelism);
if(api+conf)
{
/**
* There's something to close
* setup m_api_receivers (for send_next_scan)
*/
memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
m_api_receivers_count = api + conf;
m_conf_receivers_count = 0;
}
// Send close scan
if(send_next_scan(api+conf, true, forceSend) == -1)
{
theNdbCon->theReleaseOnClose = true;
return -1;
}
/**
* wait for close scan conf
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@ -1499,6 +1577,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
return -1;
}
}
return 0;
}
@ -1520,7 +1599,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
}
int
NdbScanOperation::restart()
NdbScanOperation::restart(bool forceSend)
{
TransporterFacade* tp = TransporterFacade::instance();
@ -1529,7 +1608,7 @@ NdbScanOperation::restart()
{
int res;
if((res= close_impl(tp)))
if((res= close_impl(tp, forceSend)))
{
return res;
}
@ -1548,13 +1627,13 @@ NdbScanOperation::restart()
}
int
NdbIndexScanOperation::reset_bounds(){
NdbIndexScanOperation::reset_bounds(bool forceSend){
int res;
{
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
res= close_impl(tp);
res= close_impl(tp, forceSend);
}
if(!res)

View file

@ -45,7 +45,6 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
else
m_connect_string= 0;
m_config_retriever= 0;
m_local_config= 0;
m_connect_thread= 0;
m_connect_callback= 0;
@ -125,38 +124,31 @@ int Ndb_cluster_connection::connect(int reconnect)
do {
if (m_config_retriever == 0)
{
if (m_local_config == 0) {
m_local_config= new LocalConfig();
if (!m_local_config->init(m_connect_string,0)) {
ndbout_c("Configuration error: Unable to retrieve local config");
m_local_config->printError();
m_local_config->printUsage();
DBUG_RETURN(-1);
}
}
m_config_retriever=
new ConfigRetriever(*m_local_config, NDB_VERSION, NODE_TYPE_API);
new ConfigRetriever(m_connect_string, NDB_VERSION, NODE_TYPE_API);
if (m_config_retriever->hasError())
{
printf("Could not connect initialize handle to management server",
m_config_retriever->getErrorString());
DBUG_RETURN(-1);
}
}
else
if (reconnect == 0)
DBUG_RETURN(0);
if (reconnect)
{
int r= m_config_retriever->do_connect(1);
int r= m_config_retriever->do_connect(0,0,0);
if (r == 1)
DBUG_RETURN(1); // mgmt server not up yet
if (r == -1)
break;
}
else
if(m_config_retriever->do_connect() == -1)
if(m_config_retriever->do_connect(12,5,1) == -1)
break;
Uint32 nodeId = m_config_retriever->allocNodeId();
for(Uint32 i = 0; nodeId == 0 && i<5; i++){
NdbSleep_SecSleep(3);
nodeId = m_config_retriever->allocNodeId();
}
Uint32 nodeId = m_config_retriever->allocNodeId(4/*retries*/,3/*delay*/);
if(nodeId == 0)
break;
ndb_mgm_configuration * props = m_config_retriever->getConfig();
@ -200,8 +192,6 @@ Ndb_cluster_connection::~Ndb_cluster_connection()
my_free(m_connect_string,MYF(MY_ALLOW_ZERO_PTR));
if (m_config_retriever)
delete m_config_retriever;
if (m_local_config)
delete m_local_config;
DBUG_VOID_RETURN;
}

View file

@ -426,7 +426,8 @@ ErrorBundle ErrorCodes[] = {
{ 4267, IE, "Corrupted blob value" },
{ 4268, IE, "Error in blob head update forced rollback of transaction" },
{ 4268, IE, "Unknown blob error" },
{ 4269, IE, "No connection to ndb management server" }
{ 4269, IE, "No connection to ndb management server" },
{ 4335, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" }
};
static

View file

@ -36,15 +36,21 @@ public:
bool allowConstraintViolation = true,
int doSleep = 0,
bool oneTrans = false);
int scanReadRecords(Ndb*,
int records,
int abort = 0,
int parallelism = 0,
bool committed = false);
int scanReadCommittedRecords(Ndb*,
int records,
int abort = 0,
int parallelism = 0);
NdbOperation::LockMode = NdbOperation::LM_Read);
int scanReadRecords(Ndb*,
const NdbDictionary::Index*,
int records,
int abort = 0,
int parallelism = 0,
NdbOperation::LockMode = NdbOperation::LM_Read,
bool sorted = false);
int pkReadRecords(Ndb*,
int records,
int batchsize = 1,

View file

@ -87,8 +87,6 @@ protected:
bool connected;
BaseString addr;
BaseString host;
int port;
NdbMgmHandle handle;
ndb_mgm_configuration * m_config;
protected:

View file

@ -53,11 +53,11 @@ public:
int selectCount(Ndb*,
int parallelism = 0,
int* count_rows = NULL,
ScanLock lock = SL_Read,
NdbOperation::LockMode lm = NdbOperation::LM_CommittedRead,
NdbConnection* pTrans = NULL);
int scanReadRecords(Ndb*,
int parallelism,
bool exclusive,
NdbOperation::LockMode lm,
int records,
int noAttribs,
int* attrib_list,

View file

@ -391,8 +391,15 @@ run_read(){
void
print_result(){
int tmp = 1;
tmp *= g_paramters[P_RANGE].value;
tmp *= g_paramters[P_LOOPS].value;
int t, t2;
for(int i = 0; i<P_OP_TYPES; i++){
g_err.println("%s avg: %u us/row", g_ops[i],
(1000*g_times[i])/(g_paramters[P_RANGE].value*g_paramters[P_LOOPS].value));
g_err << g_ops[i] << " avg: "
<< (int)((1000*g_times[i])/tmp)
<< " us/row ("
<< (1000 * tmp)/g_times[i] << " rows / sec)" << endl;
}
}

View file

@ -242,8 +242,9 @@ int runScanReadCommitted(NDBT_Context* ctx, NDBT_Step* step){
HugoTransactions hugoTrans(*ctx->getTab());
while (i<loops && !ctx->isTestStopped()) {
g_info << i << ": ";
if (hugoTrans.scanReadCommittedRecords(GETNDB(step), records,
abort, parallelism) != 0){
if (hugoTrans.scanReadRecords(GETNDB(step), records,
abort, parallelism,
NdbOperation::LM_CommittedRead) != 0){
return NDBT_FAILED;
}
i++;
@ -639,7 +640,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){
g_info << (unsigned)i << endl;
if(utilTrans.scanReadRecords(GETNDB(step),
parallelism,
false,
NdbOperation::LM_Read,
records,
alist.attriblist[i]->numAttribs,
alist.attriblist[i]->attribs) != 0){
@ -647,7 +648,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){
}
if(utilTrans.scanReadRecords(GETNDB(step),
parallelism,
true,
NdbOperation::LM_Read,
records,
alist.attriblist[i]->numAttribs,
alist.attriblist[i]->attribs) != 0){

View file

@ -39,8 +39,9 @@ struct Parameter {
#define P_LOOPS 8
#define P_CREATE 9
#define P_LOAD 10
#define P_RESET 11
#define P_MAX 11
#define P_MAX 12
static
Parameter
@ -55,7 +56,8 @@ g_paramters[] = {
{ "size", 1000000, 1, ~0 },
{ "iterations", 3, 1, ~0 },
{ "create_drop", 1, 0, 1 },
{ "data", 1, 0, 1 }
{ "data", 1, 0, 1 },
{ "q-reset bounds", 0, 1, 0 }
};
static Ndb* g_ndb = 0;
@ -219,21 +221,29 @@ run_scan(){
NDB_TICKS start1, stop;
int sum_time= 0;
int sample_rows = 0;
NDB_TICKS sample_start = NdbTick_CurrentMillisecond();
Uint32 tot = g_paramters[P_ROWS].value;
if(g_paramters[P_BOUND].value == 2 || g_paramters[P_FILT].value == 2)
iter *= g_paramters[P_ROWS].value;
NdbScanOperation * pOp = 0;
NdbIndexScanOperation * pIOp = 0;
NdbConnection * pTrans = 0;
NdbResultSet * rs = 0;
int check = 0;
for(int i = 0; i<iter; i++){
start1 = NdbTick_CurrentMillisecond();
NdbConnection * pTrans = g_ndb->startTransaction();
pTrans = pTrans ? pTrans : g_ndb->startTransaction();
if(!pTrans){
g_err << "Failed to start transaction" << endl;
err(g_ndb->getNdbError());
return -1;
}
NdbScanOperation * pOp;
NdbIndexScanOperation * pIOp;
NdbResultSet * rs;
int par = g_paramters[P_PARRA].value;
int bat = g_paramters[P_BATCH].value;
NdbScanOperation::LockMode lm;
@ -256,9 +266,17 @@ run_scan(){
assert(pOp);
rs = pOp->readTuples(lm, bat, par);
} else {
pOp = pIOp = pTrans->getNdbIndexScanOperation(g_indexname, g_tablename);
bool ord = g_paramters[P_ACCESS].value == 2;
rs = pIOp->readTuples(lm, bat, par, ord);
if(g_paramters[P_RESET].value == 0 || pIOp == 0)
{
pOp= pIOp= pTrans->getNdbIndexScanOperation(g_indexname, g_tablename);
bool ord = g_paramters[P_ACCESS].value == 2;
rs = pIOp->readTuples(lm, bat, par, ord);
}
else
{
pIOp->reset_bounds();
}
switch(g_paramters[P_BOUND].value){
case 0: // All
break;
@ -268,20 +286,22 @@ run_scan(){
case 2: { // 1 row
default:
assert(g_table->getNoOfPrimaryKeys() == 1); // only impl. so far
abort();
#if 0
int tot = g_paramters[P_ROWS].value;
int row = rand() % tot;
#if 0
fix_eq_bound(pIOp, row);
#else
pIOp->setBound((Uint32)0, NdbIndexScanOperation::BoundEQ, &row);
#endif
break;
}
}
if(g_paramters[P_RESET].value == 1)
goto execute;
}
assert(pOp);
assert(rs);
int check = 0;
switch(g_paramters[P_FILT].value){
case 0: // All
check = pOp->interpret_exit_ok();
@ -313,7 +333,7 @@ run_scan(){
for(int i = 0; i<g_table->getNoOfColumns(); i++){
pOp->getValue(i);
}
execute:
int rows = 0;
check = pTrans->execute(NoCommit);
assert(check == 0);
@ -334,19 +354,29 @@ run_scan(){
return -1;
}
assert(check == 1);
g_info << "Found " << rows << " rows" << endl;
pTrans->close();
if(g_paramters[P_RESET].value == 0)
{
pTrans->close();
pTrans = 0;
}
stop = NdbTick_CurrentMillisecond();
int time_passed= (int)(stop - start1);
g_err.println("Time: %d ms = %u rows/sec", time_passed,
(1000*tot)/time_passed);
sample_rows += rows;
sum_time+= time_passed;
if(sample_rows >= tot)
{
int sample_time = (int)(stop - sample_start);
g_info << "Found " << sample_rows << " rows" << endl;
g_err.println("Time: %d ms = %u rows/sec", sample_time,
(1000*sample_rows)/sample_time);
sample_rows = 0;
sample_start = stop;
}
}
sum_time= sum_time / iter;
g_err.println("Avg time: %d ms = %u rows/sec", sum_time,
(1000*tot)/sum_time);
g_err.println("Avg time: %d ms = %u rows/sec", sum_time/iter,
(1000*tot*iter)/sum_time);
return 0;
}

View file

@ -222,6 +222,10 @@ max-time: 500
cmd: testScan
args: -n ScanRead488 -l 10 T6
max-time: 500
cmd: testScan
args: -n ScanRead488Timeout -l 10 T6
max-time: 600
cmd: testScan
args: -n ScanRead40 -l 100 T2

View file

@ -538,15 +538,19 @@ connect_ndb_mgm(atrt_process & proc){
}
BaseString tmp = proc.m_hostname;
tmp.appfmt(":%d", proc.m_ndb_mgm_port);
time_t start = time(0);
const time_t max_connect_time = 30;
do {
if(ndb_mgm_connect(handle, tmp.c_str()) != -1){
proc.m_ndb_mgm_handle = handle;
return true;
}
sleep(1);
} while(time(0) < (start + max_connect_time));
if (ndb_mgm_set_connectstring(handle,tmp.c_str()))
{
g_logger.critical("Unable to create parse connectstring");
return false;
}
if(ndb_mgm_connect(handle, 30, 1, 0) != -1)
{
proc.m_ndb_mgm_handle = handle;
return true;
}
g_logger.critical("Unable to connect to ndb mgm %s", tmp.c_str());
return false;
}

View file

@ -29,20 +29,13 @@ HugoTransactions::~HugoTransactions(){
deallocRows();
}
int HugoTransactions::scanReadCommittedRecords(Ndb* pNdb,
int records,
int abortPercent,
int parallelism){
return scanReadRecords(pNdb, records, abortPercent, parallelism, true);
}
int
HugoTransactions::scanReadRecords(Ndb* pNdb,
int records,
int abortPercent,
int parallelism,
bool committed){
NdbOperation::LockMode lm)
{
int retryAttempt = 0;
const int retryMax = 100;
@ -80,8 +73,163 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
}
NdbResultSet * rs;
rs = pOp ->readTuples(committed ? NdbScanOperation::LM_CommittedRead :
NdbScanOperation::LM_Read);
rs = pOp ->readTuples(lm);
if( rs == 0 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
check = pOp->interpret_exit_ok();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
for(a = 0; a<tab.getNoOfColumns(); a++){
if((row.attributeStore(a) =
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
}
check = pTrans->execute(NoCommit);
if( check == -1 ) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
// Abort after 1-100 or 1-records rows
int ranVal = rand();
int abortCount = ranVal % (records == 0 ? 100 : records);
bool abortTrans = false;
if (abort > 0){
// Abort if abortCount is less then abortPercent
if (abortCount < abortPercent)
abortTrans = true;
}
int eof;
int rows = 0;
while((eof = rs->nextResult(true)) == 0){
rows++;
if (calc.verifyRowValues(&row) != 0){
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
if (abortCount == rows && abortTrans == true){
ndbout << "Scan is aborted" << endl;
g_info << "Scan is aborted" << endl;
rs->close();
if( check == -1 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
return NDBT_OK;
}
}
if (eof == -1) {
const NdbError err = pTrans->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR_INFO(err);
pNdb->closeTransaction(pTrans);
NdbSleep_MilliSleep(50);
switch (err.code){
case 488:
case 245:
case 490:
// Too many active scans, no limit on number of retry attempts
break;
default:
retryAttempt++;
}
continue;
}
ERR(err);
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
pNdb->closeTransaction(pTrans);
g_info << rows << " rows have been read" << endl;
if (records != 0 && rows != records){
g_err << "Check expected number of records failed" << endl
<< " expected=" << records <<", " << endl
<< " read=" << rows << endl;
return NDBT_FAILED;
}
return NDBT_OK;
}
return NDBT_FAILED;
}
int
HugoTransactions::scanReadRecords(Ndb* pNdb,
const NdbDictionary::Index * pIdx,
int records,
int abortPercent,
int parallelism,
NdbOperation::LockMode lm,
bool sorted)
{
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
NdbIndexScanOperation *pOp;
while (true){
if (retryAttempt >= retryMax){
g_err << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << endl;
return NDBT_FAILED;
}
pTrans = pNdb->startTransaction();
if (pTrans == NULL) {
const NdbError err = pNdb->getNdbError();
if (err.status == NdbError::TemporaryError){
ERR(err);
NdbSleep_MilliSleep(50);
retryAttempt++;
continue;
}
ERR(err);
return NDBT_FAILED;
}
pOp = pTrans->getNdbIndexScanOperation(pIdx->getName(), tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
}
NdbResultSet * rs;
rs = pOp ->readTuples(lm, 0, parallelism, sorted);
if( rs == 0 ) {
ERR(pTrans->getNdbError());

View file

@ -839,9 +839,9 @@ void NDBT_TestSuite::execute(Ndb* ndb, const NdbDictionary::Table* pTab,
continue;
}
pTab2 = pDict->getTable(pTab->getName());
} else {
} else if(!pTab2) {
pTab2 = pTab;
}
}
ctx = new NDBT_Context();
ctx->setTab(pTab2);

View file

@ -69,28 +69,19 @@ NdbBackup::getBackupDataDirForNode(int _node_id){
/**
* Fetch configuration from management server
*/
LocalConfig lc;
if (!lc.init(0,0)) {
abort();
}
ConfigRetriever cr(lc, 0, NODE_TYPE_API);
ndb_mgm_configuration * p = 0;
ndb_mgm_configuration *p;
if (connect())
return NULL;
BaseString tmp; tmp.assfmt("%s:%d", host.c_str(), port);
NdbMgmHandle handle = ndb_mgm_create_handle();
if(handle == 0 || ndb_mgm_connect(handle, tmp.c_str()) != 0 ||
(p = ndb_mgm_get_configuration(handle, 0)) == 0){
const char * s = 0;
if(p == 0 && handle != 0){
s = ndb_mgm_get_latest_error_msg(handle);
if(s == 0)
s = "No error given!";
if ((p = ndb_mgm_get_configuration(handle, 0)) == 0)
{
const char * s= ndb_mgm_get_latest_error_msg(handle);
if(s == 0)
s = "No error given!";
ndbout << "Could not fetch configuration" << endl;
ndbout << s << endl;
return NULL;
}
ndbout << "Could not fetch configuration" << endl;
ndbout << s << endl;
return NULL;
}
/**
@ -155,13 +146,14 @@ NdbBackup::execRestore(bool _restore_data,
ndbout << "scp res: " << res << endl;
BaseString::snprintf(buf, 255, "%sndb_restore -c \"host=%s\" -n %d -b %d %s %s .",
BaseString::snprintf(buf, 255, "%sndb_restore -c \"%s:%d\" -n %d -b %d %s %s .",
#if 1
"",
#else
"valgrind --leak-check=yes -v "
#endif
addr.c_str(),
ndb_mgm_get_connected_host(handle),
ndb_mgm_get_connected_port(handle),
_node_id,
_backup_id,
_restore_data?"-r":"",

View file

@ -18,7 +18,6 @@
#include <NdbOut.hpp>
#include <NdbSleep.h>
#include <NdbTick.h>
#include <LocalConfig.hpp>
#include <mgmapi_debug.h>
#include <NDBT_Output.hpp>
#include <random.h>
@ -33,42 +32,11 @@
NdbRestarter::NdbRestarter(const char* _addr):
connected(false),
port(-1),
handle(NULL),
m_config(0)
{
if (_addr == NULL){
LocalConfig lcfg;
if(!lcfg.init()){
lcfg.printError();
lcfg.printUsage();
g_err << "NdbRestarter - Error parsing local config file" << endl;
return;
}
if (lcfg.ids.size() == 0){
g_err << "NdbRestarter - No management servers configured in local config file" << endl;
return;
}
for (int i = 0; i<lcfg.ids.size(); i++){
MgmtSrvrId * m = &lcfg.ids[i];
switch(m->type){
case MgmId_TCP:
char buf[255];
snprintf(buf, 255, "%s:%d", m->name.c_str(), m->port);
addr.assign(buf);
host.assign(m->name.c_str());
port = m->port;
return;
break;
case MgmId_File:
break;
default:
break;
}
}
addr.assign("");
} else {
addr.assign(_addr);
}
@ -391,13 +359,22 @@ NdbRestarter::isConnected(){
int
NdbRestarter::connect(){
disconnect();
handle = ndb_mgm_create_handle();
if (handle == NULL){
g_err << "handle == NULL" << endl;
return -1;
}
g_info << "Connecting to mgmsrv at " << addr.c_str() << endl;
if (ndb_mgm_connect(handle, addr.c_str()) == -1) {
if (ndb_mgm_set_connectstring(handle,addr.c_str()))
{
MGMERR(handle);
g_err << "Connection to " << addr.c_str() << " failed" << endl;
return -1;
}
if (ndb_mgm_connect(handle, 0, 0, 0) == -1)
{
MGMERR(handle);
g_err << "Connection to " << addr.c_str() << " failed" << endl;
return -1;

View file

@ -619,7 +619,7 @@ UtilTransactions::addRowToInsert(Ndb* pNdb,
int
UtilTransactions::scanReadRecords(Ndb* pNdb,
int parallelism,
bool exclusive,
NdbOperation::LockMode lm,
int records,
int noAttribs,
int *attrib_list,
@ -669,10 +669,7 @@ UtilTransactions::scanReadRecords(Ndb* pNdb,
return NDBT_FAILED;
}
NdbResultSet * rs = pOp->readTuples(exclusive ?
NdbScanOperation::LM_Exclusive :
NdbScanOperation::LM_Read,
0, parallelism);
NdbResultSet * rs = pOp->readTuples(lm, 0, parallelism);
if( rs == 0 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
@ -761,7 +758,7 @@ int
UtilTransactions::selectCount(Ndb* pNdb,
int parallelism,
int* count_rows,
ScanLock lock,
NdbOperation::LockMode lm,
NdbConnection* pTrans){
int retryAttempt = 0;
@ -785,19 +782,7 @@ UtilTransactions::selectCount(Ndb* pNdb,
return NDBT_FAILED;
}
NdbResultSet * rs;
switch(lock){
case SL_ReadHold:
rs = pOp->readTuples(NdbScanOperation::LM_Read);
break;
case SL_Exclusive:
rs = pOp->readTuples(NdbScanOperation::LM_Exclusive);
break;
case SL_Read:
default:
rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead);
}
NdbResultSet * rs = pOp->readTuples(lm);
if( rs == 0) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);

View file

@ -30,7 +30,7 @@ main(int argc, const char** argv){
const char* _dbname = "TEST_DB";
int _help = 0;
int _ordered, _pk;
int _ordered = 0, _pk = 1;
struct getargs args[] = {
{ "database", 'd', arg_string, &_dbname, "dbname",

View file

@ -35,13 +35,17 @@ int main(int argc, const char** argv){
int _parallelism = 1;
const char* _tabname = NULL;
int _help = 0;
int lock = NdbOperation::LM_Read;
int sorted = 0;
struct getargs args[] = {
{ "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" },
{ "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" },
{ "parallelism", 'p', arg_integer, &_parallelism, "parallelism(1-240)", "para" },
{ "records", 'r', arg_integer, &_records, "Number of records", "recs" },
{ "usage", '?', arg_flag, &_help, "Print help", "" }
{ "usage", '?', arg_flag, &_help, "Print help", "" },
{ "lock", 'm', arg_integer, &lock, "lock mode", "" },
{ "sorted", 's', arg_flag, &sorted, "sorted", "" }
};
int num_args = sizeof(args) / sizeof(args[0]);
int optind = 0;
@ -73,16 +77,48 @@ int main(int argc, const char** argv){
ndbout << " Table " << _tabname << " does not exist!" << endl;
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
const NdbDictionary::Index * pIdx = 0;
if(optind+1 < argc)
{
pIdx = MyNdb.getDictionary()->getIndex(argv[optind+1], _tabname);
if(!pIdx)
ndbout << " Index " << argv[optind+1] << " not found" << endl;
else
if(pIdx->getType() != NdbDictionary::Index::UniqueOrderedIndex &&
pIdx->getType() != NdbDictionary::Index::OrderedIndex)
{
ndbout << " Index " << argv[optind+1] << " is not scannable" << endl;
pIdx = 0;
}
}
HugoTransactions hugoTrans(*pTab);
int i = 0;
while (i<_loops || _loops==0) {
ndbout << i << ": ";
if(hugoTrans.scanReadRecords(&MyNdb,
0,
_abort,
_parallelism) != 0){
return NDBT_ProgramExit(NDBT_FAILED);
if(!pIdx)
{
if(hugoTrans.scanReadRecords(&MyNdb,
0,
_abort,
_parallelism,
(NdbOperation::LockMode)lock) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
}
}
else
{
if(hugoTrans.scanReadRecords(&MyNdb, pIdx,
0,
_abort,
_parallelism,
(NdbOperation::LockMode)lock,
sorted) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
}
}
i++;
}

View file

@ -67,7 +67,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 };
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))

View file

@ -67,7 +67,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 };
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))

View file

@ -64,7 +64,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 };
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))

View file

@ -64,7 +64,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 };
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))

View file

@ -220,7 +220,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char* _tabname;
const char *load_default_groups[]= { "ndb_tools",0 };
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))

View file

@ -143,7 +143,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
bool
readArguments(int *pargc, char*** pargv)
{
const char *load_default_groups[]= { "ndb_tools","ndb_restore",0 };
const char *load_default_groups[]= { "mysql_cluster","ndb_restore",0 };
load_defaults("my",load_default_groups,pargc,pargv);
if (handle_options(pargc, pargv, my_long_options, get_one_option))
{

View file

@ -50,7 +50,7 @@ static struct my_option my_long_options[] =
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "parallelism", 'p', "parallelism",
(gptr*) &_parallelism, (gptr*) &_parallelism, 0,
GET_INT, REQUIRED_ARG, 240, 0, 0, 0, 0, 0 },
GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "lock", 'l', "Read(0), Read-hold(1), Exclusive(2)",
(gptr*) &_lock, (gptr*) &_lock, 0,
GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
@ -105,7 +105,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 };
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
const char* _tabname;
int ho_error;
@ -133,13 +133,18 @@ int main(int argc, char** argv){
const NdbDictionary::Table* pTab = NDBT_Table::discoverTableFromDb(&MyNdb, _tabname);
const NdbDictionary::Index * pIdx = 0;
if(argc > 1){
pIdx = MyNdb.getDictionary()->getIndex(argv[0], _tabname);
pIdx = MyNdb.getDictionary()->getIndex(argv[1], _tabname);
}
if(pTab == NULL){
ndbout << " Table " << _tabname << " does not exist!" << endl;
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
if(argc > 1 && pIdx == 0)
{
ndbout << " Index " << argv[1] << " does not exists" << endl;
}
if(_order && pIdx == NULL){
ndbout << " Order flag given without an index" << endl;

View file

@ -83,7 +83,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 };
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))

View file

@ -23,7 +23,6 @@
#include <NdbOut.hpp>
#include <NdbSleep.h>
#include <kernel/ndb_limits.h>
#include <LocalConfig.hpp>
#include <NDBT.hpp>
@ -75,7 +74,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 };
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
const char* _hostName = NULL;
int ho_error;
@ -85,39 +84,8 @@ int main(int argc, char** argv){
char buf[255];
_hostName = argv[0];
if (_hostName == NULL){
LocalConfig lcfg;
if(!lcfg.init(opt_connect_str, 0))
{
lcfg.printError();
lcfg.printUsage();
g_err << "Error parsing local config file" << endl;
return NDBT_ProgramExit(NDBT_FAILED);
}
for (unsigned i = 0; i<lcfg.ids.size();i++)
{
MgmtSrvrId * m = &lcfg.ids[i];
switch(m->type){
case MgmId_TCP:
snprintf(buf, 255, "%s:%d", m->name.c_str(), m->port);
_hostName = buf;
break;
case MgmId_File:
break;
default:
break;
}
if (_hostName != NULL)
break;
}
if (_hostName == NULL)
{
g_err << "No management servers configured in local config file" << endl;
return NDBT_ProgramExit(NDBT_FAILED);
}
}
if (_hostName == 0)
_hostName= opt_connect_str;
if (_no_contact) {
if (waitClusterStatus(_hostName, NDB_MGM_NODE_STATUS_NO_CONTACT, _timeout) != 0)
@ -210,13 +178,19 @@ waitClusterStatus(const char* _addr,
int _nodes[MAX_NDB_NODES];
int _num_nodes = 0;
handle = ndb_mgm_create_handle();
handle = ndb_mgm_create_handle();
if (handle == NULL){
g_err << "handle == NULL" << endl;
return -1;
}
g_info << "Connecting to mgmsrv at " << _addr << endl;
if (ndb_mgm_connect(handle, _addr) == -1) {
if (ndb_mgm_set_connectstring(handle, _addr))
{
MGMERR(handle);
g_err << "Connectstring " << _addr << " invalid" << endl;
return -1;
}
if (ndb_mgm_connect(handle,0,0,1)) {
MGMERR(handle);
g_err << "Connection to " << _addr << " failed" << endl;
return -1;

View file

@ -1268,7 +1268,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
m_ops_pending= 0;
m_blobs_pending= FALSE;
}
check= cursor->nextResult(contact_ndb);
check= cursor->nextResult(contact_ndb, m_force_send);
if (check == 0)
{
// One more record found
@ -1561,7 +1561,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_ASSERT(op->getSorted() == sorted);
DBUG_ASSERT(op->getLockMode() ==
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
if(op->reset_bounds())
if(op->reset_bounds(m_force_send))
DBUG_RETURN(ndb_err(m_active_trans));
}
@ -2394,7 +2394,7 @@ int ha_ndbcluster::index_last(byte *buf)
int res;
if((res= ordered_index_scan(0, 0, TRUE, buf)) == 0){
NdbResultSet *cursor= m_active_cursor;
while((res= cursor->nextResult(TRUE)) == 0);
while((res= cursor->nextResult(TRUE, m_force_send)) == 0);
if(res == 1){
unpack_record(buf);
table->status= 0;
@ -2480,7 +2480,7 @@ int ha_ndbcluster::rnd_init(bool scan)
{
if (!scan)
DBUG_RETURN(1);
int res= cursor->restart();
int res= cursor->restart(m_force_send);
DBUG_ASSERT(res == 0);
}
index_init(table->primary_key);
@ -2511,7 +2511,7 @@ int ha_ndbcluster::close_scan()
m_ops_pending= 0;
}
cursor->close();
cursor->close(m_force_send);
m_active_cursor= NULL;
DBUG_RETURN(0);
}
@ -3035,6 +3035,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
m_transaction_on= FALSE;
else
m_transaction_on= thd->variables.ndb_use_transactions;
// m_use_local_query_cache= thd->variables.ndb_use_local_query_cache;
m_active_trans= thd->transaction.all.ndb_tid ?
(NdbConnection*)thd->transaction.all.ndb_tid:
@ -3761,7 +3762,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_ha_not_exact_count(FALSE),
m_force_send(TRUE),
m_autoincrement_prefetch(32),
m_transaction_on(TRUE)
m_transaction_on(TRUE),
m_use_local_query_cache(FALSE)
{
int i;
@ -4449,7 +4451,7 @@ bool ha_ndbcluster::low_byte_first() const
}
bool ha_ndbcluster::has_transactions()
{
return TRUE;
return m_transaction_on;
}
const char* ha_ndbcluster::index_type(uint key_number)
{
@ -4466,7 +4468,10 @@ const char* ha_ndbcluster::index_type(uint key_number)
}
uint8 ha_ndbcluster::table_cache_type()
{
return HA_CACHE_TBL_NOCACHE;
if (m_use_local_query_cache)
return HA_CACHE_TBL_TRANSACT;
else
return HA_CACHE_TBL_NOCACHE;
}
/*
@ -4634,13 +4639,12 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
{
DBUG_ENTER("ndb_get_table_statistics");
DBUG_PRINT("enter", ("table: %s", table));
NdbConnection* pTrans= ndb->startTransaction();
do
{
NdbConnection* pTrans= ndb->startTransaction();
if (pTrans == NULL)
break;
NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
if (pOp == NULL)
break;
@ -4657,13 +4661,13 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows);
pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits);
check= pTrans->execute(NoCommit);
check= pTrans->execute(NoCommit, AbortOnError, TRUE);
if (check == -1)
break;
Uint64 sum_rows= 0;
Uint64 sum_commits= 0;
while((check= rs->nextResult(TRUE)) == 0)
while((check= rs->nextResult(TRUE, TRUE)) == 0)
{
sum_rows+= rows;
sum_commits+= commits;
@ -4672,6 +4676,8 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
if (check == -1)
break;
rs->close(TRUE);
ndb->closeTransaction(pTrans);
if(row_count)
* row_count= sum_rows;
@ -4681,6 +4687,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
DBUG_RETURN(0);
} while(0);
ndb->closeTransaction(pTrans);
DBUG_PRINT("exit", ("failed"));
DBUG_RETURN(-1);
}

View file

@ -239,10 +239,12 @@ class ha_ndbcluster: public handler
char *m_blobs_buffer;
uint32 m_blobs_buffer_size;
uint m_dupkey;
// set from thread variables at external lock
bool m_ha_not_exact_count;
bool m_force_send;
ha_rows m_autoincrement_prefetch;
bool m_transaction_on;
bool m_use_local_query_cache;
void set_rec_per_key();
void records_update();

View file

@ -7288,6 +7288,7 @@ int my_wildcmp_uca(CHARSET_INFO *cs,
{
while (1)
{
my_bool escaped= 0;
if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <= 0)
return 1;
@ -7305,6 +7306,7 @@ int my_wildcmp_uca(CHARSET_INFO *cs,
(const uchar*)wildend)) <= 0)
return 1;
wildstr+= scan;
escaped= 1;
}
if ((scan= mb_wc(cs, &s_wc, (const uchar*)str,
@ -7312,7 +7314,7 @@ int my_wildcmp_uca(CHARSET_INFO *cs,
return 1;
str+= scan;
if (w_wc == (my_wc_t)w_one)
if (!escaped && w_wc == (my_wc_t)w_one)
{
result= 1; /* Found an anchor char */
}

View file

@ -1545,31 +1545,33 @@ int my_wildcmp_unicode(CHARSET_INFO *cs,
{
while (1)
{
my_bool escaped= 0;
if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <= 0)
return 1;
if (w_wc == (my_wc_t)escape)
{
wildstr+= scan;
if ((scan= mb_wc(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <= 0)
return 1;
}
if (w_wc == (my_wc_t)w_many)
{
result= 1; /* Found an anchor char */
break;
}
wildstr+= scan;
if (w_wc == (my_wc_t)escape)
{
if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <= 0)
return 1;
wildstr+= scan;
escaped= 1;
}
if ((scan= mb_wc(cs, &s_wc, (const uchar*)str,
(const uchar*)str_end)) <=0)
(const uchar*)str_end)) <= 0)
return 1;
str+= scan;
if (w_wc == (my_wc_t)w_one)
if (!escaped && w_wc == (my_wc_t)w_one)
{
result= 1; /* Found an anchor char */
}

View file

@ -269,7 +269,7 @@ int main(int ac, char **av)
*/
int tmp= weight[i];
if (w == 2 && tmp)
tmp= (int)(0x100 - weight[i]);
tmp= (int)(0x20 - weight[i]);
printf("0x%04X", tmp);
@ -304,7 +304,7 @@ int main(int ac, char **av)
const char *comma= page < MY_UCA_NPAGES-1 ? "," : "";
const char *nline= (page+1) % 4 ? "" : "\n";
if (!pagemaxlen[page])
printf("NULL %s%s", comma , nline);
printf("NULL %s%s%s", w ? " ": "", comma , nline);
else
printf("page%03Xdata%s%s%s", page, pname[w], comma, nline);
}