Changed scheme used to lock the socket multiplexer's job list.

I think the new scheme is easier to understand.  It should have
exactly the same behavior.
This commit is contained in:
crs 2004-11-11 19:17:03 +00:00
parent 6ea96719ab
commit c135432040
2 changed files with 110 additions and 46 deletions

View File

@ -32,10 +32,13 @@ CSocketMultiplexer* CSocketMultiplexer::s_instance = NULL;
CSocketMultiplexer::CSocketMultiplexer() : CSocketMultiplexer::CSocketMultiplexer() :
m_mutex(new CMutex), m_mutex(new CMutex),
m_pollable(new CCondVar<bool>(m_mutex, false)),
m_polling(new CCondVar<bool>(m_mutex, false)),
m_thread(NULL), m_thread(NULL),
m_update(false) m_update(false),
m_jobsReady(new CCondVar<bool>(m_mutex, false)),
m_jobListLock(new CCondVar<bool>(m_mutex, false)),
m_jobListLockLocked(new CCondVar<bool>(m_mutex, false)),
m_jobListLocker(NULL),
m_jobListLockLocker(NULL)
{ {
assert(s_instance == NULL); assert(s_instance == NULL);
@ -57,8 +60,11 @@ CSocketMultiplexer::~CSocketMultiplexer()
m_thread->unblockPollSocket(); m_thread->unblockPollSocket();
m_thread->wait(); m_thread->wait();
delete m_thread; delete m_thread;
delete m_polling; delete m_jobsReady;
delete m_pollable; delete m_jobListLock;
delete m_jobListLockLocked;
delete m_jobListLocker;
delete m_jobListLockLocker;
delete m_mutex; delete m_mutex;
// clean up jobs // clean up jobs
@ -82,18 +88,14 @@ CSocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job)
assert(socket != NULL); assert(socket != NULL);
assert(job != NULL); assert(job != NULL);
CLock lock(m_mutex); // prevent other threads from locking the job list
lockJobListLock();
// prevent service thread from restarting poll
*m_pollable = false;
// break thread out of poll // break thread out of poll
m_thread->unblockPollSocket(); m_thread->unblockPollSocket();
// wait for poll to finish // lock the job list
while (*m_polling) { lockJobList();
m_polling->wait();
}
// insert/replace job // insert/replace job
CSocketJobMap::iterator i = m_socketJobMap.find(socket); CSocketJobMap::iterator i = m_socketJobMap.find(socket);
@ -114,9 +116,8 @@ CSocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job)
m_update = true; m_update = true;
} }
// there must be at least one socket so we can poll now // unlock the job list
*m_pollable = true; unlockJobList();
m_pollable->broadcast();
} }
void void
@ -124,18 +125,14 @@ CSocketMultiplexer::removeSocket(ISocket* socket)
{ {
assert(socket != NULL); assert(socket != NULL);
CLock lock(m_mutex); // prevent other threads from locking the job list
lockJobListLock();
// prevent service thread from restarting poll
*m_pollable = false;
// break thread out of poll // break thread out of poll
m_thread->unblockPollSocket(); m_thread->unblockPollSocket();
// wait until thread finishes poll // lock the job list
while (*m_polling) { lockJobList();
m_polling->wait();
}
// remove job. rather than removing it from the map we put NULL // remove job. rather than removing it from the map we put NULL
// in the list instead so the order of jobs in the list continues // in the list instead so the order of jobs in the list continues
@ -149,8 +146,8 @@ CSocketMultiplexer::removeSocket(ISocket* socket)
} }
} }
*m_pollable = true; // unlock the job list
m_pollable->broadcast(); unlockJobList();
} }
void void
@ -162,22 +159,18 @@ CSocketMultiplexer::serviceThread(void*)
// service the connections // service the connections
for (;;) { for (;;) {
CThread::testCancel(); CThread::testCancel();
// wait until there are jobs to handle
{ {
CLock lock(m_mutex); CLock lock(m_mutex);
while (!(bool)*m_jobsReady) {
// wait until pollable m_jobsReady->wait();
while (!(bool)*m_pollable) { }
m_pollable->wait();
} }
// now we're polling // lock the job list
*m_polling = true; lockJobListLock();
m_polling->broadcast(); lockJobList();
}
// we're now the only thread that can access m_sockets and
// m_update because m_polling and m_pollable are both true.
// therefore, we don't need to hold the mutex.
// collect poll entries // collect poll entries
if (m_update) { if (m_update) {
@ -251,7 +244,6 @@ CSocketMultiplexer::serviceThread(void*)
} }
// delete any removed socket jobs // delete any removed socket jobs
CLock lock(m_mutex);
for (CSocketJobMap::iterator i = m_socketJobMap.begin(); for (CSocketJobMap::iterator i = m_socketJobMap.begin();
i != m_socketJobMap.end();) { i != m_socketJobMap.end();) {
if (*(i->second) == NULL) { if (*(i->second) == NULL) {
@ -262,13 +254,9 @@ CSocketMultiplexer::serviceThread(void*)
++i; ++i;
} }
} }
if (m_socketJobMap.empty()) {
*m_pollable = false;
}
// done polling // unlock the job list
*m_polling = false; unlockJobList();
m_polling->broadcast();
} }
} }
@ -304,3 +292,63 @@ CSocketMultiplexer::deleteCursor(CJobCursor cursor)
CLock lock(m_mutex); CLock lock(m_mutex);
m_socketJobs.erase(cursor); m_socketJobs.erase(cursor);
} }
void
CSocketMultiplexer::lockJobListLock()
{
CLock lock(m_mutex);
// wait for the lock on the lock
while (*m_jobListLockLocked) {
m_jobListLockLocked->wait();
}
// take ownership of the lock on the lock
*m_jobListLockLocked = true;
m_jobListLockLocker = new CThread(CThread::getCurrentThread());
}
void
CSocketMultiplexer::lockJobList()
{
CLock lock(m_mutex);
// make sure we're the one that called lockJobListLock()
assert(*m_jobListLockLocker == CThread::getCurrentThread());
// wait for the job list lock
while (*m_jobListLock) {
m_jobListLock->wait();
}
// take ownership of the lock
*m_jobListLock = true;
m_jobListLocker = m_jobListLockLocker;
m_jobListLockLocker = NULL;
// release the lock on the lock
*m_jobListLockLocked = false;
m_jobListLockLocked->broadcast();
}
void
CSocketMultiplexer::unlockJobList()
{
CLock lock(m_mutex);
// make sure we're the one that called lockJobList()
assert(*m_jobListLocker == CThread::getCurrentThread());
// release the lock
delete m_jobListLocker;
m_jobListLocker = NULL;
*m_jobListLock = false;
m_jobListLock->signal();
// set new jobs ready state
bool isReady = !m_socketJobMap.empty();
if (*m_jobsReady != isReady) {
*m_jobsReady = isReady;
m_jobsReady->signal();
}
}

View File

@ -78,12 +78,28 @@ private:
CJobCursor nextCursor(CJobCursor); CJobCursor nextCursor(CJobCursor);
void deleteCursor(CJobCursor); void deleteCursor(CJobCursor);
// lock out locking the job list. this blocks if another thread
// has already locked out locking. once it returns, only the
// calling thread will be able to lock the job list after any
// current lock is released.
void lockJobListLock();
// lock the job list. this blocks if the job list is already
// locked. the calling thread must have called requestJobLock.
void lockJobList();
// unlock the job list and the lock out on locking.
void unlockJobList();
private: private:
CMutex* m_mutex; CMutex* m_mutex;
CCondVar<bool>* m_pollable;
CCondVar<bool>* m_polling;
CThread* m_thread; CThread* m_thread;
bool m_update; bool m_update;
CCondVar<bool>* m_jobsReady;
CCondVar<bool>* m_jobListLock;
CCondVar<bool>* m_jobListLockLocked;
CThread* m_jobListLocker;
CThread* m_jobListLockLocker;
CSocketJobs m_socketJobs; CSocketJobs m_socketJobs;
CSocketJobMap m_socketJobMap; CSocketJobMap m_socketJobMap;