Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.0

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.0
This commit is contained in:
unknown 2006-04-26 23:10:36 +02:00
commit 58629f016b
9 changed files with 238 additions and 68 deletions

View file

@ -232,6 +232,12 @@ extern "C" {
/** Could not connect to socker */
NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET = 1011,
/* Alloc node id failures */
/** Generic error, retry may succeed */
NDB_MGM_ALLOCID_ERROR = 1101,
/** Non retriable error */
NDB_MGM_ALLOCID_CONFIG_MISMATCH = 1102,
/* Service errors - Start/Stop Node or System */
/** Start failed */
NDB_MGM_START_FAILED = 2001,
@ -999,7 +1005,7 @@ extern "C" {
void ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *);
int ndb_mgm_alloc_nodeid(NdbMgmHandle handle,
unsigned version, int nodetype);
unsigned version, int nodetype, int log_event);
/**
* End Session

View file

@ -349,12 +349,14 @@ ConfigRetriever::allocNodeId(int no_retries, int retry_delay_in_seconds)
if(!ndb_mgm_connect(m_handle, 0, 0, 0))
goto next;
res= ndb_mgm_alloc_nodeid(m_handle, m_version, m_node_type);
res= ndb_mgm_alloc_nodeid(m_handle, m_version, m_node_type,
no_retries == 0 /* only log last retry */);
if(res >= 0)
return _ownNodeId= (Uint32)res;
next:
if (no_retries == 0)
int error = ndb_mgm_get_latest_error(m_handle);
if (no_retries == 0 || error == NDB_MGM_ALLOCID_CONFIG_MISMATCH)
break;
no_retries--;
NdbSleep_SecSleep(retry_delay_in_seconds);

View file

@ -286,7 +286,8 @@ Configuration::fetch_configuration(){
if (globalData.ownId)
cr.setNodeId(globalData.ownId);
globalData.ownId = cr.allocNodeId(2 /*retry*/,3 /*delay*/);
globalData.ownId = cr.allocNodeId(globalData.ownId ? 10 : 2 /*retry*/,
3 /*delay*/);
if(globalData.ownId == 0){
ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,

View file

@ -1868,7 +1868,8 @@ const char *ndb_mgm_get_connectstring(NdbMgmHandle handle, char *buf, int buf_sz
extern "C"
int
ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype)
ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype,
int log_event)
{
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
@ -1888,9 +1889,11 @@ ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype)
args.put("endian", (endian_check.c[sizeof(long)-1])?"big":"little");
if (handle->m_name)
args.put("name", handle->m_name);
args.put("log_event", log_event);
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get nodeid reply", NULL, ""),
MGM_ARG("error_code", Int, Optional, "Error code"),
MGM_ARG("nodeid", Int, Optional, "Error message"),
MGM_ARG("result", String, Mandatory, "Error message"),
MGM_END()
@ -1903,14 +1906,16 @@ ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype)
nodeid= -1;
do {
const char * buf;
if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){
if (!prop->get("result", &buf) || strcmp(buf, "Ok") != 0)
{
const char *hostname= ndb_mgm_get_connected_host(handle);
unsigned port= ndb_mgm_get_connected_port(handle);
BaseString err;
Uint32 error_code= NDB_MGM_ALLOCID_ERROR;
err.assfmt("Could not alloc node id at %s port %d: %s",
hostname, port, buf);
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
err.c_str());
prop->get("error_code", &error_code);
setError(handle, error_code, __LINE__, err.c_str());
break;
}
Uint32 _nodeid;

View file

