diff --git a/client/CClient.cpp b/client/CClient.cpp index 453d6d45..83d2164f 100644 --- a/client/CClient.cpp +++ b/client/CClient.cpp @@ -13,6 +13,7 @@ #include "CTimerThread.h" #include "XThread.h" #include "CLog.h" +#include "CStopwatch.h" #include "TMethodJob.h" #include @@ -301,23 +302,36 @@ CClient::runSession(void*) m_compressMouse = false; // handle messages from server + CStopwatch heartbeat; for (;;) { // if no input is pending then flush compressed mouse motion if (input->getSize() == 0) { flushCompressedMouse(); } - // wait for reply + // wait for a message log((CLOG_DEBUG2 "waiting for message")); UInt8 code[4]; - UInt32 n = input->read(code, 4); + UInt32 n = input->read(code, 4, kHeartRate); - // verify we got an entire code + // check if server hungup if (n == 0) { log((CLOG_NOTE "server disconnected")); - // server hungup break; } + + // check for time out + if (n == (UInt32)-1 || heartbeat.getTime() > kHeartRate) { + // send heartbeat + CProtocolUtil::writef(m_output, kMsgCNoop); + heartbeat.reset(); + if (n == (UInt32)-1) { + // no message to process + continue; + } + } + + // verify we got an entire code if (n != 4) { // client sent an incomplete message log((CLOG_ERR "incomplete message from server")); @@ -347,6 +361,10 @@ CClient::runSession(void*) else if (memcmp(code, kMsgDKeyRepeat, 4) == 0) { onKeyRepeat(); } + else if (memcmp(code, kMsgCNoop, 4) == 0) { + // accept and discard no-op + continue; + } else if (memcmp(code, kMsgCEnter, 4) == 0) { onEnter(); } diff --git a/http/CHTTPProtocol.cpp b/http/CHTTPProtocol.cpp index e49bdd00..cea0d64c 100644 --- a/http/CHTTPProtocol.cpp +++ b/http/CHTTPProtocol.cpp @@ -468,7 +468,7 @@ CHTTPProtocol::readLine(IInputStream* stream, CString& tmpBuffer) // read more from stream char buffer[4096]; - UInt32 n = stream->read(buffer, sizeof(buffer)); + UInt32 n = stream->read(buffer, sizeof(buffer), -1.0); if (n == 0) { // stream is empty. return what's leftover. CString line = tmpBuffer; @@ -514,7 +514,7 @@ CHTTPProtocol::readBlock(IInputStream* stream, if (n > numBytes) { n = numBytes; } - n = stream->read(buffer, n); + n = stream->read(buffer, n, -1.0); // if stream is empty then return what we've got so far if (n == 0) { diff --git a/io/CBufferedInputStream.cpp b/io/CBufferedInputStream.cpp index 28c6e7a5..06f553ea 100644 --- a/io/CBufferedInputStream.cpp +++ b/io/CBufferedInputStream.cpp @@ -2,6 +2,7 @@ #include "CLock.h" #include "CMutex.h" #include "CThread.h" +#include "CStopwatch.h" #include "IJob.h" #include "XIO.h" #include @@ -43,15 +44,19 @@ CBufferedInputStream::hangup() } UInt32 -CBufferedInputStream::readNoLock(void* dst, UInt32 n) +CBufferedInputStream::readNoLock(void* dst, UInt32 n, double timeout) { if (m_closed) { throw XIOClosed(); } - // wait for data (or hangup) + // wait for data, hangup, or timeout + CStopwatch timer(true); while (!m_hungup && m_empty == true) { - m_empty.wait(); + if (!m_empty.wait(timer, timeout)) { + // timed out + return (UInt32)-1; + } } // read data @@ -98,10 +103,10 @@ CBufferedInputStream::close() } UInt32 -CBufferedInputStream::read(void* dst, UInt32 n) +CBufferedInputStream::read(void* dst, UInt32 n, double timeout) { CLock lock(m_mutex); - return readNoLock(dst, n); + return readNoLock(dst, n, timeout); } UInt32 diff --git a/io/CBufferedInputStream.h b/io/CBufferedInputStream.h index f47658f3..22ded242 100644 --- a/io/CBufferedInputStream.h +++ b/io/CBufferedInputStream.h @@ -26,7 +26,7 @@ public: void hangup(); // same as read() but caller must lock the mutex - UInt32 readNoLock(void*, UInt32 count); + UInt32 readNoLock(void*, UInt32 count, double timeout); // accessors @@ -36,7 +36,7 @@ public: // IInputStream overrides // these all lock the mutex for their duration virtual void close(); - virtual UInt32 read(void*, UInt32 count); + virtual UInt32 read(void*, UInt32 count, double timeout); virtual UInt32 getSize() const; private: diff --git a/io/CInputStreamFilter.h b/io/CInputStreamFilter.h index 85487226..bb25a261 100644 --- a/io/CInputStreamFilter.h +++ b/io/CInputStreamFilter.h @@ -14,7 +14,7 @@ public: // IInputStream overrides virtual void close() = 0; - virtual UInt32 read(void*, UInt32 maxCount) = 0; + virtual UInt32 read(void*, UInt32 maxCount, double timeout) = 0; virtual UInt32 getSize() const = 0; protected: diff --git a/io/IInputStream.h b/io/IInputStream.h index f025ed40..fc5abaa3 100644 --- a/io/IInputStream.h +++ b/io/IInputStream.h @@ -13,8 +13,11 @@ public: // read up to maxCount bytes into buffer, return number read. // blocks if no data is currently available. if buffer is NULL - // then the data is discarded. - virtual UInt32 read(void* buffer, UInt32 maxCount) = 0; + // then the data is discarded. returns (UInt32)-1 if there's + // no data for timeout seconds; if timeout < 0 then it blocks + // until data is available. + // (cancellation point) + virtual UInt32 read(void* buffer, UInt32 maxCount, double timeout) = 0; // accessors diff --git a/server/CMSWindowsPrimaryScreen.cpp b/server/CMSWindowsPrimaryScreen.cpp index e35d2811..1631ab7f 100644 --- a/server/CMSWindowsPrimaryScreen.cpp +++ b/server/CMSWindowsPrimaryScreen.cpp @@ -386,21 +386,27 @@ CMSWindowsPrimaryScreen::onOpenDisplay() // initialize hook library m_init(m_threadID); - // install the screen saver hook - if (m_installScreenSaver != NULL) { - m_installScreenSaver(); - } + try { + // install the screen saver hook + if (m_installScreenSaver != NULL) { + m_installScreenSaver(); + } - // get the input desktop and switch to it - if (m_is95Family) { - if (!openDesktop()) { - throw XScreenOpenFailure(); + // get the input desktop and switch to it + if (m_is95Family) { + if (!openDesktop()) { + throw XScreenOpenFailure(); + } + } + else { + if (!switchDesktop(openInputDesktop())) { + throw XScreenOpenFailure(); + } } } - else { - if (!switchDesktop(openInputDesktop())) { - throw XScreenOpenFailure(); - } + catch (...) { + m_cleanup(); + throw; } } diff --git a/server/CServer.cpp b/server/CServer.cpp index 6ae97436..5e7004ba 100644 --- a/server/CServer.cpp +++ b/server/CServer.cpp @@ -42,6 +42,7 @@ CServer::CServer(const CString& serverName) : m_active(NULL), m_primaryInfo(NULL), m_seqNum(0), + m_activeSaver(NULL), m_httpServer(NULL), m_httpAvailable(&m_mutex, s_httpMaxSimultaneousRequests) { diff --git a/server/CServerProtocol1_0.cpp b/server/CServerProtocol1_0.cpp index 54eaf81e..ad6f559d 100644 --- a/server/CServerProtocol1_0.cpp +++ b/server/CServerProtocol1_0.cpp @@ -8,6 +8,7 @@ #include "IOutputStream.h" #include "CThread.h" #include "CLog.h" +#include "CStopwatch.h" #include // @@ -29,22 +30,35 @@ CServerProtocol1_0::~CServerProtocol1_0() void CServerProtocol1_0::run() { - // handle messages until the client hangs up + // handle messages until the client hangs up or stops sending heartbeats + CStopwatch heartTimer; for (;;) { CThread::testCancel(); // wait for a message UInt8 code[4]; - UInt32 n = getInputStream()->read(code, 4); + UInt32 n = getInputStream()->read(code, 4, kHeartRate); CThread::testCancel(); - // verify we got an entire code + // check if client hungup if (n == 0) { log((CLOG_NOTE "client \"%s\" disconnected", getClient().c_str())); - - // client hungup return; } + + // check if client has stopped sending heartbeats + if (n == (UInt32)-1) { + if (heartTimer.getTime() > kHeartDeath) { + log((CLOG_NOTE "client \"%s\" is dead", getClient().c_str())); + return; + } + continue; + } + + // got a message so reset heartbeat monitor + heartTimer.reset(); + + // verify we got an entire code if (n != 4) { log((CLOG_ERR "incomplete message from \"%s\": %d bytes", getClient().c_str(), n)); @@ -57,6 +71,10 @@ CServerProtocol1_0::run() if (memcmp(code, kMsgDInfo, 4) == 0) { recvInfo(); } + else if (memcmp(code, kMsgCNoop, 4) == 0) { + // discard no-ops + continue; + } else if (memcmp(code, kMsgCClipboard, 4) == 0) { recvGrabClipboard(); } @@ -83,8 +101,17 @@ CServerProtocol1_0::queryInfo() // wait for and verify reply UInt8 code[4]; - UInt32 n = getInputStream()->read(code, 4); - if (n != 4 && memcmp(code, kMsgDInfo, 4) != 0) { + for (;;) { + UInt32 n = getInputStream()->read(code, 4, -1.0); + if (n == 4) { + if (memcmp(code, kMsgCNoop, 4) == 0) { + // discard heartbeats + continue; + } + if (memcmp(code, kMsgDInfo, 4) == 0) { + break; + } + } throw XBadClient(); } diff --git a/server/CSynergyHook.cpp b/server/CSynergyHook.cpp index 8440013b..262c27a6 100644 --- a/server/CSynergyHook.cpp +++ b/server/CSynergyHook.cpp @@ -468,9 +468,13 @@ init(DWORD threadID) { assert(g_hinstance != NULL); - // see if already initialized + // see if already initialized. if it is we'll shut down and + // reinitialize. this allows the hook DLL to be reclaimed by + // a new synergyd if the previous one died unexpectedly. if (g_threadID != 0) { - return 0; + uninstallScreenSaver(); + uninstall(); + cleanup(); } // save thread id. we'll post messages to this thread's @@ -624,9 +628,10 @@ uninstall(void) UnhookWindowsHookEx(g_getMessage); g_getMessage = NULL; } - g_keyboard = NULL; - g_mouse = NULL; - g_cbt = NULL; + g_keyboard = NULL; + g_mouse = NULL; + g_cbt = NULL; + g_wheelSupport = kWheelNone; // show the cursor restoreCursor(); @@ -664,7 +669,7 @@ uninstallScreenSaver(void) assert(g_hinstance != NULL); // uninstall hook unless the mouse wheel hook is installed - if (g_getMessage != NULL && g_threadID == 0) { + if (g_getMessage != NULL && g_wheelSupport == kWheelNone) { UnhookWindowsHookEx(g_getMessage); g_getMessage = NULL; } diff --git a/synergy/CInputPacketStream.cpp b/synergy/CInputPacketStream.cpp index c29a6a04..7b1b792c 100644 --- a/synergy/CInputPacketStream.cpp +++ b/synergy/CInputPacketStream.cpp @@ -1,5 +1,6 @@ #include "CInputPacketStream.h" #include "CLock.h" +#include "CStopwatch.h" // // CInputPacketStream @@ -26,14 +27,21 @@ CInputPacketStream::close() } UInt32 -CInputPacketStream::read(void* buffer, UInt32 n) +CInputPacketStream::read(void* buffer, UInt32 n, double timeout) { CLock lock(&m_mutex); - // wait for entire message to be read. return immediately if - // stream hungup. - if (!waitForFullMessage()) { + // wait for entire message to be read. return if stream + // hungup or timeout. + switch (waitForFullMessage(timeout)) { + case kData: + break; + + case kHungup: return 0; + + case kTimedout: + return (UInt32)-1; } // limit number of bytes to read to the number of bytes left in the @@ -43,7 +51,7 @@ CInputPacketStream::read(void* buffer, UInt32 n) } // now read from our buffer - n = m_buffer.readNoLock(buffer, n); + n = m_buffer.readNoLock(buffer, n, -1.0); assert(n <= m_size); m_size -= n; @@ -60,48 +68,70 @@ CInputPacketStream::getSize() const UInt32 CInputPacketStream::getSizeNoLock() const { + CStopwatch timer(true); while (!hasFullMessage() && getStream()->getSize() > 0) { // read more data - if (!getMoreMessage()) { + if (getMoreMessage(-1.0) != kData) { // stream hungup - return false; + return 0; } } return m_size; } -bool -CInputPacketStream::waitForFullMessage() const +CInputPacketStream::EResult +CInputPacketStream::waitForFullMessage(double timeout) const { + CStopwatch timer(true); while (!hasFullMessage()) { + // compute remaining timeout + double t = timeout - timer.getTime(); + if (timeout >= 0.0 && t <= 0.0) { + // timeout + return kTimedout; + } + // read more data - if (!getMoreMessage()) { + switch (getMoreMessage(t)) { + case kData: + break; + + case kHungup: // stream hungup - return false; + return kHungup; + + case kTimedout: + // stream timed out + return kTimedout; } } - return true; + return kData; } -bool -CInputPacketStream::getMoreMessage() const +CInputPacketStream::EResult +CInputPacketStream::getMoreMessage(double timeout) const { // read more data char buffer[4096]; - UInt32 n = getStream()->read(buffer, sizeof(buffer)); + UInt32 n = getStream()->read(buffer, sizeof(buffer), timeout); + + // return if stream timed out + if (n == (UInt32)-1) { + return kTimedout; + } // return if stream hungup if (n == 0) { m_buffer.hangup(); - return false; + return kHungup; } // append to our buffer m_buffer.write(buffer, n); - return true; + return kData; } bool @@ -117,7 +147,7 @@ CInputPacketStream::hasFullMessage() const // save payload length UInt8 buffer[4]; - UInt32 n = m_buffer.readNoLock(buffer, sizeof(buffer)); + UInt32 n = m_buffer.readNoLock(buffer, sizeof(buffer), -1.0); assert(n == 4); m_size = ((UInt32)buffer[0] << 24) | ((UInt32)buffer[1] << 16) | diff --git a/synergy/CInputPacketStream.h b/synergy/CInputPacketStream.h index 03fd32be..c8b123d1 100644 --- a/synergy/CInputPacketStream.h +++ b/synergy/CInputPacketStream.h @@ -16,13 +16,15 @@ public: // IInputStream overrides virtual void close(); - virtual UInt32 read(void*, UInt32 maxCount); + virtual UInt32 read(void*, UInt32 maxCount, double timeout); virtual UInt32 getSize() const; private: + enum EResult { kData, kHungup, kTimedout }; + UInt32 getSizeNoLock() const; - bool waitForFullMessage() const; - bool getMoreMessage() const; + EResult waitForFullMessage(double timeout) const; + EResult getMoreMessage(double timeout) const; bool hasFullMessage() const; private: diff --git a/synergy/CProtocolUtil.cpp b/synergy/CProtocolUtil.cpp index 1b3102b2..9426a2f6 100644 --- a/synergy/CProtocolUtil.cpp +++ b/synergy/CProtocolUtil.cpp @@ -348,7 +348,7 @@ CProtocolUtil::read(IInputStream* stream, void* vbuffer, UInt32 count) UInt8* buffer = reinterpret_cast(vbuffer); while (count > 0) { // read more - UInt32 n = stream->read(buffer, count); + UInt32 n = stream->read(buffer, count, -1.0); // bail if stream has hungup if (n == 0) { diff --git a/synergy/ProtocolTypes.h b/synergy/ProtocolTypes.h index 3d2223e2..872a1d7e 100644 --- a/synergy/ProtocolTypes.h +++ b/synergy/ProtocolTypes.h @@ -10,6 +10,12 @@ static const SInt16 kProtocolMinorVersion = 1; // contact port number static const UInt16 kDefaultPort = 24800; +// time between heartbeats (in seconds) +static const double kHeartRate = 2.0; + +// time without a heartbeat that we call death +static const double kHeartDeath = 3.0 * kHeartRate; + // // message codes (trailing NUL is not part of code). in comments, $n // refers to the n'th argument (counting from one). message codes are @@ -24,6 +30,9 @@ static const UInt16 kDefaultPort = 24800; // command codes // +// no operation; secondary -> primary +static const char kMsgCNoop[] = "CNOP"; + // close connection; primary -> secondary static const char kMsgCClose[] = "CBYE";