--- src/common.h +++ src/common.h @@ -42,6 +42,8 @@ #include #include #include +#include +#include #include #include #include --- src/conn_pool.cpp +++ src/conn_pool.cpp @@ -271,11 +271,11 @@ ++iter) { msgs.push_back(**iter); } - ResultCode result = TRY_LATER; + ResultCode::type result = ResultCode::TRY_LATER; try { result = resendClient->Log(msgs); - if (result == OK) { + if (result == ResultCode::OK) { if (g_Handler) { g_Handler->incrementCounter("sent", size); } --- src/file.cpp +++ src/file.cpp @@ -74,9 +74,21 @@ } bool StdFile::openWrite() { + // if file is a fifo, temporarily open it for read + int fd = -1; + struct stat st; + int s = stat(filename.c_str(), &st); + if (s != -1 && S_ISFIFO(st.st_mode)) + fd = ::open(filename.c_str(), O_RDONLY | O_NONBLOCK); + // open file for write in append mode ios_base::openmode mode = fstream::out | fstream::app; - return open(mode); + bool r = open(mode); + + // close fifo + if (fd != -1) + ::close(fd); + return r; } bool StdFile::openTruncate() { --- src/scribe_server.cpp +++ src/scribe_server.cpp @@ -55,6 +55,8 @@ if (-1 == setrlimit(RLIMIT_NOFILE, &r_fd)) { LOG_OPER("setrlimit error (setting max fd size)"); } + + signal(SIGPIPE, SIG_IGN); int next_option; const char* const short_options = "hp:c:"; @@ -110,7 +112,7 @@ } TNonblockingServer server(processor, binaryProtocolFactory, - g_Handler->port, thread_manager); + g_Handler->host, g_Handler->port, thread_manager); LOG_OPER("Starting scribe server on port %lu", g_Handler->port); fflush(stderr); @@ -409,13 +411,13 @@ } -ResultCode scribeHandler::Log(const vector& messages) { - ResultCode result; +ResultCode::type scribeHandler::Log(const vector& messages) { + ResultCode::type result; scribeHandlerLock.acquireRead(); if (throttleRequest(messages)) { - result = TRY_LATER; + result = ResultCode::TRY_LATER; goto end; } @@ -463,7 +465,7 @@ addMessage(*msg_iter, store_list); } - result = OK; + result = ResultCode::OK; end: scribeHandlerLock.release(); @@ -583,6 +585,8 @@ throw runtime_error("No port number configured"); } + config.getString("host", host); + // check if config sets the size to use for the ThreadManager unsigned long int num_threads; if (config.getUnsigned("num_thrift_server_threads", num_threads)) { --- src/scribe_server.h +++ src/scribe_server.h @@ -42,7 +42,7 @@ void initialize(); void reinitialize(); - scribe::thrift::ResultCode Log(const std::vector& messages); + scribe::thrift::ResultCode::type Log(const std::vector& messages); void getVersion(std::string& _return) {_return = "2.2";} facebook::fb303::fb_status getStatus(); @@ -51,6 +51,7 @@ void setStatusDetails(const std::string& new_status_details); unsigned long int port; // it's long because that's all I implemented in the conf class + std::string host; // number of threads processing new Thrift connections size_t numThriftServerThreads;