mariadb/ndb/test/run-test/main.cpp

942 lines
23 KiB
C++

/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <getarg.h>
#include <BaseString.hpp>
#include <Parser.hpp>
#include <NdbOut.hpp>
#include <Properties.hpp>
#include <NdbAutoPtr.hpp>
#include "run-test.hpp"
#include <SysLogHandler.hpp>
#include <FileLogHandler.hpp>
#include <mgmapi.h>
#include "CpcClient.hpp"
/**
psuedo code for run-test.bin
define autotest_wrapper process at each host
start ndb-processes
for each testcase
do
start mysqld processes
start replication processes
start test programs
wait until test program finished or max time passed
stop test program
stop replication processes
stop mysqld processes
write report data-file
if test failed and ! last test
restart ndb processes
drop all tables created by test
done
stop ndb processes
undefined wrapper processes
*/
/** Global variables */
static const char progname[] = "ndb_atrt";
static const char * g_gather_progname = "atrt-gather-result.sh";
static const char * g_analyze_progname = "atrt-analyze-result.sh";
static const char * g_clear_progname = "atrt-clear-result.sh";
static const char * g_setup_progname = "atrt-setup.sh";
static const char * g_setup_path = 0;
static const char * g_process_config_filename = "d.txt";
static const char * g_log_filename = 0;
static const char * g_test_case_filename = 0;
static const char * g_report_filename = 0;
static const char * g_default_user = 0;
static const char * g_default_base_dir = 0;
static int g_default_base_port = 0;
static int g_report = 0;
static int g_verbosity = 0;
static FILE * g_report_file = 0;
static FILE * g_test_case_file = stdin;
Logger g_logger;
atrt_config g_config;
static int g_mode_bench = 0;
static int g_mode_regression = 0;
static int g_mode_interactive = 0;
static int g_mode = 0;
static
struct getargs args[] = {
{ "process-config", 0, arg_string, &g_process_config_filename, 0, 0 },
{ "setup-path", 0, arg_string, &g_setup_path, 0, 0 },
{ 0, 'v', arg_counter, &g_verbosity, 0, 0 },
{ "log-file", 0, arg_string, &g_log_filename, 0, 0 },
{ "testcase-file", 'f', arg_string, &g_test_case_filename, 0, 0 },
{ 0, 'R', arg_flag, &g_report, 0, 0 },
{ "report-file", 0, arg_string, &g_report_filename, 0, 0 },
{ "interactive", 'i', arg_flag, &g_mode_interactive, 0, 0 },
{ "regression", 'r', arg_flag, &g_mode_regression, 0, 0 },
{ "bench", 'b', arg_flag, &g_mode_bench, 0, 0 },
};
const int arg_count = 10;
int
main(int argc, const char ** argv){
bool restart = true;
int lineno = 1;
int test_no = 1;
const int p_ndb = atrt_process::NDB_MGM | atrt_process::NDB_DB;
const int p_servers = atrt_process::MYSQL_SERVER | atrt_process::NDB_REP;
const int p_clients = atrt_process::MYSQL_CLIENT | atrt_process::NDB_API;
g_logger.setCategory(progname);
g_logger.enable(Logger::LL_ALL);
g_logger.createConsoleHandler();
if(!parse_args(argc, argv))
goto end;
g_logger.info("Starting...");
if(!setup_config(g_config))
goto end;
g_logger.info("Connecting to hosts");
if(!connect_hosts(g_config))
goto end;
if(!setup_hosts(g_config))
goto end;
if(!start_processes(g_config, atrt_process::NDB_MGM))
goto end;
if(!connect_ndb_mgm(g_config)){
goto end;
}
/**
* Main loop
*/
while(!feof(g_test_case_file)){
/**
* Do we need to restart ndb
*/
if(restart){
g_logger.info("(Re)starting ndb processes");
if(!stop_processes(g_config, atrt_process::NDB_DB))
goto end;
if(!wait_ndb(g_config, NDB_MGM_NODE_STATUS_NO_CONTACT))
goto end;
if(!start_processes(g_config, atrt_process::NDB_DB))
goto end;
if(!wait_ndb(g_config, NDB_MGM_NODE_STATUS_STARTED))
goto end;
g_logger.info("Ndb start completed");
}
const int start_line = lineno;
atrt_testcase test_case;
if(!read_test_case(g_test_case_file, test_case, lineno))
goto end;
g_logger.info("#%d - %s %s",
test_no,
test_case.m_command.c_str(), test_case.m_args.c_str());
// Assign processes to programs
if(!setup_test_case(g_config, test_case))
goto end;
if(!start_processes(g_config, p_servers))
goto end;
if(!start_processes(g_config, p_clients))
goto end;
int result = 0;
const time_t start = time(0);
time_t now = start;
do {
if(!update_status(g_config, atrt_process::ALL))
goto end;
int count = 0;
if((count = is_running(g_config, p_ndb)) != 2){
result = ERR_NDB_FAILED;
break;
}
if((count = is_running(g_config, p_servers)) != 2){
result = ERR_SERVERS_FAILED;
break;
}
if((count = is_running(g_config, p_clients)) == 0){
break;
}
now = time(0);
if(now > (start + test_case.m_max_time)){
result = ERR_MAX_TIME_ELAPSED;
break;
}
sleep(1);
} while(true);
const time_t elapsed = time(0) - start;
if(!stop_processes(g_config, p_clients))
goto end;
if(!stop_processes(g_config, p_servers))
goto end;
if(!gather_result(g_config, &result))
goto end;
g_logger.info("#%d %s(%d)",
test_no,
(result == 0 ? "OK" : "FAILED"), result);
if(g_report_file != 0){
fprintf(g_report_file, "%s %s ; %d ; %d ; %d\n",
test_case.m_command.c_str(),
test_case.m_args.c_str(),
test_no, result, elapsed);
fflush(g_report_file);
}
if(g_mode_bench || (g_mode_regression && result)){
BaseString tmp;
tmp.assfmt("result.%d", test_no);
if(rename("result", tmp.c_str()) != 0){
g_logger.critical("Failed to rename %s as %s",
"result", tmp.c_str());
goto end;
}
}
if(g_mode_interactive && result){
g_logger.info
("Encountered failed test in interactive mode - terminating");
break;
}
if(result != 0){
restart = true;
} else {
restart = false;
}
test_no++;
}
end:
if(g_report_file != 0){
fclose(g_report_file);
g_report_file = 0;
}
if(g_test_case_file != 0 && g_test_case_file != stdin){
fclose(g_test_case_file);
g_test_case_file = 0;
}
stop_processes(g_config, atrt_process::ALL);
return 0;
}
bool
parse_args(int argc, const char** argv){
int optind = 0;
if(getarg(args, arg_count, argc, argv, &optind)) {
arg_printusage(args, arg_count, progname, "");
return false;
}
if(g_log_filename != 0){
g_logger.removeConsoleHandler();
g_logger.addHandler(new FileLogHandler(g_log_filename));
}
{
int tmp = Logger::LL_WARNING - g_verbosity;
tmp = (tmp < Logger::LL_DEBUG ? Logger::LL_DEBUG : tmp);
g_logger.disable(Logger::LL_ALL);
g_logger.enable((Logger::LoggerLevel)tmp, Logger::LL_ALERT);
}
if(!g_process_config_filename){
g_logger.critical("Process config not specified!");
return false;
}
if(!g_setup_path){
char buf[1024];
if(getcwd(buf, sizeof(buf))){
g_setup_path = strdup(buf);
g_logger.info("Setup path not specified, using %s", buf);
} else {
g_logger.critical("Setup path not specified!\n");
return false;
}
}
if(g_report & !g_report_filename){
g_report_filename = "report.txt";
}
if(g_report_filename){
g_report_file = fopen(g_report_filename, "w");
if(g_report_file == 0){
g_logger.critical("Unable to create report file: %s", g_report_filename);
return false;
}
}
if(g_test_case_filename){
g_test_case_file = fopen(g_test_case_filename, "r");
if(g_test_case_file == 0){
g_logger.critical("Unable to open file: %s", g_test_case_filename);
return false;
}
}
int sum = g_mode_interactive + g_mode_regression + g_mode_bench;
if(sum == 0){
g_mode_interactive = 1;
}
if(sum > 1){
g_logger.critical
("Only one of bench/regression/interactive can be specified");
return false;
}
g_default_user = strdup(getenv("USER"));
return true;
}
static
atrt_host *
find(const BaseString& host, Vector<atrt_host> & hosts){
for(size_t i = 0; i<hosts.size(); i++){
if(hosts[i].m_hostname == host){
return &hosts[i];
}
}
return 0;
}
bool
setup_config(atrt_config& config){
FILE * f = fopen(g_process_config_filename, "r");
if(!f){
g_logger.critical("Failed to open process config file: %s",
g_process_config_filename);
return false;
}
bool result = true;
int lineno = 0;
char buf[2048];
while(fgets(buf, 2048, f)){
lineno++;
BaseString tmp(buf);
tmp.trim(" \t\n\r");
if(tmp.length() == 0 || tmp == "" || tmp.c_str()[0] == '#')
continue;
Vector<BaseString> split1;
if(tmp.split(split1, ":", 2) != 2){
g_logger.warning("Invalid line %d in %s - ignoring",
lineno, g_process_config_filename);
continue;
}
if(split1[0].trim() == "basedir"){
g_default_base_dir = strdup(split1[1].trim().c_str());
continue;
}
if(split1[0].trim() == "baseport"){
g_default_base_port = atoi(split1[1].trim().c_str());
continue;
}
if(split1[0].trim() == "user"){
g_default_user = strdup(split1[1].trim().c_str());
continue;
}
Vector<BaseString> hosts;
if(split1[1].trim().split(hosts) <= 0){
g_logger.warning("Invalid line %d in %s - ignoring",
lineno, g_process_config_filename);
}
// 1 - Check hosts
for(size_t i = 0; i<hosts.size(); i++){
Vector<BaseString> tmp;
hosts[i].split(tmp, ":");
BaseString hostname = tmp[0].trim();
BaseString base_dir;
if(tmp.size() >= 2)
base_dir = tmp[1];
else if(g_default_base_dir == 0){
g_logger.critical("Basedir not specified...");
return false;
}
atrt_host * host_ptr;
if((host_ptr = find(hostname, config.m_hosts)) == 0){
atrt_host host;
host.m_index = config.m_hosts.size();
host.m_cpcd = new SimpleCpcClient(hostname.c_str(), 1234);
host.m_base_dir = (base_dir.empty() ? g_default_base_dir : base_dir);
host.m_user = g_default_user;
host.m_hostname = hostname.c_str();
config.m_hosts.push_back(host);
} else {
if(!base_dir.empty() && (base_dir == host_ptr->m_base_dir)){
g_logger.critical("Inconsistent base dir definition for host %s"
", \"%s\" != \"%s\"", hostname.c_str(),
base_dir.c_str(), host_ptr->m_base_dir.c_str());
return false;
}
}
}
for(size_t i = 0; i<hosts.size(); i++){
BaseString & tmp = hosts[i];
atrt_host * host = find(tmp, config.m_hosts);
const int index = config.m_processes.size() + 1;
atrt_process proc;
proc.m_index = index;
proc.m_host = host;
proc.m_proc.m_id = -1;
proc.m_proc.m_type = "temporary";
proc.m_proc.m_owner = "atrt";
proc.m_proc.m_group = "group";
proc.m_proc.m_cwd.assign(host->m_base_dir).append("/run/");
proc.m_proc.m_env.assign("LD_LIBRARY_PATH=").append(host->m_base_dir).append("/lib");
proc.m_proc.m_stdout = "log.out";
proc.m_proc.m_stderr = "2>&1";
proc.m_proc.m_runas = proc.m_host->m_user;
proc.m_proc.m_ulimit = "c:unlimited";
proc.m_hostname = proc.m_host->m_hostname;
proc.m_ndb_mgm_port = g_default_base_port;
if(split1[0] == "mgm"){
proc.m_type = atrt_process::NDB_MGM;
proc.m_proc.m_name.assfmt("%d-%s", index, "ndb_mgm");
proc.m_proc.m_path.assign(host->m_base_dir).append("/bin/mgmtsrvr");
proc.m_proc.m_args = "-n -c initconfig.txt";
proc.m_proc.m_cwd.appfmt("%d.ndb_mgm", index);
} else if(split1[0] == "ndb"){
proc.m_type = atrt_process::NDB_DB;
proc.m_proc.m_name.assfmt("%d-%s", index, "ndb_db");
proc.m_proc.m_path.assign(host->m_base_dir).append("/bin/ndb");
proc.m_proc.m_args = "-i -n";
proc.m_proc.m_cwd.appfmt("%d.ndb_db", index);
} else if(split1[0] == "api"){
proc.m_type = atrt_process::NDB_API;
proc.m_proc.m_name.assfmt("%d-%s", index, "ndb_api");
proc.m_proc.m_path = "";
proc.m_proc.m_args = "";
proc.m_proc.m_cwd.appfmt("%d.ndb_api", index);
} else {
g_logger.critical("%s:%d: Unhandled process type: %s",
g_process_config_filename, lineno,
split1[0].c_str());
result = false;
goto end;
}
config.m_processes.push_back(proc);
}
}
end:
fclose(f);
return result;
}
bool
connect_hosts(atrt_config& config){
for(size_t i = 0; i<config.m_hosts.size(); i++){
if(config.m_hosts[i].m_cpcd->connect() != 0){
g_logger.error("Unable to connect to cpc %s:%d",
config.m_hosts[i].m_cpcd->getHost(),
config.m_hosts[i].m_cpcd->getPort());
return false;
}
g_logger.debug("Connected to %s:%d",
config.m_hosts[i].m_cpcd->getHost(),
config.m_hosts[i].m_cpcd->getPort());
}
return true;
}
bool
connect_ndb_mgm(atrt_process & proc){
NdbMgmHandle handle = ndb_mgm_create_handle();
if(handle == 0){
g_logger.critical("Unable to create mgm handle");
return false;
}
BaseString tmp = proc.m_hostname;
tmp.appfmt(":%d", proc.m_ndb_mgm_port);
time_t start = time(0);
const time_t max_connect_time = 30;
do {
if(ndb_mgm_connect(handle, tmp.c_str()) != -1){
proc.m_ndb_mgm_handle = handle;
return true;
}
sleep(1);
} while(time(0) < (start + max_connect_time));
g_logger.critical("Unable to connect to ndb mgm %s", tmp.c_str());
return false;
}
bool
connect_ndb_mgm(atrt_config& config){
for(size_t i = 0; i<config.m_processes.size(); i++){
atrt_process & proc = config.m_processes[i];
if((proc.m_type & atrt_process::NDB_MGM) != 0){
if(!connect_ndb_mgm(proc)){
return false;
}
}
}
return true;
}
static int remap(int i){
if(i == NDB_MGM_NODE_STATUS_NO_CONTACT) return NDB_MGM_NODE_STATUS_UNKNOWN;
if(i == NDB_MGM_NODE_STATUS_UNKNOWN) return NDB_MGM_NODE_STATUS_NO_CONTACT;
return i;
}
bool
wait_ndb(atrt_config& config, int goal){
goal = remap(goal);
/**
* Get mgm handle for cluster
*/
NdbMgmHandle handle = 0;
for(size_t i = 0; i<config.m_processes.size(); i++){
atrt_process & proc = config.m_processes[i];
if((proc.m_type & atrt_process::NDB_MGM) != 0){
handle = proc.m_ndb_mgm_handle;
break;
}
}
if(handle == 0){
g_logger.critical("Unable to find mgm handle");
return false;
}
if(goal == NDB_MGM_NODE_STATUS_STARTED){
/**
* 1) wait NOT_STARTED
* 2) send start
* 3) wait STARTED
*/
if(!wait_ndb(config, NDB_MGM_NODE_STATUS_NOT_STARTED))
return false;
ndb_mgm_start(handle, 0, 0);
}
struct ndb_mgm_cluster_state * state;
time_t now = time(0);
time_t end = now + 360;
int min = remap(NDB_MGM_NODE_STATUS_NO_CONTACT);
int min2 = goal;
while(now < end){
/**
* 1) retreive current state
*/
state = ndb_mgm_get_status(handle);
if(state == 0){
g_logger.critical("Unable to poll db state");
return false;
}
NdbAutoPtr<void> tmp(state);
min2 = goal;
for(int i = 0; i<state->no_of_nodes; i++){
if(state->node_states[i].node_type == NDB_MGM_NODE_TYPE_NDB){
const int s = remap(state->node_states[i].node_status);
min2 = (min2 < s ? min2 : s );
if(s < remap(NDB_MGM_NODE_STATUS_NO_CONTACT) ||
s > NDB_MGM_NODE_STATUS_STARTED){
g_logger.critical("Strange DB status during start: %d %d", i, min2);
return false;
}
}
}
if(min2 < min){
g_logger.critical("wait ndb failed %d %d %d", min, min2, goal);
return false;
}
if(min2 == goal){
return true;
break;
}
min = min2;
now = time(0);
}
g_logger.critical("wait ndb timed out %d %d %d", min, min2, goal);
return false;
}
bool
start_process(atrt_process & proc){
if(proc.m_proc.m_id != -1){
g_logger.critical("starting already started process: %d", proc.m_index);
return false;
}
BaseString path = proc.m_proc.m_cwd.substr(proc.m_host->m_base_dir.length()+BaseString("/run").length());
BaseString tmp = g_setup_progname;
tmp.appfmt(" %s %s/%s/ %s",
proc.m_host->m_hostname.c_str(),
g_setup_path,
path.c_str(),
proc.m_proc.m_cwd.c_str());
const int r1 = system(tmp.c_str());
if(r1 != 0){
g_logger.critical("Failed to setup process");
return false;
}
{
Properties reply;
if(proc.m_host->m_cpcd->define_process(proc.m_proc, reply) != 0){
BaseString msg;
reply.get("errormessage", msg);
g_logger.error("Unable to define process: %s", msg.c_str());
return false;
}
}
{
Properties reply;
if(proc.m_host->m_cpcd->start_process(proc.m_proc.m_id, reply) != 0){
BaseString msg;
reply.get("errormessage", msg);
g_logger.error("Unable to start process: %s", msg.c_str());
return false;
}
}
return true;
}
bool
start_processes(atrt_config& config, int types){
for(size_t i = 0; i<config.m_processes.size(); i++){
atrt_process & proc = config.m_processes[i];
if((types & proc.m_type) != 0){
if(!start_process(proc)){
return false;
}
}
}
return true;
}
bool
stop_process(atrt_process & proc){
if(proc.m_proc.m_id == -1){
return true;
}
{
Properties reply;
if(proc.m_host->m_cpcd->stop_process(proc.m_proc.m_id, reply) != 0){
Uint32 status;
reply.get("status", &status);
if(status != 4){
BaseString msg;
reply.get("errormessage", msg);
g_logger.error("Unable to stop process: %s(%d)", msg.c_str(), status);
return false;
}
}
}
{
Properties reply;
if(proc.m_host->m_cpcd->undefine_process(proc.m_proc.m_id, reply) != 0){
BaseString msg;
reply.get("errormessage", msg);
g_logger.error("Unable to undefine process: %s", msg.c_str());
return false;
}
proc.m_proc.m_id = -1;
}
return true;
}
bool
stop_processes(atrt_config& config, int types){
for(size_t i = 0; i<config.m_processes.size(); i++){
atrt_process & proc = config.m_processes[i];
if((types & proc.m_type) != 0){
if(!stop_process(proc)){
return false;
}
}
}
return true;
}
bool
update_status(atrt_config& config, int){
Vector<Vector<SimpleCpcClient::Process> > m_procs;
Vector<SimpleCpcClient::Process> dummy;
m_procs.fill(config.m_hosts.size(), dummy);
for(size_t i = 0; i<config.m_hosts.size(); i++){
Properties p;
config.m_hosts[i].m_cpcd->list_processes(m_procs[i], p);
}
for(size_t i = 0; i<config.m_processes.size(); i++){
atrt_process & proc = config.m_processes[i];
Vector<SimpleCpcClient::Process> & h_procs = m_procs[proc.m_host->m_index];
bool found = false;
for(size_t j = 0; j<h_procs.size(); j++){
if(proc.m_proc.m_id == h_procs[j].m_id){
found = true;
proc.m_proc.m_status = h_procs[j].m_status;
break;
}
}
if(!found){
g_logger.error("update_status: not found");
return false;
}
}
return true;
}
int
is_running(atrt_config& config, int types){
int found = 0, running = 0;
for(size_t i = 0; i<config.m_processes.size(); i++){
atrt_process & proc = config.m_processes[i];
if((types & proc.m_type) != 0){
found++;
if(proc.m_proc.m_status == "running")
running++;
}
}
if(found == running)
return 2;
if(running == 0)
return 0;
return 1;
}
int
insert(const char * pair, Properties & p){
BaseString tmp(pair);
tmp.trim(" \t\n\r");
Vector<BaseString> split;
tmp.split(split, ":=", 2);
if(split.size() != 2)
return -1;
p.put(split[0].trim().c_str(), split[1].trim().c_str());
return 0;
}
bool
read_test_case(FILE * file, atrt_testcase& tc, int& line){
Properties p;
int elements = 0;
char buf[1024];
while(!feof(file)){
if(!fgets(buf, 1024, file))
break;
line++;
BaseString tmp = buf;
if(tmp.length() > 0 && tmp.c_str()[0] == '#')
continue;
if(insert(tmp.c_str(), p) != 0)
break;
elements++;
}
if(elements == 0){
if(file == stdin){
BaseString tmp(buf);
tmp.trim(" \t\n\r");
Vector<BaseString> split;
tmp.split(split, " ", 2);
tc.m_command = split[0];
if(split.size() == 2)
tc.m_args = split[1];
else
tc.m_args = "";
tc.m_max_time = 60000;
return true;
}
return false;
}
if(!p.get("cmd", tc.m_command)){
g_logger.critical("Invalid test file: cmd is missing near line: %d", line);
return false;
}
if(!p.get("args", tc.m_args))
tc.m_args = "";
const char * mt = 0;
if(!p.get("max-time", &mt))
tc.m_max_time = 60000;
else
tc.m_max_time = atoi(mt);
return true;
}
bool
setup_test_case(atrt_config& config, const atrt_testcase& tc){
const int r1 = system(g_clear_progname);
if(r1 != 0){
g_logger.critical("Failed to clear result");
return false;
}
for(size_t i = 0; i<config.m_processes.size(); i++){
atrt_process & proc = config.m_processes[i];
if(proc.m_type == atrt_process::NDB_API){
proc.m_proc.m_path.assign(proc.m_host->m_base_dir).append("/bin/").append(tc.m_command);
proc.m_proc.m_args.assign(tc.m_args);
return true;
}
}
return false;
}
bool
gather_result(atrt_config& config, int * result){
BaseString tmp = g_gather_progname;
for(size_t i = 0; i<config.m_processes.size(); i++){
atrt_process & proc = config.m_processes[i];
tmp.appfmt(" %s:%s",
proc.m_hostname.c_str(),
proc.m_proc.m_cwd.c_str());
}
const int r1 = system(tmp.c_str());
if(r1 != 0){
g_logger.critical("Failed to gather result");
return false;
}
const int r2 = system(g_analyze_progname);
if(r2 == -1 || r2 == (127 << 8)){
g_logger.critical("Failed to analyze results");
return false;
}
* result = r2 ;
return true;
}
bool
setup_hosts(atrt_config& config){
const int r1 = system(g_clear_progname);
if(r1 != 0){
g_logger.critical("Failed to clear result");
return false;
}
for(size_t i = 0; i<config.m_hosts.size(); i++){
BaseString tmp = g_setup_progname;
tmp.appfmt(" %s %s/ %s/run",
config.m_hosts[i].m_hostname.c_str(),
g_setup_path,
config.m_hosts[i].m_base_dir.c_str());
const int r1 = system(tmp.c_str());
if(r1 != 0){
g_logger.critical("Failed to setup %s",
config.m_hosts[i].m_hostname.c_str());
return false;
}
}
return true;
}