From e2ee2371e0c09f65826dddf61d3ce6f4bdc6c152 Mon Sep 17 00:00:00 2001 From: crs Date: Fri, 21 Jun 2002 15:18:01 +0000 Subject: [PATCH] some cleanup. also fixed a race condition when adding threads to the thread list: the child thread would add itself to the list which means there could be a time interval in the parent where the child thread exists but isn't on the list. the parent now does the adding and removing. --- server/CServer.cpp | 230 +++++++++++++++++++-------------------------- server/CServer.h | 43 +++------ 2 files changed, 111 insertions(+), 162 deletions(-) diff --git a/server/CServer.cpp b/server/CServer.cpp index aca6a6c4..22c79ee5 100644 --- a/server/CServer.cpp +++ b/server/CServer.cpp @@ -38,7 +38,6 @@ const SInt32 CServer::s_httpMaxSimultaneousRequests = 3; CServer::CServer(const CString& serverName) : m_name(serverName), - m_cleanupSize(&m_mutex, 0), m_primary(NULL), m_active(NULL), m_primaryInfo(NULL), @@ -81,12 +80,13 @@ CServer::run() } // start listening for new clients - CThread(new TMethodJob(this, &CServer::acceptClients)); + startThread(new TMethodJob(this, &CServer::acceptClients)); // start listening for HTTP requests if (m_config.getHTTPAddress().isValid()) { m_httpServer = new CHTTPServer(this); - CThread(new TMethodJob(this, &CServer::acceptHTTPClients)); + startThread(new TMethodJob(this, + &CServer::acceptHTTPClients)); } // handle events @@ -95,7 +95,7 @@ CServer::run() // clean up log((CLOG_NOTE "stopping server")); - cleanupThreads(); + stopThreads(); delete m_httpServer; m_httpServer = NULL; closePrimaryScreen(); @@ -105,7 +105,7 @@ CServer::run() // clean up log((CLOG_NOTE "stopping server")); - cleanupThreads(); + stopThreads(); delete m_httpServer; m_httpServer = NULL; if (m_primary != NULL) { @@ -115,7 +115,7 @@ CServer::run() catch (XThread&) { // clean up log((CLOG_NOTE "stopping server")); - cleanupThreads(); + stopThreads(); delete m_httpServer; m_httpServer = NULL; if (m_primary != NULL) { @@ -128,7 +128,7 @@ CServer::run() // clean up log((CLOG_NOTE "stopping server")); - cleanupThreads(); + stopThreads(); delete m_httpServer; m_httpServer = NULL; if (m_primary != NULL) { @@ -149,7 +149,7 @@ CServer::shutdown() { // stop all running threads but don't wait too long since some // threads may be unable to proceed until this thread returns. - cleanupThreads(3.0); + stopThreads(3.0); // done with the HTTP server delete m_httpServer; @@ -208,6 +208,9 @@ CServer::setConfig(const CConfig& config) index->wait(); } + // clean up thread list + reapThreads(); + // cut over CLock lock(&m_mutex); m_config = config; @@ -993,16 +996,85 @@ CServer::getNeighbor(CScreenInfo* src, return dst; } +void +CServer::startThread(IJob* job) +{ + CLock lock(&m_mutex); + doReapThreads(m_threads); + CThread* thread = new CThread(job); + m_threads.push_back(thread); + log((CLOG_DEBUG1 "started thread %p", thread)); +} + +void +CServer::stopThreads(double timeout) +{ + log((CLOG_DEBUG1 "stopping threads")); + + // swap thread list so nobody can mess with it + CThreadList threads; + { + CLock lock(&m_mutex); + threads.swap(m_threads); + } + + // cancel every thread + for (CThreadList::iterator index = threads.begin(); + index != threads.end(); ++index) { + CThread* thread = *index; + thread->cancel(); + } + + // now wait for the threads + CStopwatch timer(true); + while (threads.size() > 0 && (timeout < 0.0 || timer.getTime() < timeout)) { + doReapThreads(threads); + CThread::sleep(0.01); + } + + // delete remaining threads + for (CThreadList::iterator index = threads.begin(); + index != threads.end(); ++index) { + CThread* thread = *index; + log((CLOG_DEBUG1 "reaped running thread %p", thread)); + delete thread; + } + + log((CLOG_DEBUG1 "stopped threads")); +} + +void +CServer::reapThreads() +{ + CLock lock(&m_mutex); + doReapThreads(m_threads); +} + +void +CServer::doReapThreads(CThreadList& threads) +{ + for (CThreadList::iterator index = threads.begin(); + index != threads.end(); ) { + CThread* thread = *index; + if (thread->wait(0.0)) { + // thread terminated + index = threads.erase(index); + log((CLOG_DEBUG1 "reaped thread %p", thread)); + delete thread; + } + else { + // thread is running + ++index; + } + } +} + #include "CTCPListenSocket.h" void CServer::acceptClients(void*) { log((CLOG_DEBUG1 "starting to wait for clients")); - // add this thread to the list of threads to cancel. remove from - // list in d'tor. - CCleanupNote cleanupNote(this); - std::auto_ptr listen; try { // create socket listener @@ -1041,7 +1113,7 @@ CServer::acceptClients(void*) CThread::testCancel(); // start handshake thread - CThread(new TMethodJob( + startThread(new TMethodJob( this, &CServer::handshakeClient, socket)); } } @@ -1060,10 +1132,6 @@ CServer::handshakeClient(void* vsocket) assert(vsocket != NULL); std::auto_ptr socket(reinterpret_cast(vsocket)); - // add this thread to the list of threads to cancel. remove from - // list in d'tor. - CCleanupNote cleanupNote(this); - CString name(""); try { // get the input and output streams @@ -1088,8 +1156,8 @@ CServer::handshakeClient(void* vsocket) assign(input, new CInputPacketStream(srcInput, own), IInputStream); assign(output, new COutputPacketStream(srcOutput, own), IOutputStream); + bool connected = false; std::auto_ptr protocol; - std::auto_ptr connectedNote; try { { // give the client a limited time to complete the handshake @@ -1135,8 +1203,8 @@ CServer::handshakeClient(void* vsocket) IServerProtocol); // client is now pending - assign(connectedNote, new CConnectionNote(this, - name, protocol.get()), CConnectionNote); + addConnection(name, protocol.get()); + connected = true; // ask and wait for the client's info log((CLOG_DEBUG1 "waiting for info for client \"%s\"", name.c_str())); @@ -1173,9 +1241,18 @@ CServer::handshakeClient(void* vsocket) log((CLOG_WARN "protocol error from client \"%s\"", name.c_str())); CProtocolUtil::writef(output.get(), kMsgEBad); } + catch (...) { + if (connected) { + removeConnection(name); + } + throw; + } // flush any pending output output.get()->flush(); + if (connected) { + removeConnection(name); + } } catch (XBase& e) { // misc error @@ -1189,10 +1266,6 @@ CServer::acceptHTTPClients(void*) { log((CLOG_DEBUG1 "starting to wait for HTTP clients")); - // add this thread to the list of threads to cancel. remove from - // list in d'tor. - CCleanupNote cleanupNote(this); - std::auto_ptr listen; try { // create socket listener @@ -1241,7 +1314,7 @@ CServer::acceptHTTPClients(void*) CThread::testCancel(); // handle HTTP request - CThread(new TMethodJob( + startThread(new TMethodJob( this, &CServer::processHTTPRequest, socket)); } } @@ -1255,10 +1328,6 @@ CServer::acceptHTTPClients(void*) void CServer::processHTTPRequest(void* vsocket) { - // add this thread to the list of threads to cancel. remove from - // list in d'tor. - CCleanupNote cleanupNote(this); - IDataSocket* socket = reinterpret_cast(vsocket); try { // process the request and force delivery @@ -1439,73 +1508,6 @@ CServer::closePrimaryScreen() m_primary = NULL; } -void -CServer::addCleanupThread(const CThread& thread) -{ - CLock lock(&m_mutex); - m_cleanupList.insert(m_cleanupList.begin(), new CThread(thread)); - m_cleanupSize = m_cleanupSize + 1; -} - -void -CServer::removeCleanupThread(const CThread& thread) -{ - CLock lock(&m_mutex); - for (CThreadList::iterator index = m_cleanupList.begin(); - index != m_cleanupList.end(); ++index) { - if (**index == thread) { - CThread* thread = *index; - m_cleanupList.erase(index); - m_cleanupSize = m_cleanupSize - 1; - if (m_cleanupSize == 0) { - m_cleanupSize.broadcast(); - } - delete thread; - return; - } - } -} - -void -CServer::cleanupThreads(double timeout) -{ - log((CLOG_DEBUG1 "cleaning up threads")); - - // first cancel every thread except the current one (with mutex - // locked so the cleanup list won't change). - CLock lock(&m_mutex); - CThread current(CThread::getCurrentThread()); - SInt32 minCount = 0; - for (CThreadList::iterator index = m_cleanupList.begin(); - index != m_cleanupList.end(); ++index) { - CThread* thread = *index; - if (thread != ¤t) { - thread->cancel(); - } - else { - minCount = 1; - } - } - - // now wait for the threads (with mutex unlocked as each thread - // will remove itself from the list) - CStopwatch timer(true); - while (m_cleanupSize > minCount) { - m_cleanupSize.wait(timer, timeout); - } - - // delete remaining threads - for (CThreadList::iterator index = m_cleanupList.begin(); - index != m_cleanupList.end(); ++index) { - CThread* thread = *index; - delete thread; - } - m_cleanupList.clear(); - m_cleanupSize = 0; - - log((CLOG_DEBUG1 "cleaned up threads")); -} - CServer::CScreenInfo* CServer::addConnection(const CString& name, IServerProtocol* protocol) { @@ -1563,42 +1565,6 @@ CServer::removeConnection(const CString& name) } -// -// CServer::CCleanupNote -// - -CServer::CCleanupNote::CCleanupNote(CServer* server) : - m_server(server) -{ - assert(m_server != NULL); - m_server->addCleanupThread(CThread::getCurrentThread()); -} - -CServer::CCleanupNote::~CCleanupNote() -{ - m_server->removeCleanupThread(CThread::getCurrentThread()); -} - - -// -// CServer::CConnectionNote -// - -CServer::CConnectionNote::CConnectionNote(CServer* server, - const CString& name, IServerProtocol* protocol) : - m_server(server), - m_name(name) -{ - assert(m_server != NULL); - m_server->addConnection(m_name, protocol); -} - -CServer::CConnectionNote::~CConnectionNote() -{ - m_server->removeConnection(m_name); -} - - // // CServer::CScreenInfo // diff --git a/server/CServer.h b/server/CServer.h index 318ec4a7..a4e64678 100644 --- a/server/CServer.h +++ b/server/CServer.h @@ -89,25 +89,7 @@ protected: bool onCommandKey(KeyID, KeyModifierMask, bool down); private: - class CCleanupNote { - public: - CCleanupNote(CServer*); - ~CCleanupNote(); - - private: - CServer* m_server; - }; - - class CConnectionNote { - public: - CConnectionNote(CServer*, const CString&, IServerProtocol*); - ~CConnectionNote(); - - private: - bool m_pending; - CServer* m_server; - CString m_name; - }; + typedef std::list CThreadList; class CScreenInfo { public: @@ -176,8 +158,17 @@ private: // update the clipboard if owned by the primary screen void updatePrimaryClipboard(ClipboardID); - // cancel running threads - void cleanupThreads(double timeout = -1.0); + // start a thread, adding it to the list of threads + void startThread(IJob* adopted); + + // cancel running threads, waiting at most timeout seconds for + // them to finish. + void stopThreads(double timeout = -1.0); + + // reap threads, clearing finished threads from the thread list. + // doReapThreads does the work on the given thread list. + void reapThreads(); + void doReapThreads(CThreadList&); // thread method to accept incoming client connections void acceptClients(void*); @@ -191,18 +182,11 @@ private: // thread method to process HTTP requests void processHTTPRequest(void*); - // thread cleanup list maintenance - friend class CCleanupNote; - void addCleanupThread(const CThread& thread); - void removeCleanupThread(const CThread& thread); - // connection list maintenance - friend class CConnectionNote; CScreenInfo* addConnection(const CString& name, IServerProtocol*); void removeConnection(const CString& name); private: - typedef std::list CThreadList; typedef std::map CScreenList; class CClipboardInfo { public: @@ -225,8 +209,7 @@ private: ISocketFactory* m_socketFactory; ISecurityFactory* m_securityFactory; - CThreadList m_cleanupList; - CCondVar m_cleanupSize; + CThreadList m_threads; IPrimaryScreen* m_primary; CScreenList m_screens;