From 2a04d49e7762747c5d7207bb92f057ef2b7b68b2 Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 26 Apr 2006 16:57:41 +0200 Subject: [PATCH] ndb: added timeout handling to alloc node id to avoid the usage of purge stale sessions --- ndb/src/mgmsrv/MgmtSrvr.cpp | 17 ++++++++++- ndb/src/mgmsrv/MgmtSrvr.hpp | 4 ++- ndb/src/mgmsrv/Services.cpp | 57 +++++++++++++++++++++++++++++-------- ndb/src/mgmsrv/Services.hpp | 1 + 4 files changed, 65 insertions(+), 14 deletions(-) diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index 9b518ba938b..c7d0c11eec4 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -2378,6 +2378,8 @@ MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted) MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m) : m_mgmsrv(m) { + m_reserved_nodes.clear(); + m_alloc_timeout= 0; } MgmtSrvr::Allocated_resources::~Allocated_resources() @@ -2396,9 +2398,22 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() } void -MgmtSrvr::Allocated_resources::reserve_node(NodeId id) +MgmtSrvr::Allocated_resources::reserve_node(NodeId id, NDB_TICKS timeout) { m_reserved_nodes.set(id); + m_alloc_timeout= NdbTick_CurrentMillisecond() + timeout; +} + +bool +MgmtSrvr::Allocated_resources::is_timed_out(NDB_TICKS tick) +{ + if (m_alloc_timeout && tick > m_alloc_timeout) + { + g_eventLogger.info("Mgmt server state: nodeid %d timed out.", + get_nodeid()); + return true; + } + return false; } NodeId diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp index fe1603a1953..e2eb33d1198 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -106,7 +106,8 @@ public: ~Allocated_resources(); // methods to reserve/allocate resources which // will be freed when running destructor - void reserve_node(NodeId id); + void reserve_node(NodeId id, NDB_TICKS timeout); + bool is_timed_out(NDB_TICKS tick); bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); } bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); } bool isclear() { return m_reserved_nodes.isclear(); } @@ -114,6 +115,7 @@ public: private: MgmtSrvr &m_mgmsrv; NodeBitmask m_reserved_nodes; + NDB_TICKS m_alloc_timeout; }; NdbMutex *m_node_id_mutex; diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp index a80827abd8f..ee97235912a 100644 --- a/ndb/src/mgmsrv/Services.cpp +++ b/ndb/src/mgmsrv/Services.cpp @@ -137,6 +137,7 @@ ParserRow commands[] = { MGM_ARG("public key", String, Mandatory, "Public key"), MGM_ARG("endian", String, Optional, "Endianness"), MGM_ARG("name", String, Optional, "Name of connection"), + MGM_ARG("timeout", Int, Optional, "Timeout in seconds"), MGM_CMD("get version", &MgmApiSession::getVersion, ""), @@ -259,6 +260,15 @@ ParserRow commands[] = { MGM_END() }; +struct PurgeStruct +{ + NodeBitmask free_nodes;/* free nodes as reported + * by ndbd in apiRegReqConf + */ + BaseString *str; + NDB_TICKS tick; +}; + MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) : SocketServer::Session(sock), m_mgmsrv(mgm) { @@ -408,6 +418,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, { const char *cmd= "get nodeid reply"; Uint32 version, nodeid= 0, nodetype= 0xff; + Uint32 timeout= 20; // default seconds timeout const char * transporter; const char * user; const char * password; @@ -425,6 +436,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, args.get("public key", &public_key); args.get("endian", &endian); args.get("name", &name); + args.get("timeout", &timeout); endian_check.l = 1; if(endian @@ -464,8 +476,24 @@ MgmApiSession::get_nodeid(Parser_t::Context &, NodeId tmp= nodeid; if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){ BaseString error_string; - if (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype, - &addr, &addrlen, error_string)){ + NDB_TICKS tick= 0; + while (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype, + &addr, &addrlen, error_string)) + { + if (tick == 0) + { + // attempt to free any timed out reservations + tick= NdbTick_CurrentMillisecond(); + struct PurgeStruct ps; + m_mgmsrv.get_connected_nodes(ps.free_nodes); + // invert connected_nodes to get free nodes + ps.free_nodes.bitXORC(NodeBitmask()); + ps.str= 0; + ps.tick= tick; + m_mgmsrv.get_socket_server()-> + foreachSession(stop_session_if_timed_out,&ps); + continue; + } const char *alias; const char *str; alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type) @@ -491,7 +519,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, m_output->println("nodeid: %u", tmp); m_output->println("result: Ok"); m_output->println(""); - m_allocated_resources->reserve_node(tmp); + m_allocated_resources->reserve_node(tmp, timeout*1000); if (name) g_eventLogger.info("Node %d: %s", tmp, name); @@ -1480,14 +1508,6 @@ done: m_output->println(""); } -struct PurgeStruct -{ - NodeBitmask free_nodes;/* free nodes as reported - * by ndbd in apiRegReqConf - */ - BaseString *str; -}; - void MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data) { @@ -1495,7 +1515,20 @@ MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *da struct PurgeStruct &ps= *(struct PurgeStruct *)data; if (s->m_allocated_resources->is_reserved(ps.free_nodes)) { - ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid()); + if (ps.str) + ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid()); + s->stopSession(); + } +} + +void +MgmApiSession::stop_session_if_timed_out(SocketServer::Session *_s, void *data) +{ + MgmApiSession *s= (MgmApiSession *)_s; + struct PurgeStruct &ps= *(struct PurgeStruct *)data; + if (s->m_allocated_resources->is_reserved(ps.free_nodes) && + s->m_allocated_resources->is_timed_out(ps.tick)) + { s->stopSession(); } } diff --git a/ndb/src/mgmsrv/Services.hpp b/ndb/src/mgmsrv/Services.hpp index f97223750a1..975202b96df 100644 --- a/ndb/src/mgmsrv/Services.hpp +++ b/ndb/src/mgmsrv/Services.hpp @@ -30,6 +30,7 @@ class MgmApiSession : public SocketServer::Session { + static void stop_session_if_timed_out(SocketServer::Session *_s, void *data); static void stop_session_if_not_connected(SocketServer::Session *_s, void *data); private: typedef Parser Parser_t;