@ -507,9 +507,10 @@ MgmtSrvr::MgmtSrvr(SocketServer *socket_server,
if (_ownNodeId == 0) // we did not get node id from other server
{
NodeId tmp= m_config_retriever->get_configuration_nodeid();
int error_code;
if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
0, 0, error_string)){
0, 0, error_code, error_string)){
ndbout << "Unable to obtain requested nodeid: "
<< error_string.c_str() << endl;
require(false);
@ -1118,31 +1119,16 @@ int MgmtSrvr::sendSTOP_REQ(const Vector<NodeId> &node_ids,
const NFCompleteRep * const rep =
CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
#ifdef VM_TRACE
ndbout_c("Node %d fail completed", rep->failedNodeId);
ndbout_c("sendSTOP_REQ Node %d fail completed", rep->failedNodeId);
#endif
nodes.clear(rep->failedNodeId); // clear the failed node
if (singleUserNodeId == 0)
stoppedNodes.set(rep->failedNodeId);
break;
}
case GSN_NODE_FAILREP:{
const NodeFailRep * const rep =
CAST_CONSTPTR(NodeFailRep, signal->getDataPtr());
NodeBitmask failedNodes;
failedNodes.assign(NodeBitmask::Size, rep->theNodes);
#ifdef VM_TRACE
{
ndbout << "Failed nodes:";
for (unsigned i = 0; i < 32*NodeBitmask::Size; i++)
if(failedNodes.get(i))
ndbout << " " << i;
ndbout << endl;
}
#endif
failedNodes.bitAND(nodes);
if (!failedNodes.isclear())
{
nodes.bitANDC(failedNodes); // clear the failed nodes
if (singleUserNodeId == 0)
stoppedNodes.bitOR(failedNodes);
}
break;
}
default:
@ -1263,11 +1249,47 @@ int MgmtSrvr::restartNodes(const Vector<NodeId> &node_ids,
abort,
false,
true,
nostart,
true,
initialStart);
if (ret)
return ret;
if (stopCount)
*stopCount = nodes.count();
return ret;
// start up the nodes again
int waitTime = 12000;
NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
for (unsigned i = 0; i < node_ids.size(); i++)
{
NodeId nodeId= node_ids[i];
enum ndb_mgm_node_status s;
s = NDB_MGM_NODE_STATUS_NO_CONTACT;
#ifdef VM_TRACE
ndbout_c("Waiting for %d not started", nodeId);
#endif
while (s != NDB_MGM_NODE_STATUS_NOT_STARTED && waitTime > 0)
{
Uint32 startPhase = 0, version = 0, dynamicId = 0, nodeGroup = 0;
Uint32 connectCount = 0;
bool system;
const char *address;
status(nodeId, &s, &version, &startPhase,
&system, &dynamicId, &nodeGroup, &connectCount, &address);
NdbSleep_MilliSleep(100);
waitTime = (maxTime - NdbTick_CurrentMillisecond());
}
}
if (nostart)
return 0;
for (unsigned i = 0; i < node_ids.size(); i++)
{
int result = start(node_ids[i]);
}
return 0;
}
/*
@ -1918,7 +1940,8 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
enum ndb_mgm_node_type type,
struct sockaddr *client_addr,
SOCKET_SIZE_TYPE *client_addr_len,
BaseString &error_string)
int &error_code, BaseString &error_string,
int log_event)
{
DBUG_ENTER("MgmtSrvr::alloc_node_id");
DBUG_PRINT("enter", ("nodeid=%d, type=%d, client_addr=%d",
@ -1927,6 +1950,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if (*nodeId == 0) {
error_string.appfmt("no-nodeid-checks set in management server.\n"
"node id must be set explicitly in connectstring");
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
DBUG_RETURN(false);
}
DBUG_RETURN(true);
@ -1951,8 +1975,10 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if(NdbMutex_Lock(m_configMutex))
{
// should not happen
error_string.appfmt("unable to lock configuration mutex");
return false;
error_code = NDB_MGM_ALLOCID_ERROR;
DBUG_RETURN(false);
}
ndb_mgm_configuration_iterator
iter(* _config->m_configValues, CFG_SECTION_NODE);
@ -2023,6 +2049,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
"or specifying unique host names in config file.",
id_found, tmp);
NdbMutex_Unlock(m_configMutex);
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
DBUG_RETURN(false);
}
if (config_hostname == 0) {
@ -2031,6 +2058,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
"or specifying unique host names in config file,\n"
"or specifying just one mgmt server in config file.",
tmp);
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
DBUG_RETURN(false);
}
id_found= tmp; // mgmt server matched, check for more matches
@ -2072,8 +2100,9 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
char tmp_str[128];
m_reserved_nodes.getText(tmp_str);
g_eventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, m_reserved_nodes %s.",
id_found, get_connect_address(id_found), tmp_str);
g_eventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, "
"m_reserved_nodes %s.",
id_found, get_connect_address(id_found), tmp_str);
DBUG_RETURN(true);
}
@ -2093,26 +2122,48 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
type_c_string.assfmt("%s(%s)", alias, str);
}
if (*nodeId == 0) {
if (*nodeId == 0)
{
if (found_matching_id)
{
if (found_matching_type)
{
if (found_free_node)
{
error_string.appfmt("Connection done from wrong host ip %s.",
(client_addr)?
inet_ntoa(((struct sockaddr_in *)
inet_ntoa(((struct sockaddr_in *)
(client_addr))->sin_addr):"");
error_code = NDB_MGM_ALLOCID_ERROR;
}
else
{
error_string.appfmt("No free node id found for %s.",
type_string.c_str());
error_code = NDB_MGM_ALLOCID_ERROR;
}
}
else
{
error_string.appfmt("No %s node defined in config file.",
type_string.c_str());
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
}
}
else
{
error_string.append("No nodes defined in config file.");
} else {
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
}
}
else
{
if (found_matching_id)
{
if (found_matching_type)
if (found_free_node) {
{
if (found_free_node)
{
// have to split these into two since inet_ntoa overwrites itself
error_string.appfmt("Connection with id %d done from wrong host ip %s,",
*nodeId, inet_ntoa(((struct sockaddr_in *)
@ -2120,27 +2171,44 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
error_string.appfmt(" expected %s(%s).", config_hostname,
r_config_addr ?
"lookup failed" : inet_ntoa(config_addr));
} else
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
}
else
{
error_string.appfmt("Id %d already allocated by another node.",
*nodeId);
error_code = NDB_MGM_ALLOCID_ERROR;
}
}
else
{
error_string.appfmt("Id %d configured as %s, connect attempted as %s.",
*nodeId, type_c_string.c_str(),
type_string.c_str());
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
}
}
else
{
error_string.appfmt("No node defined with id=%d in config file.",
*nodeId);
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
}
}
g_eventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s. "
"Returned error string \"%s\"",
*nodeId,
client_addr != 0 ? inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr) : "<none>",
error_string.c_str());
NodeBitmask connected_nodes2;
get_connected_nodes(connected_nodes2);
if (log_event || error_code == NDB_MGM_ALLOCID_CONFIG_MISMATCH)
{
g_eventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s."
" Returned error string \"%s\"",
*nodeId,
client_addr != 0
? inet_ntoa(((struct sockaddr_in *)
(client_addr))->sin_addr)
: "<none>",
error_string.c_str());
NodeBitmask connected_nodes2;
get_connected_nodes(connected_nodes2);
BaseString tmp_connected, tmp_not_connected;
for(Uint32 i = 0; i < MAX_NODES; i++)
{
@ -2378,6 +2446,8 @@ MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted)
MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m)
: m_mgmsrv(m)
{
m_reserved_nodes.clear();
m_alloc_timeout= 0;
}
MgmtSrvr::Allocated_resources::~Allocated_resources()
@ -2396,9 +2466,22 @@ MgmtSrvr::Allocated_resources::~Allocated_resources()
}
void
MgmtSrvr::Allocated_resources::reserve_node(NodeId id)
MgmtSrvr::Allocated_resources::reserve_node(NodeId id, NDB_TICKS timeout)
{
m_reserved_nodes.set(id);
m_alloc_timeout= NdbTick_CurrentMillisecond() + timeout;
}
bool
MgmtSrvr::Allocated_resources::is_timed_out(NDB_TICKS tick)
{
if (m_alloc_timeout && tick > m_alloc_timeout)
{
g_eventLogger.info("Mgmt server state: nodeid %d timed out.",
get_nodeid());
return true;
}
return false;
}
NodeId

View file

@ -106,7 +106,8 @@ public:
~Allocated_resources();
// methods to reserve/allocate resources which
// will be freed when running destructor
void reserve_node(NodeId id);
void reserve_node(NodeId id, NDB_TICKS timeout);
bool is_timed_out(NDB_TICKS tick);
bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); }
bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); }
bool isclear() { return m_reserved_nodes.isclear(); }
@ -114,6 +115,7 @@ public:
private:
MgmtSrvr &m_mgmsrv;
NodeBitmask m_reserved_nodes;
NDB_TICKS m_alloc_timeout;
};
NdbMutex *m_node_id_mutex;
@ -432,8 +434,10 @@ public:
*/
bool getNextNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ;
bool alloc_node_id(NodeId * _nodeId, enum ndb_mgm_node_type type,
struct sockaddr *client_addr, SOCKET_SIZE_TYPE *client_addr_len,
BaseString &error_string);
struct sockaddr *client_addr,
SOCKET_SIZE_TYPE *client_addr_len,
int &error_code, BaseString &error_string,
int log_event = 1);
/**
*

View file

@ -137,6 +137,8 @@ ParserRow<MgmApiSession> commands[] = {
MGM_ARG("public key", String, Mandatory, "Public key"),
MGM_ARG("endian", String, Optional, "Endianness"),
MGM_ARG("name", String, Optional, "Name of connection"),
MGM_ARG("timeout", Int, Optional, "Timeout in seconds"),
MGM_ARG("log_event", Int, Optional, "Log failure in cluster log"),
MGM_CMD("get version", &MgmApiSession::getVersion, ""),
@ -259,6 +261,15 @@ ParserRow<MgmApiSession> commands[] = {
MGM_END()
};
struct PurgeStruct
{
NodeBitmask free_nodes;/* free nodes as reported
* by ndbd in apiRegReqConf
*/
BaseString *str;
NDB_TICKS tick;
};
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
: SocketServer::Session(sock), m_mgmsrv(mgm)
{
@ -408,12 +419,15 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
{
const char *cmd= "get nodeid reply";
Uint32 version, nodeid= 0, nodetype= 0xff;
Uint32 timeout= 20; // default seconds timeout
const char * transporter;
const char * user;
const char * password;
const char * public_key;
const char * endian= NULL;
const char * name= NULL;
Uint32 log_event= 1;
bool log_event_version;
union { long l; char c[sizeof(long)]; } endian_check;
args.get("version", &version);
@ -425,6 +439,9 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
args.get("public key", &public_key);
args.get("endian", &endian);
args.get("name", &name);
args.get("timeout", &timeout);
/* for backwards compatability keep track if client uses new protocol */
log_event_version= args.get("log_event", &log_event);
endian_check.l = 1;
if(endian
@ -464,14 +481,38 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
NodeId tmp= nodeid;
if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){
BaseString error_string;
if (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype,
&addr, &addrlen, error_string)){
int error_code;
NDB_TICKS tick= 0;
/* only report error on second attempt as not to clog the cluster log */
while (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype,
&addr, &addrlen, error_code, error_string,
tick == 0 ? 0 : log_event))
{
/* NDB_MGM_ALLOCID_CONFIG_MISMATCH is a non retriable error */
if (tick == 0 && error_code != NDB_MGM_ALLOCID_CONFIG_MISMATCH)
{
// attempt to free any timed out reservations
tick= NdbTick_CurrentMillisecond();
struct PurgeStruct ps;
m_mgmsrv.get_connected_nodes(ps.free_nodes);
// invert connected_nodes to get free nodes
ps.free_nodes.bitXORC(NodeBitmask());
ps.str= 0;
ps.tick= tick;
m_mgmsrv.get_socket_server()->
foreachSession(stop_session_if_timed_out,&ps);
error_string = "";
continue;
}
const char *alias;
const char *str;
alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)
nodetype, &str);
m_output->println(cmd);
m_output->println("result: %s", error_string.c_str());
/* only use error_code protocol if client knows about it */
if (log_event_version)
m_output->println("error_code: %d", error_code);
m_output->println("");
return;
}
@ -491,7 +532,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
m_output->println("nodeid: %u", tmp);
m_output->println("result: Ok");
m_output->println("");
m_allocated_resources->reserve_node(tmp);
m_allocated_resources->reserve_node(tmp, timeout*1000);
if (name)
g_eventLogger.info("Node %d: %s", tmp, name);
@ -1480,14 +1521,6 @@ done:
m_output->println("");
}
struct PurgeStruct
{
NodeBitmask free_nodes;/* free nodes as reported
* by ndbd in apiRegReqConf
*/
BaseString *str;
};
void
MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data)
{
@ -1495,7 +1528,20 @@ MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *da
struct PurgeStruct &ps= *(struct PurgeStruct *)data;
if (s->m_allocated_resources->is_reserved(ps.free_nodes))
{
ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid());
if (ps.str)
ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid());
s->stopSession();
}
}
void
MgmApiSession::stop_session_if_timed_out(SocketServer::Session *_s, void *data)
{
MgmApiSession *s= (MgmApiSession *)_s;
struct PurgeStruct &ps= *(struct PurgeStruct *)data;
if (s->m_allocated_resources->is_reserved(ps.free_nodes) &&
s->m_allocated_resources->is_timed_out(ps.tick))
{
s->stopSession();
}
}

