/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* MySQL server management daemon Written by: Sasha Pachev */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef HAVE_SYS_WAIT_H #include #endif #define MANAGER_VERSION "1.2" #define MANAGER_GREETING "MySQL Server Management Daemon v. 1.2" #define LOG_ERR 1 #define LOG_WARN 2 #define LOG_INFO 3 #define LOG_DEBUG 4 #define CHILD_START 1 #define CHILD_STOP 2 #ifndef MANAGER_PORT #define MANAGER_PORT 23546 #endif #ifndef MANAGER_CONNECT_RETRIES #define MANAGER_CONNECT_RETRIES 5 #endif #ifndef MANAGER_MAX_CMD_LEN #define MANAGER_MAX_CMD_LEN 16384 #endif #ifndef MANAGER_LOG_FILE #define MANAGER_LOG_FILE "/var/log/mysqlmanager.log" #endif #ifndef MANAGER_BACK_LOG #define MANAGER_BACK_LOG 50 #endif #ifndef MAX_USER_NAME #define MAX_USER_NAME 16 #endif #ifndef MANAGER_PW_FILE #define MANAGER_PW_FILE "/etc/mysqlmanager.passwd" #endif #ifndef MAX_HOST #define MAX_HOST 128 #endif #ifndef MAX_LAUNCHER_MSG #define MAX_LAUNCHER_MSG 256 #endif #define MAX_RETRY_COUNT 100 /* Variable naming convention - if starts with manager_, either is set directly by the user, or used closely in ocnjunction with a variable set by the user */ #if defined(__i386__) && defined(HAVE_LINUXTHREADS) #define DO_STACKTRACE 1 #endif uint manager_port; FILE* errfp; const char* manager_log_file = MANAGER_LOG_FILE; pthread_mutex_t lock_log,lock_shutdown,lock_exec_hash,lock_launch_thd; pthread_cond_t cond_launch_thd; pthread_t loop_th,launch_msg_th; int manager_sock = -1; uchar* stack_bottom=0; struct sockaddr_in manager_addr; ulong manager_bind_addr; int manager_back_log; int in_shutdown = 0, shutdown_requested=0; int manager_connect_retries; const char* manager_greeting = MANAGER_GREETING; uint manager_max_cmd_len; const char* manager_pw_file=MANAGER_PW_FILE; my_bool one_thread; /* for debugging */ typedef enum {PARAM_STDOUT,PARAM_STDERR} PARAM_TYPE; /* messages */ #define MAX_CLIENT_MSG_LEN 256 #define NET_BLOCK 2048 #define MD5_LEN 16 #define ESCAPE_CHAR '\\' #define EOL_CHAR '\n' /* access flags */ #define PRIV_SHUTDOWN 1 struct manager_thd { NET net; char user[MAX_USER_NAME+1]; int priv_flags; char* cmd_buf; int fatal,finished; }; struct manager_user { char user[MAX_USER_NAME+1]; char md5_pass[MD5_LEN]; int user_len; const char* error; }; HASH exec_hash,user_hash; struct manager_exec* cur_launch_exec=0; static struct manager_thd* manager_thd_new(Vio* vio); static struct manager_exec* manager_exec_new(char* arg_start,char* arg_end); static void manager_exec_print(NET* net,struct manager_exec* e); static void manager_thd_free(struct manager_thd* thd); static void manager_exec_free(void* e); static void manager_exec_connect(struct manager_exec* e); static int manager_exec_launch(struct manager_exec* e); static struct manager_exec* manager_exec_by_pid(pid_t pid); static struct manager_user* manager_user_new(char* buf); static void manager_user_free(void* u); static char* arg_strmov(char* dest, const char* src, int n); static byte* get_exec_key(const byte* e, uint* len, my_bool __attribute__((unused)) t); static byte* get_user_key(const byte* u, uint* len, my_bool __attribute__((unused)) t); static uint tokenize_args(char* arg_start,char** arg_end); static void init_arg_array(char* arg_str,char** args,uint arg_count); static int hex_val(char c); static int open_and_dup(int fd,char* path); static void update_req_len(struct manager_exec* e); typedef int (*manager_cmd_handler)(struct manager_thd*,char*,char*); static void handle_child(int __attribute__((unused)) sig); static void handle_sigpipe(int __attribute__((unused)) sig); /* exec() in a threaded application is full of problems. To solve this, we fork off a launcher at the very start and communicate with it through a pipe */ static void fork_launcher(); static void run_launcher_loop(); int to_launcher_pipe[2],from_launcher_pipe[2]; pid_t launcher_pid; int in_segfault=0; const char* pid_file = "/var/run/mysqlmanager.pid"; int created_pid_file = 0; struct manager_cmd { const char* name; const char* help; manager_cmd_handler handler_func; int len; }; struct manager_exec { char* ident; int ident_len; const char* error; char* bin_path; char** args; char con_user[16]; char con_pass[16]; int con_port; pid_t pid; int exit_code; pthread_mutex_t lock; pthread_cond_t cond; pthread_t th; char con_sock[FN_REFLEN]; char con_host[MAX_HOST]; char stderr_path[FN_REFLEN]; char stdout_path[FN_REFLEN]; MYSQL mysql; char* data_buf; int req_len; int start_wait_timeout; int stderr_path_size,stdout_path_size,data_buf_size; int num_args; }; static int set_exec_param(struct manager_thd* thd, char* args_start, char* args_end, PARAM_TYPE param_type); #define HANDLE_DECL(com) \ static int com(struct manager_thd* thd, char* args_start,char* args_end) #define HANDLE_NOARG_DECL(com) \ static int com(struct manager_thd *thd,\ char *args_start __attribute__((unused)),\ char* args_end __attribute__((unused))) HANDLE_NOARG_DECL(handle_ping); HANDLE_NOARG_DECL(handle_quit); HANDLE_NOARG_DECL(handle_help); HANDLE_NOARG_DECL(handle_shutdown); HANDLE_DECL(handle_def_exec); HANDLE_DECL(handle_start_exec); HANDLE_DECL(handle_stop_exec); HANDLE_DECL(handle_set_exec_con); HANDLE_DECL(handle_set_exec_stdout); HANDLE_DECL(handle_set_exec_stderr); HANDLE_NOARG_DECL(handle_show_exec); HANDLE_DECL(handle_query); struct manager_cmd commands[] = { {"ping", "Check if this server is alive", handle_ping,4}, {"quit", "Finish session", handle_quit,4}, {"shutdown", "Shutdown this server", handle_shutdown,8}, {"def_exec", "Define executable entry", handle_def_exec,8}, {"start_exec", "Launch process defined by executable entry", handle_start_exec,10}, {"stop_exec", "Stop process defined by executable entry", handle_stop_exec,9}, {"set_exec_con", "Set connection parameters for executable entry", handle_set_exec_con,12}, {"set_exec_stdout", "Set stdout path for executable entry", handle_set_exec_stdout,15}, {"set_exec_stderr", "Set stderr path for executable entry", handle_set_exec_stderr,15}, {"query","Run query against MySQL server",handle_query,5}, {"show_exec","Show defined executable entries",handle_show_exec,9}, {"help", "Print this message", handle_help,4}, {0,0,0,0} }; static struct my_option my_long_options[] = { #ifndef DBUG_OFF {"debug", '#', "Output debug log. Often this is 'd:t:o,filename'", 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0}, #endif {"help", '?', "Display this help and exit.", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, {"port", 'P', "Port number to listen on.", (gptr*) &manager_port, (gptr*) &manager_port, 0, GET_UINT, REQUIRED_ARG, MANAGER_PORT, 0, 0, 0, 0, 0}, {"log", 'l', "Path to log file.", (gptr*) &manager_log_file, (gptr*) &manager_log_file, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"bind-address", 'b', "Address to listen on.", (gptr*) &manager_bind_addr, (gptr*) &manager_bind_addr, 0, GET_ULONG, REQUIRED_ARG, INADDR_ANY, 0, 0, 0, 0, 0}, {"tcp-backlog", 'B', "Size of TCP/IP listen queue.", (gptr*) &manager_back_log, (gptr*) &manager_back_log, 0, GET_INT, REQUIRED_ARG, MANAGER_BACK_LOG, 0, 0, 0, 0, 0}, {"greeting", 'g', "Set greeting on connect", (gptr*) &manager_greeting, (gptr*) &manager_greeting, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"max-command-len", 'm', "Maximum command length", (gptr*) &manager_max_cmd_len, (gptr*) &manager_max_cmd_len, 0, GET_UINT, REQUIRED_ARG, MANAGER_MAX_CMD_LEN, 0, 0, 0, 0, 0}, {"one-thread", 'd', "Use one thread ( for debugging)", (gptr*) &one_thread, (gptr*) &one_thread, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, {"connect-retries", 'C', "Number of attempts to establish MySQL connection", (gptr*) &manager_connect_retries, (gptr*) &manager_connect_retries, 0, GET_INT, REQUIRED_ARG, MANAGER_CONNECT_RETRIES, 0, 0, 0, 0, 0}, {"password-file", 'p', "Password file for manager", (gptr*) &manager_pw_file, (gptr*) &manager_pw_file, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"pid-file", 'f', "Pid file to use", (gptr*) &pid_file, (gptr*) &pid_file, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"version", 'V', "Output version information and exit.", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} }; static void die(const char* fmt,...); static void print_time(FILE* fp); static void clean_up(); static struct manager_cmd* lookup_cmd(char* s,int len); static int client_msg(NET* net,int err_code,const char* fmt,...); static int client_msg_pre(NET* net,int err_code,const char* fmt,...); static int client_msg_raw(NET* net,int err_code,int pre,const char* fmt, va_list args); static int authenticate(struct manager_thd* thd); /* returns pointer to end of line */ static char* read_line(struct manager_thd* thd); static pthread_handler_decl(process_connection, arg); static pthread_handler_decl(process_launcher_messages, arg); static int exec_line(struct manager_thd* thd,char* buf,char* buf_end); #ifdef DO_STACKTRACE void print_stacktrace(); #endif static void log_msg(const char* fmt, int msg_type, va_list args); /* No 'inline' here becasue functions with ... can't do that portable */ #define LOG_MSG_FUNC(type,TYPE) static void type \ (const char* fmt,...) { \ va_list args; \ va_start(args,fmt); \ log_msg(fmt,TYPE,args);\ } LOG_MSG_FUNC(log_err,LOG_ERR) LOG_MSG_FUNC(log_warn,LOG_WARN) LOG_MSG_FUNC(log_info,LOG_INFO) #ifndef DBUG_OFF LOG_MSG_FUNC(log_debug,LOG_DEBUG) #else void log_debug(const char* __attribute__((unused)) fmt,...) {} #endif static void handle_sigterm(int sig __attribute__((unused))) { log_info("Got SIGTERM"); if (!one_thread) { kill(launcher_pid,SIGTERM); pthread_kill(loop_th,SIGTERM); } clean_up(); exit(0); } #ifdef DO_STACKTRACE static void handle_segfault(int sig) { if (in_segfault) exit(1); in_segfault=1; fprintf(errfp,"Got fatal signal %d\n",sig); print_stacktrace(); exit(1); } #endif static void handle_sigpipe(int __attribute__((unused)) sig) { signal(SIGPIPE,handle_sigpipe); } #ifdef DO_STACKTRACE #define MAX_DEPTH 25 #define SIGRETURN_FRAME_COUNT 1 void print_stacktrace() { uchar** fp; int i; LINT_INIT(fp); fprintf(errfp,"Fatal errror, stacktrace follows:\n"); #ifdef __i386__ __asm__ __volatile__("movl %%ebp,%0" :"=r"(fp) :"r"(fp)); #endif if (!fp) { fprintf(errfp,"frame points is NULL, cannot trace stack\n"); return; } for (i=0;inet,MANAGER_CLIENT_ERR, "Unrecognized command '%s', type help to see list of supported\ commands", buf)) thd->fatal=1; return 1; } for (;phandler_func(thd,p,buf_end); } static struct manager_cmd* lookup_cmd(char* s,int len) { struct manager_cmd* cmd = commands; for (;cmd->name;cmd++) { if (cmd->len == len && !memcmp(cmd->name,s,len)) return cmd; } return 0; } HANDLE_NOARG_DECL(handle_ping) { client_msg(&thd->net,MANAGER_OK,"Server management daemon is alive"); return 0; } HANDLE_NOARG_DECL(handle_quit) { client_msg(&thd->net,MANAGER_OK,"Goodbye"); thd->finished=1; return 0; } HANDLE_NOARG_DECL(handle_help) { struct manager_cmd* cmd = commands; NET* net = &thd->net; client_msg_pre(net,MANAGER_INFO,"Available commands:"); for (;cmd->name;cmd++) { client_msg_pre(net,MANAGER_INFO,"%s - %s", cmd->name, cmd->help); } client_msg_pre(net,MANAGER_INFO,"End of help"); return 0; } HANDLE_NOARG_DECL(handle_shutdown) { client_msg(&thd->net,MANAGER_OK,"Shutdown started, goodbye"); thd->finished=1; shutdown_requested = 1; if (!one_thread) { kill(launcher_pid,SIGTERM); pthread_kill(loop_th,SIGTERM); } return 0; } HANDLE_DECL(handle_set_exec_con) { int num_args; const char* error=0; struct manager_exec* e; char* arg_p; if ((num_args=tokenize_args(args_start,&args_end))<2) { error="Too few arguments"; goto err; } arg_p=args_start; pthread_mutex_lock(&lock_exec_hash); if (!(e=(struct manager_exec*)hash_search(&exec_hash,arg_p, strlen(arg_p)))) { pthread_mutex_unlock(&lock_exec_hash); error="Exec definition entry does not exist"; goto err; } arg_p+=strlen(arg_p)+1; arg_p+=(strnmov(e->con_user,arg_p,sizeof(e->con_user))-e->con_user)+1; if (num_args >= 3) { arg_p+=(strnmov(e->con_host,arg_p,sizeof(e->con_host))-e->con_host)+1; if (num_args == 4) { if (!(e->con_port=atoi(arg_p))) strnmov(e->con_sock,arg_p,sizeof(e->con_sock)); else e->con_sock[0]=0; } else if (num_args > 4) { pthread_mutex_unlock(&lock_exec_hash); error="Too many arguments"; goto err; } } pthread_mutex_unlock(&lock_exec_hash); client_msg(&thd->net,MANAGER_OK,"Entry updated"); return 0; err: client_msg(&thd->net,MANAGER_CLIENT_ERR,error); return 1; } HANDLE_DECL(handle_set_exec_stdout) { return set_exec_param(thd,args_start,args_end,PARAM_STDOUT); } HANDLE_DECL(handle_set_exec_stderr) { return set_exec_param(thd,args_start,args_end,PARAM_STDERR); } static int set_exec_param(struct manager_thd* thd, char* args_start, char* args_end, PARAM_TYPE param_type) { int num_args; const char* error=0; struct manager_exec* e; char* arg_p; char* param; int param_size; if ((num_args=tokenize_args(args_start,&args_end))<2) { error="Too few arguments"; goto err; } arg_p=args_start; pthread_mutex_lock(&lock_exec_hash); if (!(e=(struct manager_exec*)hash_search(&exec_hash,arg_p, strlen(arg_p)))) { pthread_mutex_unlock(&lock_exec_hash); error="Exec definition entry does not exist"; goto err; } arg_p+=strlen(arg_p)+1; param_size=strlen(arg_p)+1; switch (param_type) { case PARAM_STDOUT: param=e->stdout_path; e->req_len+=(param_size-e->stdout_path_size); e->stdout_path_size=param_size; break; case PARAM_STDERR: param=e->stderr_path; e->req_len+=(param_size-e->stderr_path_size); e->stderr_path_size=param_size; break; default: error="Internal error"; goto err; } strnmov(param,arg_p,FN_REFLEN); pthread_mutex_unlock(&lock_exec_hash); client_msg(&thd->net,MANAGER_OK,"Entry updated"); return 0; err: client_msg(&thd->net,MANAGER_CLIENT_ERR,error); return 1; } HANDLE_DECL(handle_start_exec) { int num_args; struct manager_exec* e; int ident_len; const char* error=0; struct timespec t; if ((num_args=tokenize_args(args_start,&args_end))<1) { error="Too few arguments"; goto err; } ident_len=strlen(args_start); pthread_mutex_lock(&lock_exec_hash); if (!(e=(struct manager_exec*)hash_search(&exec_hash,args_start, ident_len))) { pthread_mutex_unlock(&lock_exec_hash); error="Exec definition entry does not exist"; goto err; } pthread_mutex_unlock(&lock_exec_hash); manager_exec_launch(e); if ((error=e->error)) goto err; pthread_mutex_lock(&e->lock); t.tv_sec=time(0)+(e->start_wait_timeout=atoi(args_start+ident_len+1)); t.tv_nsec=0; if (!e->pid) pthread_cond_timedwait(&e->cond,&e->lock,&t); if (!e->pid) { pthread_mutex_unlock(&e->lock); error="Process failed to start withing alotted time"; goto err; } mysql_close(&e->mysql); manager_exec_connect(e); error=e->error; pthread_mutex_unlock(&e->lock); if (error) goto err; client_msg(&thd->net,MANAGER_OK,"'%s' started",e->ident); return 0; err: client_msg(&thd->net,MANAGER_CLIENT_ERR,error); return 1; } HANDLE_DECL(handle_stop_exec) { int num_args; struct timespec abstime; struct manager_exec* e; int ident_len; const char* error=0; if ((num_args=tokenize_args(args_start,&args_end))<2) { error="Too few arguments"; goto err; } ident_len=strlen(args_start); abstime.tv_sec=time(0)+atoi(args_start+1+ident_len); abstime.tv_nsec=0; pthread_mutex_lock(&lock_exec_hash); if (!(e=(struct manager_exec*)hash_search(&exec_hash,args_start, ident_len))) { pthread_mutex_unlock(&lock_exec_hash); error="Exec definition entry does not exist"; goto err; } pthread_mutex_unlock(&lock_exec_hash); pthread_mutex_lock(&e->lock); e->th=pthread_self(); if (!e->pid) { /* e->th=0; */ /* th may be a struct */ pthread_mutex_unlock(&e->lock); error="Process not running"; goto err; } if (mysql_shutdown(&e->mysql)) { /* e->th=0; */ /* th may be a struct */ pthread_mutex_unlock(&e->lock); error="Could not send shutdown command"; goto err; } if (e->pid) pthread_cond_timedwait(&e->cond,&e->lock,&abstime); if (e->pid) error="Process failed to terminate within alotted time"; /* e->th=0; */ /* th may be a struct */ pthread_mutex_unlock(&e->lock); if (!error) { client_msg(&thd->net,MANAGER_OK,"'%s' terminated",e->ident); return 0; } err: client_msg(&thd->net,MANAGER_CLIENT_ERR,error); return 1; } HANDLE_DECL(handle_query) { const char* error=0; struct manager_exec* e; MYSQL_RES* res=0; MYSQL_ROW row; MYSQL_FIELD* fields; int num_fields,i,ident_len; char* ident,*query; query=ident=args_start; while (!isspace(*query)) query++; if (query == ident) { error="Missing server identifier"; goto err; } ident_len=(int)(query-ident); while (querylock); if (!e->pid) { error="Process is not running"; pthread_mutex_unlock(&e->lock); goto err; } if (mysql_query(&e->mysql,query)) { error=mysql_error(&e->mysql); pthread_mutex_unlock(&e->lock); goto err; } if ((res=mysql_store_result(&e->mysql))) { char buf[MAX_CLIENT_MSG_LEN],*p,*buf_end; fields=mysql_fetch_fields(res); num_fields=mysql_num_fields(res); p=buf; buf_end=buf+sizeof(buf); for (i=0;inet,MANAGER_OK,buf); while ((row=mysql_fetch_row(res))) { p=buf; for (i=0;inet,MANAGER_OK,buf); } } pthread_mutex_unlock(&e->lock); client_msg(&thd->net,MANAGER_OK,"End"); return 0; err: client_msg(&thd->net,MANAGER_CLIENT_ERR,error); return 1; } HANDLE_DECL(handle_def_exec) { struct manager_exec* e=0,*old_e; const char* error=0; if (!(e=manager_exec_new(args_start,args_end))) { error="Out of memory"; goto err; } if (e->error) { error=e->error; goto err; } pthread_mutex_lock(&lock_exec_hash); if ((old_e=(struct manager_exec*)hash_search(&exec_hash,(byte*)e->ident, e->ident_len))) { strnmov(e->stdout_path,old_e->stdout_path,sizeof(e->stdout_path)); strnmov(e->stderr_path,old_e->stderr_path,sizeof(e->stderr_path)); strnmov(e->con_user,old_e->con_user,sizeof(e->con_user)); strnmov(e->con_host,old_e->con_host,sizeof(e->con_host)); strnmov(e->con_sock,old_e->con_sock,sizeof(e->con_sock)); e->con_port=old_e->con_port; update_req_len(e); hash_delete(&exec_hash,(byte*)old_e); } hash_insert(&exec_hash,(byte*)e); pthread_mutex_unlock(&lock_exec_hash); client_msg(&thd->net,MANAGER_OK,"Exec definition created"); return 0; err: client_msg(&thd->net,MANAGER_CLIENT_ERR,error); if (e) manager_exec_free(e); return 1; } HANDLE_NOARG_DECL(handle_show_exec) { uint i; client_msg_pre(&thd->net,MANAGER_INFO,"Exec_def\tPid\tExit_status\tCon_info\ \tStdout\tStderr\tArguments"); pthread_mutex_lock(&lock_exec_hash); for (i=0;inet,e); } pthread_mutex_unlock(&lock_exec_hash); client_msg(&thd->net,MANAGER_INFO,"End"); return 0; } static struct manager_exec* manager_exec_by_pid(pid_t pid) { struct manager_exec* e; uint i; pthread_mutex_lock(&lock_exec_hash); for (i=0;ipid==pid) { pthread_mutex_unlock(&lock_exec_hash); return e; } } pthread_mutex_unlock(&lock_exec_hash); return 0; } static void manager_exec_connect(struct manager_exec* e) { int i; int connect_retries; if (!(connect_retries=e->start_wait_timeout)) connect_retries=manager_connect_retries; for (i=0;imysql,e->con_host,e->con_user,e->con_pass,0, e->con_port,e->con_sock,0)) return; sleep(1); } e->error="Could not connect to MySQL server withing the number of tries"; } static int manager_exec_launch(struct manager_exec* e) { if (one_thread) { pid_t tmp_pid; switch ((tmp_pid=fork())) { case -1: e->error="Cannot fork"; return 1; case 0: { int err_code; close(manager_sock); err_code=execv(e->bin_path,e->args); exit(err_code); } default: e->pid=tmp_pid; manager_exec_connect(e); return 0; } } else { if (my_write(to_launcher_pipe[1],(byte*)&e->req_len, sizeof(int),MYF(MY_NABP))|| my_write(to_launcher_pipe[1],(byte*)&e->num_args, sizeof(int),MYF(MY_NABP)) || my_write(to_launcher_pipe[1],e->stdout_path,e->stdout_path_size, MYF(MY_NABP)) || my_write(to_launcher_pipe[1],e->stderr_path,e->stderr_path_size, MYF(MY_NABP)) || my_write(to_launcher_pipe[1],e->data_buf,e->data_buf_size, MYF(MY_NABP))) { e->error="Failed write request to launcher"; return 1; } } return 0; } static char* arg_strmov(char* dest, const char* src, int n) { char* dest_end = dest+n-1; char c; for (;destargs; p=arg_strmov(p,e->ident,(int)(buf_end-p)-1); *p++='\t'; if (p>buf_end-15) goto end; p=int10_to_str(e->pid,p,10); *p++='\t'; p=int10_to_str(e->exit_code,p,10); *p++='\t'; p=arg_strmov(p,e->con_user,(int)(buf_end-p)-1); *p++='@'; if (p==buf_end) goto end; p=arg_strmov(p,e->con_host,(int)(buf_end-p)-11); *p++=':'; if (p==buf_end-10) goto end; if (e->con_sock[0]) { p=arg_strmov(p,e->con_sock,(int)(buf_end-p)-1); } else { p=int10_to_str(e->con_port,p,10); } *p++='\t'; p=arg_strmov(p,e->stdout_path,(int)(buf_end-p)-1); if (p==buf_end-1) goto end; *p++='\t'; p=arg_strmov(p,e->stderr_path,(int)(buf_end-p)-1); if (p==buf_end-1) goto end; *p++='\t'; for (;pnet,MANAGER_INFO, manager_greeting); if (!(buf_end=read_line(thd))) return -1; for (buf=thd->cmd_buf,p=thd->user,p_end=p+MAX_USER_NAME; bufuser, (uint)(p-thd->user)))) return 1; for (;isspace(*buf) && bufmd5_pass,digest,MD5_LEN)) return 1; client_msg(&thd->net,MANAGER_OK,"OK"); return 0; } static void print_time(FILE* fp) { struct tm now; time_t t; time(&t); localtime_r(&t,&now); fprintf(fp,"[%d-%02d-%02d %02d:%02d:%02d] ", now.tm_year+1900, now.tm_mon+1,now.tm_mday,now.tm_hour,now.tm_min, now.tm_sec); } static void die(const char* fmt, ...) { va_list args; va_start(args,fmt); if (fmt) { if (errfp==stderr) fprintf(errfp, "%s: ", my_progname); else { print_time(errfp); fprintf(errfp,"Fatal error: "); } vfprintf(errfp, fmt, args); if (errno) fprintf(errfp, " errno=%d", errno); fprintf(errfp, "\n"); fflush(errfp); } va_end(args); clean_up(); exit(1); } void print_msg_type(int msg_type) { const char* msg; switch (msg_type) { case LOG_ERR: msg = "ERROR"; break; case LOG_WARN: msg = "WARNING"; break; case LOG_INFO: msg = "INFO"; break; #ifndef DBUG_OFF case LOG_DEBUG: msg = "DEBUG"; break; #endif default: msg = "UNKNOWN TYPE"; break; } fprintf(errfp," %s: ", msg); } static void log_msg(const char* fmt, int msg_type, va_list args) { pthread_mutex_lock(&lock_log); print_time(errfp); print_msg_type(msg_type); vfprintf(errfp,fmt,args); fputc('\n',errfp); fflush(errfp); pthread_mutex_unlock(&lock_log); } static pthread_handler_decl(process_launcher_messages, args __attribute__((unused))) { my_thread_init(); for (;!in_shutdown;) { pid_t pid; struct manager_exec* e; char buf[MAX_LAUNCHER_MSG]; if (read(from_launcher_pipe[0],buf,MAX_LAUNCHER_MSG)<0) { log_err("error reading launcher message"); sleep(1); continue; } switch (buf[0]) { case CHILD_START: { char* ident=buf+1; int ident_len=strlen(ident); memcpy(&pid,ident+ident_len+1,sizeof(pid)); log_debug("process message - ident=%s ident_len=%d pid=%d",ident, ident_len,pid); pthread_mutex_lock(&lock_exec_hash); log_debug("hash has %d records",exec_hash.records); e=(struct manager_exec*)hash_search(&exec_hash,ident,ident_len); if (e) { pthread_mutex_lock(&e->lock); e->pid=pid; pthread_cond_broadcast(&e->cond); pthread_mutex_unlock(&e->lock); } pthread_mutex_unlock(&lock_exec_hash); log_debug("unlocked mutex"); break; } case CHILD_STOP: memcpy(&pid,buf+1,sizeof(pid)); e=manager_exec_by_pid(pid); if (e) { pthread_mutex_lock(&e->lock); e->pid=0; memcpy(&e->exit_code,buf+1+sizeof(pid),sizeof(int)); pthread_cond_broadcast(&e->cond); pthread_mutex_unlock(&e->lock); } break; default: log_err("Got invalid launcher message"); break; } } return 0; } static pthread_handler_decl(process_connection,arg) { struct manager_thd* thd = (struct manager_thd*)arg; my_thread_init(); pthread_detach_this_thread(); for (;!thd->finished;) { char* buf_end; if ((!(buf_end=read_line(thd)) || exec_line(thd,thd->cmd_buf,buf_end)) && thd->fatal) { log_err("Thread aborted"); break; } } manager_thd_free(thd); pthread_exit(0); return 0; /* Don't get cc warning */ } static int client_msg_raw(NET* net, int err_code, int pre, const char* fmt, va_list args) { char buf[MAX_CLIENT_MSG_LEN],*p,*buf_end; p=buf; buf_end=buf+sizeof(buf); p=int10_to_str(err_code,p,10); if (pre) *p++='-'; *p++=' '; p+=my_vsnprintf(p,buf_end-p,fmt,args); if (p>buf_end-2) p=buf_end - 2; *p++='\r'; *p++='\n'; log_debug("message to client: %-.*s",p-buf-2,buf); if (my_net_write(net,buf,(uint)(p-buf)) || net_flush(net)) { p[-2]=0; log_err("Failed writing '%s' to client: errno=%d",buf,errno); net_end(net); return 1; } return 0; } static int client_msg(NET* net, int err_code, const char* fmt, ...) { va_list args; va_start(args,fmt); return client_msg_raw(net,err_code,0,fmt,args); } static int client_msg_pre(NET* net, int err_code, const char* fmt, ...) { va_list args; va_start(args,fmt); return client_msg_raw(net,err_code,1,fmt,args); } static char* read_line(struct manager_thd* thd) { int len; char* p, *buf_end; if ((len=my_net_read(&thd->net)) == (int)packet_error || !len) { log_err("Error reading command from client (Error: %d)", errno); thd->fatal=1; return 0; } buf_end=thd->cmd_buf+len; for (p=thd->cmd_buf;p0;) { char msg_buf[1+sizeof(int)+sizeof(int)]; msg_buf[0]=CHILD_STOP; memcpy(msg_buf+1,&child,sizeof(int)); memcpy(msg_buf+1+sizeof(int),&child_status,sizeof(int)); if (write(from_launcher_pipe[1],msg_buf,sizeof(msg_buf))!=sizeof(msg_buf)) log_err("launcher: error writing message on child exit"); } signal(SIGCHLD,handle_child); } static struct manager_thd* manager_thd_new(Vio* vio) { struct manager_thd* tmp; if (!(tmp=(struct manager_thd*)my_malloc(sizeof(*tmp), MYF(0)))) { log_err("Out of memory in manager_thd_new"); return 0; } my_net_init(&tmp->net,vio); tmp->user[0]=0; tmp->priv_flags=0; tmp->fatal=tmp->finished=0; tmp->cmd_buf= (char*) tmp->net.read_pos; return tmp; } static void manager_thd_free(struct manager_thd* thd) { NET* net=&thd->net; if (net->vio) { vio_delete(net->vio); net->vio=0; } net_end(&thd->net); } static void clean_up() { pthread_mutex_lock(&lock_shutdown); if (in_shutdown) { pthread_mutex_unlock(&lock_shutdown); return; } in_shutdown = 1; pthread_mutex_unlock(&lock_shutdown); log_info("Shutdown started"); if (manager_sock) close(manager_sock); log_info("Ended"); if (errfp != stderr) my_fclose(errfp, MYF(0)); hash_free(&exec_hash); if (created_pid_file) my_delete(pid_file, MYF(0)); } static void print_version(void) { printf("%s Ver %s Distrib %s, for %s (%s)\n",my_progname,MANAGER_VERSION, MYSQL_SERVER_VERSION,SYSTEM_TYPE,MACHINE_TYPE); } static void usage() { print_version(); printf("MySQL AB, by Sasha\n"); printf("This software comes with ABSOLUTELY NO WARRANTY\n\n"); printf("Manages instances of MySQL server.\n\n"); printf("Usage: %s [OPTIONS]\n\n", my_progname); my_print_help(my_long_options); my_print_variables(my_long_options); } static my_bool get_one_option(int optid, const struct my_option *opt __attribute__((unused)), char *argument) { switch (optid) { case '#': DBUG_PUSH(argument ? argument : "d:t:O,/tmp/mysqlmgrd.trace"); break; case 'V': print_version(); exit(0); case '?': usage(); exit(0); } return 0; } static int parse_args(int argc, char **argv) { int ho_error; if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) exit(ho_error); return 0; } static int init_server() { int arg=1; log_info("Started"); if ((manager_sock=socket(PF_INET,SOCK_STREAM,0)) < 0) die("Could not create socket"); bzero((char*) &manager_addr, sizeof(manager_addr)); manager_addr.sin_family = AF_INET; manager_addr.sin_addr.s_addr = manager_bind_addr; manager_addr.sin_port = htons(manager_port); setsockopt(manager_sock,SOL_SOCKET, SO_REUSEADDR,(char*)&arg,sizeof(arg)); if (bind(manager_sock,(struct sockaddr*)&manager_addr, sizeof(manager_addr)) < 0) die("Could not bind"); if (listen(manager_sock,manager_back_log) < 0) die("Could not listen"); return 0; } static int run_server_loop() { pthread_t th; struct manager_thd *thd; int client_sock; Vio* vio; pthread_attr_t thr_attr; (void) pthread_attr_init(&thr_attr); #if !defined(HAVE_DEC_3_2_THREADS) pthread_attr_setscope(&thr_attr,PTHREAD_SCOPE_SYSTEM); (void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED); #endif for (;!shutdown_requested;) { size_socket len=sizeof(struct sockaddr_in); if ((client_sock=accept(manager_sock,(struct sockaddr*)&manager_addr, &len)) <0) { if (shutdown_requested) break; if (errno != EAGAIN) { log_warn("Error in accept, errno=%d", errno); sleep(1); /* avoid tying up CPU if accept is broken */ } continue; } if (shutdown_requested) break; if (!(vio=vio_new(client_sock,VIO_TYPE_TCPIP,FALSE))) { log_err("Could not create I/O object"); close(client_sock); continue; } if (!(thd=manager_thd_new(vio))) { log_err("Could not create thread object"); vio_close(vio); continue; } if (authenticate(thd)) { client_msg(&thd->net,MANAGER_ACCESS, "Access denied"); manager_thd_free(thd); log_info("Client failed to authenticate"); continue; } if (shutdown_requested) { manager_thd_free(thd); break; } if (one_thread) { process_connection((void*)thd); manager_thd_free(thd); continue; } else if (pthread_create(&th,&thr_attr,process_connection,(void*)thd)) { client_msg(&thd->net,MANAGER_INTERNAL_ERR, "Could not create thread, errno=%d", errno); manager_thd_free(thd); continue; } } (void) pthread_attr_destroy(&thr_attr); return 0; } static FILE* open_log_stream() { FILE* fp; if (!(fp=my_fopen(manager_log_file, O_APPEND | FILE_BINARY, MYF(MY_WME)))) { clean_up(); exit(1); } return fp; } static byte* get_user_key(const byte* u, uint* len, my_bool __attribute__((unused)) t) { register const char* key; key = ((struct manager_user*)u)->user; *len = ((struct manager_user*)u)->user_len; return (byte*)key; } static byte* get_exec_key(const byte* e, uint* len, my_bool __attribute__((unused)) t) { register const char* key; key = ((struct manager_exec*)e)->ident; *len = ((struct manager_exec*)e)->ident_len; return (byte*)key; } static void init_arg_array(char* arg_str,char** args,uint arg_count) { char* p = arg_str; for (;arg_count>0;arg_count--) { *args++=p; p += strlen(p)+1; } *args=0; } static uint tokenize_args(char* arg_start,char** arg_end) { char* p, *p_write,*p_end; uint arg_count=0; int quoted=0,escaped=0,last_space=0; p_end=*arg_end; p_write=p=arg_start; for (; p < p_end ; p++) { char c = *p; switch (c) { case ' ': case '\r': case '\n': if (!quoted) { if (!last_space) { *p_write++=0; arg_count++; last_space=1; } } else *p_write++=c; escaped=0; break; case '"': if (!escaped) quoted=!quoted; else *p_write++=c; last_space=0; escaped=0; break; case '\\': if (!escaped) escaped=1; else { *p_write++=c; escaped=0; } last_space=0; break; default: escaped=last_space=0; *p_write++=c; break; } } if (!last_space && p_write>arg_start) arg_count++; *p_write=0; *arg_end=p_write; log_debug("arg_count: %d arg_start: '%s'",arg_count,arg_start); return arg_count; } static void update_req_len(struct manager_exec* e) { e->req_len=(e->data_buf_size+ (e->stdout_path_size=strlen(e->stdout_path)+1)+ (e->stderr_path_size=strlen(e->stderr_path)+1)); } static struct manager_exec* manager_exec_new(char* arg_start,char* arg_end) { struct manager_exec* tmp; char* first_arg; uint arg_len,num_args; num_args=tokenize_args(arg_start,&arg_end); arg_len=(uint)(arg_end-arg_start)+1; /* include \0 terminator*/ if (!(tmp=(struct manager_exec*)my_malloc(sizeof(*tmp)+arg_len+ sizeof(char*)*num_args, MYF(MY_ZEROFILL)))) return 0; if (num_args<2) { tmp->error="Too few arguments"; return tmp; } /* We have to allocate 'args' first as this must be alligned */ tmp->args=(char**)(tmp +1); tmp->data_buf= (char*) (tmp->args + num_args); memcpy(tmp->data_buf,arg_start,arg_len); tmp->data_buf_size=arg_len; tmp->num_args=num_args; tmp->ident=tmp->data_buf; tmp->ident_len=strlen(tmp->ident); first_arg=tmp->ident+tmp->ident_len+1; init_arg_array(first_arg,tmp->args,num_args-1); strmov(tmp->con_user,"root"); tmp->con_port=MYSQL_PORT; memcpy(tmp->con_host,"localhost",10); tmp->bin_path=tmp->args[0]; tmp->stdout_path_size=tmp->stderr_path_size=1; tmp->req_len=tmp->data_buf_size+2; pthread_mutex_init(&tmp->lock,0); pthread_cond_init(&tmp->cond,0); mysql_init(&tmp->mysql); return tmp; } static void manager_exec_free(void* e) { mysql_close(&((struct manager_exec*)e)->mysql); my_free(e,MYF(0)); } static int hex_val(char c) { if (isdigit(c)) return c-'0'; c=tolower(c); return c-'a'+10; } static struct manager_user* manager_user_new(char* buf) { struct manager_user* tmp; char* p,*user_end,*p_end; char c; if (!(tmp=(struct manager_user*)my_malloc(sizeof(*tmp),MYF(0)))) return 0; p=tmp->user; tmp->error=0; user_end=p+MAX_USER_NAME; for (;(c=*buf) && puser_len=p-tmp->user; buf++; break; } else *p++=c; } if (!c) tmp->error="Missing ':'"; if (p == user_end) tmp->error="Username too long"; if (tmp->error) return tmp; if (strlen(buf) < 2*MD5_LEN) { tmp->error="Invalid MD5 sum, too short"; return tmp; } p=tmp->md5_pass; p_end=p+MD5_LEN; for (; perror) { die("Error on line %d of '%s': %s",line_num,manager_pw_file, u->error); } else { hash_insert(&user_hash,(gptr)u); } } my_fclose(f, MYF(0)); } static void init_pid_file() { FILE* fp = my_fopen(pid_file, O_WRONLY | O_BINARY, MYF(MY_WME)); if (!fp) { clean_up(); exit(1); } created_pid_file=1; fprintf(fp, "%d\n", (int) getpid()); my_fclose(fp, MYF(0)); } static void init_globals() { pthread_attr_t thr_attr; if (hash_init(&exec_hash,1024,0,0,get_exec_key,manager_exec_free,MYF(0))) die("Exec hash initialization failed"); if (!one_thread) { (void) pthread_attr_init(&thr_attr); #if !defined(HAVE_DEC_3_2_THREADS) pthread_attr_setscope(&thr_attr,PTHREAD_SCOPE_SYSTEM); (void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED); #endif fork_launcher(); if (pthread_create(&launch_msg_th,&thr_attr,process_launcher_messages,0)) die("Could not start launcher message handler thread"); /* (void) pthread_attr_destroy(&thr_attr); */ } init_user_hash(); init_pid_file(); loop_th=pthread_self(); signal(SIGPIPE,handle_sigpipe); signal(SIGTERM,handle_sigterm); } static int open_and_dup(int fd,char* path) { int old_fd; if ((old_fd=my_open(path,O_WRONLY|O_APPEND|O_CREAT,MYF(0)))<0) { log_err("Could not open '%s' for append, errno=%d",path,errno); return 1; } if (dup2(old_fd,fd)<0) { log_err("Failed in dup2(), errno=%d",errno); return 1; } my_close(old_fd,MYF(0)); return 0; } static void run_launcher_loop() { for (;;) { int req_len,ident_len,num_args; char* request_buf=0; pid_t pid; char* exec_path,*ident,*stdout_path,*stderr_path; char** args=0; if (my_read(to_launcher_pipe[0],(byte*)&req_len, sizeof(int),MYF(MY_NABP|MY_FULL_IO)) || my_read(to_launcher_pipe[0],(byte*)&num_args, sizeof(int),MYF(MY_NABP|MY_FULL_IO)) || !(request_buf=(char*)my_malloc(req_len+sizeof(pid)+2,MYF(0))) || !(args=(char**)my_malloc(num_args*sizeof(char*),MYF(0))) || my_read(to_launcher_pipe[0],request_buf,req_len, MYF(MY_NABP|MY_FULL_IO))) { log_err("launcher: Error reading request"); my_free((gptr)request_buf,MYF(MY_ALLOW_ZERO_PTR)); my_free((gptr)args,MYF(MY_ALLOW_ZERO_PTR)); sleep(1); continue; } stdout_path=request_buf; stderr_path=stdout_path+strlen(stdout_path)+1; request_buf=stderr_path+strlen(stderr_path); /* black magic */ ident=request_buf+1; ident_len=strlen(ident); exec_path=ident+ident_len+1; log_debug("num_args=%d,req_len=%d,ident=%s,ident_len=%d,exec_path=%s,\ stdout_path=%s,stderr_path=%s", num_args, req_len,ident,ident_len,exec_path,stdout_path,stderr_path); init_arg_array(exec_path,args,num_args-1); switch ((pid=fork())) { case -1: log_err("launcher: cannot fork"); sleep(1); break; case 0: if (open_and_dup(1,stdout_path) || open_and_dup(2,stderr_path)) exit(1); if (execv(exec_path,args)) log_err("launcher: cannot exec %s",exec_path); exit(1); default: request_buf[0]=CHILD_START; memcpy(request_buf+ident_len+2,&pid,sizeof(pid)); if (write(from_launcher_pipe[1],request_buf,ident_len+2+sizeof(pid))<0) log_err("launcher: error sending launch status report"); break; } my_free((gptr)(stdout_path),MYF(0)); my_free((gptr)args,MYF(0)); } } static void fork_launcher() { if (pipe(to_launcher_pipe) || pipe(from_launcher_pipe)) die("Could not create launcher pipes"); switch ((launcher_pid=fork())) { case 0: signal(SIGCHLD,handle_child); run_launcher_loop(); exit(0); case -1: die("Could not fork the launcher"); default: return; } } static int daemonize() { switch (fork()) { case -1: die("Cannot fork"); case 0: errfp = open_log_stream(); init_globals(); close(0); close(1); close(2); init_server(); run_server_loop(); clean_up(); break; default: break; } return 0; } int main(int argc, char** argv) { char c; stack_bottom= (uchar *) &c; MY_INIT(argv[0]); errfp = stderr; parse_args(argc,argv); pthread_mutex_init(&lock_log,0); pthread_mutex_init(&lock_shutdown,0); pthread_mutex_init(&lock_exec_hash,0); pthread_mutex_init(&lock_launch_thd,0); pthread_cond_init(&cond_launch_thd,0); #ifdef DO_STACKTRACE signal(SIGSEGV,handle_segfault); #endif if (one_thread) { init_globals(); init_server(); run_server_loop(); clean_up(); return 0; } else return daemonize(); }