lib/net: Use standard mutex primitives in TCP socket code

This commit is contained in:
Povilas Kanapickas 2021-11-03 02:58:30 +02:00
parent cbf22ad589
commit 836a65cda4
5 changed files with 29 additions and 41 deletions

View File

@ -21,7 +21,6 @@
#include "net/TSocketMultiplexerMethodJob.h"
#include "base/TMethodEventJob.h"
#include "net/TCPSocket.h"
#include "mt/Lock.h"
#include "arch/XArch.h"
#include "base/Log.h"
#include "base/String.h"
@ -749,7 +748,7 @@ MultiplexerJobStatus SecureSocket::serviceConnect(ISocketMultiplexerJob* job,
{
(void) read;
Lock lock(&getMutex());
std::lock_guard<std::mutex> lock(tcp_mutex_);
int status = 0;
#ifdef SYSAPI_WIN32
@ -782,7 +781,7 @@ MultiplexerJobStatus SecureSocket::serviceAccept(ISocketMultiplexerJob* job,
bool read, bool write, bool error)
{
(void) read;
Lock lock(&getMutex());
std::lock_guard<std::mutex> lock(tcp_mutex_);
int status = 0;
#ifdef SYSAPI_WIN32

View File

@ -24,8 +24,6 @@
#include "net/TSocketMultiplexerMethodJob.h"
#include "net/XSocket.h"
#include "io/XIO.h"
#include "mt/Lock.h"
#include "mt/Mutex.h"
#include "arch/Arch.h"
#include "arch/XArch.h"
#include "base/IEventQueue.h"
@ -38,7 +36,6 @@ TCPListenSocket::TCPListenSocket(IEventQueue* events, SocketMultiplexer* socketM
m_events(events),
m_socketMultiplexer(socketMultiplexer)
{
m_mutex = new Mutex;
try {
m_socket = ARCH->newSocket(family, IArchNetwork::kSTREAM);
}
@ -58,14 +55,13 @@ TCPListenSocket::~TCPListenSocket()
catch (...) {
// ignore
}
delete m_mutex;
}
void
TCPListenSocket::bind(const NetworkAddress& addr)
{
try {
Lock lock(m_mutex);
std::lock_guard<std::mutex> lock(mutex_);
ARCH->setReuseAddrOnSocket(m_socket, true);
ARCH->bindSocket(m_socket, addr.getAddress());
ARCH->listenOnSocket(m_socket);
@ -88,7 +84,7 @@ TCPListenSocket::bind(const NetworkAddress& addr)
void
TCPListenSocket::close()
{
Lock lock(m_mutex);
std::lock_guard<std::mutex> lock(mutex_);
if (m_socket == NULL) {
throw XIOClosed();
}

View File

@ -22,7 +22,8 @@
#include "net/ISocketMultiplexerJob.h"
#include "arch/IArchNetwork.h"
class Mutex;
#include <mutex>
class IEventQueue;
class SocketMultiplexer;
@ -52,7 +53,7 @@ public:
protected:
ArchSocket m_socket;
Mutex* m_mutex;
std::mutex mutex_;
IEventQueue* m_events;
SocketMultiplexer* m_socketMultiplexer;
};

View File

@ -22,7 +22,6 @@
#include "net/SocketMultiplexer.h"
#include "net/TSocketMultiplexerMethodJob.h"
#include "net/XSocket.h"
#include "mt/Lock.h"
#include "arch/Arch.h"
#include "arch/XArch.h"
#include "base/Log.h"
@ -38,8 +37,6 @@ static const std::size_t MAX_INPUT_BUFFER_SIZE = 1024 * 1024;
TCPSocket::TCPSocket(IEventQueue* events, SocketMultiplexer* socketMultiplexer, IArchNetwork::EAddressFamily family) :
IDataSocket(events),
m_events(events),
m_mutex(),
m_flushed(&m_mutex, true),
m_socketMultiplexer(socketMultiplexer)
{
try {
@ -57,9 +54,7 @@ TCPSocket::TCPSocket(IEventQueue* events, SocketMultiplexer* socketMultiplexer,
TCPSocket::TCPSocket(IEventQueue* events, SocketMultiplexer* socketMultiplexer, ArchSocket socket) :
IDataSocket(events),
m_events(events),
m_mutex(),
m_socket(socket),
m_flushed(&m_mutex, true),
m_socketMultiplexer(socketMultiplexer)
{
assert(m_socket != NULL);
@ -104,7 +99,7 @@ TCPSocket::close()
// remove ourself from the multiplexer
setJob(NULL);
Lock lock(&m_mutex);
std::lock_guard<std::mutex> lock(tcp_mutex_);
// clear buffers and enter disconnected state
if (m_connected) {
@ -136,7 +131,7 @@ UInt32
TCPSocket::read(void* buffer, UInt32 n)
{
// copy data directly from our input buffer
Lock lock(&m_mutex);
std::lock_guard<std::mutex> lock(tcp_mutex_);
UInt32 size = m_inputBuffer.getSize();
if (n > size) {
n = size;
@ -160,7 +155,7 @@ TCPSocket::write(const void* buffer, UInt32 n)
{
bool wasEmpty;
{
Lock lock(&m_mutex);
std::lock_guard<std::mutex> lock(tcp_mutex_);
// must not have shutdown output
if (!m_writable) {
@ -178,7 +173,7 @@ TCPSocket::write(const void* buffer, UInt32 n)
m_outputBuffer.write(buffer, n);
// there's data to write
m_flushed = false;
is_flushed_ = false;
}
// make sure we're waiting to write
@ -190,10 +185,8 @@ TCPSocket::write(const void* buffer, UInt32 n)
void
TCPSocket::flush()
{
Lock lock(&m_mutex);
while (m_flushed == false) {
m_flushed.wait();
}
std::unique_lock<std::mutex> lock(tcp_mutex_);
flushed_cv_.wait(lock, [this](){ return is_flushed_; });
}
void
@ -201,7 +194,7 @@ TCPSocket::shutdownInput()
{
bool useNewJob = false;
{
Lock lock(&m_mutex);
std::lock_guard<std::mutex> lock(tcp_mutex_);
// shutdown socket for reading
try {
@ -228,7 +221,7 @@ TCPSocket::shutdownOutput()
{
bool useNewJob = false;
{
Lock lock(&m_mutex);
std::lock_guard<std::mutex> lock(tcp_mutex_);
// shutdown socket for writing
try {
@ -253,7 +246,7 @@ TCPSocket::shutdownOutput()
bool
TCPSocket::isReady() const
{
Lock lock(&m_mutex);
std::lock_guard<std::mutex> lock(tcp_mutex_);
return (m_inputBuffer.getSize() > 0);
}
@ -268,7 +261,7 @@ TCPSocket::isFatal() const
UInt32
TCPSocket::getSize() const
{
Lock lock(&m_mutex);
std::lock_guard<std::mutex> lock(tcp_mutex_);
return m_inputBuffer.getSize();
}
@ -276,7 +269,7 @@ void
TCPSocket::connect(const NetworkAddress& addr)
{
{
Lock lock(&m_mutex);
std::lock_guard<std::mutex> lock(tcp_mutex_);
// fail on attempts to reconnect
if (m_socket == NULL || m_connected) {
@ -463,8 +456,8 @@ TCPSocket::discardWrittenData(int bytesWrote)
m_outputBuffer.pop(bytesWrote);
if (m_outputBuffer.getSize() == 0) {
sendEvent(m_events->forIStream().outputFlushed());
m_flushed = true;
m_flushed.broadcast();
is_flushed_ = true;
flushed_cv_.notify_all();
}
}
@ -490,8 +483,8 @@ TCPSocket::onOutputShutdown()
m_writable = false;
// we're now flushed
m_flushed = true;
m_flushed.broadcast();
is_flushed_ = true;
flushed_cv_.notify_all();
}
void
@ -505,7 +498,7 @@ TCPSocket::onDisconnected()
MultiplexerJobStatus TCPSocket::serviceConnecting(ISocketMultiplexerJob* job, bool, bool write, bool error)
{
Lock lock(&m_mutex);
std::lock_guard<std::mutex> lock(tcp_mutex_);
// should only check for errors if error is true but checking a new
// socket (and a socket that's connecting should be new) for errors
@ -548,7 +541,7 @@ MultiplexerJobStatus TCPSocket::serviceConnecting(ISocketMultiplexerJob* job, bo
MultiplexerJobStatus TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
bool read, bool write, bool error)
{
Lock lock(&m_mutex);
std::lock_guard<std::mutex> lock(tcp_mutex_);
if (error) {
sendEvent(m_events->forISocket().disconnected());

View File

@ -22,11 +22,11 @@
#include "net/ISocketMultiplexerJob.h"
#include "io/StreamBuffer.h"
#include "mt/CondVar.h"
#include "mt/Mutex.h"
#include "arch/IArchNetwork.h"
#include <condition_variable>
#include <memory>
#include <mutex>
class Mutex;
class Thread;
class IEventQueue;
class SocketMultiplexer;
@ -81,8 +81,6 @@ protected:
bool isReadable() { return m_readable; }
bool isWritable() { return m_writable; }
Mutex& getMutex() { return m_mutex; }
void sendEvent(Event::Type);
void discardWrittenData(int bytesWrote);
@ -106,9 +104,10 @@ protected:
StreamBuffer m_inputBuffer;
StreamBuffer m_outputBuffer;
mutable std::mutex tcp_mutex_;
private:
Mutex m_mutex;
ArchSocket m_socket;
CondVar<bool> m_flushed;
std::condition_variable flushed_cv_;
bool is_flushed_ = true;
SocketMultiplexer* m_socketMultiplexer;
};