mirror of
https://github.com/MariaDB/server.git
synced 2025-01-22 14:54:20 +01:00
361 lines
9 KiB
C++
361 lines
9 KiB
C++
/* Copyright (C) 2003 MySQL AB
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; either version 2 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
|
|
|
#include <ndb_global.h>
|
|
|
|
#include <NdbTCP.h>
|
|
#include <NdbOut.hpp>
|
|
#include <NdbThread.h>
|
|
#include <NdbSleep.h>
|
|
#include <Properties.hpp>
|
|
#include <LocalConfig.hpp>
|
|
#include <Config.hpp>
|
|
#include <InitConfigFileParser.hpp>
|
|
#include <IPCConfig.hpp>
|
|
|
|
static void
|
|
fatal(char const* fmt, ...)
|
|
{
|
|
va_list ap;
|
|
char buf[200];
|
|
va_start(ap, fmt);
|
|
BaseString::vsnprintf(buf, sizeof(buf), fmt, ap);
|
|
va_end(ap);
|
|
ndbout << "FATAL: " << buf << endl;
|
|
sleep(1);
|
|
exit(1);
|
|
}
|
|
|
|
static void
|
|
debug(char const* fmt, ...)
|
|
{
|
|
va_list ap;
|
|
char buf[200];
|
|
va_start(ap, fmt);
|
|
BaseString::vsnprintf(buf, sizeof(buf), fmt, ap);
|
|
va_end(ap);
|
|
ndbout << buf << endl;
|
|
}
|
|
|
|
// node
|
|
struct Node {
|
|
enum Type { MGM = 1, DB = 2, API = 3 };
|
|
Type type;
|
|
unsigned id; // node id
|
|
static Node* list;
|
|
static unsigned count;
|
|
static Node* find(unsigned n) {
|
|
for (unsigned i = 0; i < count; i++) {
|
|
if (list[i].id == n)
|
|
return &list[i];
|
|
}
|
|
return 0;
|
|
}
|
|
};
|
|
|
|
unsigned Node::count = 0;
|
|
Node* Node::list = 0;
|
|
|
|
struct Copy {
|
|
int rfd; // read from
|
|
int wfd; // write to
|
|
unsigned char* buf;
|
|
unsigned bufsiz;
|
|
NdbThread* thread;
|
|
void run();
|
|
char info[20];
|
|
};
|
|
|
|
// connection between nodes 0-server side 1-client side
|
|
// we are client to 0 and server to 1
|
|
struct Conn {
|
|
Node* node[2]; // the nodes
|
|
unsigned port; // server port
|
|
unsigned proxy; // proxy port
|
|
static unsigned count;
|
|
static unsigned proxycount;
|
|
static Conn* list;
|
|
NdbThread* thread; // thread handling this connection
|
|
void run(); // run the connection
|
|
int sockfd[2]; // socket 0-on server side 1-client side
|
|
void conn0(); // connect to side 0
|
|
void conn1(); // connect to side 0
|
|
char info[20];
|
|
Copy copy[2]; // 0-to-1 and 1-to-0
|
|
};
|
|
|
|
unsigned Conn::count = 0;
|
|
unsigned Conn::proxycount = 0;
|
|
Conn* Conn::list = 0;
|
|
|
|
// global data
|
|
static char* hostname = 0;
|
|
static struct sockaddr_in hostaddr;
|
|
static char* localcfgfile = 0;
|
|
static char* initcfgfile = 0;
|
|
static unsigned ownnodeid = 0;
|
|
|
|
static void
|
|
properr(const Properties* props, const char* name, int i = -1)
|
|
{
|
|
if (i < 0) {
|
|
fatal("get %s failed: errno = %d", name, props->getPropertiesErrno());
|
|
} else {
|
|
fatal("get %s_%d failed: errno = %d", name, i, props->getPropertiesErrno());
|
|
}
|
|
}
|
|
|
|
// read config and load it into our structs
|
|
static void
|
|
getcfg()
|
|
{
|
|
LocalConfig lcfg;
|
|
if (! lcfg.read(localcfgfile)) {
|
|
fatal("read %s failed", localcfgfile);
|
|
}
|
|
ownnodeid = lcfg._ownNodeId;
|
|
debug("ownnodeid = %d", ownnodeid);
|
|
InitConfigFileParser pars(initcfgfile);
|
|
Config icfg;
|
|
if (! pars.getConfig(icfg)) {
|
|
fatal("parse %s failed", initcfgfile);
|
|
}
|
|
Properties* ccfg = icfg.getConfig(ownnodeid);
|
|
if (ccfg == 0) {
|
|
const char* err = "unknown error";
|
|
fatal("getConfig: %s", err);
|
|
}
|
|
ccfg->put("NodeId", ownnodeid);
|
|
ccfg->put("NodeType", "MGM");
|
|
if (! ccfg->get("NoOfNodes", &Node::count)) {
|
|
properr(ccfg, "NoOfNodes", -1);
|
|
}
|
|
debug("Node::count = %d", Node::count);
|
|
Node::list = new Node[Node::count];
|
|
for (unsigned i = 0; i < Node::count; i++) {
|
|
Node& node = Node::list[i];
|
|
const Properties* nodecfg;
|
|
if (! ccfg->get("Node", 1+i, &nodecfg)) {
|
|
properr(ccfg, "Node", 1+i);
|
|
}
|
|
const char* type;
|
|
if (! nodecfg->get("Type", &type)) {
|
|
properr(nodecfg, "Type");
|
|
}
|
|
if (strcmp(type, "MGM") == 0) {
|
|
node.type = Node::MGM;
|
|
} else if (strcmp(type, "DB") == 0) {
|
|
node.type = Node::DB;
|
|
} else if (strcmp(type, "API") == 0) {
|
|
node.type = Node::API;
|
|
} else {
|
|
fatal("prop %s_%d bad Type = %s", "Node", 1+i, type);
|
|
}
|
|
if (! nodecfg->get("NodeId", &node.id)) {
|
|
properr(nodecfg, "NodeId");
|
|
}
|
|
debug("node id=%d type=%d", node.id, node.type);
|
|
}
|
|
IPCConfig ipccfg(ccfg);
|
|
if (ipccfg.init() != 0) {
|
|
fatal("ipccfg init failed");
|
|
}
|
|
if (! ccfg->get("NoOfConnections", &Conn::count)) {
|
|
properr(ccfg, "NoOfConnections");
|
|
}
|
|
debug("Conn::count = %d", Conn::count);
|
|
Conn::list = new Conn[Conn::count];
|
|
for (unsigned i = 0; i < Conn::count; i++) {
|
|
Conn& conn = Conn::list[i];
|
|
const Properties* conncfg;
|
|
if (! ccfg->get("Connection", i, &conncfg)) {
|
|
properr(ccfg, "Connection", i);
|
|
}
|
|
unsigned n;
|
|
if (! conncfg->get("NodeId1", &n)) {
|
|
properr(conncfg, "NodeId1");
|
|
}
|
|
if ((conn.node[0] = Node::find(n)) == 0) {
|
|
fatal("node %d not found", n);
|
|
}
|
|
if (! conncfg->get("NodeId2", &n)) {
|
|
properr(conncfg, "NodeId2");
|
|
}
|
|
if ((conn.node[1] = Node::find(n)) == 0) {
|
|
fatal("node %d not found", n);
|
|
}
|
|
if (! conncfg->get("PortNumber", &conn.port)) {
|
|
properr(conncfg, "PortNumber");
|
|
}
|
|
conn.proxy = 0;
|
|
const char* proxy;
|
|
if (conncfg->get("Proxy", &proxy)) {
|
|
conn.proxy = atoi(proxy);
|
|
if (conn.proxy > 0) {
|
|
Conn::proxycount++;
|
|
}
|
|
}
|
|
sprintf(conn.info, "conn %d-%d", conn.node[0]->id, conn.node[1]->id);
|
|
}
|
|
}
|
|
|
|
void
|
|
Conn::conn0()
|
|
{
|
|
int fd;
|
|
while (1) {
|
|
if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
|
|
fatal("%s: create client socket failed: %s", info, strerror(errno));
|
|
}
|
|
struct sockaddr_in servaddr;
|
|
memset(&servaddr, 0, sizeof(servaddr));
|
|
servaddr.sin_family = AF_INET;
|
|
servaddr.sin_port = htons(port);
|
|
servaddr.sin_addr = hostaddr.sin_addr;
|
|
#if 0 // coredump
|
|
if (Ndb_getInAddr(&servaddr.sin_addr, hostname) != 0) {
|
|
fatal("%s: hostname %s lookup failed", info, hostname);
|
|
}
|
|
#endif
|
|
if (connect(fd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == 0)
|
|
break;
|
|
if (errno != ECONNREFUSED) {
|
|
fatal("%s: connect failed: %s", info, strerror(errno));
|
|
}
|
|
close(fd);
|
|
NdbSleep_MilliSleep(100);
|
|
}
|
|
sockfd[0] = fd;
|
|
debug("%s: side 0 connected", info);
|
|
}
|
|
|
|
void
|
|
Conn::conn1()
|
|
{
|
|
int servfd;
|
|
if ((servfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
|
|
fatal("%s: create server socket failed: %s", info, strerror(errno));
|
|
}
|
|
struct sockaddr_in servaddr;
|
|
memset(&servaddr, 0, sizeof(servaddr));
|
|
servaddr.sin_family = AF_INET;
|
|
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
servaddr.sin_port = htons(proxy);
|
|
const int on = 1;
|
|
setsockopt(servfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on));
|
|
if (bind(servfd, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
|
|
fatal("%s: bind %d failed: %s", info, proxy, strerror(errno));
|
|
}
|
|
if (listen(servfd, 1) == -1) {
|
|
fatal("%s: listen %d failed: %s", info, proxy, strerror(errno));
|
|
}
|
|
int fd;
|
|
if ((fd = accept(servfd, 0, 0)) == -1) {
|
|
fatal("%s: accept failed: %s", info, strerror(errno));
|
|
}
|
|
sockfd[1] = fd;
|
|
close(servfd);
|
|
debug("%s: side 1 connected", info);
|
|
}
|
|
|
|
void
|
|
Copy::run()
|
|
{
|
|
debug("%s: start", info);
|
|
int n, m;
|
|
while (1) {
|
|
n = read(rfd, buf, sizeof(buf));
|
|
if (n < 0)
|
|
fatal("read error: %s", strerror(errno));
|
|
m = write(wfd, buf, n);
|
|
if (m != n)
|
|
fatal("write error: %s", strerror(errno));
|
|
}
|
|
debug("%s: stop", info);
|
|
}
|
|
|
|
extern "C" void*
|
|
copyrun_C(void* copy)
|
|
{
|
|
((Copy*) copy)->run();
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
Conn::run()
|
|
{
|
|
debug("%s: start", info);
|
|
conn1();
|
|
conn0();
|
|
const unsigned siz = 32 * 1024;
|
|
for (int i = 0; i < 2; i++) {
|
|
Copy& copy = this->copy[i];
|
|
copy.rfd = sockfd[i];
|
|
copy.wfd = sockfd[1-i];
|
|
copy.buf = new unsigned char[siz];
|
|
copy.bufsiz = siz;
|
|
sprintf(copy.info, "copy %d-%d", this->node[i]->id, this->node[1-i]->id);
|
|
copy.thread = NdbThread_Create(copyrun_C, (void**)©,
|
|
8192, "copyrun", NDB_THREAD_PRIO_LOW);
|
|
if (copy.thread == 0) {
|
|
fatal("%s: create thread %d failed errno=%d", i, errno);
|
|
}
|
|
}
|
|
debug("%s: stop", info);
|
|
}
|
|
|
|
extern "C" void*
|
|
connrun_C(void* conn)
|
|
{
|
|
((Conn*) conn)->run();
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
start()
|
|
{
|
|
NdbThread_SetConcurrencyLevel(3 * Conn::proxycount + 2);
|
|
for (unsigned i = 0; i < Conn::count; i++) {
|
|
Conn& conn = Conn::list[i];
|
|
if (! conn.proxy)
|
|
continue;
|
|
conn.thread = NdbThread_Create(connrun_C, (void**)&conn,
|
|
8192, "connrun", NDB_THREAD_PRIO_LOW);
|
|
if (conn.thread == 0) {
|
|
fatal("create thread %d failed errno=%d", i, errno);
|
|
}
|
|
}
|
|
sleep(3600);
|
|
}
|
|
|
|
int
|
|
main(int av, char** ac)
|
|
{
|
|
ndb_init();
|
|
debug("start");
|
|
hostname = "ndb-srv7";
|
|
if (Ndb_getInAddr(&hostaddr.sin_addr, hostname) != 0) {
|
|
fatal("hostname %s lookup failed", hostname);
|
|
}
|
|
localcfgfile = "Ndb.cfg";
|
|
initcfgfile = "config.txt";
|
|
getcfg();
|
|
start();
|
|
debug("done");
|
|
return 0;
|
|
}
|
|
|
|
// vim: set sw=4 noet:
|