diff --git a/lib/net/CSocketMultiplexer.cpp b/lib/net/CSocketMultiplexer.cpp index 1a9e11b6..742198f9 100644 --- a/lib/net/CSocketMultiplexer.cpp +++ b/lib/net/CSocketMultiplexer.cpp @@ -32,10 +32,13 @@ CSocketMultiplexer* CSocketMultiplexer::s_instance = NULL; CSocketMultiplexer::CSocketMultiplexer() : m_mutex(new CMutex), - m_pollable(new CCondVar(m_mutex, false)), - m_polling(new CCondVar(m_mutex, false)), m_thread(NULL), - m_update(false) + m_update(false), + m_jobsReady(new CCondVar(m_mutex, false)), + m_jobListLock(new CCondVar(m_mutex, false)), + m_jobListLockLocked(new CCondVar(m_mutex, false)), + m_jobListLocker(NULL), + m_jobListLockLocker(NULL) { assert(s_instance == NULL); @@ -57,8 +60,11 @@ CSocketMultiplexer::~CSocketMultiplexer() m_thread->unblockPollSocket(); m_thread->wait(); delete m_thread; - delete m_polling; - delete m_pollable; + delete m_jobsReady; + delete m_jobListLock; + delete m_jobListLockLocked; + delete m_jobListLocker; + delete m_jobListLockLocker; delete m_mutex; // clean up jobs @@ -82,18 +88,14 @@ CSocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job) assert(socket != NULL); assert(job != NULL); - CLock lock(m_mutex); - - // prevent service thread from restarting poll - *m_pollable = false; + // prevent other threads from locking the job list + lockJobListLock(); // break thread out of poll m_thread->unblockPollSocket(); - // wait for poll to finish - while (*m_polling) { - m_polling->wait(); - } + // lock the job list + lockJobList(); // insert/replace job CSocketJobMap::iterator i = m_socketJobMap.find(socket); @@ -114,9 +116,8 @@ CSocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job) m_update = true; } - // there must be at least one socket so we can poll now - *m_pollable = true; - m_pollable->broadcast(); + // unlock the job list + unlockJobList(); } void @@ -124,18 +125,14 @@ CSocketMultiplexer::removeSocket(ISocket* socket) { assert(socket != NULL); - CLock lock(m_mutex); - - // prevent service thread from restarting poll - *m_pollable = false; + // prevent other threads from locking the job list + lockJobListLock(); // break thread out of poll m_thread->unblockPollSocket(); - // wait until thread finishes poll - while (*m_polling) { - m_polling->wait(); - } + // lock the job list + lockJobList(); // remove job. rather than removing it from the map we put NULL // in the list instead so the order of jobs in the list continues @@ -149,8 +146,8 @@ CSocketMultiplexer::removeSocket(ISocket* socket) } } - *m_pollable = true; - m_pollable->broadcast(); + // unlock the job list + unlockJobList(); } void @@ -162,22 +159,18 @@ CSocketMultiplexer::serviceThread(void*) // service the connections for (;;) { CThread::testCancel(); + + // wait until there are jobs to handle { CLock lock(m_mutex); - - // wait until pollable - while (!(bool)*m_pollable) { - m_pollable->wait(); + while (!(bool)*m_jobsReady) { + m_jobsReady->wait(); } - - // now we're polling - *m_polling = true; - m_polling->broadcast(); } - // we're now the only thread that can access m_sockets and - // m_update because m_polling and m_pollable are both true. - // therefore, we don't need to hold the mutex. + // lock the job list + lockJobListLock(); + lockJobList(); // collect poll entries if (m_update) { @@ -251,7 +244,6 @@ CSocketMultiplexer::serviceThread(void*) } // delete any removed socket jobs - CLock lock(m_mutex); for (CSocketJobMap::iterator i = m_socketJobMap.begin(); i != m_socketJobMap.end();) { if (*(i->second) == NULL) { @@ -262,13 +254,9 @@ CSocketMultiplexer::serviceThread(void*) ++i; } } - if (m_socketJobMap.empty()) { - *m_pollable = false; - } - // done polling - *m_polling = false; - m_polling->broadcast(); + // unlock the job list + unlockJobList(); } } @@ -304,3 +292,63 @@ CSocketMultiplexer::deleteCursor(CJobCursor cursor) CLock lock(m_mutex); m_socketJobs.erase(cursor); } + +void +CSocketMultiplexer::lockJobListLock() +{ + CLock lock(m_mutex); + + // wait for the lock on the lock + while (*m_jobListLockLocked) { + m_jobListLockLocked->wait(); + } + + // take ownership of the lock on the lock + *m_jobListLockLocked = true; + m_jobListLockLocker = new CThread(CThread::getCurrentThread()); +} + +void +CSocketMultiplexer::lockJobList() +{ + CLock lock(m_mutex); + + // make sure we're the one that called lockJobListLock() + assert(*m_jobListLockLocker == CThread::getCurrentThread()); + + // wait for the job list lock + while (*m_jobListLock) { + m_jobListLock->wait(); + } + + // take ownership of the lock + *m_jobListLock = true; + m_jobListLocker = m_jobListLockLocker; + m_jobListLockLocker = NULL; + + // release the lock on the lock + *m_jobListLockLocked = false; + m_jobListLockLocked->broadcast(); +} + +void +CSocketMultiplexer::unlockJobList() +{ + CLock lock(m_mutex); + + // make sure we're the one that called lockJobList() + assert(*m_jobListLocker == CThread::getCurrentThread()); + + // release the lock + delete m_jobListLocker; + m_jobListLocker = NULL; + *m_jobListLock = false; + m_jobListLock->signal(); + + // set new jobs ready state + bool isReady = !m_socketJobMap.empty(); + if (*m_jobsReady != isReady) { + *m_jobsReady = isReady; + m_jobsReady->signal(); + } +} diff --git a/lib/net/CSocketMultiplexer.h b/lib/net/CSocketMultiplexer.h index 8abd57ba..bb96ca70 100644 --- a/lib/net/CSocketMultiplexer.h +++ b/lib/net/CSocketMultiplexer.h @@ -78,12 +78,28 @@ private: CJobCursor nextCursor(CJobCursor); void deleteCursor(CJobCursor); + // lock out locking the job list. this blocks if another thread + // has already locked out locking. once it returns, only the + // calling thread will be able to lock the job list after any + // current lock is released. + void lockJobListLock(); + + // lock the job list. this blocks if the job list is already + // locked. the calling thread must have called requestJobLock. + void lockJobList(); + + // unlock the job list and the lock out on locking. + void unlockJobList(); + private: CMutex* m_mutex; - CCondVar* m_pollable; - CCondVar* m_polling; CThread* m_thread; bool m_update; + CCondVar* m_jobsReady; + CCondVar* m_jobListLock; + CCondVar* m_jobListLockLocked; + CThread* m_jobListLocker; + CThread* m_jobListLockLocker; CSocketJobs m_socketJobs; CSocketJobMap m_socketJobMap;