some cleanup. also fixed a race condition when adding threads

to the thread list:  the child thread would add itself to the
list which means there could be a time interval in the parent
where the child thread exists but isn't on the list.  the
parent now does the adding and removing.
This commit is contained in:
crs 2002-06-21 15:18:01 +00:00
parent b83c0c5928
commit e2ee2371e0
2 changed files with 111 additions and 162 deletions

View File

@ -38,7 +38,6 @@ const SInt32 CServer::s_httpMaxSimultaneousRequests = 3;
CServer::CServer(const CString& serverName) :
m_name(serverName),
m_cleanupSize(&m_mutex, 0),
m_primary(NULL),
m_active(NULL),
m_primaryInfo(NULL),
@ -81,12 +80,13 @@ CServer::run()
}
// start listening for new clients
CThread(new TMethodJob<CServer>(this, &CServer::acceptClients));
startThread(new TMethodJob<CServer>(this, &CServer::acceptClients));
// start listening for HTTP requests
if (m_config.getHTTPAddress().isValid()) {
m_httpServer = new CHTTPServer(this);
CThread(new TMethodJob<CServer>(this, &CServer::acceptHTTPClients));
startThread(new TMethodJob<CServer>(this,
&CServer::acceptHTTPClients));
}
// handle events
@ -95,7 +95,7 @@ CServer::run()
// clean up
log((CLOG_NOTE "stopping server"));
cleanupThreads();
stopThreads();
delete m_httpServer;
m_httpServer = NULL;
closePrimaryScreen();
@ -105,7 +105,7 @@ CServer::run()
// clean up
log((CLOG_NOTE "stopping server"));
cleanupThreads();
stopThreads();
delete m_httpServer;
m_httpServer = NULL;
if (m_primary != NULL) {
@ -115,7 +115,7 @@ CServer::run()
catch (XThread&) {
// clean up
log((CLOG_NOTE "stopping server"));
cleanupThreads();
stopThreads();
delete m_httpServer;
m_httpServer = NULL;
if (m_primary != NULL) {
@ -128,7 +128,7 @@ CServer::run()
// clean up
log((CLOG_NOTE "stopping server"));
cleanupThreads();
stopThreads();
delete m_httpServer;
m_httpServer = NULL;
if (m_primary != NULL) {
@ -149,7 +149,7 @@ CServer::shutdown()
{
// stop all running threads but don't wait too long since some
// threads may be unable to proceed until this thread returns.
cleanupThreads(3.0);
stopThreads(3.0);
// done with the HTTP server
delete m_httpServer;
@ -208,6 +208,9 @@ CServer::setConfig(const CConfig& config)
index->wait();
}
// clean up thread list
reapThreads();
// cut over
CLock lock(&m_mutex);
m_config = config;
@ -993,16 +996,85 @@ CServer::getNeighbor(CScreenInfo* src,
return dst;
}
void
CServer::startThread(IJob* job)
{
CLock lock(&m_mutex);
doReapThreads(m_threads);
CThread* thread = new CThread(job);
m_threads.push_back(thread);
log((CLOG_DEBUG1 "started thread %p", thread));
}
void
CServer::stopThreads(double timeout)
{
log((CLOG_DEBUG1 "stopping threads"));
// swap thread list so nobody can mess with it
CThreadList threads;
{
CLock lock(&m_mutex);
threads.swap(m_threads);
}
// cancel every thread
for (CThreadList::iterator index = threads.begin();
index != threads.end(); ++index) {
CThread* thread = *index;
thread->cancel();
}
// now wait for the threads
CStopwatch timer(true);
while (threads.size() > 0 && (timeout < 0.0 || timer.getTime() < timeout)) {
doReapThreads(threads);
CThread::sleep(0.01);
}
// delete remaining threads
for (CThreadList::iterator index = threads.begin();
index != threads.end(); ++index) {
CThread* thread = *index;
log((CLOG_DEBUG1 "reaped running thread %p", thread));
delete thread;
}
log((CLOG_DEBUG1 "stopped threads"));
}
void
CServer::reapThreads()
{
CLock lock(&m_mutex);
doReapThreads(m_threads);
}
void
CServer::doReapThreads(CThreadList& threads)
{
for (CThreadList::iterator index = threads.begin();
index != threads.end(); ) {
CThread* thread = *index;
if (thread->wait(0.0)) {
// thread terminated
index = threads.erase(index);
log((CLOG_DEBUG1 "reaped thread %p", thread));
delete thread;
}
else {
// thread is running
++index;
}
}
}
#include "CTCPListenSocket.h"
void
CServer::acceptClients(void*)
{
log((CLOG_DEBUG1 "starting to wait for clients"));
// add this thread to the list of threads to cancel. remove from
// list in d'tor.
CCleanupNote cleanupNote(this);
std::auto_ptr<IListenSocket> listen;
try {
// create socket listener
@ -1041,7 +1113,7 @@ CServer::acceptClients(void*)
CThread::testCancel();
// start handshake thread
CThread(new TMethodJob<CServer>(
startThread(new TMethodJob<CServer>(
this, &CServer::handshakeClient, socket));
}
}
@ -1060,10 +1132,6 @@ CServer::handshakeClient(void* vsocket)
assert(vsocket != NULL);
std::auto_ptr<IDataSocket> socket(reinterpret_cast<IDataSocket*>(vsocket));
// add this thread to the list of threads to cancel. remove from
// list in d'tor.
CCleanupNote cleanupNote(this);
CString name("<unknown>");
try {
// get the input and output streams
@ -1088,8 +1156,8 @@ CServer::handshakeClient(void* vsocket)
assign(input, new CInputPacketStream(srcInput, own), IInputStream);
assign(output, new COutputPacketStream(srcOutput, own), IOutputStream);
bool connected = false;
std::auto_ptr<IServerProtocol> protocol;
std::auto_ptr<CConnectionNote> connectedNote;
try {
{
// give the client a limited time to complete the handshake
@ -1135,8 +1203,8 @@ CServer::handshakeClient(void* vsocket)
IServerProtocol);
// client is now pending
assign(connectedNote, new CConnectionNote(this,
name, protocol.get()), CConnectionNote);
addConnection(name, protocol.get());
connected = true;
// ask and wait for the client's info
log((CLOG_DEBUG1 "waiting for info for client \"%s\"", name.c_str()));
@ -1173,9 +1241,18 @@ CServer::handshakeClient(void* vsocket)
log((CLOG_WARN "protocol error from client \"%s\"", name.c_str()));
CProtocolUtil::writef(output.get(), kMsgEBad);
}
catch (...) {
if (connected) {
removeConnection(name);
}
throw;
}
// flush any pending output
output.get()->flush();
if (connected) {
removeConnection(name);
}
}
catch (XBase& e) {
// misc error
@ -1189,10 +1266,6 @@ CServer::acceptHTTPClients(void*)
{
log((CLOG_DEBUG1 "starting to wait for HTTP clients"));
// add this thread to the list of threads to cancel. remove from
// list in d'tor.
CCleanupNote cleanupNote(this);
std::auto_ptr<IListenSocket> listen;
try {
// create socket listener
@ -1241,7 +1314,7 @@ CServer::acceptHTTPClients(void*)
CThread::testCancel();
// handle HTTP request
CThread(new TMethodJob<CServer>(
startThread(new TMethodJob<CServer>(
this, &CServer::processHTTPRequest, socket));
}
}
@ -1255,10 +1328,6 @@ CServer::acceptHTTPClients(void*)
void
CServer::processHTTPRequest(void* vsocket)
{
// add this thread to the list of threads to cancel. remove from
// list in d'tor.
CCleanupNote cleanupNote(this);
IDataSocket* socket = reinterpret_cast<IDataSocket*>(vsocket);
try {
// process the request and force delivery
@ -1439,73 +1508,6 @@ CServer::closePrimaryScreen()
m_primary = NULL;
}
void
CServer::addCleanupThread(const CThread& thread)
{
CLock lock(&m_mutex);
m_cleanupList.insert(m_cleanupList.begin(), new CThread(thread));
m_cleanupSize = m_cleanupSize + 1;
}
void
CServer::removeCleanupThread(const CThread& thread)
{
CLock lock(&m_mutex);
for (CThreadList::iterator index = m_cleanupList.begin();
index != m_cleanupList.end(); ++index) {
if (**index == thread) {
CThread* thread = *index;
m_cleanupList.erase(index);
m_cleanupSize = m_cleanupSize - 1;
if (m_cleanupSize == 0) {
m_cleanupSize.broadcast();
}
delete thread;
return;
}
}
}
void
CServer::cleanupThreads(double timeout)
{
log((CLOG_DEBUG1 "cleaning up threads"));
// first cancel every thread except the current one (with mutex
// locked so the cleanup list won't change).
CLock lock(&m_mutex);
CThread current(CThread::getCurrentThread());
SInt32 minCount = 0;
for (CThreadList::iterator index = m_cleanupList.begin();
index != m_cleanupList.end(); ++index) {
CThread* thread = *index;
if (thread != &current) {
thread->cancel();
}
else {
minCount = 1;
}
}
// now wait for the threads (with mutex unlocked as each thread
// will remove itself from the list)
CStopwatch timer(true);
while (m_cleanupSize > minCount) {
m_cleanupSize.wait(timer, timeout);
}
// delete remaining threads
for (CThreadList::iterator index = m_cleanupList.begin();
index != m_cleanupList.end(); ++index) {
CThread* thread = *index;
delete thread;
}
m_cleanupList.clear();
m_cleanupSize = 0;
log((CLOG_DEBUG1 "cleaned up threads"));
}
CServer::CScreenInfo*
CServer::addConnection(const CString& name, IServerProtocol* protocol)
{
@ -1563,42 +1565,6 @@ CServer::removeConnection(const CString& name)
}
//
// CServer::CCleanupNote
//
CServer::CCleanupNote::CCleanupNote(CServer* server) :
m_server(server)
{
assert(m_server != NULL);
m_server->addCleanupThread(CThread::getCurrentThread());
}
CServer::CCleanupNote::~CCleanupNote()
{
m_server->removeCleanupThread(CThread::getCurrentThread());
}
//
// CServer::CConnectionNote
//
CServer::CConnectionNote::CConnectionNote(CServer* server,
const CString& name, IServerProtocol* protocol) :
m_server(server),
m_name(name)
{
assert(m_server != NULL);
m_server->addConnection(m_name, protocol);
}
CServer::CConnectionNote::~CConnectionNote()
{
m_server->removeConnection(m_name);
}
//
// CServer::CScreenInfo
//

View File

@ -89,25 +89,7 @@ protected:
bool onCommandKey(KeyID, KeyModifierMask, bool down);
private:
class CCleanupNote {
public:
CCleanupNote(CServer*);
~CCleanupNote();
private:
CServer* m_server;
};
class CConnectionNote {
public:
CConnectionNote(CServer*, const CString&, IServerProtocol*);
~CConnectionNote();
private:
bool m_pending;
CServer* m_server;
CString m_name;
};
typedef std::list<CThread*> CThreadList;
class CScreenInfo {
public:
@ -176,8 +158,17 @@ private:
// update the clipboard if owned by the primary screen
void updatePrimaryClipboard(ClipboardID);
// cancel running threads
void cleanupThreads(double timeout = -1.0);
// start a thread, adding it to the list of threads
void startThread(IJob* adopted);
// cancel running threads, waiting at most timeout seconds for
// them to finish.
void stopThreads(double timeout = -1.0);
// reap threads, clearing finished threads from the thread list.
// doReapThreads does the work on the given thread list.
void reapThreads();
void doReapThreads(CThreadList&);
// thread method to accept incoming client connections
void acceptClients(void*);
@ -191,18 +182,11 @@ private:
// thread method to process HTTP requests
void processHTTPRequest(void*);
// thread cleanup list maintenance
friend class CCleanupNote;
void addCleanupThread(const CThread& thread);
void removeCleanupThread(const CThread& thread);
// connection list maintenance
friend class CConnectionNote;
CScreenInfo* addConnection(const CString& name, IServerProtocol*);
void removeConnection(const CString& name);
private:
typedef std::list<CThread*> CThreadList;
typedef std::map<CString, CScreenInfo*> CScreenList;
class CClipboardInfo {
public:
@ -225,8 +209,7 @@ private:
ISocketFactory* m_socketFactory;
ISecurityFactory* m_securityFactory;
CThreadList m_cleanupList;
CCondVar<SInt32> m_cleanupSize;
CThreadList m_threads;
IPrimaryScreen* m_primary;
CScreenList m_screens;