Make ownership of SocketMultiplexerJob explicit

This commit is contained in:
Povilas Kanapickas 2019-08-17 16:17:50 +03:00
parent 8dd6bc2c55
commit a841b2858f
10 changed files with 148 additions and 146 deletions

View File

@ -20,6 +20,19 @@
#include "arch/IArchNetwork.h"
#include "common/IInterface.h"
#include <memory>
class ISocketMultiplexerJob;
struct MultiplexerJobStatus
{
MultiplexerJobStatus(bool cont, std::unique_ptr<ISocketMultiplexerJob>&& nj) :
continue_servicing(cont), new_job(std::move(nj))
{}
bool continue_servicing = false;
std::unique_ptr<ISocketMultiplexerJob> new_job;
};
//! Socket multiplexer job
/*!
@ -32,21 +45,20 @@ public:
//! Handle socket event
/*!
Called by a socket multiplexer when the socket becomes readable,
writable, or has an error. It should return itself if the same
job can continue to service events, a new job if the socket must
be serviced differently, or NULL if the socket should no longer
be serviced. The socket is readable if \p readable is true,
writable if \p writable is true, and in error if \p error is
true.
Called by a socket multiplexer when the socket becomes readable, writable, or has an error.
The socket is readable if \p readable is true, writable if \p writable is true, and in error
if \p error is true.
The method returns false as the continue_servicing member of the returned struct if the socket
should no longer be served and true otherwise. Additionally, if the new_job member of the
returned pair is not empty, the socket should be serviced differently with the specified job.
This call must not attempt to directly change the job for this
socket by calling \c addSocket() or \c removeSocket() on the
multiplexer. It must instead return the new job. It can,
however, add or remove jobs for other sockets.
*/
virtual ISocketMultiplexerJob*
run(bool readable, bool writable, bool error) = 0;
virtual MultiplexerJobStatus run(bool readable, bool writable, bool error) = 0;
//@}
//! @name accessors
@ -72,5 +84,6 @@ public:
*/
virtual bool isWritable() const = 0;
virtual bool isCursor() const { return false; }
//@}
};

View File

