diff --git a/src/lib/net/SecureSocket.cpp b/src/lib/net/SecureSocket.cpp index 85e1d383..1aefbd54 100644 --- a/src/lib/net/SecureSocket.cpp +++ b/src/lib/net/SecureSocket.cpp @@ -21,7 +21,6 @@ #include "net/TSocketMultiplexerMethodJob.h" #include "base/TMethodEventJob.h" #include "net/TCPSocket.h" -#include "mt/Lock.h" #include "arch/XArch.h" #include "base/Log.h" #include "base/String.h" @@ -749,7 +748,7 @@ MultiplexerJobStatus SecureSocket::serviceConnect(ISocketMultiplexerJob* job, { (void) read; - Lock lock(&getMutex()); + std::lock_guard lock(tcp_mutex_); int status = 0; #ifdef SYSAPI_WIN32 @@ -782,7 +781,7 @@ MultiplexerJobStatus SecureSocket::serviceAccept(ISocketMultiplexerJob* job, bool read, bool write, bool error) { (void) read; - Lock lock(&getMutex()); + std::lock_guard lock(tcp_mutex_); int status = 0; #ifdef SYSAPI_WIN32 diff --git a/src/lib/net/TCPListenSocket.cpp b/src/lib/net/TCPListenSocket.cpp index 2c305fcf..cf5ef8f7 100644 --- a/src/lib/net/TCPListenSocket.cpp +++ b/src/lib/net/TCPListenSocket.cpp @@ -24,8 +24,6 @@ #include "net/TSocketMultiplexerMethodJob.h" #include "net/XSocket.h" #include "io/XIO.h" -#include "mt/Lock.h" -#include "mt/Mutex.h" #include "arch/Arch.h" #include "arch/XArch.h" #include "base/IEventQueue.h" @@ -38,7 +36,6 @@ TCPListenSocket::TCPListenSocket(IEventQueue* events, SocketMultiplexer* socketM m_events(events), m_socketMultiplexer(socketMultiplexer) { - m_mutex = new Mutex; try { m_socket = ARCH->newSocket(family, IArchNetwork::kSTREAM); } @@ -58,14 +55,13 @@ TCPListenSocket::~TCPListenSocket() catch (...) { // ignore } - delete m_mutex; } void TCPListenSocket::bind(const NetworkAddress& addr) { try { - Lock lock(m_mutex); + std::lock_guard lock(mutex_); ARCH->setReuseAddrOnSocket(m_socket, true); ARCH->bindSocket(m_socket, addr.getAddress()); ARCH->listenOnSocket(m_socket); @@ -88,7 +84,7 @@ TCPListenSocket::bind(const NetworkAddress& addr) void TCPListenSocket::close() { - Lock lock(m_mutex); + std::lock_guard lock(mutex_); if (m_socket == NULL) { throw XIOClosed(); } diff --git a/src/lib/net/TCPListenSocket.h b/src/lib/net/TCPListenSocket.h index 109c1c3f..12d0128b 100644 --- a/src/lib/net/TCPListenSocket.h +++ b/src/lib/net/TCPListenSocket.h @@ -22,7 +22,8 @@ #include "net/ISocketMultiplexerJob.h" #include "arch/IArchNetwork.h" -class Mutex; +#include + class IEventQueue; class SocketMultiplexer; @@ -52,7 +53,7 @@ public: protected: ArchSocket m_socket; - Mutex* m_mutex; + std::mutex mutex_; IEventQueue* m_events; SocketMultiplexer* m_socketMultiplexer; }; diff --git a/src/lib/net/TCPSocket.cpp b/src/lib/net/TCPSocket.cpp index fa7edcc8..ae6f5feb 100644 --- a/src/lib/net/TCPSocket.cpp +++ b/src/lib/net/TCPSocket.cpp @@ -22,7 +22,6 @@ #include "net/SocketMultiplexer.h" #include "net/TSocketMultiplexerMethodJob.h" #include "net/XSocket.h" -#include "mt/Lock.h" #include "arch/Arch.h" #include "arch/XArch.h" #include "base/Log.h" @@ -38,8 +37,6 @@ static const std::size_t MAX_INPUT_BUFFER_SIZE = 1024 * 1024; TCPSocket::TCPSocket(IEventQueue* events, SocketMultiplexer* socketMultiplexer, IArchNetwork::EAddressFamily family) : IDataSocket(events), m_events(events), - m_mutex(), - m_flushed(&m_mutex, true), m_socketMultiplexer(socketMultiplexer) { try { @@ -57,9 +54,7 @@ TCPSocket::TCPSocket(IEventQueue* events, SocketMultiplexer* socketMultiplexer, TCPSocket::TCPSocket(IEventQueue* events, SocketMultiplexer* socketMultiplexer, ArchSocket socket) : IDataSocket(events), m_events(events), - m_mutex(), m_socket(socket), - m_flushed(&m_mutex, true), m_socketMultiplexer(socketMultiplexer) { assert(m_socket != NULL); @@ -104,7 +99,7 @@ TCPSocket::close() // remove ourself from the multiplexer setJob(NULL); - Lock lock(&m_mutex); + std::lock_guard lock(tcp_mutex_); // clear buffers and enter disconnected state if (m_connected) { @@ -136,7 +131,7 @@ UInt32 TCPSocket::read(void* buffer, UInt32 n) { // copy data directly from our input buffer - Lock lock(&m_mutex); + std::lock_guard lock(tcp_mutex_); UInt32 size = m_inputBuffer.getSize(); if (n > size) { n = size; @@ -160,7 +155,7 @@ TCPSocket::write(const void* buffer, UInt32 n) { bool wasEmpty; { - Lock lock(&m_mutex); + std::lock_guard lock(tcp_mutex_); // must not have shutdown output if (!m_writable) { @@ -178,7 +173,7 @@ TCPSocket::write(const void* buffer, UInt32 n) m_outputBuffer.write(buffer, n); // there's data to write - m_flushed = false; + is_flushed_ = false; } // make sure we're waiting to write @@ -190,10 +185,8 @@ TCPSocket::write(const void* buffer, UInt32 n) void TCPSocket::flush() { - Lock lock(&m_mutex); - while (m_flushed == false) { - m_flushed.wait(); - } + std::unique_lock lock(tcp_mutex_); + flushed_cv_.wait(lock, [this](){ return is_flushed_; }); } void @@ -201,7 +194,7 @@ TCPSocket::shutdownInput() { bool useNewJob = false; { - Lock lock(&m_mutex); + std::lock_guard lock(tcp_mutex_); // shutdown socket for reading try { @@ -228,7 +221,7 @@ TCPSocket::shutdownOutput() { bool useNewJob = false; { - Lock lock(&m_mutex); + std::lock_guard lock(tcp_mutex_); // shutdown socket for writing try { @@ -253,7 +246,7 @@ TCPSocket::shutdownOutput() bool TCPSocket::isReady() const { - Lock lock(&m_mutex); + std::lock_guard lock(tcp_mutex_); return (m_inputBuffer.getSize() > 0); } @@ -268,7 +261,7 @@ TCPSocket::isFatal() const UInt32 TCPSocket::getSize() const { - Lock lock(&m_mutex); + std::lock_guard lock(tcp_mutex_); return m_inputBuffer.getSize(); } @@ -276,7 +269,7 @@ void TCPSocket::connect(const NetworkAddress& addr) { { - Lock lock(&m_mutex); + std::lock_guard lock(tcp_mutex_); // fail on attempts to reconnect if (m_socket == NULL || m_connected) { @@ -463,8 +456,8 @@ TCPSocket::discardWrittenData(int bytesWrote) m_outputBuffer.pop(bytesWrote); if (m_outputBuffer.getSize() == 0) { sendEvent(m_events->forIStream().outputFlushed()); - m_flushed = true; - m_flushed.broadcast(); + is_flushed_ = true; + flushed_cv_.notify_all(); } } @@ -490,8 +483,8 @@ TCPSocket::onOutputShutdown() m_writable = false; // we're now flushed - m_flushed = true; - m_flushed.broadcast(); + is_flushed_ = true; + flushed_cv_.notify_all(); } void @@ -505,7 +498,7 @@ TCPSocket::onDisconnected() MultiplexerJobStatus TCPSocket::serviceConnecting(ISocketMultiplexerJob* job, bool, bool write, bool error) { - Lock lock(&m_mutex); + std::lock_guard lock(tcp_mutex_); // should only check for errors if error is true but checking a new // socket (and a socket that's connecting should be new) for errors @@ -548,7 +541,7 @@ MultiplexerJobStatus TCPSocket::serviceConnecting(ISocketMultiplexerJob* job, bo MultiplexerJobStatus TCPSocket::serviceConnected(ISocketMultiplexerJob* job, bool read, bool write, bool error) { - Lock lock(&m_mutex); + std::lock_guard lock(tcp_mutex_); if (error) { sendEvent(m_events->forISocket().disconnected()); diff --git a/src/lib/net/TCPSocket.h b/src/lib/net/TCPSocket.h index 45aed07f..0076cfe8 100644 --- a/src/lib/net/TCPSocket.h +++ b/src/lib/net/TCPSocket.h @@ -22,11 +22,11 @@ #include "net/ISocketMultiplexerJob.h" #include "io/StreamBuffer.h" #include "mt/CondVar.h" -#include "mt/Mutex.h" #include "arch/IArchNetwork.h" +#include #include +#include -class Mutex; class Thread; class IEventQueue; class SocketMultiplexer; @@ -81,8 +81,6 @@ protected: bool isReadable() { return m_readable; } bool isWritable() { return m_writable; } - Mutex& getMutex() { return m_mutex; } - void sendEvent(Event::Type); void discardWrittenData(int bytesWrote); @@ -106,9 +104,10 @@ protected: StreamBuffer m_inputBuffer; StreamBuffer m_outputBuffer; + mutable std::mutex tcp_mutex_; private: - Mutex m_mutex; ArchSocket m_socket; - CondVar m_flushed; + std::condition_variable flushed_cv_; + bool is_flushed_ = true; SocketMultiplexer* m_socketMultiplexer; };