View file

@ -30,6 +30,7 @@
class MgmApiSession : public SocketServer::Session
{
static void stop_session_if_timed_out(SocketServer::Session *_s, void *data);
static void stop_session_if_not_connected(SocketServer::Session *_s, void *data);
private:
typedef Parser<MgmApiSession> Parser_t;

View file

@ -5650,12 +5650,24 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
{
DBUG_ENTER("ndb_get_table_statistics");
DBUG_PRINT("enter", ("table: %s", table));
NdbTransaction* pTrans= ndb->startTransaction();
do
NdbTransaction* pTrans;
int retries= 10;
int retry_sleep= 30 * 1000; /* 30 milliseconds */
do
{
pTrans= ndb->startTransaction();
if (pTrans == NULL)
{
if (ndb->getNdbError().status == NdbError::TemporaryError &&
retries--)
{
my_sleep(retry_sleep);
continue;
}
break;
}
NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
if (pOp == NULL)
break;
@ -5678,8 +5690,18 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
NdbTransaction::AbortOnError,
TRUE);
if (check == -1)
{
if (pTrans->getNdbError().status == NdbError::TemporaryError &&
retries--)
{
ndb->closeTransaction(pTrans);
pTrans= 0;
my_sleep(retry_sleep);
continue;
}
break;
}
Uint32 count= 0;
Uint64 sum_rows= 0;
Uint64 sum_commits= 0;
@ -5713,7 +5735,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
sum_mem, count));
DBUG_RETURN(0);
} while (0);
} while(1);
if (pTrans)
ndb->closeTransaction(pTrans);