@ -83,7 +83,7 @@ SecureSocket::~SecureSocket()
// take socket from multiplexer ASAP otherwise the race condition
// could cause events to get called on a dead object. TCPSocket
// will do this, too, but the double-call is harmless
setJob(NULL);
removeJob();
freeSSLResources();
// removing sleep() because I have no idea why you would want to do it
@ -125,13 +125,12 @@ SecureSocket::connect(const NetworkAddress& addr)
TCPSocket::connect(addr);
}
ISocketMultiplexerJob*
SecureSocket::newJob()
std::unique_ptr<ISocketMultiplexerJob> SecureSocket::newJob()
{
// after TCP connection is established, SecureSocket will pick up
// connected event and do secureConnect
if (m_connected && !m_secureReady) {
return NULL;
return {};
}
return TCPSocket::newJob();
@ -140,7 +139,7 @@ SecureSocket::newJob()
void
SecureSocket::secureConnect()
{
setJob(new TSocketMultiplexerMethodJob<SecureSocket>(
setJob(std::make_unique<TSocketMultiplexerMethodJob<SecureSocket>>(
this, &SecureSocket::serviceConnect,
getSocket(), isReadable(), isWritable()));
}
@ -148,7 +147,7 @@ SecureSocket::secureConnect()
void
SecureSocket::secureAccept()
{
setJob(new TSocketMultiplexerMethodJob<SecureSocket>(
setJob(std::make_unique<TSocketMultiplexerMethodJob<SecureSocket>>(
this, &SecureSocket::serviceAccept,
getSocket(), isReadable(), isWritable()));
}
@ -740,10 +739,11 @@ SecureSocket::verifyCertFingerprint()
return isValid;
}
ISocketMultiplexerJob*
SecureSocket::serviceConnect(ISocketMultiplexerJob* job,
bool, bool write, bool error)
MultiplexerJobStatus SecureSocket::serviceConnect(ISocketMultiplexerJob* job,
bool read, bool write, bool error)
{
(void) read;
Lock lock(&getMutex());
int status = 0;
@ -755,25 +755,28 @@ SecureSocket::serviceConnect(ISocketMultiplexerJob* job,
// If status < 0, error happened
if (status < 0) {
return NULL;
return {false, {}};
}
// If status > 0, success
if (status > 0) {
sendEvent(m_events->forIDataSocket().secureConnected());
return newJob();
return {true, newJob()};
}
// Retry case
return new TSocketMultiplexerMethodJob<SecureSocket>(
return {
true,
std::make_unique<TSocketMultiplexerMethodJob<SecureSocket>>(
this, &SecureSocket::serviceConnect,
getSocket(), isReadable(), isWritable());
getSocket(), isReadable(), isWritable())
};
}
ISocketMultiplexerJob*
SecureSocket::serviceAccept(ISocketMultiplexerJob* job,
bool, bool write, bool error)
MultiplexerJobStatus SecureSocket::serviceAccept(ISocketMultiplexerJob* job,
bool read, bool write, bool error)
{
(void) read;
Lock lock(&getMutex());
int status = 0;
@ -784,19 +787,19 @@ SecureSocket::serviceAccept(ISocketMultiplexerJob* job,
#endif
// If status < 0, error happened
if (status < 0) {
return NULL;
return {false, {}};
}
// If status > 0, success
if (status > 0) {
sendEvent(m_events->forClientListener().accepted());
return newJob();
return {true, newJob()};
}
// Retry case
return new TSocketMultiplexerMethodJob<SecureSocket>(
return {true, std::make_unique<TSocketMultiplexerMethodJob<SecureSocket>>(
this, &SecureSocket::serviceAccept,
getSocket(), isReadable(), isWritable());
getSocket(), isReadable(), isWritable())};
}
void

View File

@ -44,8 +44,7 @@ public:
// IDataSocket overrides
virtual void connect(const NetworkAddress&);
ISocketMultiplexerJob*
newJob();
std::unique_ptr<ISocketMultiplexerJob> newJob() override;
bool isFatal() const { return m_fatal; }
void isFatal(bool b) { m_fatal = b; }
bool isSecureReady();
@ -74,13 +73,8 @@ private:
bool separator = true);
bool verifyCertFingerprint();
ISocketMultiplexerJob*
serviceConnect(ISocketMultiplexerJob*,
bool, bool, bool);
ISocketMultiplexerJob*
serviceAccept(ISocketMultiplexerJob*,
bool, bool, bool);
MultiplexerJobStatus serviceConnect(ISocketMultiplexerJob*, bool, bool, bool);
MultiplexerJobStatus serviceAccept(ISocketMultiplexerJob*, bool, bool, bool);
void showSecureConnectInfo();
void showSecureLibInfo();

View File

@ -33,6 +33,20 @@
// SocketMultiplexer
//
class CursorMultiplexerJob : public ISocketMultiplexerJob {
public:
MultiplexerJobStatus run(bool readable, bool writable, bool error) override
{
return {false, {}};
}
ArchSocket getSocket() const override { return {}; }
bool isReadable() const override { return false; }
bool isWritable() const override { return false; }
bool isCursor() const override { return true; }
};
SocketMultiplexer::SocketMultiplexer() :
m_mutex(new Mutex),
m_thread(NULL),
@ -43,12 +57,6 @@ SocketMultiplexer::SocketMultiplexer() :
m_jobListLocker(NULL),
m_jobListLockLocker(NULL)
{
// this pointer just has to be unique and not NULL. it will
// never be dereferenced. it's used to identify cursor nodes
// in the jobs list.
// TODO: Remove this evilness
m_cursorMark = reinterpret_cast<ISocketMultiplexerJob*>(this);
// start thread
m_thread = new Thread(new TMethodJob<SocketMultiplexer>(
this, &SocketMultiplexer::serviceThread));
@ -66,16 +74,9 @@ SocketMultiplexer::~SocketMultiplexer()
delete m_jobListLocker;
delete m_jobListLockLocker;
delete m_mutex;
// clean up jobs
for (SocketJobMap::iterator i = m_socketJobMap.begin();
i != m_socketJobMap.end(); ++i) {
delete *(i->second);
}
}
void
SocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job)
void SocketMultiplexer::addSocket(ISocket* socket, std::unique_ptr<ISocketMultiplexerJob>&& job)
{
assert(socket != NULL);
assert(job != NULL);
@ -95,16 +96,12 @@ SocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job)
// we *must* put the job at the end so the order of jobs in
// the list continue to match the order of jobs in pfds in
// serviceThread().
JobCursor j = m_socketJobs.insert(m_socketJobs.end(), job);
JobCursor j = m_socketJobs.insert(m_socketJobs.end(), std::move(job));
m_update = true;
m_socketJobMap.insert(std::make_pair(socket, j));
}
else {
JobCursor j = i->second;
if (*j != job) {
delete *j;
*j = job;
}
*(i->second) = std::move(job);
m_update = true;
}
@ -131,9 +128,8 @@ SocketMultiplexer::removeSocket(ISocket* socket)
// to match the order of jobs in pfds in serviceThread().
SocketJobMap::iterator i = m_socketJobMap.find(socket);
if (i != m_socketJobMap.end()) {
if (*(i->second) != NULL) {
delete *(i->second);
*(i->second) = NULL;
if (*(i->second)) {
i->second->reset();
m_update = true;
}
}
@ -173,14 +169,13 @@ SocketMultiplexer::serviceThread(void*)
JobCursor cursor = newCursor();
JobCursor jobCursor = nextCursor(cursor);
while (jobCursor != m_socketJobs.end()) {
ISocketMultiplexerJob* job = *jobCursor;
if (job != NULL) {
pfd.m_socket = job->getSocket();
if (*jobCursor) {
pfd.m_socket = (*jobCursor)->getSocket();
pfd.m_events = 0;
if (job->isReadable()) {
if ((*jobCursor)->isReadable()) {
pfd.m_events |= IArchNetwork::kPOLLIN;
}
if (job->isWritable()) {
if ((*jobCursor)->isWritable()) {
pfd.m_events |= IArchNetwork::kPOLLOUT;
}
pfds.push_back(pfd);
@ -221,14 +216,15 @@ SocketMultiplexer::serviceThread(void*)
IArchNetwork::kPOLLNVAL)) != 0);
// run job
ISocketMultiplexerJob* job = *jobCursor;
ISocketMultiplexerJob* newJob = job->run(read, write, error);
MultiplexerJobStatus status = (*jobCursor)->run(read, write, error);
// save job, if different
if (newJob != job) {
if (!status.continue_servicing) {
Lock lock(m_mutex);
delete job;
*jobCursor = newJob;
jobCursor->reset();
m_update = true;
} else if (status.new_job) {
Lock lock(m_mutex);
*jobCursor = std::move(status.new_job);
m_update = true;
}
++i;
@ -262,7 +258,7 @@ SocketMultiplexer::JobCursor
SocketMultiplexer::newCursor()
{
Lock lock(m_mutex);
return m_socketJobs.insert(m_socketJobs.begin(), m_cursorMark);
return m_socketJobs.insert(m_socketJobs.begin(), std::make_unique<CursorMultiplexerJob>());
}
SocketMultiplexer::JobCursor
@ -272,7 +268,7 @@ SocketMultiplexer::nextCursor(JobCursor cursor)
JobCursor j = m_socketJobs.end();
JobCursor i = cursor;
while (++i != m_socketJobs.end()) {
if (*i != m_cursorMark) {
if (*i && !(*i)->isCursor()) {
// found a real job (as opposed to a cursor)
j = i;

View File

@ -21,6 +21,7 @@
#include "arch/IArchNetwork.h"
#include "common/stdlist.h"
#include "common/stdmap.h"
#include <memory>
template <class T>
class CondVar;
@ -41,7 +42,7 @@ public:
//! @name manipulators
//@{
void addSocket(ISocket*, ISocketMultiplexerJob*);
void addSocket(ISocket*, std::unique_ptr<ISocketMultiplexerJob>&& job);
void removeSocket(ISocket*);
@ -58,7 +59,7 @@ public:
private:
// list of jobs. we use a list so we can safely iterate over it
// while other threads modify it.
typedef std::list<ISocketMultiplexerJob*> SocketJobs;
using SocketJobs = std::list<std::unique_ptr<ISocketMultiplexerJob>>;
typedef SocketJobs::iterator JobCursor;
typedef std::map<ISocket*, JobCursor> SocketJobMap;
@ -106,6 +107,4 @@ private:
SocketJobs m_socketJobs;
SocketJobMap m_socketJobMap;
ISocketMultiplexerJob*
m_cursorMark;
};

View File

@ -69,10 +69,11 @@ TCPListenSocket::bind(const NetworkAddress& addr)
ARCH->setReuseAddrOnSocket(m_socket, true);
ARCH->bindSocket(m_socket, addr.getAddress());
ARCH->listenOnSocket(m_socket);
m_socketMultiplexer->addSocket(this,
new TSocketMultiplexerMethodJob<TCPListenSocket>(
this, &TCPListenSocket::serviceListening,
m_socket, true, false));
auto new_job = std::make_unique<TSocketMultiplexerMethodJob<TCPListenSocket>>(
this, &TCPListenSocket::serviceListening, m_socket, true, false);
m_socketMultiplexer->addSocket(this, std::move(new_job));
}
catch (XArchNetworkAddressInUse& e) {
throw XSocketAddressInUse(e.what());
@ -135,24 +136,22 @@ TCPListenSocket::accept()
void
TCPListenSocket::setListeningJob()
{
m_socketMultiplexer->addSocket(this,
new TSocketMultiplexerMethodJob<TCPListenSocket>(
this, &TCPListenSocket::serviceListening,
m_socket, true, false));
auto new_job = std::make_unique<TSocketMultiplexerMethodJob<TCPListenSocket>>(
this, &TCPListenSocket::serviceListening, m_socket, true, false);
m_socketMultiplexer->addSocket(this, std::move(new_job));
}
ISocketMultiplexerJob*
TCPListenSocket::serviceListening(ISocketMultiplexerJob* job,
MultiplexerJobStatus TCPListenSocket::serviceListening(ISocketMultiplexerJob* job,
bool read, bool, bool error)
{
if (error) {
close();
return NULL;
return {false, {}};
}
if (read) {
m_events->addEvent(Event(m_events->forIListenSocket().connecting(), this, NULL));
// stop polling on this socket until the client accepts
return NULL;
return {false, {}};
}
return job;
return {true, {}};
}

View File

@ -19,10 +19,10 @@
#pragma once
#include "net/IListenSocket.h"
#include "net/ISocketMultiplexerJob.h"
#include "arch/IArchNetwork.h"
class Mutex;
class ISocketMultiplexerJob;
class IEventQueue;
class SocketMultiplexer;
@ -48,9 +48,7 @@ protected:
void setListeningJob();
public:
ISocketMultiplexerJob*
serviceListening(ISocketMultiplexerJob*,
bool, bool, bool);
MultiplexerJobStatus serviceListening(ISocketMultiplexerJob*, bool, bool, bool);
protected:
ArchSocket m_socket;

View File

@ -388,40 +388,42 @@ TCPSocket::doWrite()
return kRetry;
}
void
TCPSocket::setJob(ISocketMultiplexerJob* job)
void TCPSocket::removeJob()
{
// multiplexer will delete the old job
if (job == NULL) {
m_socketMultiplexer->removeSocket(this);
}
else {
m_socketMultiplexer->addSocket(this, job);
}
void TCPSocket::setJob(std::unique_ptr<ISocketMultiplexerJob>&& job)
{
if (job.get() == nullptr) {
removeJob();
} else {
m_socketMultiplexer->addSocket(this, std::move(job));
}
}
ISocketMultiplexerJob*
TCPSocket::newJob()
std::unique_ptr<ISocketMultiplexerJob> TCPSocket::newJob()
{
// note -- must have m_mutex locked on entry
if (m_socket == NULL) {
return NULL;
return {};
}
else if (!m_connected) {
assert(!m_readable);
if (!(m_readable || m_writable)) {
return NULL;
return {};
}
return new TSocketMultiplexerMethodJob<TCPSocket>(
return std::make_unique<TSocketMultiplexerMethodJob<TCPSocket>>(
this, &TCPSocket::serviceConnecting,
m_socket, m_readable, m_writable);
}
else {
if (!(m_readable || (m_writable && (m_outputBuffer.getSize() > 0)))) {
return NULL;
return {};
}
return new TSocketMultiplexerMethodJob<TCPSocket>(
return std::make_unique<TSocketMultiplexerMethodJob<TCPSocket>>(
this, &TCPSocket::serviceConnected,
m_socket, m_readable,
m_writable && (m_outputBuffer.getSize() > 0));
@ -488,9 +490,7 @@ TCPSocket::onDisconnected()
m_connected = false;
}
ISocketMultiplexerJob*
TCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
bool, bool write, bool error)
MultiplexerJobStatus TCPSocket::serviceConnecting(ISocketMultiplexerJob* job, bool, bool write, bool error)
{
Lock lock(&m_mutex);
@ -519,21 +519,28 @@ TCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
catch (XArchNetwork& e) {
sendConnectionFailedEvent(e.what());
onDisconnected();
return newJob();
auto new_job = newJob();
if (new_job)
return {true, std::move(new_job)};
else
return {false, {}};
}
}
if (write) {
sendEvent(m_events->forIDataSocket().connected());
onConnected();
return newJob();
auto new_job = newJob();
if (new_job)
return {true, std::move(new_job)};
else
return {false, {}};
}
return job;
return {true, {}};
}
ISocketMultiplexerJob*
TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
MultiplexerJobStatus TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
bool read, bool write, bool error)
{
Lock lock(&m_mutex);
@ -541,7 +548,7 @@ TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
if (error) {
sendEvent(m_events->forISocket().disconnected());
onDisconnected();
return newJob();
return {true, newJob()};
}
EJobResult writeResult = kRetry;
@ -594,10 +601,10 @@ TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
}
if (writeResult == kBreak || readResult == kBreak) {
return NULL;
return {false, {}};
} else if (writeResult == kNew || readResult == kNew) {
return newJob();
return {true, newJob()};
} else {
return job;
return {true, {}};
}
}

View File

@ -19,14 +19,15 @@
#pragma once
#include "net/IDataSocket.h"
#include "net/ISocketMultiplexerJob.h"
#include "io/StreamBuffer.h"
#include "mt/CondVar.h"
#include "mt/Mutex.h"
#include "arch/IArchNetwork.h"
#include <memory>
class Mutex;
class Thread;
class ISocketMultiplexerJob;
class IEventQueue;
class SocketMultiplexer;
@ -59,8 +60,7 @@ public:
virtual void connect(const NetworkAddress&);
virtual ISocketMultiplexerJob*
newJob();
virtual std::unique_ptr<ISocketMultiplexerJob> newJob();
protected:
enum EJobResult {
@ -74,7 +74,8 @@ protected:
virtual EJobResult doRead();
virtual EJobResult doWrite();
void setJob(ISocketMultiplexerJob*);
void removeJob();
void setJob(std::unique_ptr<ISocketMultiplexerJob>&& job);
bool isReadable() { return m_readable; }
bool isWritable() { return m_writable; }
@ -93,12 +94,8 @@ private:
void onOutputShutdown();
void onDisconnected();
ISocketMultiplexerJob*
serviceConnecting(ISocketMultiplexerJob*,
bool, bool, bool);
ISocketMultiplexerJob*
serviceConnected(ISocketMultiplexerJob*,
bool, bool, bool);
MultiplexerJobStatus serviceConnecting(ISocketMultiplexerJob*, bool, bool, bool);
MultiplexerJobStatus serviceConnected(ISocketMultiplexerJob*, bool, bool, bool);
protected:
bool m_readable;

View File

@ -28,8 +28,7 @@ A socket multiplexer job class that invokes a member function.
template <class T>
class TSocketMultiplexerMethodJob : public ISocketMultiplexerJob {
public:
typedef ISocketMultiplexerJob*
(T::*Method)(ISocketMultiplexerJob*, bool, bool, bool);
using Method = MultiplexerJobStatus (T::*)(ISocketMultiplexerJob*, bool, bool, bool);
//! run() invokes \c object->method(arg)
TSocketMultiplexerMethodJob(T* object, Method method,
@ -37,11 +36,10 @@ public:
virtual ~TSocketMultiplexerMethodJob();
// IJob overrides
virtual ISocketMultiplexerJob*
run(bool readable, bool writable, bool error);
virtual ArchSocket getSocket() const;
virtual bool isReadable() const;
virtual bool isWritable() const;
virtual MultiplexerJobStatus run(bool readable, bool writable, bool error) override;
virtual ArchSocket getSocket() const override;
virtual bool isReadable() const override;
virtual bool isWritable() const override;
private:
T* m_object;
@ -74,14 +72,12 @@ TSocketMultiplexerMethodJob<T>::~TSocketMultiplexerMethodJob()
}
template <class T>
inline
ISocketMultiplexerJob*
TSocketMultiplexerMethodJob<T>::run(bool read, bool write, bool error)
inline MultiplexerJobStatus TSocketMultiplexerMethodJob<T>::run(bool read, bool write, bool error)
{
if (m_object != NULL) {
return (m_object->*m_method)(this, read, write, error);
}
return NULL;
return {false, {}};
}
template <class T>