mirror of
https://github.com/MariaDB/server.git
synced 2025-02-01 03:21:53 +01:00
Merge perch.ndb.mysql.com:/home/jonas/src/51-work
into perch.ndb.mysql.com:/home/jonas/src/mysql-5.1-new-ndb
This commit is contained in:
commit
b0b69bac4a
21 changed files with 439 additions and 145 deletions
|
@ -127,6 +127,7 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
|
||||||
/* 68 not unused */
|
/* 68 not unused */
|
||||||
/* 69 not unused */
|
/* 69 not unused */
|
||||||
/* 70 unused */
|
/* 70 unused */
|
||||||
|
#define GSN_UPDATE_FRAG_DIST_KEY_ORD 70
|
||||||
#define GSN_ACC_ABORTREQ 71
|
#define GSN_ACC_ABORTREQ 71
|
||||||
#define GSN_ACC_CHECK_SCAN 72
|
#define GSN_ACC_CHECK_SCAN 72
|
||||||
#define GSN_ACC_COMMITCONF 73
|
#define GSN_ACC_COMMITCONF 73
|
||||||
|
|
|
@ -30,7 +30,7 @@ class CopyFragReq {
|
||||||
*/
|
*/
|
||||||
friend class Dblqh;
|
friend class Dblqh;
|
||||||
public:
|
public:
|
||||||
STATIC_CONST( SignalLength = 8 );
|
STATIC_CONST( SignalLength = 9 );
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Uint32 userPtr;
|
Uint32 userPtr;
|
||||||
|
@ -41,6 +41,8 @@ private:
|
||||||
Uint32 schemaVersion;
|
Uint32 schemaVersion;
|
||||||
Uint32 distributionKey;
|
Uint32 distributionKey;
|
||||||
Uint32 gci;
|
Uint32 gci;
|
||||||
|
Uint32 nodeCount;
|
||||||
|
Uint32 nodeList[1];
|
||||||
};
|
};
|
||||||
|
|
||||||
class CopyFragConf {
|
class CopyFragConf {
|
||||||
|
@ -85,4 +87,13 @@ private:
|
||||||
Uint32 errorCode;
|
Uint32 errorCode;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct UpdateFragDistKeyOrd
|
||||||
|
{
|
||||||
|
Uint32 tableId;
|
||||||
|
Uint32 fragId;
|
||||||
|
Uint32 fragDistributionKey;
|
||||||
|
|
||||||
|
STATIC_CONST( SignalLength = 3 );
|
||||||
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -230,6 +230,8 @@ extern "C" {
|
||||||
NDB_MGM_SERVER_NOT_CONNECTED = 1010,
|
NDB_MGM_SERVER_NOT_CONNECTED = 1010,
|
||||||
/** Could not connect to socker */
|
/** Could not connect to socker */
|
||||||
NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET = 1011,
|
NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET = 1011,
|
||||||
|
/** Could not bind local address */
|
||||||
|
NDB_MGM_BIND_ADDRESS = 1012,
|
||||||
|
|
||||||
/* Alloc node id failures */
|
/* Alloc node id failures */
|
||||||
/** Generic error, retry may succeed */
|
/** Generic error, retry may succeed */
|
||||||
|
@ -514,6 +516,15 @@ extern "C" {
|
||||||
const char *ndb_mgm_get_connected_host(NdbMgmHandle handle);
|
const char *ndb_mgm_get_connected_host(NdbMgmHandle handle);
|
||||||
const char *ndb_mgm_get_connectstring(NdbMgmHandle handle, char *buf, int buf_sz);
|
const char *ndb_mgm_get_connectstring(NdbMgmHandle handle, char *buf, int buf_sz);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set local bindaddress
|
||||||
|
* @param arg - Srting of form "host[:port]"
|
||||||
|
* @note must be called before connect
|
||||||
|
* @note Error on binding local address will not be reported until connect
|
||||||
|
* @return 0 on success
|
||||||
|
*/
|
||||||
|
int ndb_mgm_set_bindaddress(NdbMgmHandle, const char * arg);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the connectstring used for a connection
|
* Gets the connectstring used for a connection
|
||||||
*
|
*
|
||||||
|
|
|
@ -28,7 +28,8 @@
|
||||||
class ConfigRetriever {
|
class ConfigRetriever {
|
||||||
public:
|
public:
|
||||||
ConfigRetriever(const char * _connect_string,
|
ConfigRetriever(const char * _connect_string,
|
||||||
Uint32 version, Uint32 nodeType);
|
Uint32 version, Uint32 nodeType,
|
||||||
|
const char * _bind_address = 0);
|
||||||
~ConfigRetriever();
|
~ConfigRetriever();
|
||||||
|
|
||||||
int do_connect(int no_retries, int retry_delay_in_seconds, int verbose);
|
int do_connect(int no_retries, int retry_delay_in_seconds, int verbose);
|
||||||
|
|
|
@ -67,5 +67,7 @@ char ndb_version_string_buf[NDB_VERSION_STRING_BUF_SZ];
|
||||||
#define NDBD_DICT_LOCK_VERSION_5 MAKE_VERSION(5,0,23)
|
#define NDBD_DICT_LOCK_VERSION_5 MAKE_VERSION(5,0,23)
|
||||||
#define NDBD_DICT_LOCK_VERSION_5_1 MAKE_VERSION(5,1,12)
|
#define NDBD_DICT_LOCK_VERSION_5_1 MAKE_VERSION(5,1,12)
|
||||||
|
|
||||||
|
#define NDBD_UPDATE_FRAG_DIST_KEY_50 MAKE_VERSION(5,0,26)
|
||||||
|
#define NDBD_UPDATE_FRAG_DIST_KEY_51 MAKE_VERSION(5,1,12)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -658,7 +658,10 @@ private:
|
||||||
// Release all cursor operations in connection
|
// Release all cursor operations in connection
|
||||||
void releaseOps(NdbOperation*);
|
void releaseOps(NdbOperation*);
|
||||||
void releaseScanOperations(NdbIndexScanOperation*);
|
void releaseScanOperations(NdbIndexScanOperation*);
|
||||||
void releaseScanOperation(NdbIndexScanOperation*);
|
bool releaseScanOperation(NdbIndexScanOperation** listhead,
|
||||||
|
NdbIndexScanOperation** listtail,
|
||||||
|
NdbIndexScanOperation* op);
|
||||||
|
void releaseExecutedScanOperation(NdbIndexScanOperation*);
|
||||||
|
|
||||||
// Set the transaction identity of the transaction
|
// Set the transaction identity of the transaction
|
||||||
void setTransactionId(Uint64 aTransactionId);
|
void setTransactionId(Uint64 aTransactionId);
|
||||||
|
|
|
@ -37,7 +37,8 @@ public:
|
||||||
};
|
};
|
||||||
unsigned short get_port() { return m_port; };
|
unsigned short get_port() { return m_port; };
|
||||||
char *get_server_name() { return m_server_name; };
|
char *get_server_name() { return m_server_name; };
|
||||||
NDB_SOCKET_TYPE connect();
|
int bind(const char* toaddress, unsigned short toport);
|
||||||
|
NDB_SOCKET_TYPE connect(const char* toaddress = 0, unsigned short port = 0);
|
||||||
bool close();
|
bool close();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -633,5 +633,6 @@ const GsnName SignalNames [] = {
|
||||||
,{ GSN_DICT_LOCK_REF, "DICT_LOCK_REF" }
|
,{ GSN_DICT_LOCK_REF, "DICT_LOCK_REF" }
|
||||||
,{ GSN_DICT_UNLOCK_ORD, "DICT_UNLOCK_ORD" }
|
,{ GSN_DICT_UNLOCK_ORD, "DICT_UNLOCK_ORD" }
|
||||||
|
|
||||||
|
,{ GSN_UPDATE_FRAG_DIST_KEY_ORD, "UPDATE_FRAG_DIST_KEY_ORD" }
|
||||||
};
|
};
|
||||||
const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);
|
const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);
|
||||||
|
|
|
@ -45,7 +45,8 @@
|
||||||
//****************************************************************************
|
//****************************************************************************
|
||||||
|
|
||||||
ConfigRetriever::ConfigRetriever(const char * _connect_string,
|
ConfigRetriever::ConfigRetriever(const char * _connect_string,
|
||||||
Uint32 version, Uint32 node_type)
|
Uint32 version, Uint32 node_type,
|
||||||
|
const char * _bindaddress)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("ConfigRetriever::ConfigRetriever");
|
DBUG_ENTER("ConfigRetriever::ConfigRetriever");
|
||||||
|
|
||||||
|
@ -69,6 +70,15 @@ ConfigRetriever::ConfigRetriever(const char * _connect_string,
|
||||||
setError(CR_ERROR, tmp.c_str());
|
setError(CR_ERROR, tmp.c_str());
|
||||||
DBUG_VOID_RETURN;
|
DBUG_VOID_RETURN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_bindaddress)
|
||||||
|
{
|
||||||
|
if (ndb_mgm_set_bindaddress(m_handle, _bindaddress))
|
||||||
|
{
|
||||||
|
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
|
}
|
||||||
|
}
|
||||||
resetError();
|
resetError();
|
||||||
DBUG_VOID_RETURN;
|
DBUG_VOID_RETURN;
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,9 +60,6 @@ Transporter::Transporter(TransporterRegistry &t_reg,
|
||||||
}
|
}
|
||||||
strncpy(localHostName, lHostName, sizeof(localHostName));
|
strncpy(localHostName, lHostName, sizeof(localHostName));
|
||||||
|
|
||||||
if (strlen(lHostName) > 0)
|
|
||||||
Ndb_getInAddr(&localHostAddress, lHostName);
|
|
||||||
|
|
||||||
DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
|
DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
|
||||||
remoteNodeId, localNodeId, isServer,
|
remoteNodeId, localNodeId, isServer,
|
||||||
remoteHostName, localHostName,
|
remoteHostName, localHostName,
|
||||||
|
@ -128,9 +125,22 @@ Transporter::connect_client() {
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
if(isMgmConnection)
|
if(isMgmConnection)
|
||||||
|
{
|
||||||
sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client);
|
sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
|
if (!m_socket_client->init())
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (strlen(localHostName) > 0)
|
||||||
|
{
|
||||||
|
if (m_socket_client->bind(localHostName, 0) != 0)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
sockfd= m_socket_client->connect();
|
sockfd= m_socket_client->connect();
|
||||||
|
}
|
||||||
|
|
||||||
return connect_client(sockfd);
|
return connect_client(sockfd);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ SocketClient::SocketClient(const char *server_name, unsigned short port, SocketA
|
||||||
{
|
{
|
||||||
m_auth= sa;
|
m_auth= sa;
|
||||||
m_port= port;
|
m_port= port;
|
||||||
m_server_name= strdup(server_name);
|
m_server_name= server_name ? strdup(server_name) : 0;
|
||||||
m_sockfd= NDB_INVALID_SOCKET;
|
m_sockfd= NDB_INVALID_SOCKET;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,12 +45,15 @@ SocketClient::init()
|
||||||
if (m_sockfd != NDB_INVALID_SOCKET)
|
if (m_sockfd != NDB_INVALID_SOCKET)
|
||||||
NDB_CLOSE_SOCKET(m_sockfd);
|
NDB_CLOSE_SOCKET(m_sockfd);
|
||||||
|
|
||||||
|
if (m_server_name)
|
||||||
|
{
|
||||||
memset(&m_servaddr, 0, sizeof(m_servaddr));
|
memset(&m_servaddr, 0, sizeof(m_servaddr));
|
||||||
m_servaddr.sin_family = AF_INET;
|
m_servaddr.sin_family = AF_INET;
|
||||||
m_servaddr.sin_port = htons(m_port);
|
m_servaddr.sin_port = htons(m_port);
|
||||||
// Convert ip address presentation format to numeric format
|
// Convert ip address presentation format to numeric format
|
||||||
if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
|
if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
m_sockfd= socket(AF_INET, SOCK_STREAM, 0);
|
m_sockfd= socket(AF_INET, SOCK_STREAM, 0);
|
||||||
if (m_sockfd == NDB_INVALID_SOCKET) {
|
if (m_sockfd == NDB_INVALID_SOCKET) {
|
||||||
|
@ -62,8 +65,45 @@ SocketClient::init()
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
SocketClient::bind(const char* bindaddress, unsigned short localport)
|
||||||
|
{
|
||||||
|
if (m_sockfd == NDB_INVALID_SOCKET)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
struct sockaddr_in local;
|
||||||
|
memset(&local, 0, sizeof(local));
|
||||||
|
local.sin_family = AF_INET;
|
||||||
|
local.sin_port = htons(localport);
|
||||||
|
// Convert ip address presentation format to numeric format
|
||||||
|
if (Ndb_getInAddr(&local.sin_addr, bindaddress))
|
||||||
|
{
|
||||||
|
return errno ? errno : EINVAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
const int on = 1;
|
||||||
|
if (setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEADDR,
|
||||||
|
(const char*)&on, sizeof(on)) == -1) {
|
||||||
|
|
||||||
|
int ret = errno;
|
||||||
|
NDB_CLOSE_SOCKET(m_sockfd);
|
||||||
|
m_sockfd= NDB_INVALID_SOCKET;
|
||||||
|
return errno;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (::bind(m_sockfd, (struct sockaddr*)&local, sizeof(local)) == -1)
|
||||||
|
{
|
||||||
|
int ret = errno;
|
||||||
|
NDB_CLOSE_SOCKET(m_sockfd);
|
||||||
|
m_sockfd= NDB_INVALID_SOCKET;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
NDB_SOCKET_TYPE
|
NDB_SOCKET_TYPE
|
||||||
SocketClient::connect()
|
SocketClient::connect(const char *toaddress, unsigned short toport)
|
||||||
{
|
{
|
||||||
if (m_sockfd == NDB_INVALID_SOCKET)
|
if (m_sockfd == NDB_INVALID_SOCKET)
|
||||||
{
|
{
|
||||||
|
@ -74,6 +114,21 @@ SocketClient::connect()
|
||||||
return NDB_INVALID_SOCKET;
|
return NDB_INVALID_SOCKET;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (toaddress)
|
||||||
|
{
|
||||||
|
if (m_server_name)
|
||||||
|
free(m_server_name);
|
||||||
|
m_server_name = strdup(toaddress);
|
||||||
|
m_port = toport;
|
||||||
|
memset(&m_servaddr, 0, sizeof(m_servaddr));
|
||||||
|
m_servaddr.sin_family = AF_INET;
|
||||||
|
m_servaddr.sin_port = htons(toport);
|
||||||
|
// Convert ip address presentation format to numeric format
|
||||||
|
if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
|
||||||
|
return NDB_INVALID_SOCKET;
|
||||||
|
}
|
||||||
|
|
||||||
const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
|
const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
|
||||||
if (r == -1) {
|
if (r == -1) {
|
||||||
NDB_CLOSE_SOCKET(m_sockfd);
|
NDB_CLOSE_SOCKET(m_sockfd);
|
||||||
|
|
|
@ -2928,10 +2928,10 @@ void Dbdict::checkSchemaStatus(Signal* signal)
|
||||||
|
|
||||||
// On NR get index from master because index state is not on file
|
// On NR get index from master because index state is not on file
|
||||||
Uint32 type= oldEntry->m_tableType;
|
Uint32 type= oldEntry->m_tableType;
|
||||||
const bool file = c_systemRestart || !DictTabInfo::isIndex(type);
|
const bool file = (* newEntry == * oldEntry) &&
|
||||||
|
(c_systemRestart || !DictTabInfo::isIndex(type));
|
||||||
newEntry->m_info_words= oldEntry->m_info_words;
|
newEntry->m_info_words= oldEntry->m_info_words;
|
||||||
restartCreateTab(signal, tableId, oldEntry, newEntry, file);
|
restartCreateTab(signal, tableId, oldEntry, newEntry, file);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ndbrequire(ok);
|
ndbrequire(ok);
|
||||||
|
|
|
@ -1305,9 +1305,9 @@ void Dbdih::execNDB_STTOR(Signal* signal)
|
||||||
if (isMaster()) {
|
if (isMaster()) {
|
||||||
jam();
|
jam();
|
||||||
systemRestartTakeOverLab(signal);
|
systemRestartTakeOverLab(signal);
|
||||||
if (anyActiveTakeOver() && false) {
|
if (anyActiveTakeOver())
|
||||||
|
{
|
||||||
jam();
|
jam();
|
||||||
ndbout_c("1 - anyActiveTakeOver == true");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2347,6 +2347,8 @@ Dbdih::systemRestartTakeOverLab(Signal* signal)
|
||||||
// NOT ACTIVE NODES THAT HAVE NOT YET BEEN TAKEN OVER NEEDS TAKE OVER
|
// NOT ACTIVE NODES THAT HAVE NOT YET BEEN TAKEN OVER NEEDS TAKE OVER
|
||||||
// IMMEDIATELY. IF WE ARE ALIVE WE TAKE OVER OUR OWN NODE.
|
// IMMEDIATELY. IF WE ARE ALIVE WE TAKE OVER OUR OWN NODE.
|
||||||
/*-------------------------------------------------------------------*/
|
/*-------------------------------------------------------------------*/
|
||||||
|
infoEvent("Take over of node %d started",
|
||||||
|
nodePtr.i);
|
||||||
startTakeOver(signal, RNIL, nodePtr.i, nodePtr.i);
|
startTakeOver(signal, RNIL, nodePtr.i, nodePtr.i);
|
||||||
}//if
|
}//if
|
||||||
break;
|
break;
|
||||||
|
@ -2459,6 +2461,12 @@ void Dbdih::nodeRestartTakeOver(Signal* signal, Uint32 startNodeId)
|
||||||
*--------------------------------------------------------------------*/
|
*--------------------------------------------------------------------*/
|
||||||
Uint32 takeOverNode = Sysfile::getTakeOverNode(startNodeId,
|
Uint32 takeOverNode = Sysfile::getTakeOverNode(startNodeId,
|
||||||
SYSFILE->takeOver);
|
SYSFILE->takeOver);
|
||||||
|
if(takeOverNode == 0){
|
||||||
|
jam();
|
||||||
|
warningEvent("Bug in take-over code restarting");
|
||||||
|
takeOverNode = startNodeId;
|
||||||
|
}
|
||||||
|
|
||||||
startTakeOver(signal, RNIL, startNodeId, takeOverNode);
|
startTakeOver(signal, RNIL, startNodeId, takeOverNode);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2613,6 +2621,13 @@ void Dbdih::startTakeOver(Signal* signal,
|
||||||
startNode);
|
startNode);
|
||||||
takeOverPtr.p->toMasterStatus = TakeOverRecord::TO_START_COPY;
|
takeOverPtr.p->toMasterStatus = TakeOverRecord::TO_START_COPY;
|
||||||
|
|
||||||
|
if (getNodeState().getSystemRestartInProgress())
|
||||||
|
{
|
||||||
|
jam();
|
||||||
|
checkToCopy();
|
||||||
|
checkToCopyCompleted(signal);
|
||||||
|
return;
|
||||||
|
}
|
||||||
cstartGcpNow = true;
|
cstartGcpNow = true;
|
||||||
}//Dbdih::startTakeOver()
|
}//Dbdih::startTakeOver()
|
||||||
|
|
||||||
|
@ -3264,7 +3279,10 @@ void Dbdih::execCREATE_FRAGCONF(Signal* signal)
|
||||||
copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
|
copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
|
||||||
copyFragReq->distributionKey = fragPtr.p->distributionKey;
|
copyFragReq->distributionKey = fragPtr.p->distributionKey;
|
||||||
copyFragReq->gci = gci;
|
copyFragReq->gci = gci;
|
||||||
sendSignal(ref, GSN_COPY_FRAGREQ, signal, CopyFragReq::SignalLength, JBB);
|
copyFragReq->nodeCount = extractNodeInfo(fragPtr.p,
|
||||||
|
copyFragReq->nodeList);
|
||||||
|
sendSignal(ref, GSN_COPY_FRAGREQ, signal,
|
||||||
|
CopyFragReq::SignalLength + copyFragReq->nodeCount, JBB);
|
||||||
} else {
|
} else {
|
||||||
ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE);
|
ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE);
|
||||||
jam();
|
jam();
|
||||||
|
@ -3514,6 +3532,18 @@ void Dbdih::toCopyCompletedLab(Signal * signal, TakeOverRecordPtr takeOverPtr)
|
||||||
signal->theData[1] = takeOverPtr.p->toStartingNode;
|
signal->theData[1] = takeOverPtr.p->toStartingNode;
|
||||||
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
|
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
|
||||||
|
|
||||||
|
if (getNodeState().getSystemRestartInProgress())
|
||||||
|
{
|
||||||
|
jam();
|
||||||
|
infoEvent("Take over of node %d complete", takeOverPtr.p->toStartingNode);
|
||||||
|
setNodeActiveStatus(takeOverPtr.p->toStartingNode, Sysfile::NS_Active);
|
||||||
|
takeOverPtr.p->toMasterStatus = TakeOverRecord::WAIT_LCP;
|
||||||
|
takeOverCompleted(takeOverPtr.p->toStartingNode);
|
||||||
|
checkToCopy();
|
||||||
|
checkToCopyCompleted(signal);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
c_lcpState.immediateLcpStart = true;
|
c_lcpState.immediateLcpStart = true;
|
||||||
takeOverPtr.p->toMasterStatus = TakeOverRecord::WAIT_LCP;
|
takeOverPtr.p->toMasterStatus = TakeOverRecord::WAIT_LCP;
|
||||||
|
|
||||||
|
@ -3620,16 +3650,12 @@ void Dbdih::execEND_TOCONF(Signal* signal)
|
||||||
}//if
|
}//if
|
||||||
endTakeOver(takeOverPtr.i);
|
endTakeOver(takeOverPtr.i);
|
||||||
|
|
||||||
ndbout_c("2 - endTakeOver");
|
|
||||||
if (cstartPhase == ZNDB_SPH4) {
|
if (cstartPhase == ZNDB_SPH4) {
|
||||||
jam();
|
jam();
|
||||||
ndbrequire(false);
|
|
||||||
if (anyActiveTakeOver()) {
|
if (anyActiveTakeOver()) {
|
||||||
jam();
|
jam();
|
||||||
ndbout_c("4 - anyActiveTakeOver == true");
|
|
||||||
return;
|
return;
|
||||||
}//if
|
}//if
|
||||||
ndbout_c("5 - anyActiveTakeOver == false -> ndbsttorry10Lab");
|
|
||||||
ndbsttorry10Lab(signal, __LINE__);
|
ndbsttorry10Lab(signal, __LINE__);
|
||||||
return;
|
return;
|
||||||
}//if
|
}//if
|
||||||
|
@ -10017,15 +10043,19 @@ void Dbdih::startNextChkpt(Signal* signal)
|
||||||
nodePtr.i = replicaPtr.p->procNode;
|
nodePtr.i = replicaPtr.p->procNode;
|
||||||
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
|
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
|
||||||
|
|
||||||
|
if (c_lcpState.m_participatingLQH.get(nodePtr.i))
|
||||||
|
{
|
||||||
if (replicaPtr.p->lcpOngoingFlag &&
|
if (replicaPtr.p->lcpOngoingFlag &&
|
||||||
replicaPtr.p->lcpIdStarted < lcpId) {
|
replicaPtr.p->lcpIdStarted < lcpId)
|
||||||
|
{
|
||||||
jam();
|
jam();
|
||||||
//-------------------------------------------------------------------
|
//-------------------------------------------------------------------
|
||||||
// We have found a replica on a node that performs local checkpoint
|
// We have found a replica on a node that performs local checkpoint
|
||||||
// that is alive and that have not yet been started.
|
// that is alive and that have not yet been started.
|
||||||
//-------------------------------------------------------------------
|
//-------------------------------------------------------------------
|
||||||
|
|
||||||
if (nodePtr.p->noOfStartedChkpt < 2) {
|
if (nodePtr.p->noOfStartedChkpt < 2)
|
||||||
|
{
|
||||||
jam();
|
jam();
|
||||||
/**
|
/**
|
||||||
* Send LCP_FRAG_ORD to LQH
|
* Send LCP_FRAG_ORD to LQH
|
||||||
|
@ -10043,7 +10073,9 @@ void Dbdih::startNextChkpt(Signal* signal)
|
||||||
nodePtr.p->noOfStartedChkpt = i + 1;
|
nodePtr.p->noOfStartedChkpt = i + 1;
|
||||||
|
|
||||||
sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
|
sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
|
||||||
} else if (nodePtr.p->noOfQueuedChkpt < 2) {
|
}
|
||||||
|
else if (nodePtr.p->noOfQueuedChkpt < 2)
|
||||||
|
{
|
||||||
jam();
|
jam();
|
||||||
/**
|
/**
|
||||||
* Put LCP_FRAG_ORD "in queue"
|
* Put LCP_FRAG_ORD "in queue"
|
||||||
|
@ -10059,10 +10091,13 @@ void Dbdih::startNextChkpt(Signal* signal)
|
||||||
nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
|
nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
|
||||||
nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
|
nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
|
||||||
nodePtr.p->noOfQueuedChkpt = i + 1;
|
nodePtr.p->noOfQueuedChkpt = i + 1;
|
||||||
} else {
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
jam();
|
jam();
|
||||||
|
|
||||||
if(save){
|
if(save)
|
||||||
|
{
|
||||||
/**
|
/**
|
||||||
* Stop increasing value on first that was "full"
|
* Stop increasing value on first that was "full"
|
||||||
*/
|
*/
|
||||||
|
@ -10071,7 +10106,8 @@ void Dbdih::startNextChkpt(Signal* signal)
|
||||||
}
|
}
|
||||||
|
|
||||||
busyNodes.set(nodePtr.i);
|
busyNodes.set(nodePtr.i);
|
||||||
if(busyNodes.count() == lcpNodes){
|
if(busyNodes.count() == lcpNodes)
|
||||||
|
{
|
||||||
/**
|
/**
|
||||||
* There were no possibility to start the local checkpoint
|
* There were no possibility to start the local checkpoint
|
||||||
* and it was not possible to queue it up. In this case we
|
* and it was not possible to queue it up. In this case we
|
||||||
|
@ -10084,6 +10120,7 @@ void Dbdih::startNextChkpt(Signal* signal)
|
||||||
}//if
|
}//if
|
||||||
}
|
}
|
||||||
}//while
|
}//while
|
||||||
|
}
|
||||||
curr.fragmentId++;
|
curr.fragmentId++;
|
||||||
if (curr.fragmentId >= tabPtr.p->totalfragments) {
|
if (curr.fragmentId >= tabPtr.p->totalfragments) {
|
||||||
jam();
|
jam();
|
||||||
|
|
|
@ -2147,6 +2147,7 @@ private:
|
||||||
void execSTORED_PROCCONF(Signal* signal);
|
void execSTORED_PROCCONF(Signal* signal);
|
||||||
void execSTORED_PROCREF(Signal* signal);
|
void execSTORED_PROCREF(Signal* signal);
|
||||||
void execCOPY_FRAGREQ(Signal* signal);
|
void execCOPY_FRAGREQ(Signal* signal);
|
||||||
|
void execUPDATE_FRAG_DIST_KEY_ORD(Signal*);
|
||||||
void execCOPY_ACTIVEREQ(Signal* signal);
|
void execCOPY_ACTIVEREQ(Signal* signal);
|
||||||
void execCOPY_STATEREQ(Signal* signal);
|
void execCOPY_STATEREQ(Signal* signal);
|
||||||
void execLQH_TRANSREQ(Signal* signal);
|
void execLQH_TRANSREQ(Signal* signal);
|
||||||
|
|
|
@ -302,6 +302,9 @@ Dblqh::Dblqh(Block_context& ctx):
|
||||||
addRecSignal(GSN_RESTORE_LCP_REF, &Dblqh::execRESTORE_LCP_REF);
|
addRecSignal(GSN_RESTORE_LCP_REF, &Dblqh::execRESTORE_LCP_REF);
|
||||||
addRecSignal(GSN_RESTORE_LCP_CONF, &Dblqh::execRESTORE_LCP_CONF);
|
addRecSignal(GSN_RESTORE_LCP_CONF, &Dblqh::execRESTORE_LCP_CONF);
|
||||||
|
|
||||||
|
addRecSignal(GSN_UPDATE_FRAG_DIST_KEY_ORD,
|
||||||
|
&Dblqh::execUPDATE_FRAG_DIST_KEY_ORD);
|
||||||
|
|
||||||
initData();
|
initData();
|
||||||
|
|
||||||
#ifdef VM_TRACE
|
#ifdef VM_TRACE
|
||||||
|
|
|
@ -9913,6 +9913,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
|
||||||
const CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0];
|
const CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0];
|
||||||
tabptr.i = copyFragReq->tableId;
|
tabptr.i = copyFragReq->tableId;
|
||||||
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
|
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
|
||||||
|
Uint32 i;
|
||||||
const Uint32 fragId = copyFragReq->fragId;
|
const Uint32 fragId = copyFragReq->fragId;
|
||||||
const Uint32 copyPtr = copyFragReq->userPtr;
|
const Uint32 copyPtr = copyFragReq->userPtr;
|
||||||
const Uint32 userRef = copyFragReq->userRef;
|
const Uint32 userRef = copyFragReq->userRef;
|
||||||
|
@ -9925,7 +9926,19 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
|
||||||
ndbrequire(cfirstfreeTcConrec != RNIL);
|
ndbrequire(cfirstfreeTcConrec != RNIL);
|
||||||
ndbrequire(fragptr.p->m_scanNumberMask.get(NR_ScanNo));
|
ndbrequire(fragptr.p->m_scanNumberMask.get(NR_ScanNo));
|
||||||
|
|
||||||
fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
|
Uint32 key = fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
|
||||||
|
|
||||||
|
Uint32 checkversion = NDB_VERSION >= MAKE_VERSION(5,1,0) ?
|
||||||
|
NDBD_UPDATE_FRAG_DIST_KEY_51 : NDBD_UPDATE_FRAG_DIST_KEY_50;
|
||||||
|
|
||||||
|
Uint32 nodeCount = copyFragReq->nodeCount;
|
||||||
|
NdbNodeBitmask nodemask;
|
||||||
|
if (getNodeInfo(refToNode(userRef)).m_version >= checkversion)
|
||||||
|
{
|
||||||
|
ndbrequire(nodeCount <= MAX_REPLICAS);
|
||||||
|
for (i = 0; i<nodeCount; i++)
|
||||||
|
nodemask.set(copyFragReq->nodeList[i]);
|
||||||
|
}
|
||||||
|
|
||||||
if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
|
if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
|
||||||
jam();
|
jam();
|
||||||
|
@ -10009,9 +10022,42 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
|
||||||
req->savePointId = tcConnectptr.p->savePointId;
|
req->savePointId = tcConnectptr.p->savePointId;
|
||||||
sendSignal(scanptr.p->scanBlockref, GSN_ACC_SCANREQ, signal,
|
sendSignal(scanptr.p->scanBlockref, GSN_ACC_SCANREQ, signal,
|
||||||
AccScanReq::SignalLength, JBB);
|
AccScanReq::SignalLength, JBB);
|
||||||
|
|
||||||
|
if (! nodemask.isclear())
|
||||||
|
{
|
||||||
|
ndbrequire(nodemask.get(getOwnNodeId()));
|
||||||
|
ndbrequire(nodemask.get(nodeId)); // cpy dest
|
||||||
|
nodemask.clear(getOwnNodeId());
|
||||||
|
nodemask.clear(nodeId);
|
||||||
|
|
||||||
|
UpdateFragDistKeyOrd*
|
||||||
|
ord = (UpdateFragDistKeyOrd*)signal->getDataPtrSend();
|
||||||
|
ord->tableId = tabptr.i;
|
||||||
|
ord->fragId = fragId;
|
||||||
|
ord->fragDistributionKey = key;
|
||||||
|
i = 0;
|
||||||
|
while ((i = nodemask.find(i+1)) != NdbNodeBitmask::NotFound)
|
||||||
|
{
|
||||||
|
if (getNodeInfo(i).m_version >= checkversion)
|
||||||
|
sendSignal(calcLqhBlockRef(i), GSN_UPDATE_FRAG_DIST_KEY_ORD,
|
||||||
|
signal, UpdateFragDistKeyOrd::SignalLength, JBB);
|
||||||
|
}
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}//Dblqh::execCOPY_FRAGREQ()
|
}//Dblqh::execCOPY_FRAGREQ()
|
||||||
|
|
||||||
|
void
|
||||||
|
Dblqh::execUPDATE_FRAG_DIST_KEY_ORD(Signal * signal)
|
||||||
|
{
|
||||||
|
jamEntry();
|
||||||
|
UpdateFragDistKeyOrd* ord =(UpdateFragDistKeyOrd*)signal->getDataPtr();
|
||||||
|
|
||||||
|
tabptr.i = ord->tableId;
|
||||||
|
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
|
||||||
|
ndbrequire(getFragmentrec(signal, ord->fragId));
|
||||||
|
fragptr.p->fragDistributionKey = ord->fragDistributionKey;
|
||||||
|
}
|
||||||
|
|
||||||
void Dblqh::accScanConfCopyLab(Signal* signal)
|
void Dblqh::accScanConfCopyLab(Signal* signal)
|
||||||
{
|
{
|
||||||
AccScanConf * const accScanConf = (AccScanConf *)&signal->theData[0];
|
AccScanConf * const accScanConf = (AccScanConf *)&signal->theData[0];
|
||||||
|
@ -18292,6 +18338,18 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
|
||||||
if(tabPtr.p->tableStatus != Tablerec::NOT_DEFINED){
|
if(tabPtr.p->tableStatus != Tablerec::NOT_DEFINED){
|
||||||
infoEvent("Table %d Status: %d Usage: %d",
|
infoEvent("Table %d Status: %d Usage: %d",
|
||||||
i, tabPtr.p->tableStatus, tabPtr.p->usageCount);
|
i, tabPtr.p->tableStatus, tabPtr.p->usageCount);
|
||||||
|
|
||||||
|
for (Uint32 j = 0; j<MAX_FRAG_PER_NODE; j++)
|
||||||
|
{
|
||||||
|
FragrecordPtr fragPtr;
|
||||||
|
if ((fragPtr.i = tabPtr.p->fragrec[j]) != RNIL)
|
||||||
|
{
|
||||||
|
c_fragment_pool.getPtr(fragPtr);
|
||||||
|
infoEvent(" frag: %d distKey: %u",
|
||||||
|
tabPtr.p->fragid[j],
|
||||||
|
fragPtr.p->fragDistributionKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -1003,13 +1003,6 @@ Dbtc::handleFailedApiNode(Signal* signal,
|
||||||
TloopCount += 64;
|
TloopCount += 64;
|
||||||
break;
|
break;
|
||||||
case CS_CONNECTED:
|
case CS_CONNECTED:
|
||||||
/*********************************************************************/
|
|
||||||
// The api record is connected to failed node. We need to release the
|
|
||||||
// connection and set it in a disconnected state.
|
|
||||||
/*********************************************************************/
|
|
||||||
jam();
|
|
||||||
releaseApiCon(signal, apiConnectptr.i);
|
|
||||||
break;
|
|
||||||
case CS_REC_COMMITTING:
|
case CS_REC_COMMITTING:
|
||||||
case CS_RECEIVING:
|
case CS_RECEIVING:
|
||||||
case CS_STARTED:
|
case CS_STARTED:
|
||||||
|
|
|
@ -58,7 +58,8 @@ NDB_STD_OPTS_VARS;
|
||||||
// XXX should be my_bool ???
|
// XXX should be my_bool ???
|
||||||
static int _daemon, _no_daemon, _foreground, _initial, _no_start;
|
static int _daemon, _no_daemon, _foreground, _initial, _no_start;
|
||||||
static int _initialstart;
|
static int _initialstart;
|
||||||
static const char* _nowait_nodes;
|
static const char* _nowait_nodes = 0;
|
||||||
|
static const char* _bind_address = 0;
|
||||||
|
|
||||||
extern Uint32 g_start_type;
|
extern Uint32 g_start_type;
|
||||||
extern NdbNodeBitmask g_nowait_nodes;
|
extern NdbNodeBitmask g_nowait_nodes;
|
||||||
|
@ -98,6 +99,10 @@ static struct my_option my_long_options[] =
|
||||||
"Perform initial start",
|
"Perform initial start",
|
||||||
(gptr*) &_initialstart, (gptr*) &_initialstart, 0,
|
(gptr*) &_initialstart, (gptr*) &_initialstart, 0,
|
||||||
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
|
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
|
||||||
|
{ "bind-address", OPT_NOWAIT_NODES,
|
||||||
|
"Local bind address",
|
||||||
|
(gptr*) &_bind_address, (gptr*) &_bind_address, 0,
|
||||||
|
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
||||||
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
|
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
|
||||||
};
|
};
|
||||||
static void short_usage_sub(void)
|
static void short_usage_sub(void)
|
||||||
|
@ -257,7 +262,9 @@ Configuration::fetch_configuration(){
|
||||||
|
|
||||||
m_mgmd_port= 0;
|
m_mgmd_port= 0;
|
||||||
m_config_retriever= new ConfigRetriever(getConnectString(),
|
m_config_retriever= new ConfigRetriever(getConnectString(),
|
||||||
NDB_VERSION, NODE_TYPE_DB);
|
NDB_VERSION,
|
||||||
|
NODE_TYPE_DB,
|
||||||
|
_bind_address);
|
||||||
|
|
||||||
if (m_config_retriever->hasError())
|
if (m_config_retriever->hasError())
|
||||||
{
|
{
|
||||||
|
|
|
@ -107,6 +107,7 @@ struct ndb_mgm_handle {
|
||||||
int mgmd_version_major;
|
int mgmd_version_major;
|
||||||
int mgmd_version_minor;
|
int mgmd_version_minor;
|
||||||
int mgmd_version_build;
|
int mgmd_version_build;
|
||||||
|
char * m_bindaddress;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define SET_ERROR(h, e, s) setError(h, e, __LINE__, s)
|
#define SET_ERROR(h, e, s) setError(h, e, __LINE__, s)
|
||||||
|
@ -168,6 +169,7 @@ ndb_mgm_create_handle()
|
||||||
h->cfg_i = -1;
|
h->cfg_i = -1;
|
||||||
h->errstream = stdout;
|
h->errstream = stdout;
|
||||||
h->m_name = 0;
|
h->m_name = 0;
|
||||||
|
h->m_bindaddress = 0;
|
||||||
|
|
||||||
strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE);
|
strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE);
|
||||||
|
|
||||||
|
@ -215,6 +217,22 @@ ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv)
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern "C"
|
||||||
|
int
|
||||||
|
ndb_mgm_set_bindaddress(NdbMgmHandle handle, const char * arg)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("ndb_mgm_set_bindaddress");
|
||||||
|
if (handle->m_bindaddress)
|
||||||
|
free(handle->m_bindaddress);
|
||||||
|
|
||||||
|
if (arg)
|
||||||
|
handle->m_bindaddress = strdup(arg);
|
||||||
|
else
|
||||||
|
handle->m_bindaddress = 0;
|
||||||
|
|
||||||
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destroy a handle
|
* Destroy a handle
|
||||||
*/
|
*/
|
||||||
|
@ -241,6 +259,8 @@ ndb_mgm_destroy_handle(NdbMgmHandle * handle)
|
||||||
#endif
|
#endif
|
||||||
(*handle)->cfg.~LocalConfig();
|
(*handle)->cfg.~LocalConfig();
|
||||||
my_free((*handle)->m_name, MYF(MY_ALLOW_ZERO_PTR));
|
my_free((*handle)->m_name, MYF(MY_ALLOW_ZERO_PTR));
|
||||||
|
if ((*handle)->m_bindaddress)
|
||||||
|
free((*handle)->m_bindaddress);
|
||||||
my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR));
|
my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR));
|
||||||
* handle = 0;
|
* handle = 0;
|
||||||
DBUG_VOID_RETURN;
|
DBUG_VOID_RETURN;
|
||||||
|
@ -433,6 +453,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
|
||||||
BaseString::snprintf(logname, 64, "mgmapi.log");
|
BaseString::snprintf(logname, 64, "mgmapi.log");
|
||||||
handle->logfile = fopen(logname, "w");
|
handle->logfile = fopen(logname, "w");
|
||||||
#endif
|
#endif
|
||||||
|
char buf[1024];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Do connect
|
* Do connect
|
||||||
|
@ -440,6 +461,50 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
|
||||||
LocalConfig &cfg= handle->cfg;
|
LocalConfig &cfg= handle->cfg;
|
||||||
NDB_SOCKET_TYPE sockfd= NDB_INVALID_SOCKET;
|
NDB_SOCKET_TYPE sockfd= NDB_INVALID_SOCKET;
|
||||||
Uint32 i;
|
Uint32 i;
|
||||||
|
int binderror = 0;
|
||||||
|
SocketClient s(0, 0);
|
||||||
|
if (!s.init())
|
||||||
|
{
|
||||||
|
fprintf(handle->errstream,
|
||||||
|
"Unable to create socket, "
|
||||||
|
"while trying to connect with connect string: %s\n",
|
||||||
|
cfg.makeConnectString(buf,sizeof(buf)));
|
||||||
|
|
||||||
|
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
|
||||||
|
"Unable to create socket, "
|
||||||
|
"while trying to connect with connect string: %s\n",
|
||||||
|
cfg.makeConnectString(buf,sizeof(buf)));
|
||||||
|
DBUG_RETURN(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (handle->m_bindaddress)
|
||||||
|
{
|
||||||
|
BaseString::snprintf(buf, sizeof(buf), handle->m_bindaddress);
|
||||||
|
unsigned short portno = 0;
|
||||||
|
char * port = strchr(buf, ':');
|
||||||
|
if (port != 0)
|
||||||
|
{
|
||||||
|
portno = atoi(port+1);
|
||||||
|
* port = 0;
|
||||||
|
}
|
||||||
|
int err;
|
||||||
|
if ((err = s.bind(buf, portno)) != 0)
|
||||||
|
{
|
||||||
|
fprintf(handle->errstream,
|
||||||
|
"Unable to bind local address %s errno: %d, "
|
||||||
|
"while trying to connect with connect string: %s\n",
|
||||||
|
handle->m_bindaddress, err,
|
||||||
|
cfg.makeConnectString(buf,sizeof(buf)));
|
||||||
|
|
||||||
|
setError(handle, NDB_MGM_BIND_ADDRESS, __LINE__,
|
||||||
|
"Unable to bind local address %s errno: %d, "
|
||||||
|
"while trying to connect with connect string: %s\n",
|
||||||
|
handle->m_bindaddress, err,
|
||||||
|
cfg.makeConnectString(buf,sizeof(buf)));
|
||||||
|
DBUG_RETURN(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
while (sockfd == NDB_INVALID_SOCKET)
|
while (sockfd == NDB_INVALID_SOCKET)
|
||||||
{
|
{
|
||||||
// do all the mgmt servers
|
// do all the mgmt servers
|
||||||
|
@ -447,8 +512,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
|
||||||
{
|
{
|
||||||
if (cfg.ids[i].type != MgmId_TCP)
|
if (cfg.ids[i].type != MgmId_TCP)
|
||||||
continue;
|
continue;
|
||||||
SocketClient s(cfg.ids[i].name.c_str(), cfg.ids[i].port);
|
sockfd = s.connect(cfg.ids[i].name.c_str(), cfg.ids[i].port);
|
||||||
sockfd = s.connect();
|
|
||||||
if (sockfd != NDB_INVALID_SOCKET)
|
if (sockfd != NDB_INVALID_SOCKET)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -456,19 +520,17 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
|
||||||
break;
|
break;
|
||||||
#ifndef DBUG_OFF
|
#ifndef DBUG_OFF
|
||||||
{
|
{
|
||||||
char buf[1024];
|
|
||||||
DBUG_PRINT("info",("Unable to connect with connect string: %s",
|
DBUG_PRINT("info",("Unable to connect with connect string: %s",
|
||||||
cfg.makeConnectString(buf,sizeof(buf))));
|
cfg.makeConnectString(buf,sizeof(buf))));
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
if (verbose > 0) {
|
if (verbose > 0) {
|
||||||
char buf[1024];
|
fprintf(handle->errstream,
|
||||||
fprintf(handle->errstream, "Unable to connect with connect string: %s\n",
|
"Unable to connect with connect string: %s\n",
|
||||||
cfg.makeConnectString(buf,sizeof(buf)));
|
cfg.makeConnectString(buf,sizeof(buf)));
|
||||||
verbose= -1;
|
verbose= -1;
|
||||||
}
|
}
|
||||||
if (no_retries == 0) {
|
if (no_retries == 0) {
|
||||||
char buf[1024];
|
|
||||||
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
|
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
|
||||||
"Unable to connect with connect string: %s",
|
"Unable to connect with connect string: %s",
|
||||||
cfg.makeConnectString(buf,sizeof(buf)));
|
cfg.makeConnectString(buf,sizeof(buf)));
|
||||||
|
|
|
@ -694,9 +694,27 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp)
|
||||||
theNdbCon = NULL;
|
theNdbCon = NULL;
|
||||||
m_transConnection = NULL;
|
m_transConnection = NULL;
|
||||||
|
|
||||||
if (releaseOp && tTransCon) {
|
if (tTransCon)
|
||||||
|
{
|
||||||
NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
|
NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
|
||||||
tTransCon->releaseScanOperation(tOp);
|
|
||||||
|
bool ret = true;
|
||||||
|
if (theStatus != WaitResponse)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Not executed yet
|
||||||
|
*/
|
||||||
|
ret =
|
||||||
|
tTransCon->releaseScanOperation(&tTransCon->m_theFirstScanOperation,
|
||||||
|
&tTransCon->m_theLastScanOperation,
|
||||||
|
tOp);
|
||||||
|
}
|
||||||
|
else if (releaseOp)
|
||||||
|
{
|
||||||
|
ret = tTransCon->releaseScanOperation(&tTransCon->m_firstExecutedScanOp,
|
||||||
|
0, tOp);
|
||||||
|
}
|
||||||
|
assert(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
tCon->theScanningOp = 0;
|
tCon->theScanningOp = 0;
|
||||||
|
|
|
@ -978,52 +978,61 @@ void releaseScanOperation();
|
||||||
Remark: Release scan op when hupp'ed trans closed (save memory)
|
Remark: Release scan op when hupp'ed trans closed (save memory)
|
||||||
******************************************************************************/
|
******************************************************************************/
|
||||||
void
|
void
|
||||||
NdbTransaction::releaseScanOperation(NdbIndexScanOperation* cursorOp)
|
NdbTransaction::releaseExecutedScanOperation(NdbIndexScanOperation* cursorOp)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("NdbTransaction::releaseScanOperation");
|
DBUG_ENTER("NdbTransaction::releaseExecutedScanOperation");
|
||||||
DBUG_PRINT("enter", ("this=0x%x op=0x%x", (UintPtr)this, (UintPtr)cursorOp));
|
DBUG_PRINT("enter", ("this=0x%x op=0x%x", (UintPtr)this, (UintPtr)cursorOp));
|
||||||
|
|
||||||
// here is one reason to make op lists doubly linked
|
releaseScanOperation(&m_firstExecutedScanOp, 0, cursorOp);
|
||||||
if (cursorOp->m_executed)
|
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
|
}//NdbTransaction::releaseExecutedScanOperation()
|
||||||
|
|
||||||
|
bool
|
||||||
|
NdbTransaction::releaseScanOperation(NdbIndexScanOperation** listhead,
|
||||||
|
NdbIndexScanOperation** listtail,
|
||||||
|
NdbIndexScanOperation* op)
|
||||||
{
|
{
|
||||||
if (m_firstExecutedScanOp == cursorOp) {
|
if (* listhead == op)
|
||||||
m_firstExecutedScanOp = (NdbIndexScanOperation*)cursorOp->theNext;
|
{
|
||||||
cursorOp->release();
|
* listhead = (NdbIndexScanOperation*)op->theNext;
|
||||||
theNdb->releaseScanOperation(cursorOp);
|
if (listtail && *listtail == op)
|
||||||
} else if (m_firstExecutedScanOp != NULL) {
|
{
|
||||||
NdbIndexScanOperation* tOp = m_firstExecutedScanOp;
|
assert(* listhead == 0);
|
||||||
while (tOp->theNext != NULL) {
|
* listtail = 0;
|
||||||
if (tOp->theNext == cursorOp) {
|
|
||||||
tOp->theNext = cursorOp->theNext;
|
|
||||||
cursorOp->release();
|
|
||||||
theNdb->releaseScanOperation(cursorOp);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
tOp = (NdbIndexScanOperation*)tOp->theNext;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (m_theFirstScanOperation == cursorOp) {
|
NdbIndexScanOperation* tmp = * listhead;
|
||||||
m_theFirstScanOperation = (NdbIndexScanOperation*)cursorOp->theNext;
|
while (tmp != NULL)
|
||||||
cursorOp->release();
|
{
|
||||||
theNdb->releaseScanOperation(cursorOp);
|
if (tmp->theNext == op)
|
||||||
} else if (m_theFirstScanOperation != NULL) {
|
{
|
||||||
NdbIndexScanOperation* tOp = m_theFirstScanOperation;
|
tmp->theNext = (NdbIndexScanOperation*)op->theNext;
|
||||||
while (tOp->theNext != NULL) {
|
if (listtail && *listtail == op)
|
||||||
if (tOp->theNext == cursorOp) {
|
{
|
||||||
tOp->theNext = cursorOp->theNext;
|
assert(op->theNext == 0);
|
||||||
cursorOp->release();
|
*listtail = tmp;
|
||||||
theNdb->releaseScanOperation(cursorOp);
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
tOp = (NdbIndexScanOperation*)tOp->theNext;
|
tmp = (NdbIndexScanOperation*)tmp->theNext;
|
||||||
}
|
}
|
||||||
|
if (tmp == NULL)
|
||||||
|
op = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (op != NULL)
|
||||||
|
{
|
||||||
|
op->release();
|
||||||
|
theNdb->releaseScanOperation(op);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
DBUG_VOID_RETURN;
|
|
||||||
}//NdbTransaction::releaseScanOperation()
|
|
||||||
|
|
||||||
/*****************************************************************************
|
/*****************************************************************************
|
||||||
NdbOperation* getNdbOperation(const char* aTableName);
|
NdbOperation* getNdbOperation(const char* aTableName);
|
||||||
|
|
Loading…
Add table
Reference in a new issue