lib/net: Use standard mutex primitives in socket multiplexer

This commit is contained in:
Povilas Kanapickas 2021-11-03 02:58:29 +02:00
parent 27de30e761
commit cbf22ad589
2 changed files with 33 additions and 44 deletions

View File

@ -20,8 +20,6 @@
#include "net/ISocketMultiplexerJob.h" #include "net/ISocketMultiplexerJob.h"
#include "mt/CondVar.h" #include "mt/CondVar.h"
#include "mt/Lock.h"
#include "mt/Mutex.h"
#include "mt/Thread.h" #include "mt/Thread.h"
#include "arch/Arch.h" #include "arch/Arch.h"
#include "arch/XArch.h" #include "arch/XArch.h"
@ -47,12 +45,8 @@ public:
SocketMultiplexer::SocketMultiplexer() : SocketMultiplexer::SocketMultiplexer() :
m_mutex(new Mutex),
m_thread(NULL), m_thread(NULL),
m_update(false), m_update(false),
m_jobsReady(new CondVar<bool>(m_mutex, false)),
m_jobListLock(new CondVar<bool>(m_mutex, false)),
m_jobListLockLocked(new CondVar<bool>(m_mutex, false)),
m_jobListLocker(NULL), m_jobListLocker(NULL),
m_jobListLockLocker(NULL) m_jobListLockLocker(NULL)
{ {
@ -66,12 +60,8 @@ SocketMultiplexer::~SocketMultiplexer()
m_thread->unblockPollSocket(); m_thread->unblockPollSocket();
m_thread->wait(); m_thread->wait();
delete m_thread; delete m_thread;
delete m_jobsReady;
delete m_jobListLock;
delete m_jobListLockLocked;
delete m_jobListLocker; delete m_jobListLocker;
delete m_jobListLockLocker; delete m_jobListLockLocker;
delete m_mutex;
} }
void SocketMultiplexer::addSocket(ISocket* socket, std::unique_ptr<ISocketMultiplexerJob>&& job) void SocketMultiplexer::addSocket(ISocket* socket, std::unique_ptr<ISocketMultiplexerJob>&& job)
@ -147,10 +137,8 @@ void SocketMultiplexer::service_thread()
// wait until there are jobs to handle // wait until there are jobs to handle
{ {
Lock lock(m_mutex); std::unique_lock<std::mutex> lock(mutex_);
while (!(bool)*m_jobsReady) { cv_jobs_ready_.wait(lock, [this](){ return are_jobs_ready_; });
m_jobsReady->wait();
}
} }
// lock the job list // lock the job list
@ -216,11 +204,11 @@ void SocketMultiplexer::service_thread()
MultiplexerJobStatus status = (*jobCursor)->run(read, write, error); MultiplexerJobStatus status = (*jobCursor)->run(read, write, error);
if (!status.continue_servicing) { if (!status.continue_servicing) {
Lock lock(m_mutex); std::lock_guard<std::mutex> lock(mutex_);
jobCursor->reset(); jobCursor->reset();
m_update = true; m_update = true;
} else if (status.new_job) { } else if (status.new_job) {
Lock lock(m_mutex); std::lock_guard<std::mutex> lock(mutex_);
*jobCursor = std::move(status.new_job); *jobCursor = std::move(status.new_job);
m_update = true; m_update = true;
} }
@ -254,14 +242,14 @@ void SocketMultiplexer::service_thread()
SocketMultiplexer::JobCursor SocketMultiplexer::JobCursor
SocketMultiplexer::newCursor() SocketMultiplexer::newCursor()
{ {
Lock lock(m_mutex); std::lock_guard<std::mutex> lock(mutex_);
return m_socketJobs.insert(m_socketJobs.begin(), std::make_unique<CursorMultiplexerJob>()); return m_socketJobs.insert(m_socketJobs.begin(), std::make_unique<CursorMultiplexerJob>());
} }
SocketMultiplexer::JobCursor SocketMultiplexer::JobCursor
SocketMultiplexer::nextCursor(JobCursor cursor) SocketMultiplexer::nextCursor(JobCursor cursor)
{ {
Lock lock(m_mutex); std::lock_guard<std::mutex> lock(mutex_);
JobCursor j = m_socketJobs.end(); JobCursor j = m_socketJobs.end();
JobCursor i = cursor; JobCursor i = cursor;
while (++i != m_socketJobs.end()) { while (++i != m_socketJobs.end()) {
@ -280,52 +268,48 @@ SocketMultiplexer::nextCursor(JobCursor cursor)
void void
SocketMultiplexer::deleteCursor(JobCursor cursor) SocketMultiplexer::deleteCursor(JobCursor cursor)
{ {
Lock lock(m_mutex); std::lock_guard<std::mutex> lock(mutex_);
m_socketJobs.erase(cursor); m_socketJobs.erase(cursor);
} }
void void
SocketMultiplexer::lockJobListLock() SocketMultiplexer::lockJobListLock()
{ {
Lock lock(m_mutex); std::unique_lock<std::mutex> lock(mutex_);
// wait for the lock on the lock // wait for the lock on the lock
while (*m_jobListLockLocked) { cv_job_list_lock_locked_.wait(lock, [this](){ return !is_job_list_lock_lock_locked_; });
m_jobListLockLocked->wait();
}
// take ownership of the lock on the lock // take ownership of the lock on the lock
*m_jobListLockLocked = true; is_job_list_lock_lock_locked_ = true;
m_jobListLockLocker = new Thread(Thread::getCurrentThread()); m_jobListLockLocker = new Thread(Thread::getCurrentThread());
} }
void void
SocketMultiplexer::lockJobList() SocketMultiplexer::lockJobList()
{ {
Lock lock(m_mutex); std::unique_lock<std::mutex> lock(mutex_);
// make sure we're the one that called lockJobListLock() // make sure we're the one that called lockJobListLock()
assert(*m_jobListLockLocker == Thread::getCurrentThread()); assert(*m_jobListLockLocker == Thread::getCurrentThread());
// wait for the job list lock // wait for the job list lock
while (*m_jobListLock) { cv_jobs_list_lock_.wait(lock, [this]() { return !is_jobs_list_lock_locked_; });
m_jobListLock->wait();
}
// take ownership of the lock // take ownership of the lock
*m_jobListLock = true; is_jobs_list_lock_locked_ = true;
m_jobListLocker = m_jobListLockLocker; m_jobListLocker = m_jobListLockLocker;
m_jobListLockLocker = NULL; m_jobListLockLocker = NULL;
// release the lock on the lock // release the lock on the lock
*m_jobListLockLocked = false; is_job_list_lock_lock_locked_ = false;
m_jobListLockLocked->broadcast(); cv_job_list_lock_locked_.notify_all();
} }
void void
SocketMultiplexer::unlockJobList() SocketMultiplexer::unlockJobList()
{ {
Lock lock(m_mutex); std::lock_guard<std::mutex> lock(mutex_);
// make sure we're the one that called lockJobList() // make sure we're the one that called lockJobList()
assert(*m_jobListLocker == Thread::getCurrentThread()); assert(*m_jobListLocker == Thread::getCurrentThread());
@ -333,13 +317,13 @@ SocketMultiplexer::unlockJobList()
// release the lock // release the lock
delete m_jobListLocker; delete m_jobListLocker;
m_jobListLocker = NULL; m_jobListLocker = NULL;
*m_jobListLock = false; is_jobs_list_lock_locked_ = false;
m_jobListLock->signal(); cv_jobs_list_lock_.notify_one();
// set new jobs ready state // set new jobs ready state
bool isReady = !m_socketJobMap.empty(); bool isReady = !m_socketJobMap.empty();
if (*m_jobsReady != isReady) { if (are_jobs_ready_ != isReady) {
*m_jobsReady = isReady; are_jobs_ready_ = isReady;
m_jobsReady->signal(); cv_jobs_ready_.notify_one();
} }
} }

View File

@ -21,11 +21,10 @@
#include "arch/IArchNetwork.h" #include "arch/IArchNetwork.h"
#include "common/stdlist.h" #include "common/stdlist.h"
#include "common/stdmap.h" #include "common/stdmap.h"
#include <condition_variable>
#include <memory> #include <memory>
#include <mutex>
template <class T>
class CondVar;
class Mutex;
class Thread; class Thread;
class ISocket; class ISocket;
class ISocketMultiplexerJob; class ISocketMultiplexerJob;
@ -96,12 +95,18 @@ private:
void unlockJobList(); void unlockJobList();
private: private:
Mutex* m_mutex; std::mutex mutex_;
Thread* m_thread; Thread* m_thread;
bool m_update; bool m_update;
CondVar<bool>* m_jobsReady; std::condition_variable cv_jobs_ready_;
CondVar<bool>* m_jobListLock; bool are_jobs_ready_ = false;
CondVar<bool>* m_jobListLockLocked;
std::condition_variable cv_jobs_list_lock_;
bool is_jobs_list_lock_locked_ = false;
std::condition_variable cv_job_list_lock_locked_;
bool is_job_list_lock_lock_locked_ = false;
Thread* m_jobListLocker; Thread* m_jobListLocker;
Thread* m_jobListLockLocker; Thread* m_jobListLockLocker;