attempting to fix deadlocks by going mutex crazy (this never ends well)

This commit is contained in:
Nick Bolton 2012-07-06 16:18:21 +00:00
parent 17a92f4f4c
commit 663cd28f2d
4 changed files with 84 additions and 72 deletions

View File

@ -29,10 +29,8 @@ CEvent::Type CIpcClientProxy::s_disconnectedEvent = CEvent::kUnknown;
CIpcClientProxy::CIpcClientProxy(synergy::IStream& stream) : CIpcClientProxy::CIpcClientProxy(synergy::IStream& stream) :
m_stream(stream), m_stream(stream),
m_enableLog(false),
m_clientType(kIpcClientUnknown), m_clientType(kIpcClientUnknown),
m_disconnecting(false), m_disconnecting(false)
m_socketBusy(false)
{ {
m_mutex = ARCH->newMutex(); m_mutex = ARCH->newMutex();
@ -74,6 +72,7 @@ CIpcClientProxy::~CIpcClientProxy()
void void
CIpcClientProxy::handleDisconnect(const CEvent&, void*) CIpcClientProxy::handleDisconnect(const CEvent&, void*)
{ {
CArchMutexLock lock(m_mutex);
disconnect(); disconnect();
LOG((CLOG_DEBUG "ipc client disconnected")); LOG((CLOG_DEBUG "ipc client disconnected"));
} }
@ -81,6 +80,7 @@ CIpcClientProxy::handleDisconnect(const CEvent&, void*)
void void
CIpcClientProxy::handleWriteError(const CEvent&, void*) CIpcClientProxy::handleWriteError(const CEvent&, void*)
{ {
CArchMutexLock lock(m_mutex);
disconnect(); disconnect();
LOG((CLOG_DEBUG "ipc client write error")); LOG((CLOG_DEBUG "ipc client write error"));
} }
@ -88,12 +88,7 @@ CIpcClientProxy::handleWriteError(const CEvent&, void*)
void void
CIpcClientProxy::handleData(const CEvent&, void*) CIpcClientProxy::handleData(const CEvent&, void*)
{ {
// ugh, not sure if this is needed here. the write function has it to protect
// m_socketBusy (for dropping log messages primarily). but i think i saw a
// deadlock even after adding this mechanism. my rationale here is that i don't
// want the read to deadlock the stream (if that's even possible).
CArchMutexLock lock(m_mutex); CArchMutexLock lock(m_mutex);
UInt8 code[1]; UInt8 code[1];
UInt32 n = m_stream.read(code, 1); UInt32 n = m_stream.read(code, 1);
while (n != 0) { while (n != 0) {
@ -103,9 +98,7 @@ CIpcClientProxy::handleData(const CEvent&, void*)
m->m_type = type; m->m_type = type;
m->m_source = this; m->m_source = this;
if (m_enableLog) { LOG((CLOG_DEBUG "ipc client proxy read: %d", code[0]));
LOG((CLOG_DEBUG "ipc client proxy read: %d", code[0]));
}
switch (type) { switch (type) {
case kIpcHello: case kIpcHello:
@ -132,52 +125,32 @@ CIpcClientProxy::handleData(const CEvent&, void*)
void void
CIpcClientProxy::send(const CIpcMessage& message) CIpcClientProxy::send(const CIpcMessage& message)
{ {
// discard message if we're busy writing already. this is to prevent
// deadlock, since this function can sometimes generate log messages
// through stream usage.
if (m_socketBusy) {
return;
}
CArchMutexLock lock(m_mutex); CArchMutexLock lock(m_mutex);
m_socketBusy = true; LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type));
try {
if (m_enableLog) {
LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type));
}
UInt8 code[1]; UInt8 code[1];
code[0] = message.m_type; code[0] = message.m_type;
m_stream.write(code, 1); m_stream.write(code, 1);
switch (message.m_type) { switch (message.m_type) {
case kIpcLogLine: { case kIpcLogLine: {
CString* s = (CString*)message.m_data; CString* s = (CString*)message.m_data;
const char* data = s->c_str(); const char* data = s->c_str();
int len = strlen(data); int len = strlen(data);
CProtocolUtil::writef(&m_stream, "%2i", len); CProtocolUtil::writef(&m_stream, "%2i", len);
m_stream.write(data, len); m_stream.write(data, len);
break; break;
}
case kIpcShutdown:
// no data.
break;
default:
if (m_enableLog) {
LOG((CLOG_ERR "message not supported: %d", message.m_type));
}
break;
}
m_socketBusy = false;
} }
catch (...) {
m_socketBusy = false; case kIpcShutdown:
throw; // no data.
break;
default:
LOG((CLOG_ERR "message not supported: %d", message.m_type));
break;
} }
} }

View File

@ -19,7 +19,7 @@
#include "CEvent.h" #include "CEvent.h"
#include "Ipc.h" #include "Ipc.h"
#include "CMutex.h" #include "IArchMultithread.h"
namespace synergy { class IStream; } namespace synergy { class IStream; }
class CIpcMessage; class CIpcMessage;
@ -48,10 +48,8 @@ private:
public: public:
synergy::IStream& m_stream; synergy::IStream& m_stream;
bool m_enableLog;
EIpcClientType m_clientType; EIpcClientType m_clientType;
bool m_disconnecting; bool m_disconnecting;
bool m_socketBusy;
private: private:
CArchMutex m_mutex; CArchMutex m_mutex;

View File

@ -23,10 +23,13 @@
#include "CEventQueue.h" #include "CEventQueue.h"
#include "TMethodEventJob.h" #include "TMethodEventJob.h"
#include "CIpcClientProxy.h" #include "CIpcClientProxy.h"
#include "CArch.h"
CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) : CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) :
m_ipcServer(ipcServer) m_ipcServer(ipcServer),
m_sending(false)
{ {
m_mutex = ARCH->newMutex();
} }
CIpcLogOutputter::~CIpcLogOutputter() CIpcLogOutputter::~CIpcLogOutputter()
@ -36,14 +39,32 @@ CIpcLogOutputter::~CIpcLogOutputter()
void void
CIpcLogOutputter::sendBuffer(CIpcClientProxy& proxy) CIpcLogOutputter::sendBuffer(CIpcClientProxy& proxy)
{ {
while (m_buffer.size() != 0) { // drop messages logged while sending over ipc, since ipc can cause
CString text = m_buffer.front(); // log messages (sending these could cause recursion or deadlocks).
m_buffer.pop(); // this has the side effect of dropping messages from other threads
// which weren't caused by ipc, but that is just the downside of
// logging this way.
if (m_sending) {
return;
}
CIpcMessage message; CArchMutexLock lock(m_mutex);
message.m_type = kIpcLogLine; m_sending = true;
message.m_data = new CString(text); try {
proxy.send(message); while (m_buffer.size() != 0) {
CString text = m_buffer.front();
m_buffer.pop();
CIpcMessage message;
message.m_type = kIpcLogLine;
message.m_data = new CString(text);
proxy.send(message);
}
m_sending = false;
}
catch (...) {
m_sending = false;
throw;
} }
} }
@ -65,15 +86,33 @@ CIpcLogOutputter::show(bool showIfEmpty)
bool bool
CIpcLogOutputter::write(ELevel level, const char* text) CIpcLogOutputter::write(ELevel level, const char* text)
{ {
if (m_ipcServer.hasClients(kIpcClientGui)) { // drop messages logged while sending over ipc, since ipc can cause
CIpcMessage message; // log messages (sending these could cause recursion or deadlocks).
message.m_type = kIpcLogLine; // this has the side effect of dropping messages from other threads
message.m_data = new CString(text); // which weren't caused by ipc, but that is just the downside of
m_ipcServer.send(message, kIpcClientGui); // logging this way.
} if (m_sending) {
else { return false;
m_buffer.push(text);
} }
return true; CArchMutexLock lock(m_mutex);
m_sending = true;
try {
if (m_ipcServer.hasClients(kIpcClientGui)) {
CIpcMessage message;
message.m_type = kIpcLogLine;
message.m_data = new CString(text);
m_ipcServer.send(message, kIpcClientGui);
}
else {
m_buffer.push(text);
}
m_sending = false;
return true;
}
catch (...) {
m_sending = false;
throw;
}
} }

View File

@ -47,4 +47,6 @@ private:
CIpcServer& m_ipcServer; CIpcServer& m_ipcServer;
CIpcLogQueue m_buffer; CIpcLogQueue m_buffer;
CArchMutex m_mutex;
bool m_sending;
}; };