Merge pull request #411 from p12tic/rewrite-memory-management

Use explicit memory ownership in SocketMultiplexer
This commit is contained in:
Adrian Lucrèce Céleste 2019-08-22 15:01:21 -04:00 committed by GitHub
commit 7bb541ea91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 156 additions and 154 deletions

View File

@ -200,7 +200,7 @@ App::initApp(int argc, const char** argv)
void
App::initIpcClient()
{
m_ipcClient = new IpcClient(m_events, m_socketMultiplexer);
m_ipcClient = new IpcClient(m_events, m_socketMultiplexer.get());
m_ipcClient->connect();
m_events->adoptHandler(

View File

@ -23,7 +23,9 @@
#include "base/String.h"
#include "base/Log.h"
#include "base/EventQueue.h"
#include "net/SocketMultiplexer.h"
#include "common/common.h"
#include <memory>
#if SYSAPI_WIN32
#include "barrier/win32/AppUtilWindows.h"
@ -95,8 +97,8 @@ public:
virtual IEventQueue* getEvents() const { return m_events; }
void setSocketMultiplexer(SocketMultiplexer* sm) { m_socketMultiplexer = sm; }
SocketMultiplexer* getSocketMultiplexer() const { return m_socketMultiplexer; }
void setSocketMultiplexer(std::unique_ptr<SocketMultiplexer>&& sm) { m_socketMultiplexer = std::move(sm); }
SocketMultiplexer* getSocketMultiplexer() const { return m_socketMultiplexer.get(); }
void setEvents(EventQueue& events) { m_events = &events; }
@ -119,7 +121,7 @@ private:
CreateTaskBarReceiverFunc m_createTaskBarReceiver;
ARCH_APP_UTIL m_appUtil;
IpcClient* m_ipcClient;
SocketMultiplexer* m_socketMultiplexer;
std::unique_ptr<SocketMultiplexer> m_socketMultiplexer;
};
class MinimalApp : public App {

View File

@ -443,8 +443,7 @@ ClientApp::mainLoop()
{
// create socket multiplexer. this must happen after daemonization
// on unix because threads evaporate across a fork().
SocketMultiplexer multiplexer;
setSocketMultiplexer(&multiplexer);
setSocketMultiplexer(std::make_unique<SocketMultiplexer>());
// start client, etc
appUtil().startNode();

View File

@ -713,8 +713,7 @@ ServerApp::mainLoop()
{
// create socket multiplexer. this must happen after daemonization
// on unix because threads evaporate across a fork().
SocketMultiplexer multiplexer;
setSocketMultiplexer(&multiplexer);
setSocketMultiplexer(std::make_unique<SocketMultiplexer>());
// if configuration has no screens then add this system
// as the default

View File

@ -20,6 +20,19 @@
#include "arch/IArchNetwork.h"
#include "common/IInterface.h"
#include <memory>
class ISocketMultiplexerJob;
struct MultiplexerJobStatus
{
MultiplexerJobStatus(bool cont, std::unique_ptr<ISocketMultiplexerJob>&& nj) :
continue_servicing(cont), new_job(std::move(nj))
{}
bool continue_servicing = false;
std::unique_ptr<ISocketMultiplexerJob> new_job;
};
//! Socket multiplexer job
/*!
@ -32,21 +45,20 @@ public:
//! Handle socket event
/*!
Called by a socket multiplexer when the socket becomes readable,
writable, or has an error. It should return itself if the same
job can continue to service events, a new job if the socket must
be serviced differently, or NULL if the socket should no longer
be serviced. The socket is readable if \p readable is true,
writable if \p writable is true, and in error if \p error is
true.
Called by a socket multiplexer when the socket becomes readable, writable, or has an error.
The socket is readable if \p readable is true, writable if \p writable is true, and in error
if \p error is true.
The method returns false as the continue_servicing member of the returned struct if the socket
should no longer be served and true otherwise. Additionally, if the new_job member of the
returned pair is not empty, the socket should be serviced differently with the specified job.
This call must not attempt to directly change the job for this
socket by calling \c addSocket() or \c removeSocket() on the
multiplexer. It must instead return the new job. It can,
however, add or remove jobs for other sockets.
*/
virtual ISocketMultiplexerJob*
run(bool readable, bool writable, bool error) = 0;
virtual MultiplexerJobStatus run(bool readable, bool writable, bool error) = 0;
//@}
//! @name accessors
@ -72,5 +84,6 @@ public:
*/
virtual bool isWritable() const = 0;
virtual bool isCursor() const { return false; }
//@}
};

View File

@ -83,7 +83,7 @@ SecureSocket::~SecureSocket()
// take socket from multiplexer ASAP otherwise the race condition
// could cause events to get called on a dead object. TCPSocket
// will do this, too, but the double-call is harmless
setJob(NULL);
removeJob();
freeSSLResources();
// removing sleep() because I have no idea why you would want to do it
@ -125,13 +125,12 @@ SecureSocket::connect(const NetworkAddress& addr)
TCPSocket::connect(addr);
}
ISocketMultiplexerJob*
SecureSocket::newJob()
std::unique_ptr<ISocketMultiplexerJob> SecureSocket::newJob()
{
// after TCP connection is established, SecureSocket will pick up
// connected event and do secureConnect
if (m_connected && !m_secureReady) {
return NULL;
return {};
}
return TCPSocket::newJob();
@ -140,7 +139,7 @@ SecureSocket::newJob()
void
SecureSocket::secureConnect()
{
setJob(new TSocketMultiplexerMethodJob<SecureSocket>(
setJob(std::make_unique<TSocketMultiplexerMethodJob<SecureSocket>>(
this, &SecureSocket::serviceConnect,
getSocket(), isReadable(), isWritable()));
}
@ -148,7 +147,7 @@ SecureSocket::secureConnect()
void
SecureSocket::secureAccept()
{
setJob(new TSocketMultiplexerMethodJob<SecureSocket>(
setJob(std::make_unique<TSocketMultiplexerMethodJob<SecureSocket>>(
this, &SecureSocket::serviceAccept,
getSocket(), isReadable(), isWritable()));
}
@ -740,10 +739,11 @@ SecureSocket::verifyCertFingerprint()
return isValid;
}
ISocketMultiplexerJob*
SecureSocket::serviceConnect(ISocketMultiplexerJob* job,
bool, bool write, bool error)
MultiplexerJobStatus SecureSocket::serviceConnect(ISocketMultiplexerJob* job,
bool read, bool write, bool error)
{
(void) read;
Lock lock(&getMutex());
int status = 0;
@ -755,25 +755,28 @@ SecureSocket::serviceConnect(ISocketMultiplexerJob* job,
// If status < 0, error happened
if (status < 0) {
return NULL;
return {false, {}};
}
// If status > 0, success
if (status > 0) {
sendEvent(m_events->forIDataSocket().secureConnected());
return newJob();
return {true, newJob()};
}
// Retry case
return new TSocketMultiplexerMethodJob<SecureSocket>(
return {
true,
std::make_unique<TSocketMultiplexerMethodJob<SecureSocket>>(
this, &SecureSocket::serviceConnect,
getSocket(), isReadable(), isWritable());
getSocket(), isReadable(), isWritable())
};
}
ISocketMultiplexerJob*
SecureSocket::serviceAccept(ISocketMultiplexerJob* job,
bool, bool write, bool error)
MultiplexerJobStatus SecureSocket::serviceAccept(ISocketMultiplexerJob* job,
bool read, bool write, bool error)
{
(void) read;
Lock lock(&getMutex());
int status = 0;
@ -784,19 +787,19 @@ SecureSocket::serviceAccept(ISocketMultiplexerJob* job,
#endif
// If status < 0, error happened
if (status < 0) {
return NULL;
return {false, {}};
}
// If status > 0, success
if (status > 0) {
sendEvent(m_events->forClientListener().accepted());
return newJob();
return {true, newJob()};
}
// Retry case
return new TSocketMultiplexerMethodJob<SecureSocket>(
return {true, std::make_unique<TSocketMultiplexerMethodJob<SecureSocket>>(
this, &SecureSocket::serviceAccept,
getSocket(), isReadable(), isWritable());
getSocket(), isReadable(), isWritable())};
}
void

View File

@ -44,8 +44,7 @@ public:
// IDataSocket overrides
virtual void connect(const NetworkAddress&);
ISocketMultiplexerJob*
newJob();
std::unique_ptr<ISocketMultiplexerJob> newJob() override;
bool isFatal() const { return m_fatal; }
void isFatal(bool b) { m_fatal = b; }
bool isSecureReady();
@ -74,13 +73,8 @@ private:
bool separator = true);
bool verifyCertFingerprint();
ISocketMultiplexerJob*
serviceConnect(ISocketMultiplexerJob*,
bool, bool, bool);
ISocketMultiplexerJob*
serviceAccept(ISocketMultiplexerJob*,
bool, bool, bool);
MultiplexerJobStatus serviceConnect(ISocketMultiplexerJob*, bool, bool, bool);
MultiplexerJobStatus serviceAccept(ISocketMultiplexerJob*, bool, bool, bool);
void showSecureConnectInfo();
void showSecureLibInfo();

View File

@ -33,6 +33,20 @@
// SocketMultiplexer
//
class CursorMultiplexerJob : public ISocketMultiplexerJob {
public:
MultiplexerJobStatus run(bool readable, bool writable, bool error) override
{
return {false, {}};
}
ArchSocket getSocket() const override { return {}; }
bool isReadable() const override { return false; }
bool isWritable() const override { return false; }
bool isCursor() const override { return true; }
};
SocketMultiplexer::SocketMultiplexer() :
m_mutex(new Mutex),
m_thread(NULL),
@ -43,12 +57,6 @@ SocketMultiplexer::SocketMultiplexer() :
m_jobListLocker(NULL),
m_jobListLockLocker(NULL)
{
// this pointer just has to be unique and not NULL. it will
// never be dereferenced. it's used to identify cursor nodes
// in the jobs list.
// TODO: Remove this evilness
m_cursorMark = reinterpret_cast<ISocketMultiplexerJob*>(this);
// start thread
m_thread = new Thread(new TMethodJob<SocketMultiplexer>(
this, &SocketMultiplexer::serviceThread));
@ -66,16 +74,9 @@ SocketMultiplexer::~SocketMultiplexer()
delete m_jobListLocker;
delete m_jobListLockLocker;
delete m_mutex;
// clean up jobs
for (SocketJobMap::iterator i = m_socketJobMap.begin();
i != m_socketJobMap.end(); ++i) {
delete *(i->second);
}
}
void
SocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job)
void SocketMultiplexer::addSocket(ISocket* socket, std::unique_ptr<ISocketMultiplexerJob>&& job)
{
assert(socket != NULL);
assert(job != NULL);
@ -95,16 +96,12 @@ SocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job)
// we *must* put the job at the end so the order of jobs in
// the list continue to match the order of jobs in pfds in
// serviceThread().
JobCursor j = m_socketJobs.insert(m_socketJobs.end(), job);
JobCursor j = m_socketJobs.insert(m_socketJobs.end(), std::move(job));
m_update = true;
m_socketJobMap.insert(std::make_pair(socket, j));
}
else {
JobCursor j = i->second;
if (*j != job) {
delete *j;
*j = job;
}
*(i->second) = std::move(job);
m_update = true;
}
@ -131,9 +128,8 @@ SocketMultiplexer::removeSocket(ISocket* socket)
// to match the order of jobs in pfds in serviceThread().
SocketJobMap::iterator i = m_socketJobMap.find(socket);
if (i != m_socketJobMap.end()) {
if (*(i->second) != NULL) {
delete *(i->second);
*(i->second) = NULL;
if (*(i->second)) {
i->second->reset();
m_update = true;
}
}
@ -173,14 +169,13 @@ SocketMultiplexer::serviceThread(void*)
JobCursor cursor = newCursor();
JobCursor jobCursor = nextCursor(cursor);
while (jobCursor != m_socketJobs.end()) {
ISocketMultiplexerJob* job = *jobCursor;
if (job != NULL) {
pfd.m_socket = job->getSocket();
if (*jobCursor) {
pfd.m_socket = (*jobCursor)->getSocket();
pfd.m_events = 0;
if (job->isReadable()) {
if ((*jobCursor)->isReadable()) {
pfd.m_events |= IArchNetwork::kPOLLIN;
}
if (job->isWritable()) {
if ((*jobCursor)->isWritable()) {
pfd.m_events |= IArchNetwork::kPOLLOUT;
}
pfds.push_back(pfd);
@ -221,14 +216,15 @@ SocketMultiplexer::serviceThread(void*)
IArchNetwork::kPOLLNVAL)) != 0);
// run job
ISocketMultiplexerJob* job = *jobCursor;
ISocketMultiplexerJob* newJob = job->run(read, write, error);
MultiplexerJobStatus status = (*jobCursor)->run(read, write, error);
// save job, if different
if (newJob != job) {
if (!status.continue_servicing) {
Lock lock(m_mutex);
delete job;
*jobCursor = newJob;
jobCursor->reset();
m_update = true;
} else if (status.new_job) {
Lock lock(m_mutex);
*jobCursor = std::move(status.new_job);
m_update = true;
}
++i;
@ -262,7 +258,7 @@ SocketMultiplexer::JobCursor
SocketMultiplexer::newCursor()
{
Lock lock(m_mutex);
return m_socketJobs.insert(m_socketJobs.begin(), m_cursorMark);
return m_socketJobs.insert(m_socketJobs.begin(), std::make_unique<CursorMultiplexerJob>());
}
SocketMultiplexer::JobCursor
@ -272,7 +268,7 @@ SocketMultiplexer::nextCursor(JobCursor cursor)
JobCursor j = m_socketJobs.end();
JobCursor i = cursor;
while (++i != m_socketJobs.end()) {
if (*i != m_cursorMark) {
if (*i && !(*i)->isCursor()) {
// found a real job (as opposed to a cursor)
j = i;

View File

@ -21,6 +21,7 @@
#include "arch/IArchNetwork.h"
#include "common/stdlist.h"
#include "common/stdmap.h"
#include <memory>
template <class T>
class CondVar;
@ -41,7 +42,7 @@ public:
//! @name manipulators
//@{
void addSocket(ISocket*, ISocketMultiplexerJob*);
void addSocket(ISocket*, std::unique_ptr<ISocketMultiplexerJob>&& job);
void removeSocket(ISocket*);
@ -58,7 +59,7 @@ public:
private:
// list of jobs. we use a list so we can safely iterate over it
// while other threads modify it.
typedef std::list<ISocketMultiplexerJob*> SocketJobs;
using SocketJobs = std::list<std::unique_ptr<ISocketMultiplexerJob>>;
typedef SocketJobs::iterator JobCursor;
typedef std::map<ISocket*, JobCursor> SocketJobMap;
@ -106,6 +107,4 @@ private:
SocketJobs m_socketJobs;
SocketJobMap m_socketJobMap;
ISocketMultiplexerJob*
m_cursorMark;
};

View File

@ -69,10 +69,11 @@ TCPListenSocket::bind(const NetworkAddress& addr)
ARCH->setReuseAddrOnSocket(m_socket, true);
ARCH->bindSocket(m_socket, addr.getAddress());
ARCH->listenOnSocket(m_socket);
m_socketMultiplexer->addSocket(this,
new TSocketMultiplexerMethodJob<TCPListenSocket>(
this, &TCPListenSocket::serviceListening,
m_socket, true, false));
auto new_job = std::make_unique<TSocketMultiplexerMethodJob<TCPListenSocket>>(
this, &TCPListenSocket::serviceListening, m_socket, true, false);
m_socketMultiplexer->addSocket(this, std::move(new_job));
}
catch (XArchNetworkAddressInUse& e) {
throw XSocketAddressInUse(e.what());
@ -135,24 +136,22 @@ TCPListenSocket::accept()
void
TCPListenSocket::setListeningJob()
{
m_socketMultiplexer->addSocket(this,
new TSocketMultiplexerMethodJob<TCPListenSocket>(
this, &TCPListenSocket::serviceListening,
m_socket, true, false));
auto new_job = std::make_unique<TSocketMultiplexerMethodJob<TCPListenSocket>>(
this, &TCPListenSocket::serviceListening, m_socket, true, false);
m_socketMultiplexer->addSocket(this, std::move(new_job));
}
ISocketMultiplexerJob*
TCPListenSocket::serviceListening(ISocketMultiplexerJob* job,
MultiplexerJobStatus TCPListenSocket::serviceListening(ISocketMultiplexerJob* job,
bool read, bool, bool error)
{
if (error) {
close();
return NULL;
return {false, {}};
}
if (read) {
m_events->addEvent(Event(m_events->forIListenSocket().connecting(), this, NULL));
// stop polling on this socket until the client accepts
return NULL;
return {false, {}};
}
return job;
return {true, {}};
}

View File

@ -19,10 +19,10 @@
#pragma once
#include "net/IListenSocket.h"
#include "net/ISocketMultiplexerJob.h"
#include "arch/IArchNetwork.h"
class Mutex;
class ISocketMultiplexerJob;
class IEventQueue;
class SocketMultiplexer;
@ -48,9 +48,7 @@ protected:
void setListeningJob();
public:
ISocketMultiplexerJob*
serviceListening(ISocketMultiplexerJob*,
bool, bool, bool);
MultiplexerJobStatus serviceListening(ISocketMultiplexerJob*, bool, bool, bool);
protected:
ArchSocket m_socket;

View File

@ -388,40 +388,42 @@ TCPSocket::doWrite()
return kRetry;
}
void
TCPSocket::setJob(ISocketMultiplexerJob* job)
void TCPSocket::removeJob()
{
// multiplexer will delete the old job
if (job == NULL) {
m_socketMultiplexer->removeSocket(this);
}
else {
m_socketMultiplexer->addSocket(this, job);
}
void TCPSocket::setJob(std::unique_ptr<ISocketMultiplexerJob>&& job)
{
if (job.get() == nullptr) {
removeJob();
} else {
m_socketMultiplexer->addSocket(this, std::move(job));
}
}
ISocketMultiplexerJob*
TCPSocket::newJob()
std::unique_ptr<ISocketMultiplexerJob> TCPSocket::newJob()
{
// note -- must have m_mutex locked on entry
if (m_socket == NULL) {
return NULL;
return {};
}
else if (!m_connected) {
assert(!m_readable);
if (!(m_readable || m_writable)) {
return NULL;
return {};
}
return new TSocketMultiplexerMethodJob<TCPSocket>(
return std::make_unique<TSocketMultiplexerMethodJob<TCPSocket>>(
this, &TCPSocket::serviceConnecting,
m_socket, m_readable, m_writable);
}
else {
if (!(m_readable || (m_writable && (m_outputBuffer.getSize() > 0)))) {
return NULL;
return {};
}
return new TSocketMultiplexerMethodJob<TCPSocket>(
return std::make_unique<TSocketMultiplexerMethodJob<TCPSocket>>(
this, &TCPSocket::serviceConnected,
m_socket, m_readable,
m_writable && (m_outputBuffer.getSize() > 0));
@ -488,9 +490,7 @@ TCPSocket::onDisconnected()
m_connected = false;
}
ISocketMultiplexerJob*
TCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
bool, bool write, bool error)
MultiplexerJobStatus TCPSocket::serviceConnecting(ISocketMultiplexerJob* job, bool, bool write, bool error)
{
Lock lock(&m_mutex);
@ -519,21 +519,28 @@ TCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
catch (XArchNetwork& e) {
sendConnectionFailedEvent(e.what());
onDisconnected();
return newJob();
auto new_job = newJob();
if (new_job)
return {true, std::move(new_job)};
else
return {false, {}};
}
}
if (write) {
sendEvent(m_events->forIDataSocket().connected());
onConnected();
return newJob();
auto new_job = newJob();
if (new_job)
return {true, std::move(new_job)};
else
return {false, {}};
}
return job;
return {true, {}};
}
ISocketMultiplexerJob*
TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
MultiplexerJobStatus TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
bool read, bool write, bool error)
{
Lock lock(&m_mutex);
@ -541,7 +548,7 @@ TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
if (error) {
sendEvent(m_events->forISocket().disconnected());
onDisconnected();
return newJob();
return {true, newJob()};
}
EJobResult writeResult = kRetry;
@ -594,10 +601,10 @@ TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
}
if (writeResult == kBreak || readResult == kBreak) {
return NULL;
return {false, {}};
} else if (writeResult == kNew || readResult == kNew) {
return newJob();
return {true, newJob()};
} else {
return job;
return {true, {}};
}
}

View File

@ -19,14 +19,15 @@
#pragma once
#include "net/IDataSocket.h"
#include "net/ISocketMultiplexerJob.h"
#include "io/StreamBuffer.h"
#include "mt/CondVar.h"
#include "mt/Mutex.h"
#include "arch/IArchNetwork.h"
#include <memory>
class Mutex;
class Thread;
class ISocketMultiplexerJob;
class IEventQueue;
class SocketMultiplexer;
@ -59,8 +60,7 @@ public:
virtual void connect(const NetworkAddress&);
virtual ISocketMultiplexerJob*
newJob();
virtual std::unique_ptr<ISocketMultiplexerJob> newJob();
protected:
enum EJobResult {
@ -74,7 +74,8 @@ protected:
virtual EJobResult doRead();
virtual EJobResult doWrite();
void setJob(ISocketMultiplexerJob*);
void removeJob();
void setJob(std::unique_ptr<ISocketMultiplexerJob>&& job);
bool isReadable() { return m_readable; }
bool isWritable() { return m_writable; }
@ -93,12 +94,8 @@ private:
void onOutputShutdown();
void onDisconnected();
ISocketMultiplexerJob*
serviceConnecting(ISocketMultiplexerJob*,
bool, bool, bool);
ISocketMultiplexerJob*
serviceConnected(ISocketMultiplexerJob*,
bool, bool, bool);
MultiplexerJobStatus serviceConnecting(ISocketMultiplexerJob*, bool, bool, bool);
MultiplexerJobStatus serviceConnected(ISocketMultiplexerJob*, bool, bool, bool);
protected:
bool m_readable;

View File

@ -28,8 +28,7 @@ A socket multiplexer job class that invokes a member function.
template <class T>
class TSocketMultiplexerMethodJob : public ISocketMultiplexerJob {
public:
typedef ISocketMultiplexerJob*
(T::*Method)(ISocketMultiplexerJob*, bool, bool, bool);
using Method = MultiplexerJobStatus (T::*)(ISocketMultiplexerJob*, bool, bool, bool);
//! run() invokes \c object->method(arg)
TSocketMultiplexerMethodJob(T* object, Method method,
@ -37,11 +36,10 @@ public:
virtual ~TSocketMultiplexerMethodJob();
// IJob overrides
virtual ISocketMultiplexerJob*
run(bool readable, bool writable, bool error);
virtual ArchSocket getSocket() const;
virtual bool isReadable() const;
virtual bool isWritable() const;
virtual MultiplexerJobStatus run(bool readable, bool writable, bool error) override;
virtual ArchSocket getSocket() const override;
virtual bool isReadable() const override;
virtual bool isWritable() const override;
private:
T* m_object;
@ -74,14 +72,12 @@ TSocketMultiplexerMethodJob<T>::~TSocketMultiplexerMethodJob()
}
template <class T>
inline
ISocketMultiplexerJob*
TSocketMultiplexerMethodJob<T>::run(bool read, bool write, bool error)
inline MultiplexerJobStatus TSocketMultiplexerMethodJob<T>::run(bool read, bool write, bool error)
{
if (m_object != NULL) {
return (m_object->*m_method)(this, read, write, error);
}
return NULL;
return {false, {}};
}
template <class T>