From b3291bc2b524d96f3bde9eddae74754298929d07 Mon Sep 17 00:00:00 2001 From: crs Date: Sun, 14 Oct 2001 18:29:43 +0000 Subject: [PATCH] fixed timeout bug in CThreadRep::wait() (negative timeout wouldn't wait forever). also fixed early return from sleep due to signal. now forcing client to initialize CThread to ensure global mutex gets initialized before threads are used. --- base/CLog.h | 3 +++ mt/CMutex.cpp | 8 ++++++ mt/CThread.cpp | 5 ++++ mt/CThread.h | 5 ++++ mt/CThreadRep.cpp | 47 ++++++++++++++++++++++------------ mt/CThreadRep.h | 7 +++-- mt/CTimerThread.cpp | 3 +++ synergy/CClient.cpp | 15 +++++++++-- synergy/CClient.h | 3 +++ synergy/CProtocolUtil.cpp | 5 ++++ synergy/CServerProtocol1_0.cpp | 6 +++++ synergy/client.cpp | 3 +++ synergy/server.cpp | 3 +++ 13 files changed, 92 insertions(+), 21 deletions(-) diff --git a/base/CLog.h b/base/CLog.h index 0d44c3e7..3116142b 100644 --- a/base/CLog.h +++ b/base/CLog.h @@ -16,12 +16,15 @@ class CLog { #if defined(NOLOGGING) #define log(_a1) +#define logc(_a1, _a2) #define CLOG_TRACE #elif defined(NDEBUG) #define log(_a1) CLog::print _a1 +#define logc(_a1, _a2) if (_a1) CLog::print _a2 #define CLOG_TRACE #else #define log(_a1) CLog::printt _a1 +#define logc(_a1, _a2) if (_a1) CLog::printt _a2 #define CLOG_TRACE __FILE__, __LINE__, #endif diff --git a/mt/CMutex.cpp b/mt/CMutex.cpp index 3b203d50..311df19a 100644 --- a/mt/CMutex.cpp +++ b/mt/CMutex.cpp @@ -1,4 +1,5 @@ #include "CMutex.h" +#include "CLog.h" #include // @@ -40,10 +41,15 @@ void CMutex::init() m_mutex = reinterpret_cast(mutex); } +#include +#include +#include +#include void CMutex::fini() { pthread_mutex_t* mutex = reinterpret_cast(m_mutex); int status = pthread_mutex_destroy(mutex); + logc(status != 0, (CLOG_ERR "pthread_mutex_destroy status %d", status)); assert(status == 0); delete mutex; } @@ -67,6 +73,7 @@ void CMutex::lock() const break; default: + log((CLOG_ERR "pthread_mutex_lock status %d", status)); assert(0 && "unexpected error"); } } @@ -86,6 +93,7 @@ void CMutex::unlock() const break; default: + log((CLOG_ERR "pthread_mutex_unlock status %d", status)); assert(0 && "unexpected error"); } } diff --git a/mt/CThread.cpp b/mt/CThread.cpp index 17fd85c1..0b2a098a 100644 --- a/mt/CThread.cpp +++ b/mt/CThread.cpp @@ -58,6 +58,11 @@ CThread& CThread::operator=(const CThread& thread) return *this; } +void CThread::init() +{ + CThreadRep::initThreads(); +} + void CThread::sleep(double timeout) { CThreadPtr currentRep(CThreadRep::getCurrentThreadRep()); diff --git a/mt/CThread.h b/mt/CThread.h index b5cf6d49..f505ad00 100644 --- a/mt/CThread.h +++ b/mt/CThread.h @@ -28,6 +28,11 @@ class CThread { // start a new thread. CThread& operator=(const CThread&); + // initialize the thread library. this must be called before + // any other thread methods or creating a thread object. it + // is harmless to call init() multiple times. + static void init(); + // the calling thread sleeps for the given number of seconds. if // timeout <= 0.0 then the call returns immediately. if timeout // == 0.0 then the calling thread yields the CPU. diff --git a/mt/CThreadRep.cpp b/mt/CThreadRep.cpp index 296b50da..87847f29 100644 --- a/mt/CThreadRep.cpp +++ b/mt/CThreadRep.cpp @@ -1,7 +1,8 @@ #include "CThreadRep.h" #include "CThread.h" -#include "XThread.h" +#include "CMutex.h" #include "CLock.h" +#include "XThread.h" #include "IJob.h" #include @@ -16,7 +17,7 @@ class XThreadUnavailable { }; // CThreadRep // -CMutex CThreadRep::s_mutex; +CMutex* CThreadRep::s_mutex = NULL; CThreadRep* CThreadRep::s_head = NULL; CThreadRep::CThreadRep() : m_prev(NULL), @@ -26,6 +27,7 @@ CThreadRep::CThreadRep() : m_prev(NULL), m_userData(NULL) { // note -- s_mutex must be locked on entry + assert(s_mutex != NULL); // initialize stuff init(); @@ -65,6 +67,7 @@ CThreadRep::CThreadRep(IJob* job, void* userData) : m_userData(userData) { assert(m_job != NULL); + assert(s_mutex != NULL); // create a thread rep for the main thread if the current thread // is unknown. note that this might cause multiple "main" threads @@ -75,7 +78,7 @@ CThreadRep::CThreadRep(IJob* job, void* userData) : init(); // hold mutex while we create the thread - CLock lock(&s_mutex); + CLock lock(s_mutex); // start the thread. throw if it doesn't start. #if defined(CONFIG_PTHREADS) @@ -124,15 +127,22 @@ CThreadRep::~CThreadRep() fini(); } +void CThreadRep::initThreads() +{ + if (s_mutex == NULL) { + s_mutex = new CMutex; + } +} + void CThreadRep::ref() { - CLock lock(&s_mutex); + CLock lock(s_mutex); ++m_refCount; } void CThreadRep::unref() { - CLock lock(&s_mutex); + CLock lock(s_mutex); if (--m_refCount == 0) { delete this; } @@ -140,7 +150,7 @@ void CThreadRep::unref() bool CThreadRep::enableCancel(bool enable) { - CLock lock(&s_mutex); + CLock lock(s_mutex); const bool old = m_cancellable; m_cancellable = enable; return old; @@ -148,7 +158,7 @@ bool CThreadRep::enableCancel(bool enable) bool CThreadRep::isCancellable() const { - CLock lock(&s_mutex); + CLock lock(s_mutex); return (m_cancellable && !m_cancelling); } @@ -166,6 +176,8 @@ void* CThreadRep::getUserData() const CThreadRep* CThreadRep::getCurrentThreadRep() { + assert(s_mutex != NULL); + #if defined(CONFIG_PTHREADS) const pthread_t thread = pthread_self(); #elif defined(CONFIG_PLATFORM_WIN32) @@ -173,7 +185,7 @@ CThreadRep* CThreadRep::getCurrentThreadRep() #endif // lock list while we search - CLock lock(&s_mutex); + CLock lock(s_mutex); // search CThreadRep* scan = s_head; @@ -207,7 +219,7 @@ void CThreadRep::doThreadFunc() setPriority(1); // wait for parent to initialize this object - { CLock lock(&s_mutex); } + { CLock lock(s_mutex); } void* result = NULL; try { @@ -263,12 +275,13 @@ void CThreadRep::sleep(double timeout) struct timespec t; t.tv_sec = (long)timeout; t.tv_nsec = (long)(1000000000.0 * (timeout - (double)t.tv_sec)); - nanosleep(&t, NULL); + while (nanosleep(&t, &t) < 0) + testCancel(); } void CThreadRep::cancel() { - CLock lock(&s_mutex); + CLock lock(s_mutex); if (m_cancellable && !m_cancelling) { m_cancel = true; @@ -281,7 +294,7 @@ void CThreadRep::testCancel() { { // prevent further cancellation - CLock lock(&s_mutex); + CLock lock(s_mutex); if (!m_cancel || !m_cancellable || m_cancelling) return; @@ -303,14 +316,14 @@ bool CThreadRep::wait(CThreadRep* target, double timeout) if (target->isExited()) return true; - if (timeout > 0.0) { + if (timeout != 0.0) { CStopwatch timer; do { sleep(0.05); testCancel(); if (target->isExited()) return true; - } while (timer.getTime() <= timeout); + } while (timeout < 0.0 || timer.getTime() <= timeout); } return false; @@ -323,7 +336,7 @@ void CThreadRep::setPriority(int) bool CThreadRep::isExited() const { - CLock lock(&s_mutex); + CLock lock(s_mutex); return m_exit; } @@ -341,7 +354,7 @@ void* CThreadRep::threadFunc(void* arg) rep->unref(); // mark as terminated - CLock lock(&s_mutex); + CLock lock(s_mutex); rep->m_exit = true; // terminate the thread @@ -400,7 +413,7 @@ void CThreadRep::testCancel() { // ignore if disabled or already cancelling - CLock lock(&s_mutex); + CLock lock(s_mutex); if (!m_cancellable || m_cancelling) return; diff --git a/mt/CThreadRep.h b/mt/CThreadRep.h index 89bb6131..3cfce4b9 100644 --- a/mt/CThreadRep.h +++ b/mt/CThreadRep.h @@ -1,7 +1,6 @@ #ifndef CTHREADREP_H #define CTHREADREP_H -#include "CMutex.h" #include "BasicTypes.h" #if defined(CONFIG_PTHREADS) @@ -10,6 +9,7 @@ #include #endif +class CMutex; class IJob; class CThreadRep { @@ -18,6 +18,9 @@ class CThreadRep { // manipulators + // initialize the thread library + static void initThreads(); + // change ref count void ref(); void unref(); @@ -92,7 +95,7 @@ class CThreadRep { CThreadRep& operator=(const CThreadRep&); private: - static CMutex s_mutex; + static CMutex* s_mutex; static CThreadRep* s_head; CThreadRep* m_prev; diff --git a/mt/CTimerThread.cpp b/mt/CTimerThread.cpp index afe2925c..d1d7d48f 100644 --- a/mt/CTimerThread.cpp +++ b/mt/CTimerThread.cpp @@ -1,6 +1,7 @@ #include "CTimerThread.h" #include "CThread.h" #include "TMethodJob.h" +#include "CLog.h" #include // @@ -24,7 +25,9 @@ CTimerThread::~CTimerThread() void CTimerThread::timer(void*) { + log((CLOG_DEBUG "timeout in %f seconds", m_timeout)); CThread::sleep(m_timeout); + log((CLOG_DEBUG "timeout")); m_callingThread->cancel(); } diff --git a/synergy/CClient.cpp b/synergy/CClient.cpp index 6fb1d215..8cd8fb61 100644 --- a/synergy/CClient.cpp +++ b/synergy/CClient.cpp @@ -4,8 +4,10 @@ #include "CProtocolUtil.h" #include "ISecondaryScreen.h" #include "ProtocolTypes.h" +#include "CThread.h" #include "CTimerThread.h" #include "XSynergy.h" +#include "TMethodJob.h" #include "CLog.h" #include @@ -19,15 +21,24 @@ CClient::CClient(const CString& clientName) : m_output(NULL), m_screen(NULL) { + // do nothing } CClient::~CClient() { + // do nothing +} + +void CClient::run(const CNetworkAddress& serverAddress) +{ + m_serverAddress = &serverAddress; + CThread thread(new TMethodJob(this, &CClient::runSession)); + thread.wait(); } #include "CTCPSocket.h" #include "CXWindowsSecondaryScreen.h" -void CClient::run(const CNetworkAddress& serverAddress) +void CClient::runSession(void*) { log((CLOG_DEBUG "starting client \"%s\"", m_name.c_str())); @@ -41,7 +52,7 @@ void CClient::run(const CNetworkAddress& serverAddress) // create socket and attempt to connect to server log((CLOG_DEBUG "connecting to server")); socket.reset(new CTCPSocket()); // FIXME -- use factory - socket->connect(serverAddress); + socket->connect(*m_serverAddress); log((CLOG_INFO "connected to server")); // get the input and output streams diff --git a/synergy/CClient.h b/synergy/CClient.h index 825e973f..ddcd3a09 100644 --- a/synergy/CClient.h +++ b/synergy/CClient.h @@ -22,6 +22,8 @@ class CClient { private: + void runSession(void*); + // message handlers void onEnter(); void onLeave(); @@ -53,6 +55,7 @@ class CClient { IInputStream* m_input; IOutputStream* m_output; ISecondaryScreen* m_screen; + const CNetworkAddress* m_serverAddress; }; #endif diff --git a/synergy/CProtocolUtil.cpp b/synergy/CProtocolUtil.cpp index ec0cc2cc..e22d24c4 100644 --- a/synergy/CProtocolUtil.cpp +++ b/synergy/CProtocolUtil.cpp @@ -52,6 +52,7 @@ void CProtocolUtil::readf(IInputStream* stream, { assert(stream != NULL); assert(fmt != NULL); + log((CLOG_DEBUG "readf(%s)", fmt)); va_list args; va_start(args, fmt); @@ -93,6 +94,7 @@ void CProtocolUtil::readf(IInputStream* stream, static_cast(buffer[3]); break; } + log((CLOG_DEBUG "readf: read %d byte integer: %d (0x%x)", len, *v, *v)); break; } @@ -129,6 +131,7 @@ void CProtocolUtil::readf(IInputStream* stream, } throw; } + log((CLOG_DEBUG "readf: read %d byte string: %.*s", len, len, sBuffer)); // save the data CString* dst = va_arg(args, CString*); @@ -159,6 +162,7 @@ void CProtocolUtil::readf(IInputStream* stream, // verify match if (buffer[0] != *fmt) { + log((CLOG_DEBUG "readf: format mismatch: %c vs %c", *fmt, buffer[0])); throw XIOReadMismatch(); } @@ -322,6 +326,7 @@ void CProtocolUtil::read(IInputStream* stream, // bail if stream has hungup if (n == 0) { + log((CLOG_DEBUG "unexpected disconnect in readf(), %d bytes left", count)); throw XIOEndOfStream(); } diff --git a/synergy/CServerProtocol1_0.cpp b/synergy/CServerProtocol1_0.cpp index 746e0c1a..de28aa81 100644 --- a/synergy/CServerProtocol1_0.cpp +++ b/synergy/CServerProtocol1_0.cpp @@ -32,10 +32,14 @@ void CServerProtocol1_0::run() // verify we got an entire code if (n == 0) { + log((CLOG_NOTE "client \"%s\" disconnected", getClient().c_str())); + // client hungup return; } if (n != 4) { + log((CLOG_ERR "incomplete message from \"%s\": %d bytes", getClient().c_str(), n)); + // client sent an incomplete message throw XBadClient(); } @@ -47,6 +51,8 @@ void CServerProtocol1_0::run() } // FIXME -- more message here else { + log((CLOG_ERR "unknown message from client \"%s\"", getClient().c_str())); + // unknown message throw XBadClient(); } diff --git a/synergy/client.cpp b/synergy/client.cpp index b1c3c16b..4ada77d9 100644 --- a/synergy/client.cpp +++ b/synergy/client.cpp @@ -1,9 +1,12 @@ #include "CClient.h" #include "CNetworkAddress.h" +#include "CThread.h" #include int main(int argc, char** argv) { + CThread::init(); + if (argc != 2) { fprintf(stderr, "usage: %s \n", argv[0]); return 1; diff --git a/synergy/server.cpp b/synergy/server.cpp index 29c8466b..ef1b578f 100644 --- a/synergy/server.cpp +++ b/synergy/server.cpp @@ -1,9 +1,12 @@ #include "CServer.h" #include "CScreenMap.h" +#include "CThread.h" #include int main(int argc, char** argv) { + CThread::init(); + if (argc != 1) { fprintf(stderr, "usage: %s\n", argv[0]); return 1;