mirror of
https://github.com/MariaDB/server.git
synced 2025-01-22 06:44:16 +01:00
Merge willster.(none):/home/stewart/Documents/MySQL/5.0/main
into willster.(none):/home/stewart/Documents/MySQL/5.0/bug13985
This commit is contained in:
commit
c709602124
6 changed files with 137 additions and 10 deletions
|
@ -173,8 +173,15 @@ private:
|
|||
bool rep_connected;
|
||||
#endif
|
||||
struct NdbThread* m_event_thread;
|
||||
NdbMutex *m_print_mutex;
|
||||
};
|
||||
|
||||
struct event_thread_param {
|
||||
NdbMgmHandle *m;
|
||||
NdbMutex **p;
|
||||
};
|
||||
|
||||
NdbMutex* print_mutex;
|
||||
|
||||
/*
|
||||
* Facade object for CommandInterpreter
|
||||
|
@ -395,6 +402,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose)
|
|||
m_connected= false;
|
||||
m_event_thread= 0;
|
||||
try_reconnect = 0;
|
||||
m_print_mutex= NdbMutex_Create();
|
||||
#ifdef HAVE_GLOBAL_REPLICATION
|
||||
rep_host = NULL;
|
||||
m_repserver = NULL;
|
||||
|
@ -408,6 +416,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose)
|
|||
CommandInterpreter::~CommandInterpreter()
|
||||
{
|
||||
disconnect();
|
||||
NdbMutex_Destroy(m_print_mutex);
|
||||
}
|
||||
|
||||
static bool
|
||||
|
@ -444,11 +453,13 @@ CommandInterpreter::printError()
|
|||
|
||||
static int do_event_thread;
|
||||
static void*
|
||||
event_thread_run(void* m)
|
||||
event_thread_run(void* p)
|
||||
{
|
||||
DBUG_ENTER("event_thread_run");
|
||||
|
||||
NdbMgmHandle handle= *(NdbMgmHandle*)m;
|
||||
struct event_thread_param param= *(struct event_thread_param*)p;
|
||||
NdbMgmHandle handle= *(param.m);
|
||||
NdbMutex* printmutex= *(param.p);
|
||||
|
||||
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP,
|
||||
1, NDB_MGM_EVENT_CATEGORY_STARTUP,
|
||||
|
@ -466,7 +477,11 @@ event_thread_run(void* m)
|
|||
{
|
||||
const char ping_token[]= "<PING>";
|
||||
if (memcmp(ping_token,tmp,sizeof(ping_token)-1))
|
||||
ndbout << tmp;
|
||||
if(tmp && strlen(tmp))
|
||||
{
|
||||
Guard g(printmutex);
|
||||
ndbout << tmp;
|
||||
}
|
||||
}
|
||||
} while(do_event_thread);
|
||||
NDB_CLOSE_SOCKET(fd);
|
||||
|
@ -519,8 +534,11 @@ CommandInterpreter::connect()
|
|||
assert(m_event_thread == 0);
|
||||
assert(do_event_thread == 0);
|
||||
do_event_thread= 0;
|
||||
struct event_thread_param p;
|
||||
p.m= &m_mgmsrv2;
|
||||
p.p= &m_print_mutex;
|
||||
m_event_thread = NdbThread_Create(event_thread_run,
|
||||
(void**)&m_mgmsrv2,
|
||||
(void**)&p,
|
||||
32768,
|
||||
"CommandInterpreted_event_thread",
|
||||
NDB_THREAD_PRIO_LOW);
|
||||
|
@ -607,6 +625,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect,
|
|||
int result= execute_impl(_line);
|
||||
if (error)
|
||||
*error= m_error;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -920,6 +939,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun,
|
|||
ndbout_c("Trying to start all nodes of system.");
|
||||
ndbout_c("Use ALL STATUS to see the system start-up phases.");
|
||||
} else {
|
||||
Guard g(m_print_mutex);
|
||||
struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv);
|
||||
if(cl == 0){
|
||||
ndbout_c("Unable get status from management server");
|
||||
|
|
|
@ -1455,6 +1455,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort)
|
|||
|
||||
#include <ClusterMgr.hpp>
|
||||
|
||||
void
|
||||
MgmtSrvr::updateStatus(NodeBitmask nodes)
|
||||
{
|
||||
theFacade->theClusterMgr->forceHB(nodes);
|
||||
}
|
||||
|
||||
int
|
||||
MgmtSrvr::status(int nodeId,
|
||||
ndb_mgm_node_status * _status,
|
||||
|
@ -1979,6 +1985,25 @@ MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const
|
|||
}
|
||||
}
|
||||
|
||||
void
|
||||
MgmtSrvr::get_connected_ndb_nodes(NodeBitmask &connected_nodes) const
|
||||
{
|
||||
NodeBitmask ndb_nodes;
|
||||
if (theFacade && theFacade->theClusterMgr)
|
||||
{
|
||||
for(Uint32 i = 0; i < MAX_NODES; i++)
|
||||
{
|
||||
if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB)
|
||||
{
|
||||
ndb_nodes.set(i);
|
||||
const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i);
|
||||
connected_nodes.bitOR(node.m_state.m_connected_nodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
connected_nodes.bitAND(ndb_nodes);
|
||||
}
|
||||
|
||||
bool
|
||||
MgmtSrvr::alloc_node_id(NodeId * nodeId,
|
||||
enum ndb_mgm_node_type type,
|
||||
|
|
|
@ -488,8 +488,11 @@ public:
|
|||
|
||||
const char *get_connect_address(Uint32 node_id);
|
||||
void get_connected_nodes(NodeBitmask &connected_nodes) const;
|
||||
void get_connected_ndb_nodes(NodeBitmask &connected_nodes) const;
|
||||
SocketServer *get_socket_server() { return m_socket_server; }
|
||||
|
||||
void updateStatus(NodeBitmask nodes);
|
||||
|
||||
//**************************************************************************
|
||||
private:
|
||||
//**************************************************************************
|
||||
|
|
|
@ -982,6 +982,9 @@ printNodeStatus(OutputStream *output,
|
|||
MgmtSrvr &mgmsrv,
|
||||
enum ndb_mgm_node_type type) {
|
||||
NodeId nodeId = 0;
|
||||
NodeBitmask hbnodes;
|
||||
mgmsrv.get_connected_ndb_nodes(hbnodes);
|
||||
mgmsrv.updateStatus(hbnodes);
|
||||
while(mgmsrv.getNextNodeId(&nodeId, type)) {
|
||||
enum ndb_mgm_node_status status;
|
||||
Uint32 startPhase = 0,
|
||||
|
|
|
@ -39,6 +39,8 @@
|
|||
|
||||
int global_flag_send_heartbeat_now= 0;
|
||||
|
||||
//#define DEBUG_REG
|
||||
|
||||
// Just a C wrapper for threadMain
|
||||
extern "C"
|
||||
void*
|
||||
|
@ -67,6 +69,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
|
|||
DBUG_ENTER("ClusterMgr::ClusterMgr");
|
||||
ndbSetOwnVersion();
|
||||
clusterMgrThreadMutex = NdbMutex_Create();
|
||||
waitForHBCond= NdbCondition_Create();
|
||||
waitingForHB= false;
|
||||
noOfAliveNodes= 0;
|
||||
noOfConnectedNodes= 0;
|
||||
theClusterMgrThread= 0;
|
||||
|
@ -77,7 +81,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
|
|||
ClusterMgr::~ClusterMgr()
|
||||
{
|
||||
DBUG_ENTER("ClusterMgr::~ClusterMgr");
|
||||
doStop();
|
||||
doStop();
|
||||
NdbCondition_Destroy(waitForHBCond);
|
||||
NdbMutex_Destroy(clusterMgrThreadMutex);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
@ -163,6 +168,56 @@ ClusterMgr::doStop( ){
|
|||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void
|
||||
ClusterMgr::forceHB(NodeBitmask waitFor)
|
||||
{
|
||||
theFacade.lock_mutex();
|
||||
|
||||
if(waitingForHB)
|
||||
{
|
||||
NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
|
||||
theFacade.unlock_mutex();
|
||||
return;
|
||||
}
|
||||
|
||||
global_flag_send_heartbeat_now= 1;
|
||||
waitingForHB= true;
|
||||
|
||||
waitForHBFromNodes= waitFor;
|
||||
#ifdef DEBUG_REG
|
||||
char buf[128];
|
||||
ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
|
||||
#endif
|
||||
NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
|
||||
|
||||
signal.theVerId_signalNumber = GSN_API_REGREQ;
|
||||
signal.theReceiversBlockNumber = QMGR;
|
||||
signal.theTrace = 0;
|
||||
signal.theLength = ApiRegReq::SignalLength;
|
||||
|
||||
ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
|
||||
req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
|
||||
req->version = NDB_VERSION;
|
||||
|
||||
int nodeId= 0;
|
||||
for(int i=0;
|
||||
NodeBitmask::NotFound!=(nodeId= waitForHBFromNodes.find(i));
|
||||
i= nodeId+1)
|
||||
{
|
||||
#ifdef DEBUG_REG
|
||||
ndbout << "FORCE HB to " << nodeId << endl;
|
||||
#endif
|
||||
theFacade.sendSignalUnCond(&signal, nodeId);
|
||||
}
|
||||
|
||||
NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
|
||||
waitingForHB= false;
|
||||
#ifdef DEBUG_REG
|
||||
ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
|
||||
#endif
|
||||
theFacade.unlock_mutex();
|
||||
}
|
||||
|
||||
void
|
||||
ClusterMgr::threadMain( ){
|
||||
NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
|
||||
|
@ -226,7 +281,7 @@ ClusterMgr::threadMain( ){
|
|||
if (theNode.m_info.m_type == NodeInfo::REP) {
|
||||
signal.theReceiversBlockNumber = API_CLUSTERMGR;
|
||||
}
|
||||
#if 0
|
||||
#ifdef DEBUG_REG
|
||||
ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
|
||||
#endif
|
||||
theFacade.sendSignalUnCond(&signal, nodeId);
|
||||
|
@ -278,7 +333,7 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
|
|||
const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
|
||||
const NodeId nodeId = refToNode(apiRegReq->ref);
|
||||
|
||||
#if 0
|
||||
#ifdef DEBUG_REG
|
||||
ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
|
||||
#endif
|
||||
|
||||
|
@ -319,7 +374,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
|
|||
const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
|
||||
const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
|
||||
|
||||
#if 0
|
||||
#ifdef DEBUG_REG
|
||||
ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
|
||||
#endif
|
||||
|
||||
|
@ -351,6 +406,17 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
|
|||
if (node.m_info.m_type != NodeInfo::REP) {
|
||||
node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
|
||||
}
|
||||
|
||||
if(waitingForHB)
|
||||
{
|
||||
waitForHBFromNodes.clear(nodeId);
|
||||
|
||||
if(waitForHBFromNodes.isclear())
|
||||
{
|
||||
waitingForHB= false;
|
||||
NdbCondition_Broadcast(waitForHBCond);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -379,6 +445,10 @@ ClusterMgr::execAPI_REGREF(const Uint32 * theData){
|
|||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
waitForHBFromNodes.clear(nodeId);
|
||||
if(waitForHBFromNodes.isclear())
|
||||
NdbCondition_Signal(waitForHBCond);
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -49,7 +49,9 @@ public:
|
|||
|
||||
void doStop();
|
||||
void startThread();
|
||||
|
||||
|
||||
void forceHB(NodeBitmask waitFor);
|
||||
|
||||
private:
|
||||
void threadMain();
|
||||
|
||||
|
@ -85,7 +87,11 @@ private:
|
|||
Uint32 noOfConnectedNodes;
|
||||
Node theNodes[MAX_NODES];
|
||||
NdbThread* theClusterMgrThread;
|
||||
|
||||
|
||||
NodeBitmask waitForHBFromNodes; // used in forcing HBs
|
||||
NdbCondition* waitForHBCond;
|
||||
bool waitingForHB;
|
||||
|
||||
/**
|
||||
* Used for controlling start/stop of the thread
|
||||
*/
|
||||
|
|
Loading…
Add table
Reference in a new issue