From a841b2858f178a0da41efa520e87b73fb8b24189 Mon Sep 17 00:00:00 2001 From: Povilas Kanapickas Date: Sat, 17 Aug 2019 16:17:50 +0300 Subject: [PATCH] Make ownership of SocketMultiplexerJob explicit --- src/lib/net/ISocketMultiplexerJob.h | 31 +++++++--- src/lib/net/SecureSocket.cpp | 43 +++++++------- src/lib/net/SecureSocket.h | 12 +--- src/lib/net/SocketMultiplexer.cpp | 72 +++++++++++------------ src/lib/net/SocketMultiplexer.h | 7 +-- src/lib/net/TCPListenSocket.cpp | 27 ++++----- src/lib/net/TCPListenSocket.h | 6 +- src/lib/net/TCPSocket.cpp | 61 ++++++++++--------- src/lib/net/TCPSocket.h | 17 +++--- src/lib/net/TSocketMultiplexerMethodJob.h | 18 +++--- 10 files changed, 148 insertions(+), 146 deletions(-) diff --git a/src/lib/net/ISocketMultiplexerJob.h b/src/lib/net/ISocketMultiplexerJob.h index ddd3ba54..c27fce25 100644 --- a/src/lib/net/ISocketMultiplexerJob.h +++ b/src/lib/net/ISocketMultiplexerJob.h @@ -20,6 +20,19 @@ #include "arch/IArchNetwork.h" #include "common/IInterface.h" +#include + +class ISocketMultiplexerJob; + +struct MultiplexerJobStatus +{ + MultiplexerJobStatus(bool cont, std::unique_ptr&& nj) : + continue_servicing(cont), new_job(std::move(nj)) + {} + + bool continue_servicing = false; + std::unique_ptr new_job; +}; //! Socket multiplexer job /*! @@ -32,21 +45,20 @@ public: //! Handle socket event /*! - Called by a socket multiplexer when the socket becomes readable, - writable, or has an error. It should return itself if the same - job can continue to service events, a new job if the socket must - be serviced differently, or NULL if the socket should no longer - be serviced. The socket is readable if \p readable is true, - writable if \p writable is true, and in error if \p error is - true. + Called by a socket multiplexer when the socket becomes readable, writable, or has an error. + The socket is readable if \p readable is true, writable if \p writable is true, and in error + if \p error is true. + + The method returns false as the continue_servicing member of the returned struct if the socket + should no longer be served and true otherwise. Additionally, if the new_job member of the + returned pair is not empty, the socket should be serviced differently with the specified job. This call must not attempt to directly change the job for this socket by calling \c addSocket() or \c removeSocket() on the multiplexer. It must instead return the new job. It can, however, add or remove jobs for other sockets. */ - virtual ISocketMultiplexerJob* - run(bool readable, bool writable, bool error) = 0; + virtual MultiplexerJobStatus run(bool readable, bool writable, bool error) = 0; //@} //! @name accessors @@ -72,5 +84,6 @@ public: */ virtual bool isWritable() const = 0; + virtual bool isCursor() const { return false; } //@} }; diff --git a/src/lib/net/SecureSocket.cpp b/src/lib/net/SecureSocket.cpp index 76b3662f..99f626e8 100644 --- a/src/lib/net/SecureSocket.cpp +++ b/src/lib/net/SecureSocket.cpp @@ -83,7 +83,7 @@ SecureSocket::~SecureSocket() // take socket from multiplexer ASAP otherwise the race condition // could cause events to get called on a dead object. TCPSocket // will do this, too, but the double-call is harmless - setJob(NULL); + removeJob(); freeSSLResources(); // removing sleep() because I have no idea why you would want to do it @@ -125,13 +125,12 @@ SecureSocket::connect(const NetworkAddress& addr) TCPSocket::connect(addr); } -ISocketMultiplexerJob* -SecureSocket::newJob() +std::unique_ptr SecureSocket::newJob() { // after TCP connection is established, SecureSocket will pick up // connected event and do secureConnect if (m_connected && !m_secureReady) { - return NULL; + return {}; } return TCPSocket::newJob(); @@ -140,7 +139,7 @@ SecureSocket::newJob() void SecureSocket::secureConnect() { - setJob(new TSocketMultiplexerMethodJob( + setJob(std::make_unique>( this, &SecureSocket::serviceConnect, getSocket(), isReadable(), isWritable())); } @@ -148,7 +147,7 @@ SecureSocket::secureConnect() void SecureSocket::secureAccept() { - setJob(new TSocketMultiplexerMethodJob( + setJob(std::make_unique>( this, &SecureSocket::serviceAccept, getSocket(), isReadable(), isWritable())); } @@ -740,10 +739,11 @@ SecureSocket::verifyCertFingerprint() return isValid; } -ISocketMultiplexerJob* -SecureSocket::serviceConnect(ISocketMultiplexerJob* job, - bool, bool write, bool error) +MultiplexerJobStatus SecureSocket::serviceConnect(ISocketMultiplexerJob* job, + bool read, bool write, bool error) { + (void) read; + Lock lock(&getMutex()); int status = 0; @@ -755,25 +755,28 @@ SecureSocket::serviceConnect(ISocketMultiplexerJob* job, // If status < 0, error happened if (status < 0) { - return NULL; + return {false, {}}; } // If status > 0, success if (status > 0) { sendEvent(m_events->forIDataSocket().secureConnected()); - return newJob(); + return {true, newJob()}; } // Retry case - return new TSocketMultiplexerMethodJob( + return { + true, + std::make_unique>( this, &SecureSocket::serviceConnect, - getSocket(), isReadable(), isWritable()); + getSocket(), isReadable(), isWritable()) + }; } -ISocketMultiplexerJob* -SecureSocket::serviceAccept(ISocketMultiplexerJob* job, - bool, bool write, bool error) +MultiplexerJobStatus SecureSocket::serviceAccept(ISocketMultiplexerJob* job, + bool read, bool write, bool error) { + (void) read; Lock lock(&getMutex()); int status = 0; @@ -784,19 +787,19 @@ SecureSocket::serviceAccept(ISocketMultiplexerJob* job, #endif // If status < 0, error happened if (status < 0) { - return NULL; + return {false, {}}; } // If status > 0, success if (status > 0) { sendEvent(m_events->forClientListener().accepted()); - return newJob(); + return {true, newJob()}; } // Retry case - return new TSocketMultiplexerMethodJob( + return {true, std::make_unique>( this, &SecureSocket::serviceAccept, - getSocket(), isReadable(), isWritable()); + getSocket(), isReadable(), isWritable())}; } void diff --git a/src/lib/net/SecureSocket.h b/src/lib/net/SecureSocket.h index b2948c5e..773b5088 100644 --- a/src/lib/net/SecureSocket.h +++ b/src/lib/net/SecureSocket.h @@ -44,8 +44,7 @@ public: // IDataSocket overrides virtual void connect(const NetworkAddress&); - ISocketMultiplexerJob* - newJob(); + std::unique_ptr newJob() override; bool isFatal() const { return m_fatal; } void isFatal(bool b) { m_fatal = b; } bool isSecureReady(); @@ -74,13 +73,8 @@ private: bool separator = true); bool verifyCertFingerprint(); - ISocketMultiplexerJob* - serviceConnect(ISocketMultiplexerJob*, - bool, bool, bool); - - ISocketMultiplexerJob* - serviceAccept(ISocketMultiplexerJob*, - bool, bool, bool); + MultiplexerJobStatus serviceConnect(ISocketMultiplexerJob*, bool, bool, bool); + MultiplexerJobStatus serviceAccept(ISocketMultiplexerJob*, bool, bool, bool); void showSecureConnectInfo(); void showSecureLibInfo(); diff --git a/src/lib/net/SocketMultiplexer.cpp b/src/lib/net/SocketMultiplexer.cpp index c4bc64a3..cdb9039e 100644 --- a/src/lib/net/SocketMultiplexer.cpp +++ b/src/lib/net/SocketMultiplexer.cpp @@ -33,6 +33,20 @@ // SocketMultiplexer // +class CursorMultiplexerJob : public ISocketMultiplexerJob { +public: + MultiplexerJobStatus run(bool readable, bool writable, bool error) override + { + return {false, {}}; + } + + ArchSocket getSocket() const override { return {}; } + bool isReadable() const override { return false; } + bool isWritable() const override { return false; } + bool isCursor() const override { return true; } +}; + + SocketMultiplexer::SocketMultiplexer() : m_mutex(new Mutex), m_thread(NULL), @@ -43,12 +57,6 @@ SocketMultiplexer::SocketMultiplexer() : m_jobListLocker(NULL), m_jobListLockLocker(NULL) { - // this pointer just has to be unique and not NULL. it will - // never be dereferenced. it's used to identify cursor nodes - // in the jobs list. - // TODO: Remove this evilness - m_cursorMark = reinterpret_cast(this); - // start thread m_thread = new Thread(new TMethodJob( this, &SocketMultiplexer::serviceThread)); @@ -66,16 +74,9 @@ SocketMultiplexer::~SocketMultiplexer() delete m_jobListLocker; delete m_jobListLockLocker; delete m_mutex; - - // clean up jobs - for (SocketJobMap::iterator i = m_socketJobMap.begin(); - i != m_socketJobMap.end(); ++i) { - delete *(i->second); - } } -void -SocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job) +void SocketMultiplexer::addSocket(ISocket* socket, std::unique_ptr&& job) { assert(socket != NULL); assert(job != NULL); @@ -95,16 +96,12 @@ SocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job) // we *must* put the job at the end so the order of jobs in // the list continue to match the order of jobs in pfds in // serviceThread(). - JobCursor j = m_socketJobs.insert(m_socketJobs.end(), job); + JobCursor j = m_socketJobs.insert(m_socketJobs.end(), std::move(job)); m_update = true; m_socketJobMap.insert(std::make_pair(socket, j)); } else { - JobCursor j = i->second; - if (*j != job) { - delete *j; - *j = job; - } + *(i->second) = std::move(job); m_update = true; } @@ -131,10 +128,9 @@ SocketMultiplexer::removeSocket(ISocket* socket) // to match the order of jobs in pfds in serviceThread(). SocketJobMap::iterator i = m_socketJobMap.find(socket); if (i != m_socketJobMap.end()) { - if (*(i->second) != NULL) { - delete *(i->second); - *(i->second) = NULL; - m_update = true; + if (*(i->second)) { + i->second->reset(); + m_update = true; } } @@ -173,14 +169,13 @@ SocketMultiplexer::serviceThread(void*) JobCursor cursor = newCursor(); JobCursor jobCursor = nextCursor(cursor); while (jobCursor != m_socketJobs.end()) { - ISocketMultiplexerJob* job = *jobCursor; - if (job != NULL) { - pfd.m_socket = job->getSocket(); + if (*jobCursor) { + pfd.m_socket = (*jobCursor)->getSocket(); pfd.m_events = 0; - if (job->isReadable()) { + if ((*jobCursor)->isReadable()) { pfd.m_events |= IArchNetwork::kPOLLIN; } - if (job->isWritable()) { + if ((*jobCursor)->isWritable()) { pfd.m_events |= IArchNetwork::kPOLLOUT; } pfds.push_back(pfd); @@ -221,15 +216,16 @@ SocketMultiplexer::serviceThread(void*) IArchNetwork::kPOLLNVAL)) != 0); // run job - ISocketMultiplexerJob* job = *jobCursor; - ISocketMultiplexerJob* newJob = job->run(read, write, error); + MultiplexerJobStatus status = (*jobCursor)->run(read, write, error); - // save job, if different - if (newJob != job) { + if (!status.continue_servicing) { Lock lock(m_mutex); - delete job; - *jobCursor = newJob; - m_update = true; + jobCursor->reset(); + m_update = true; + } else if (status.new_job) { + Lock lock(m_mutex); + *jobCursor = std::move(status.new_job); + m_update = true; } ++i; } @@ -262,7 +258,7 @@ SocketMultiplexer::JobCursor SocketMultiplexer::newCursor() { Lock lock(m_mutex); - return m_socketJobs.insert(m_socketJobs.begin(), m_cursorMark); + return m_socketJobs.insert(m_socketJobs.begin(), std::make_unique()); } SocketMultiplexer::JobCursor @@ -272,7 +268,7 @@ SocketMultiplexer::nextCursor(JobCursor cursor) JobCursor j = m_socketJobs.end(); JobCursor i = cursor; while (++i != m_socketJobs.end()) { - if (*i != m_cursorMark) { + if (*i && !(*i)->isCursor()) { // found a real job (as opposed to a cursor) j = i; diff --git a/src/lib/net/SocketMultiplexer.h b/src/lib/net/SocketMultiplexer.h index 4aa39fc4..98915580 100644 --- a/src/lib/net/SocketMultiplexer.h +++ b/src/lib/net/SocketMultiplexer.h @@ -21,6 +21,7 @@ #include "arch/IArchNetwork.h" #include "common/stdlist.h" #include "common/stdmap.h" +#include template class CondVar; @@ -41,7 +42,7 @@ public: //! @name manipulators //@{ - void addSocket(ISocket*, ISocketMultiplexerJob*); + void addSocket(ISocket*, std::unique_ptr&& job); void removeSocket(ISocket*); @@ -58,7 +59,7 @@ public: private: // list of jobs. we use a list so we can safely iterate over it // while other threads modify it. - typedef std::list SocketJobs; + using SocketJobs = std::list>; typedef SocketJobs::iterator JobCursor; typedef std::map SocketJobMap; @@ -106,6 +107,4 @@ private: SocketJobs m_socketJobs; SocketJobMap m_socketJobMap; - ISocketMultiplexerJob* - m_cursorMark; }; diff --git a/src/lib/net/TCPListenSocket.cpp b/src/lib/net/TCPListenSocket.cpp index 8e1540e9..26f03b05 100644 --- a/src/lib/net/TCPListenSocket.cpp +++ b/src/lib/net/TCPListenSocket.cpp @@ -69,10 +69,11 @@ TCPListenSocket::bind(const NetworkAddress& addr) ARCH->setReuseAddrOnSocket(m_socket, true); ARCH->bindSocket(m_socket, addr.getAddress()); ARCH->listenOnSocket(m_socket); - m_socketMultiplexer->addSocket(this, - new TSocketMultiplexerMethodJob( - this, &TCPListenSocket::serviceListening, - m_socket, true, false)); + + auto new_job = std::make_unique>( + this, &TCPListenSocket::serviceListening, m_socket, true, false); + + m_socketMultiplexer->addSocket(this, std::move(new_job)); } catch (XArchNetworkAddressInUse& e) { throw XSocketAddressInUse(e.what()); @@ -135,24 +136,22 @@ TCPListenSocket::accept() void TCPListenSocket::setListeningJob() { - m_socketMultiplexer->addSocket(this, - new TSocketMultiplexerMethodJob( - this, &TCPListenSocket::serviceListening, - m_socket, true, false)); + auto new_job = std::make_unique>( + this, &TCPListenSocket::serviceListening, m_socket, true, false); + m_socketMultiplexer->addSocket(this, std::move(new_job)); } -ISocketMultiplexerJob* -TCPListenSocket::serviceListening(ISocketMultiplexerJob* job, - bool read, bool, bool error) +MultiplexerJobStatus TCPListenSocket::serviceListening(ISocketMultiplexerJob* job, + bool read, bool, bool error) { if (error) { close(); - return NULL; + return {false, {}}; } if (read) { m_events->addEvent(Event(m_events->forIListenSocket().connecting(), this, NULL)); // stop polling on this socket until the client accepts - return NULL; + return {false, {}}; } - return job; + return {true, {}}; } diff --git a/src/lib/net/TCPListenSocket.h b/src/lib/net/TCPListenSocket.h index 10603562..f3ababc2 100644 --- a/src/lib/net/TCPListenSocket.h +++ b/src/lib/net/TCPListenSocket.h @@ -19,10 +19,10 @@ #pragma once #include "net/IListenSocket.h" +#include "net/ISocketMultiplexerJob.h" #include "arch/IArchNetwork.h" class Mutex; -class ISocketMultiplexerJob; class IEventQueue; class SocketMultiplexer; @@ -48,9 +48,7 @@ protected: void setListeningJob(); public: - ISocketMultiplexerJob* - serviceListening(ISocketMultiplexerJob*, - bool, bool, bool); + MultiplexerJobStatus serviceListening(ISocketMultiplexerJob*, bool, bool, bool); protected: ArchSocket m_socket; diff --git a/src/lib/net/TCPSocket.cpp b/src/lib/net/TCPSocket.cpp index d454aa1f..4f4251ad 100644 --- a/src/lib/net/TCPSocket.cpp +++ b/src/lib/net/TCPSocket.cpp @@ -388,40 +388,42 @@ TCPSocket::doWrite() return kRetry; } -void -TCPSocket::setJob(ISocketMultiplexerJob* job) +void TCPSocket::removeJob() { // multiplexer will delete the old job - if (job == NULL) { - m_socketMultiplexer->removeSocket(this); - } - else { - m_socketMultiplexer->addSocket(this, job); + m_socketMultiplexer->removeSocket(this); +} + +void TCPSocket::setJob(std::unique_ptr&& job) +{ + if (job.get() == nullptr) { + removeJob(); + } else { + m_socketMultiplexer->addSocket(this, std::move(job)); } } -ISocketMultiplexerJob* -TCPSocket::newJob() +std::unique_ptr TCPSocket::newJob() { // note -- must have m_mutex locked on entry if (m_socket == NULL) { - return NULL; + return {}; } else if (!m_connected) { assert(!m_readable); if (!(m_readable || m_writable)) { - return NULL; + return {}; } - return new TSocketMultiplexerMethodJob( + return std::make_unique>( this, &TCPSocket::serviceConnecting, m_socket, m_readable, m_writable); } else { if (!(m_readable || (m_writable && (m_outputBuffer.getSize() > 0)))) { - return NULL; + return {}; } - return new TSocketMultiplexerMethodJob( + return std::make_unique>( this, &TCPSocket::serviceConnected, m_socket, m_readable, m_writable && (m_outputBuffer.getSize() > 0)); @@ -488,9 +490,7 @@ TCPSocket::onDisconnected() m_connected = false; } -ISocketMultiplexerJob* -TCPSocket::serviceConnecting(ISocketMultiplexerJob* job, - bool, bool write, bool error) +MultiplexerJobStatus TCPSocket::serviceConnecting(ISocketMultiplexerJob* job, bool, bool write, bool error) { Lock lock(&m_mutex); @@ -519,29 +519,36 @@ TCPSocket::serviceConnecting(ISocketMultiplexerJob* job, catch (XArchNetwork& e) { sendConnectionFailedEvent(e.what()); onDisconnected(); - return newJob(); + auto new_job = newJob(); + if (new_job) + return {true, std::move(new_job)}; + else + return {false, {}}; } } if (write) { sendEvent(m_events->forIDataSocket().connected()); onConnected(); - return newJob(); + auto new_job = newJob(); + if (new_job) + return {true, std::move(new_job)}; + else + return {false, {}}; } - return job; + return {true, {}}; } -ISocketMultiplexerJob* -TCPSocket::serviceConnected(ISocketMultiplexerJob* job, - bool read, bool write, bool error) +MultiplexerJobStatus TCPSocket::serviceConnected(ISocketMultiplexerJob* job, + bool read, bool write, bool error) { Lock lock(&m_mutex); if (error) { sendEvent(m_events->forISocket().disconnected()); onDisconnected(); - return newJob(); + return {true, newJob()}; } EJobResult writeResult = kRetry; @@ -594,10 +601,10 @@ TCPSocket::serviceConnected(ISocketMultiplexerJob* job, } if (writeResult == kBreak || readResult == kBreak) { - return NULL; + return {false, {}}; } else if (writeResult == kNew || readResult == kNew) { - return newJob(); + return {true, newJob()}; } else { - return job; + return {true, {}}; } } diff --git a/src/lib/net/TCPSocket.h b/src/lib/net/TCPSocket.h index 1006f88e..28891353 100644 --- a/src/lib/net/TCPSocket.h +++ b/src/lib/net/TCPSocket.h @@ -19,14 +19,15 @@ #pragma once #include "net/IDataSocket.h" +#include "net/ISocketMultiplexerJob.h" #include "io/StreamBuffer.h" #include "mt/CondVar.h" #include "mt/Mutex.h" #include "arch/IArchNetwork.h" +#include class Mutex; class Thread; -class ISocketMultiplexerJob; class IEventQueue; class SocketMultiplexer; @@ -59,8 +60,7 @@ public: virtual void connect(const NetworkAddress&); - virtual ISocketMultiplexerJob* - newJob(); + virtual std::unique_ptr newJob(); protected: enum EJobResult { @@ -74,7 +74,8 @@ protected: virtual EJobResult doRead(); virtual EJobResult doWrite(); - void setJob(ISocketMultiplexerJob*); + void removeJob(); + void setJob(std::unique_ptr&& job); bool isReadable() { return m_readable; } bool isWritable() { return m_writable; } @@ -93,12 +94,8 @@ private: void onOutputShutdown(); void onDisconnected(); - ISocketMultiplexerJob* - serviceConnecting(ISocketMultiplexerJob*, - bool, bool, bool); - ISocketMultiplexerJob* - serviceConnected(ISocketMultiplexerJob*, - bool, bool, bool); + MultiplexerJobStatus serviceConnecting(ISocketMultiplexerJob*, bool, bool, bool); + MultiplexerJobStatus serviceConnected(ISocketMultiplexerJob*, bool, bool, bool); protected: bool m_readable; diff --git a/src/lib/net/TSocketMultiplexerMethodJob.h b/src/lib/net/TSocketMultiplexerMethodJob.h index 90efbe7d..9e74cdd2 100644 --- a/src/lib/net/TSocketMultiplexerMethodJob.h +++ b/src/lib/net/TSocketMultiplexerMethodJob.h @@ -28,8 +28,7 @@ A socket multiplexer job class that invokes a member function. template class TSocketMultiplexerMethodJob : public ISocketMultiplexerJob { public: - typedef ISocketMultiplexerJob* - (T::*Method)(ISocketMultiplexerJob*, bool, bool, bool); + using Method = MultiplexerJobStatus (T::*)(ISocketMultiplexerJob*, bool, bool, bool); //! run() invokes \c object->method(arg) TSocketMultiplexerMethodJob(T* object, Method method, @@ -37,11 +36,10 @@ public: virtual ~TSocketMultiplexerMethodJob(); // IJob overrides - virtual ISocketMultiplexerJob* - run(bool readable, bool writable, bool error); - virtual ArchSocket getSocket() const; - virtual bool isReadable() const; - virtual bool isWritable() const; + virtual MultiplexerJobStatus run(bool readable, bool writable, bool error) override; + virtual ArchSocket getSocket() const override; + virtual bool isReadable() const override; + virtual bool isWritable() const override; private: T* m_object; @@ -74,14 +72,12 @@ TSocketMultiplexerMethodJob::~TSocketMultiplexerMethodJob() } template -inline -ISocketMultiplexerJob* -TSocketMultiplexerMethodJob::run(bool read, bool write, bool error) +inline MultiplexerJobStatus TSocketMultiplexerMethodJob::run(bool read, bool write, bool error) { if (m_object != NULL) { return (m_object->*m_method)(this, read, write, error); } - return NULL; + return {false, {}}; } template