From 663cd28f2d20a72fb4d25e88069eb5ce6eaa9322 Mon Sep 17 00:00:00 2001 From: Nick Bolton Date: Fri, 6 Jul 2012 16:18:21 +0000 Subject: [PATCH] attempting to fix deadlocks by going mutex crazy (this never ends well) --- src/lib/ipc/CIpcClientProxy.cpp | 77 +++++++++++--------------------- src/lib/ipc/CIpcClientProxy.h | 4 +- src/lib/ipc/CIpcLogOutputter.cpp | 73 +++++++++++++++++++++++------- src/lib/ipc/CIpcLogOutputter.h | 2 + 4 files changed, 84 insertions(+), 72 deletions(-) diff --git a/src/lib/ipc/CIpcClientProxy.cpp b/src/lib/ipc/CIpcClientProxy.cpp index 1656d0de..a796e30c 100644 --- a/src/lib/ipc/CIpcClientProxy.cpp +++ b/src/lib/ipc/CIpcClientProxy.cpp @@ -29,13 +29,11 @@ CEvent::Type CIpcClientProxy::s_disconnectedEvent = CEvent::kUnknown; CIpcClientProxy::CIpcClientProxy(synergy::IStream& stream) : m_stream(stream), -m_enableLog(false), m_clientType(kIpcClientUnknown), -m_disconnecting(false), -m_socketBusy(false) +m_disconnecting(false) { m_mutex = ARCH->newMutex(); - + EVENTQUEUE->adoptHandler( m_stream.getInputReadyEvent(), stream.getEventTarget(), new TMethodEventJob( @@ -74,6 +72,7 @@ CIpcClientProxy::~CIpcClientProxy() void CIpcClientProxy::handleDisconnect(const CEvent&, void*) { + CArchMutexLock lock(m_mutex); disconnect(); LOG((CLOG_DEBUG "ipc client disconnected")); } @@ -81,6 +80,7 @@ CIpcClientProxy::handleDisconnect(const CEvent&, void*) void CIpcClientProxy::handleWriteError(const CEvent&, void*) { + CArchMutexLock lock(m_mutex); disconnect(); LOG((CLOG_DEBUG "ipc client write error")); } @@ -88,12 +88,7 @@ CIpcClientProxy::handleWriteError(const CEvent&, void*) void CIpcClientProxy::handleData(const CEvent&, void*) { - // ugh, not sure if this is needed here. the write function has it to protect - // m_socketBusy (for dropping log messages primarily). but i think i saw a - // deadlock even after adding this mechanism. my rationale here is that i don't - // want the read to deadlock the stream (if that's even possible). CArchMutexLock lock(m_mutex); - UInt8 code[1]; UInt32 n = m_stream.read(code, 1); while (n != 0) { @@ -103,9 +98,7 @@ CIpcClientProxy::handleData(const CEvent&, void*) m->m_type = type; m->m_source = this; - if (m_enableLog) { - LOG((CLOG_DEBUG "ipc client proxy read: %d", code[0])); - } + LOG((CLOG_DEBUG "ipc client proxy read: %d", code[0])); switch (type) { case kIpcHello: @@ -132,52 +125,32 @@ CIpcClientProxy::handleData(const CEvent&, void*) void CIpcClientProxy::send(const CIpcMessage& message) { - // discard message if we're busy writing already. this is to prevent - // deadlock, since this function can sometimes generate log messages - // through stream usage. - if (m_socketBusy) { - return; - } - CArchMutexLock lock(m_mutex); - m_socketBusy = true; - try { - if (m_enableLog) { - LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type)); - } + LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type)); - UInt8 code[1]; - code[0] = message.m_type; - m_stream.write(code, 1); + UInt8 code[1]; + code[0] = message.m_type; + m_stream.write(code, 1); - switch (message.m_type) { - case kIpcLogLine: { - CString* s = (CString*)message.m_data; - const char* data = s->c_str(); + switch (message.m_type) { + case kIpcLogLine: { + CString* s = (CString*)message.m_data; + const char* data = s->c_str(); - int len = strlen(data); - CProtocolUtil::writef(&m_stream, "%2i", len); + int len = strlen(data); + CProtocolUtil::writef(&m_stream, "%2i", len); - m_stream.write(data, len); - break; - } - - case kIpcShutdown: - // no data. - break; - - default: - if (m_enableLog) { - LOG((CLOG_ERR "message not supported: %d", message.m_type)); - } - break; - } - - m_socketBusy = false; + m_stream.write(data, len); + break; } - catch (...) { - m_socketBusy = false; - throw; + + case kIpcShutdown: + // no data. + break; + + default: + LOG((CLOG_ERR "message not supported: %d", message.m_type)); + break; } } diff --git a/src/lib/ipc/CIpcClientProxy.h b/src/lib/ipc/CIpcClientProxy.h index df2c0d60..3fdf8297 100644 --- a/src/lib/ipc/CIpcClientProxy.h +++ b/src/lib/ipc/CIpcClientProxy.h @@ -19,7 +19,7 @@ #include "CEvent.h" #include "Ipc.h" -#include "CMutex.h" +#include "IArchMultithread.h" namespace synergy { class IStream; } class CIpcMessage; @@ -48,10 +48,8 @@ private: public: synergy::IStream& m_stream; - bool m_enableLog; EIpcClientType m_clientType; bool m_disconnecting; - bool m_socketBusy; private: CArchMutex m_mutex; diff --git a/src/lib/ipc/CIpcLogOutputter.cpp b/src/lib/ipc/CIpcLogOutputter.cpp index 705b44d6..8025ca2d 100644 --- a/src/lib/ipc/CIpcLogOutputter.cpp +++ b/src/lib/ipc/CIpcLogOutputter.cpp @@ -23,10 +23,13 @@ #include "CEventQueue.h" #include "TMethodEventJob.h" #include "CIpcClientProxy.h" +#include "CArch.h" CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) : -m_ipcServer(ipcServer) +m_ipcServer(ipcServer), +m_sending(false) { + m_mutex = ARCH->newMutex(); } CIpcLogOutputter::~CIpcLogOutputter() @@ -36,14 +39,32 @@ CIpcLogOutputter::~CIpcLogOutputter() void CIpcLogOutputter::sendBuffer(CIpcClientProxy& proxy) { - while (m_buffer.size() != 0) { - CString text = m_buffer.front(); - m_buffer.pop(); + // drop messages logged while sending over ipc, since ipc can cause + // log messages (sending these could cause recursion or deadlocks). + // this has the side effect of dropping messages from other threads + // which weren't caused by ipc, but that is just the downside of + // logging this way. + if (m_sending) { + return; + } + + CArchMutexLock lock(m_mutex); + m_sending = true; + try { + while (m_buffer.size() != 0) { + CString text = m_buffer.front(); + m_buffer.pop(); - CIpcMessage message; - message.m_type = kIpcLogLine; - message.m_data = new CString(text); - proxy.send(message); + CIpcMessage message; + message.m_type = kIpcLogLine; + message.m_data = new CString(text); + proxy.send(message); + } + m_sending = false; + } + catch (...) { + m_sending = false; + throw; } } @@ -65,15 +86,33 @@ CIpcLogOutputter::show(bool showIfEmpty) bool CIpcLogOutputter::write(ELevel level, const char* text) { - if (m_ipcServer.hasClients(kIpcClientGui)) { - CIpcMessage message; - message.m_type = kIpcLogLine; - message.m_data = new CString(text); - m_ipcServer.send(message, kIpcClientGui); - } - else { - m_buffer.push(text); + // drop messages logged while sending over ipc, since ipc can cause + // log messages (sending these could cause recursion or deadlocks). + // this has the side effect of dropping messages from other threads + // which weren't caused by ipc, but that is just the downside of + // logging this way. + if (m_sending) { + return false; } + + CArchMutexLock lock(m_mutex); + m_sending = true; - return true; + try { + if (m_ipcServer.hasClients(kIpcClientGui)) { + CIpcMessage message; + message.m_type = kIpcLogLine; + message.m_data = new CString(text); + m_ipcServer.send(message, kIpcClientGui); + } + else { + m_buffer.push(text); + } + m_sending = false; + return true; + } + catch (...) { + m_sending = false; + throw; + } } diff --git a/src/lib/ipc/CIpcLogOutputter.h b/src/lib/ipc/CIpcLogOutputter.h index a340b66c..b08d3f85 100644 --- a/src/lib/ipc/CIpcLogOutputter.h +++ b/src/lib/ipc/CIpcLogOutputter.h @@ -47,4 +47,6 @@ private: CIpcServer& m_ipcServer; CIpcLogQueue m_buffer; + CArchMutex m_mutex; + bool m_sending; };