diff --git a/storage/ndb/include/util/InputStream.hpp b/storage/ndb/include/util/InputStream.hpp index 4aabf2d1160..2b0e6cfc5b9 100644 --- a/storage/ndb/include/util/InputStream.hpp +++ b/storage/ndb/include/util/InputStream.hpp @@ -49,6 +49,7 @@ extern FileInputStream Stdin; class SocketInputStream : public InputStream { NDB_SOCKET_TYPE m_socket; unsigned m_timeout_ms; + unsigned m_timeout_remain; bool m_startover; bool m_timedout; public: diff --git a/storage/ndb/include/util/OutputStream.hpp b/storage/ndb/include/util/OutputStream.hpp index 072d4288229..13975cada3e 100644 --- a/storage/ndb/include/util/OutputStream.hpp +++ b/storage/ndb/include/util/OutputStream.hpp @@ -46,6 +46,7 @@ class SocketOutputStream : public OutputStream { NDB_SOCKET_TYPE m_socket; unsigned m_timeout_ms; bool m_timedout; + unsigned m_timeout_remain; public: SocketOutputStream(NDB_SOCKET_TYPE socket, unsigned write_timeout_ms = 1000); virtual ~SocketOutputStream() {} diff --git a/storage/ndb/include/util/socket_io.h b/storage/ndb/include/util/socket_io.h index a988f4a1e8d..f76b6790b19 100644 --- a/storage/ndb/include/util/socket_io.h +++ b/storage/ndb/include/util/socket_io.h @@ -28,15 +28,20 @@ extern "C" { int read_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len); - int readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, + int readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time, char * buf, int buflen, NdbMutex *mutex); - int write_socket(NDB_SOCKET_TYPE, int timeout_ms, const char[], int len); + int write_socket(NDB_SOCKET_TYPE, int timeout_ms, int *time, + const char[], int len); - int print_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, ...); - int println_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, ...); - int vprint_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, va_list); - int vprintln_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, va_list); + int print_socket(NDB_SOCKET_TYPE, int timeout_ms, int *time, + const char *, ...); + int println_socket(NDB_SOCKET_TYPE, int timeout_ms, int *time, + const char *, ...); + int vprint_socket(NDB_SOCKET_TYPE, int timeout_ms, int *time, + const char *, va_list); + int vprintln_socket(NDB_SOCKET_TYPE, int timeout_ms, int *time, + const char *, va_list); #ifdef __cplusplus } diff --git a/storage/ndb/src/common/util/InputStream.cpp b/storage/ndb/src/common/util/InputStream.cpp index ee7f2d5c8b2..2337344d91a 100644 --- a/storage/ndb/src/common/util/InputStream.cpp +++ b/storage/ndb/src/common/util/InputStream.cpp @@ -37,7 +37,8 @@ SocketInputStream::SocketInputStream(NDB_SOCKET_TYPE socket, unsigned read_timeout_ms) : m_socket(socket) { m_startover= true; - m_timeout_ms = read_timeout_ms; + m_timeout_remain= m_timeout_ms = read_timeout_ms; + m_timedout= false; } @@ -55,9 +56,13 @@ SocketInputStream::gets(char * buf, int bufLen) { else offset= strlen(buf); - int res = readln_socket(m_socket, m_timeout_ms, buf+offset, bufLen-offset, m_mutex); + int time= 0; + int res = readln_socket(m_socket, m_timeout_remain, &time, + buf+offset, bufLen-offset, m_mutex); - if(res == 0) + if(res >= 0) + m_timeout_remain-=time; + if(res == 0 || m_timeout_remain<=0) { m_timedout= true; buf[0]=0; diff --git a/storage/ndb/src/common/util/OutputStream.cpp b/storage/ndb/src/common/util/OutputStream.cpp index ebc352b1b50..0943e47e33f 100644 --- a/storage/ndb/src/common/util/OutputStream.cpp +++ b/storage/ndb/src/common/util/OutputStream.cpp @@ -44,7 +44,7 @@ FileOutputStream::println(const char * fmt, ...){ SocketOutputStream::SocketOutputStream(NDB_SOCKET_TYPE socket, unsigned write_timeout_ms){ m_socket = socket; - m_timeout_ms = write_timeout_ms; + m_timeout_remain= m_timeout_ms = write_timeout_ms; m_timedout= false; } @@ -55,12 +55,18 @@ SocketOutputStream::print(const char * fmt, ...){ if(timedout()) return -1; + int time= 0; va_start(ap, fmt); - const int ret = vprint_socket(m_socket, m_timeout_ms, fmt, ap); + int ret = vprint_socket(m_socket, m_timeout_ms, &time, fmt, ap); va_end(ap); - if (errno==ETIMEDOUT) + if(ret >= 0) + m_timeout_remain-=time; + if(errno==ETIMEDOUT || m_timeout_remain<=0) + { m_timedout= true; + ret= -1; + } return ret; } @@ -71,12 +77,18 @@ SocketOutputStream::println(const char * fmt, ...){ if(timedout()) return -1; + int time= 0; va_start(ap, fmt); - const int ret = vprintln_socket(m_socket, m_timeout_ms, fmt, ap); + int ret = vprintln_socket(m_socket, m_timeout_ms, &time, fmt, ap); va_end(ap); - if (errno==ETIMEDOUT) + if(ret >= 0) + m_timeout_remain-=time; + if (errno==ETIMEDOUT || m_timeout_remain<=0) + { m_timedout= true; + ret= -1; + } return ret; } diff --git a/storage/ndb/src/common/util/socket_io.cpp b/storage/ndb/src/common/util/socket_io.cpp index d19c792e20f..dfdcd19412f 100644 --- a/storage/ndb/src/common/util/socket_io.cpp +++ b/storage/ndb/src/common/util/socket_io.cpp @@ -18,6 +18,7 @@ #include #include #include +#include extern "C" int @@ -47,7 +48,7 @@ read_socket(NDB_SOCKET_TYPE socket, int timeout_millis, extern "C" int -readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, +readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time, char * buf, int buflen, NdbMutex *mutex){ if(buflen <= 1) return 0; @@ -62,7 +63,10 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, if(mutex) NdbMutex_Unlock(mutex); + Uint64 tick= NdbTick_CurrentMillisecond(); const int selectRes = select(socket + 1, &readset, 0, 0, &timeout); + + *time= NdbTick_CurrentMillisecond() - tick; if(mutex) NdbMutex_Lock(mutex); @@ -126,9 +130,13 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, FD_ZERO(&readset); FD_SET(socket, &readset); - timeout.tv_sec = (timeout_millis / 1000); - timeout.tv_usec = (timeout_millis % 1000) * 1000; + timeout.tv_sec = ((timeout_millis - *time) / 1000); + timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000; + + tick= NdbTick_CurrentMillisecond(); const int selectRes = select(socket + 1, &readset, 0, 0, &timeout); + *time= NdbTick_CurrentMillisecond() - tick; + if(selectRes != 1){ return -1; } @@ -139,7 +147,7 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, extern "C" int -write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, +write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time, const char buf[], int len){ fd_set writeset; FD_ZERO(&writeset); @@ -148,7 +156,11 @@ write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, timeout.tv_sec = (timeout_millis / 1000); timeout.tv_usec = (timeout_millis % 1000) * 1000; + + Uint64 tick= NdbTick_CurrentMillisecond(); const int selectRes = select(socket + 1, 0, &writeset, 0, &timeout); + *time= NdbTick_CurrentMillisecond() - tick; + if(selectRes != 1){ return -1; } @@ -167,9 +179,13 @@ write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, FD_ZERO(&writeset); FD_SET(socket, &writeset); - timeout.tv_sec = 1; - timeout.tv_usec = 0; + timeout.tv_sec = ((timeout_millis - *time) / 1000); + timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000; + + Uint64 tick= NdbTick_CurrentMillisecond(); const int selectRes2 = select(socket + 1, 0, &writeset, 0, &timeout); + *time= NdbTick_CurrentMillisecond() - tick; + if(selectRes2 != 1){ return -1; } @@ -180,11 +196,11 @@ write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, extern "C" int -print_socket(NDB_SOCKET_TYPE socket, int timeout_millis, +print_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time, const char * fmt, ...){ va_list ap; va_start(ap, fmt); - int ret = vprint_socket(socket, timeout_millis, fmt, ap); + int ret = vprint_socket(socket, timeout_millis, time, fmt, ap); va_end(ap); return ret; @@ -192,18 +208,18 @@ print_socket(NDB_SOCKET_TYPE socket, int timeout_millis, extern "C" int -println_socket(NDB_SOCKET_TYPE socket, int timeout_millis, +println_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time, const char * fmt, ...){ va_list ap; va_start(ap, fmt); - int ret = vprintln_socket(socket, timeout_millis, fmt, ap); + int ret = vprintln_socket(socket, timeout_millis, time, fmt, ap); va_end(ap); return ret; } extern "C" int -vprint_socket(NDB_SOCKET_TYPE socket, int timeout_millis, +vprint_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time, const char * fmt, va_list ap){ char buf[1000]; char *buf2 = buf; @@ -221,7 +237,7 @@ vprint_socket(NDB_SOCKET_TYPE socket, int timeout_millis, } else return 0; - int ret = write_socket(socket, timeout_millis, buf2, size); + int ret = write_socket(socket, timeout_millis, time, buf2, size); if(buf2 != buf) free(buf2); return ret; @@ -229,7 +245,7 @@ vprint_socket(NDB_SOCKET_TYPE socket, int timeout_millis, extern "C" int -vprintln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, +vprintln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time, const char * fmt, va_list ap){ char buf[1000]; char *buf2 = buf; @@ -249,7 +265,7 @@ vprintln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, } buf2[size-1]='\n'; - int ret = write_socket(socket, timeout_millis, buf2, size); + int ret = write_socket(socket, timeout_millis, time, buf2, size); if(buf2 != buf) free(buf2); return ret; diff --git a/storage/ndb/src/mgmsrv/Services.cpp b/storage/ndb/src/mgmsrv/Services.cpp index d39c4945f69..c658d5f0d6f 100644 --- a/storage/ndb/src/mgmsrv/Services.cpp +++ b/storage/ndb/src/mgmsrv/Services.cpp @@ -1334,20 +1334,21 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) { if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)) { - NDB_SOCKET_TYPE fd= m_clients[i].m_socket; - if(fd != NDB_INVALID_SOCKET) + if(m_clients[i].m_socket==NDB_INVALID_SOCKET) + continue; + + SocketOutputStream out(m_clients[i].m_socket); + + int r; + if (m_clients[i].m_parsable) + r= out.println(str.c_str()); + else + r= out.println(m_text); + + if (r<0) { - int r; - if (m_clients[i].m_parsable) - r= println_socket(fd, - MAX_WRITE_TIMEOUT, str.c_str()); - else - r= println_socket(fd, - MAX_WRITE_TIMEOUT, m_text); - if (r == -1) { - copy.push_back(fd); - m_clients.erase(i, false); - } + copy.push_back(m_clients[i].m_socket); + m_clients.erase(i, false); } } } @@ -1398,14 +1399,16 @@ Ndb_mgmd_event_service::check_listeners() m_clients.lock(); for(i= m_clients.size() - 1; i >= 0; i--) { - int fd= m_clients[i].m_socket; - DBUG_PRINT("info",("%d %d",i,fd)); - char buf[1]; - buf[0]=0; - if (fd != NDB_INVALID_SOCKET && - println_socket(fd,MAX_WRITE_TIMEOUT,"") == -1) + if(m_clients[i].m_socket==NDB_INVALID_SOCKET) + continue; + + SocketOutputStream out(m_clients[i].m_socket); + + DBUG_PRINT("info",("%d %d",i,m_clients[i].m_socket)); + + if(out.println("") < 0) { - NDB_CLOSE_SOCKET(fd); + NDB_CLOSE_SOCKET(m_clients[i].m_socket); m_clients.erase(i, false); n=1; }