/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static void fatal(char const* fmt, ...) { va_list ap; char buf[200]; va_start(ap, fmt); vsnprintf(buf, sizeof(buf), fmt, ap); va_end(ap); ndbout << "FATAL: " << buf << endl; sleep(1); exit(1); } static void debug(char const* fmt, ...) { va_list ap; char buf[200]; va_start(ap, fmt); vsnprintf(buf, sizeof(buf), fmt, ap); va_end(ap); ndbout << buf << endl; } // node struct Node { enum Type { MGM = 1, DB = 2, API = 3 }; Type type; unsigned id; // node id static Node* list; static unsigned count; static Node* find(unsigned n) { for (unsigned i = 0; i < count; i++) { if (list[i].id == n) return &list[i]; } return 0; } }; unsigned Node::count = 0; Node* Node::list = 0; struct Copy { int rfd; // read from int wfd; // write to unsigned char* buf; unsigned bufsiz; NdbThread* thread; void run(); char info[20]; }; // connection between nodes 0-server side 1-client side // we are client to 0 and server to 1 struct Conn { Node* node[2]; // the nodes unsigned port; // server port unsigned proxy; // proxy port static unsigned count; static unsigned proxycount; static Conn* list; NdbThread* thread; // thread handling this connection void run(); // run the connection int sockfd[2]; // socket 0-on server side 1-client side void conn0(); // connect to side 0 void conn1(); // connect to side 0 char info[20]; Copy copy[2]; // 0-to-1 and 1-to-0 }; unsigned Conn::count = 0; unsigned Conn::proxycount = 0; Conn* Conn::list = 0; // global data static char* hostname = 0; static struct sockaddr_in hostaddr; static char* localcfgfile = 0; static char* initcfgfile = 0; static unsigned ownnodeid = 0; static void properr(const Properties* props, const char* name, int i = -1) { if (i < 0) { fatal("get %s failed: errno = %d", name, props->getPropertiesErrno()); } else { fatal("get %s_%d failed: errno = %d", name, i, props->getPropertiesErrno()); } } // read config and load it into our structs static void getcfg() { LocalConfig lcfg; if (! lcfg.read(localcfgfile)) { fatal("read %s failed", localcfgfile); } ownnodeid = lcfg._ownNodeId; debug("ownnodeid = %d", ownnodeid); InitConfigFileParser pars(initcfgfile); Config icfg; if (! pars.getConfig(icfg)) { fatal("parse %s failed", initcfgfile); } Properties* ccfg = icfg.getConfig(ownnodeid); if (ccfg == 0) { const char* err = "unknown error"; fatal("getConfig: %s", err); } ccfg->put("NodeId", ownnodeid); ccfg->put("NodeType", "MGM"); if (! ccfg->get("NoOfNodes", &Node::count)) { properr(ccfg, "NoOfNodes", -1); } debug("Node::count = %d", Node::count); Node::list = new Node[Node::count]; for (unsigned i = 0; i < Node::count; i++) { Node& node = Node::list[i]; const Properties* nodecfg; if (! ccfg->get("Node", 1+i, &nodecfg)) { properr(ccfg, "Node", 1+i); } const char* type; if (! nodecfg->get("Type", &type)) { properr(nodecfg, "Type"); } if (strcmp(type, "MGM") == 0) { node.type = Node::MGM; } else if (strcmp(type, "DB") == 0) { node.type = Node::DB; } else if (strcmp(type, "API") == 0) { node.type = Node::API; } else { fatal("prop %s_%d bad Type = %s", "Node", 1+i, type); } if (! nodecfg->get("NodeId", &node.id)) { properr(nodecfg, "NodeId"); } debug("node id=%d type=%d", node.id, node.type); } IPCConfig ipccfg(ccfg); if (ipccfg.init() != 0) { fatal("ipccfg init failed"); } if (! ccfg->get("NoOfConnections", &Conn::count)) { properr(ccfg, "NoOfConnections"); } debug("Conn::count = %d", Conn::count); Conn::list = new Conn[Conn::count]; for (unsigned i = 0; i < Conn::count; i++) { Conn& conn = Conn::list[i]; const Properties* conncfg; if (! ccfg->get("Connection", i, &conncfg)) { properr(ccfg, "Connection", i); } unsigned n; if (! conncfg->get("NodeId1", &n)) { properr(conncfg, "NodeId1"); } if ((conn.node[0] = Node::find(n)) == 0) { fatal("node %d not found", n); } if (! conncfg->get("NodeId2", &n)) { properr(conncfg, "NodeId2"); } if ((conn.node[1] = Node::find(n)) == 0) { fatal("node %d not found", n); } if (! conncfg->get("PortNumber", &conn.port)) { properr(conncfg, "PortNumber"); } conn.proxy = 0; const char* proxy; if (conncfg->get("Proxy", &proxy)) { conn.proxy = atoi(proxy); if (conn.proxy > 0) { Conn::proxycount++; } } sprintf(conn.info, "conn %d-%d", conn.node[0]->id, conn.node[1]->id); } } void Conn::conn0() { int fd; while (1) { if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) { fatal("%s: create client socket failed: %s", info, strerror(errno)); } struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); servaddr.sin_addr = hostaddr.sin_addr; #if 0 // coredump if (Ndb_getInAddr(&servaddr.sin_addr, hostname) != 0) { fatal("%s: hostname %s lookup failed", info, hostname); } #endif if (connect(fd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == 0) break; if (errno != ECONNREFUSED) { fatal("%s: connect failed: %s", info, strerror(errno)); } close(fd); NdbSleep_MilliSleep(100); } sockfd[0] = fd; debug("%s: side 0 connected", info); } void Conn::conn1() { int servfd; if ((servfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) { fatal("%s: create server socket failed: %s", info, strerror(errno)); } struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(proxy); const int on = 1; setsockopt(servfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on)); if (bind(servfd, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) { fatal("%s: bind %d failed: %s", info, proxy, strerror(errno)); } if (listen(servfd, 1) == -1) { fatal("%s: listen %d failed: %s", info, proxy, strerror(errno)); } int fd; if ((fd = accept(servfd, 0, 0)) == -1) { fatal("%s: accept failed: %s", info, strerror(errno)); } sockfd[1] = fd; close(servfd); debug("%s: side 1 connected", info); } void Copy::run() { debug("%s: start", info); int n, m; while (1) { n = read(rfd, buf, sizeof(buf)); if (n < 0) fatal("read error: %s", strerror(errno)); m = write(wfd, buf, n); if (m != n) fatal("write error: %s", strerror(errno)); } debug("%s: stop", info); } extern "C" void* copyrun_C(void* copy) { ((Copy*) copy)->run(); NdbThread_Exit(0); return 0; } void Conn::run() { debug("%s: start", info); conn1(); conn0(); const unsigned siz = 32 * 1024; for (int i = 0; i < 2; i++) { Copy& copy = this->copy[i]; copy.rfd = sockfd[i]; copy.wfd = sockfd[1-i]; copy.buf = new unsigned char[siz]; copy.bufsiz = siz; sprintf(copy.info, "copy %d-%d", this->node[i]->id, this->node[1-i]->id); copy.thread = NdbThread_Create(copyrun_C, (void**)©, 8192, "copyrun", NDB_THREAD_PRIO_LOW); if (copy.thread == 0) { fatal("%s: create thread %d failed errno=%d", i, errno); } } debug("%s: stop", info); } extern "C" void* connrun_C(void* conn) { ((Conn*) conn)->run(); NdbThread_Exit(0); return 0; } static void start() { NdbThread_SetConcurrencyLevel(3 * Conn::proxycount + 2); for (unsigned i = 0; i < Conn::count; i++) { Conn& conn = Conn::list[i]; if (! conn.proxy) continue; conn.thread = NdbThread_Create(connrun_C, (void**)&conn, 8192, "connrun", NDB_THREAD_PRIO_LOW); if (conn.thread == 0) { fatal("create thread %d failed errno=%d", i, errno); } } sleep(3600); } int main(int av, char** ac) { debug("start"); hostname = "ndb-srv7"; if (Ndb_getInAddr(&hostaddr.sin_addr, hostname) != 0) { fatal("hostname %s lookup failed", hostname); } localcfgfile = "Ndb.cfg"; initcfgfile = "config.txt"; getcfg(); start(); debug("done"); return 0; } // vim: set sw=4 noet: