From ed8ed72f2600e9422c1b1952533b6bd3f08eb155 Mon Sep 17 00:00:00 2001 From: crs Date: Wed, 26 Jun 2002 16:31:48 +0000 Subject: [PATCH] synergy hook DLL will now restart itself if a client tries to init() it while it's already running. fixed an uninitialized pointer bug in CServer and some cleanup-on-error code in CMSWindowsPrimaryScreen. also added timeout to read() on IInputStream and a heartbeat sent by clients so the server can disconnect clients that are dead but never reset the TCP connection. previously the server would keep these dead clients around forever and if the user was locked on the client screen for some reason then the server would have to be rebooted (or the server would have to be killed via a remote login). --- client/CClient.cpp | 26 ++++++++++-- http/CHTTPProtocol.cpp | 4 +- io/CBufferedInputStream.cpp | 15 ++++--- io/CBufferedInputStream.h | 4 +- io/CInputStreamFilter.h | 2 +- io/IInputStream.h | 7 +++- server/CMSWindowsPrimaryScreen.cpp | 30 ++++++++------ server/CServer.cpp | 1 + server/CServerProtocol1_0.cpp | 41 +++++++++++++++---- server/CSynergyHook.cpp | 17 +++++--- synergy/CInputPacketStream.cpp | 66 ++++++++++++++++++++++-------- synergy/CInputPacketStream.h | 8 ++-- synergy/CProtocolUtil.cpp | 2 +- synergy/ProtocolTypes.h | 9 ++++ 14 files changed, 169 insertions(+), 63 deletions(-) diff --git a/client/CClient.cpp b/client/CClient.cpp index 453d6d45..83d2164f 100644 --- a/client/CClient.cpp +++ b/client/CClient.cpp @@ -13,6 +13,7 @@ #include "CTimerThread.h" #include "XThread.h" #include "CLog.h" +#include "CStopwatch.h" #include "TMethodJob.h" #include @@ -301,23 +302,36 @@ CClient::runSession(void*) m_compressMouse = false; // handle messages from server + CStopwatch heartbeat; for (;;) { // if no input is pending then flush compressed mouse motion if (input->getSize() == 0) { flushCompressedMouse(); } - // wait for reply + // wait for a message log((CLOG_DEBUG2 "waiting for message")); UInt8 code[4]; - UInt32 n = input->read(code, 4); + UInt32 n = input->read(code, 4, kHeartRate); - // verify we got an entire code + // check if server hungup if (n == 0) { log((CLOG_NOTE "server disconnected")); - // server hungup break; } + + // check for time out + if (n == (UInt32)-1 || heartbeat.getTime() > kHeartRate) { + // send heartbeat + CProtocolUtil::writef(m_output, kMsgCNoop); + heartbeat.reset(); + if (n == (UInt32)-1) { + // no message to process + continue; + } + } + + // verify we got an entire code if (n != 4) { // client sent an incomplete message log((CLOG_ERR "incomplete message from server")); @@ -347,6 +361,10 @@ CClient::runSession(void*) else if (memcmp(code, kMsgDKeyRepeat, 4) == 0) { onKeyRepeat(); } + else if (memcmp(code, kMsgCNoop, 4) == 0) { + // accept and discard no-op + continue; + } else if (memcmp(code, kMsgCEnter, 4) == 0) { onEnter(); } diff --git a/http/CHTTPProtocol.cpp b/http/CHTTPProtocol.cpp index e49bdd00..cea0d64c 100644 --- a/http/CHTTPProtocol.cpp +++ b/http/CHTTPProtocol.cpp @@ -468,7 +468,7 @@ CHTTPProtocol::readLine(IInputStream* stream, CString& tmpBuffer) // read more from stream char buffer[4096]; - UInt32 n = stream->read(buffer, sizeof(buffer)); + UInt32 n = stream->read(buffer, sizeof(buffer), -1.0); if (n == 0) { // stream is empty. return what's leftover. CString line = tmpBuffer; @@ -514,7 +514,7 @@ CHTTPProtocol::readBlock(IInputStream* stream, if (n > numBytes) { n = numBytes; } - n = stream->read(buffer, n); + n = stream->read(buffer, n, -1.0); // if stream is empty then return what we've got so far if (n == 0) { diff --git a/io/CBufferedInputStream.cpp b/io/CBufferedInputStream.cpp index 28c6e7a5..06f553ea 100644 --- a/io/CBufferedInputStream.cpp +++ b/io/CBufferedInputStream.cpp @@ -2,6 +2,7 @@ #include "CLock.h" #include "CMutex.h" #include "CThread.h" +#include "CStopwatch.h" #include "IJob.h" #include "XIO.h" #include @@ -43,15 +44,19 @@ CBufferedInputStream::hangup() } UInt32 -CBufferedInputStream::readNoLock(void* dst, UInt32 n) +CBufferedInputStream::readNoLock(void* dst, UInt32 n, double timeout) { if (m_closed) { throw XIOClosed(); } - // wait for data (or hangup) + // wait for data, hangup, or timeout + CStopwatch timer(true); while (!m_hungup && m_empty == true) { - m_empty.wait(); + if (!m_empty.wait(timer, timeout)) { + // timed out + return (UInt32)-1; + } } // read data @@ -98,10 +103,10 @@ CBufferedInputStream::close() } UInt32 -CBufferedInputStream::read(void* dst, UInt32 n) +CBufferedInputStream::read(void* dst, UInt32 n, double timeout) { CLock lock(m_mutex); - return readNoLock(dst, n); + return readNoLock(dst, n, timeout); } UInt32 diff --git a/io/CBufferedInputStream.h b/io/CBufferedInputStream.h index f47658f3..22ded242 100644 --- a/io/CBufferedInputStream.h +++ b/io/CBufferedInputStream.h @@ -26,7 +26,7 @@ public: void hangup(); // same as read() but caller must lock the mutex - UInt32 readNoLock(void*, UInt32 count); + UInt32 readNoLock(void*, UInt32 count, double timeout); // accessors @@ -36,7 +36,7 @@ public: // IInputStream overrides // these all lock the mutex for their duration virtual void close(); - virtual UInt32 read(void*, UInt32 count); + virtual UInt32 read(void*, UInt32 count, double timeout); virtual UInt32 getSize() const; private: diff --git a/io/CInputStreamFilter.h b/io/CInputStreamFilter.h index 85487226..bb25a261 100644 --- a/io/CInputStreamFilter.h +++ b/io/CInputStreamFilter.h @@ -14,7 +14,7 @@ public: // IInputStream overrides virtual void close() = 0; - virtual UInt32 read(void*, UInt32 maxCount) = 0; + virtual UInt32 read(void*, UInt32 maxCount, double timeout) = 0; virtual UInt32 getSize() const = 0; protected: diff --git a/io/IInputStream.h b/io/IInputStream.h index f025ed40..fc5abaa3 100644 --- a/io/IInputStream.h +++ b/io/IInputStream.h @@ -13,8 +13,11 @@ public: // read up to maxCount bytes into buffer, return number read. // blocks if no data is currently available. if buffer is NULL - // then the data is discarded. - virtual UInt32 read(void* buffer, UInt32 maxCount) = 0; + // then the data is discarded. returns (UInt32)-1 if there's + // no data for timeout seconds; if timeout < 0 then it blocks + // until data is available. + // (cancellation point) + virtual UInt32 read(void* buffer, UInt32 maxCount, double timeout) = 0; // accessors diff --git a/server/CMSWindowsPrimaryScreen.cpp b/server/CMSWindowsPrimaryScreen.cpp index e35d2811..1631ab7f 100644 --- a/server/CMSWindowsPrimaryScreen.cpp +++ b/server/CMSWindowsPrimaryScreen.cpp @@ -386,21 +386,27 @@ CMSWindowsPrimaryScreen::onOpenDisplay() // initialize hook library m_init(m_threadID); - // install the screen saver hook - if (m_installScreenSaver != NULL) { - m_installScreenSaver(); - } + try { + // install the screen saver hook + if (m_installScreenSaver != NULL) { + m_installScreenSaver(); + } - // get the input desktop and switch to it - if (m_is95Family) { - if (!openDesktop()) { - throw XScreenOpenFailure(); + // get the input desktop and switch to it + if (m_is95Family) { + if (!openDesktop()) { + throw XScreenOpenFailure(); + } + } + else { + if (!switchDesktop(openInputDesktop())) { + throw XScreenOpenFailure(); + } } } - else { - if (!switchDesktop(openInputDesktop())) { - throw XScreenOpenFailure(); - } + catch (...) { + m_cleanup(); + throw; } } diff --git a/server/CServer.cpp b/server/CServer.cpp index 6ae97436..5e7004ba 100644 --- a/server/CServer.cpp +++ b/server/CServer.cpp @@ -42,6 +42,7 @@ CServer::CServer(const CString& serverName) : m_active(NULL), m_primaryInfo(NULL), m_seqNum(0), + m_activeSaver(NULL), m_httpServer(NULL), m_httpAvailable(&m_mutex, s_httpMaxSimultaneousRequests) { diff --git a/server/CServerProtocol1_0.cpp b/server/CServerProtocol1_0.cpp index 54eaf81e..ad6f559d 100644 --- a/server/CServerProtocol1_0.cpp +++ b/server/CServerProtocol1_0.cpp @@ -8,6 +8,7 @@ #include "IOutputStream.h" #include "CThread.h" #include "CLog.h" +#include "CStopwatch.h" #include // @@ -29,22 +30,35 @@ CServerProtocol1_0::~CServerProtocol1_0() void CServerProtocol1_0::run() { - // handle messages until the client hangs up + // handle messages until the client hangs up or stops sending heartbeats + CStopwatch heartTimer; for (;;) { CThread::testCancel(); // wait for a message UInt8 code[4]; - UInt32 n = getInputStream()->read(code, 4); + UInt32 n = getInputStream()->read(code, 4, kHeartRate); CThread::testCancel(); - // verify we got an entire code + // check if client hungup if (n == 0) { log((CLOG_NOTE "client \"%s\" disconnected", getClient().c_str())); - - // client hungup return; } + + // check if client has stopped sending heartbeats + if (n == (UInt32)-1) { + if (heartTimer.getTime() > kHeartDeath) { + log((CLOG_NOTE "client \"%s\" is dead", getClient().c_str())); + return; + } + continue; + } + + // got a message so reset heartbeat monitor + heartTimer.reset(); + + // verify we got an entire code if (n != 4) { log((CLOG_ERR "incomplete message from \"%s\": %d bytes", getClient().c_str(), n)); @@ -57,6 +71,10 @@ CServerProtocol1_0::run() if (memcmp(code, kMsgDInfo, 4) == 0) { recvInfo(); } + else if (memcmp(code, kMsgCNoop, 4) == 0) { + // discard no-ops + continue; + } else if (memcmp(code, kMsgCClipboard, 4) == 0) { recvGrabClipboard(); } @@ -83,8 +101,17 @@ CServerProtocol1_0::queryInfo() // wait for and verify reply UInt8 code[4]; - UInt32 n = getInputStream()->read(code, 4); - if (n != 4 && memcmp(code, kMsgDInfo, 4) != 0) { + for (;;) { + UInt32 n = getInputStream()->read(code, 4, -1.0); + if (n == 4) { + if (memcmp(code, kMsgCNoop, 4) == 0) { + // discard heartbeats + continue; + } + if (memcmp(code, kMsgDInfo, 4) == 0) { + break; + } + } throw XBadClient(); } diff --git a/server/CSynergyHook.cpp b/server/CSynergyHook.cpp index 8440013b..262c27a6 100644 --- a/server/CSynergyHook.cpp +++ b/server/CSynergyHook.cpp @@ -468,9 +468,13 @@ init(DWORD threadID) { assert(g_hinstance != NULL); - // see if already initialized + // see if already initialized. if it is we'll shut down and + // reinitialize. this allows the hook DLL to be reclaimed by + // a new synergyd if the previous one died unexpectedly. if (g_threadID != 0) { - return 0; + uninstallScreenSaver(); + uninstall(); + cleanup(); } // save thread id. we'll post messages to this thread's @@ -624,9 +628,10 @@ uninstall(void) UnhookWindowsHookEx(g_getMessage); g_getMessage = NULL; } - g_keyboard = NULL; - g_mouse = NULL; - g_cbt = NULL; + g_keyboard = NULL; + g_mouse = NULL; + g_cbt = NULL; + g_wheelSupport = kWheelNone; // show the cursor restoreCursor(); @@ -664,7 +669,7 @@ uninstallScreenSaver(void) assert(g_hinstance != NULL); // uninstall hook unless the mouse wheel hook is installed - if (g_getMessage != NULL && g_threadID == 0) { + if (g_getMessage != NULL && g_wheelSupport == kWheelNone) { UnhookWindowsHookEx(g_getMessage); g_getMessage = NULL; } diff --git a/synergy/CInputPacketStream.cpp b/synergy/CInputPacketStream.cpp index c29a6a04..7b1b792c 100644 --- a/synergy/CInputPacketStream.cpp +++ b/synergy/CInputPacketStream.cpp @@ -1,5 +1,6 @@ #include "CInputPacketStream.h" #include "CLock.h" +#include "CStopwatch.h" // // CInputPacketStream @@ -26,14 +27,21 @@ CInputPacketStream::close() } UInt32 -CInputPacketStream::read(void* buffer, UInt32 n) +CInputPacketStream::read(void* buffer, UInt32 n, double timeout) { CLock lock(&m_mutex); - // wait for entire message to be read. return immediately if - // stream hungup. - if (!waitForFullMessage()) { + // wait for entire message to be read. return if stream + // hungup or timeout. + switch (waitForFullMessage(timeout)) { + case kData: + break; + + case kHungup: return 0; + + case kTimedout: + return (UInt32)-1; } // limit number of bytes to read to the number of bytes left in the @@ -43,7 +51,7 @@ CInputPacketStream::read(void* buffer, UInt32 n) } // now read from our buffer - n = m_buffer.readNoLock(buffer, n); + n = m_buffer.readNoLock(buffer, n, -1.0); assert(n <= m_size); m_size -= n; @@ -60,48 +68,70 @@ CInputPacketStream::getSize() const UInt32 CInputPacketStream::getSizeNoLock() const { + CStopwatch timer(true); while (!hasFullMessage() && getStream()->getSize() > 0) { // read more data - if (!getMoreMessage()) { + if (getMoreMessage(-1.0) != kData) { // stream hungup - return false; + return 0; } } return m_size; } -bool -CInputPacketStream::waitForFullMessage() const +CInputPacketStream::EResult +CInputPacketStream::waitForFullMessage(double timeout) const { + CStopwatch timer(true); while (!hasFullMessage()) { + // compute remaining timeout + double t = timeout - timer.getTime(); + if (timeout >= 0.0 && t <= 0.0) { + // timeout + return kTimedout; + } + // read more data - if (!getMoreMessage()) { + switch (getMoreMessage(t)) { + case kData: + break; + + case kHungup: // stream hungup - return false; + return kHungup; + + case kTimedout: + // stream timed out + return kTimedout; } } - return true; + return kData; } -bool -CInputPacketStream::getMoreMessage() const +CInputPacketStream::EResult +CInputPacketStream::getMoreMessage(double timeout) const { // read more data char buffer[4096]; - UInt32 n = getStream()->read(buffer, sizeof(buffer)); + UInt32 n = getStream()->read(buffer, sizeof(buffer), timeout); + + // return if stream timed out + if (n == (UInt32)-1) { + return kTimedout; + } // return if stream hungup if (n == 0) { m_buffer.hangup(); - return false; + return kHungup; } // append to our buffer m_buffer.write(buffer, n); - return true; + return kData; } bool @@ -117,7 +147,7 @@ CInputPacketStream::hasFullMessage() const // save payload length UInt8 buffer[4]; - UInt32 n = m_buffer.readNoLock(buffer, sizeof(buffer)); + UInt32 n = m_buffer.readNoLock(buffer, sizeof(buffer), -1.0); assert(n == 4); m_size = ((UInt32)buffer[0] << 24) | ((UInt32)buffer[1] << 16) | diff --git a/synergy/CInputPacketStream.h b/synergy/CInputPacketStream.h index 03fd32be..c8b123d1 100644 --- a/synergy/CInputPacketStream.h +++ b/synergy/CInputPacketStream.h @@ -16,13 +16,15 @@ public: // IInputStream overrides virtual void close(); - virtual UInt32 read(void*, UInt32 maxCount); + virtual UInt32 read(void*, UInt32 maxCount, double timeout); virtual UInt32 getSize() const; private: + enum EResult { kData, kHungup, kTimedout }; + UInt32 getSizeNoLock() const; - bool waitForFullMessage() const; - bool getMoreMessage() const; + EResult waitForFullMessage(double timeout) const; + EResult getMoreMessage(double timeout) const; bool hasFullMessage() const; private: diff --git a/synergy/CProtocolUtil.cpp b/synergy/CProtocolUtil.cpp index 1b3102b2..9426a2f6 100644 --- a/synergy/CProtocolUtil.cpp +++ b/synergy/CProtocolUtil.cpp @@ -348,7 +348,7 @@ CProtocolUtil::read(IInputStream* stream, void* vbuffer, UInt32 count) UInt8* buffer = reinterpret_cast(vbuffer); while (count > 0) { // read more - UInt32 n = stream->read(buffer, count); + UInt32 n = stream->read(buffer, count, -1.0); // bail if stream has hungup if (n == 0) { diff --git a/synergy/ProtocolTypes.h b/synergy/ProtocolTypes.h index 3d2223e2..872a1d7e 100644 --- a/synergy/ProtocolTypes.h +++ b/synergy/ProtocolTypes.h @@ -10,6 +10,12 @@ static const SInt16 kProtocolMinorVersion = 1; // contact port number static const UInt16 kDefaultPort = 24800; +// time between heartbeats (in seconds) +static const double kHeartRate = 2.0; + +// time without a heartbeat that we call death +static const double kHeartDeath = 3.0 * kHeartRate; + // // message codes (trailing NUL is not part of code). in comments, $n // refers to the n'th argument (counting from one). message codes are @@ -24,6 +30,9 @@ static const UInt16 kDefaultPort = 24800; // command codes // +// no operation; secondary -> primary +static const char kMsgCNoop[] = "CNOP"; + // close connection; primary -> secondary static const char kMsgCClose[] = "CBYE";