attempted to fix deadlock caused by ipc logger causing recursion.

This commit is contained in:
Nick Bolton 2012-07-06 14:46:46 +00:00
parent 12eb8efb61
commit 17a92f4f4c
4 changed files with 87 additions and 29 deletions

View File

@ -62,12 +62,14 @@ void IpcClient::read()
char lenBuf[2]; char lenBuf[2];
stream.readRawData(lenBuf, 2); stream.readRawData(lenBuf, 2);
int len = bytesToInt(lenBuf, 2); int len = bytesToInt(lenBuf, 2);
std::cout << "len: " << len << std::endl; std::cout << "told len: " << len << std::endl;
char* data = new char[len]; char* data = new char[len];
stream.readRawData(data, len); stream.readRawData(data, len);
readLogLine(QString::fromUtf8(data, len)); QString line = QString::fromUtf8(data, len);
std::cout << "actual len: " << line.size() << std::endl;
readLogLine(line);
break; break;
} }

View File

@ -22,6 +22,7 @@
#include "CLog.h" #include "CLog.h"
#include "CIpcMessage.h" #include "CIpcMessage.h"
#include "CProtocolUtil.h" #include "CProtocolUtil.h"
#include "CArch.h"
CEvent::Type CIpcClientProxy::s_messageReceivedEvent = CEvent::kUnknown; CEvent::Type CIpcClientProxy::s_messageReceivedEvent = CEvent::kUnknown;
CEvent::Type CIpcClientProxy::s_disconnectedEvent = CEvent::kUnknown; CEvent::Type CIpcClientProxy::s_disconnectedEvent = CEvent::kUnknown;
@ -30,13 +31,21 @@ CIpcClientProxy::CIpcClientProxy(synergy::IStream& stream) :
m_stream(stream), m_stream(stream),
m_enableLog(false), m_enableLog(false),
m_clientType(kIpcClientUnknown), m_clientType(kIpcClientUnknown),
m_disconnecting(false) m_disconnecting(false),
m_socketBusy(false)
{ {
m_mutex = ARCH->newMutex();
EVENTQUEUE->adoptHandler( EVENTQUEUE->adoptHandler(
m_stream.getInputReadyEvent(), stream.getEventTarget(), m_stream.getInputReadyEvent(), stream.getEventTarget(),
new TMethodEventJob<CIpcClientProxy>( new TMethodEventJob<CIpcClientProxy>(
this, &CIpcClientProxy::handleData)); this, &CIpcClientProxy::handleData));
EVENTQUEUE->adoptHandler(
m_stream.getOutputErrorEvent(), stream.getEventTarget(),
new TMethodEventJob<CIpcClientProxy>(
this, &CIpcClientProxy::handleWriteError));
EVENTQUEUE->adoptHandler( EVENTQUEUE->adoptHandler(
m_stream.getInputShutdownEvent(), stream.getEventTarget(), m_stream.getInputShutdownEvent(), stream.getEventTarget(),
new TMethodEventJob<CIpcClientProxy>( new TMethodEventJob<CIpcClientProxy>(
@ -52,6 +61,8 @@ CIpcClientProxy::~CIpcClientProxy()
{ {
EVENTQUEUE->removeHandler( EVENTQUEUE->removeHandler(
m_stream.getInputReadyEvent(), m_stream.getEventTarget()); m_stream.getInputReadyEvent(), m_stream.getEventTarget());
EVENTQUEUE->removeHandler(
m_stream.getOutputErrorEvent(), m_stream.getEventTarget());
EVENTQUEUE->removeHandler( EVENTQUEUE->removeHandler(
m_stream.getInputShutdownEvent(), m_stream.getEventTarget()); m_stream.getInputShutdownEvent(), m_stream.getEventTarget());
EVENTQUEUE->removeHandler( EVENTQUEUE->removeHandler(
@ -77,6 +88,12 @@ CIpcClientProxy::handleWriteError(const CEvent&, void*)
void void
CIpcClientProxy::handleData(const CEvent&, void*) CIpcClientProxy::handleData(const CEvent&, void*)
{ {
// ugh, not sure if this is needed here. the write function has it to protect
// m_socketBusy (for dropping log messages primarily). but i think i saw a
// deadlock even after adding this mechanism. my rationale here is that i don't
// want the read to deadlock the stream (if that's even possible).
CArchMutexLock lock(m_mutex);
UInt8 code[1]; UInt8 code[1];
UInt32 n = m_stream.read(code, 1); UInt32 n = m_stream.read(code, 1);
while (n != 0) { while (n != 0) {
@ -115,35 +132,52 @@ CIpcClientProxy::handleData(const CEvent&, void*)
void void
CIpcClientProxy::send(const CIpcMessage& message) CIpcClientProxy::send(const CIpcMessage& message)
{ {
if (m_enableLog) { // discard message if we're busy writing already. this is to prevent
LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type)); // deadlock, since this function can sometimes generate log messages
// through stream usage.
if (m_socketBusy) {
return;
} }
UInt8 code[1]; CArchMutexLock lock(m_mutex);
code[0] = message.m_type; m_socketBusy = true;
m_stream.write(code, 1); try {
switch (message.m_type) {
case kIpcLogLine: {
CString* s = (CString*)message.m_data;
const char* data = s->c_str();
int len = strlen(data);
CProtocolUtil::writef(&m_stream, "%2i", len);
m_stream.write(data, len);
break;
}
case kIpcShutdown:
// no data.
break;
default:
if (m_enableLog) { if (m_enableLog) {
LOG((CLOG_ERR "message not supported: %d", message.m_type)); LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type));
} }
break;
UInt8 code[1];
code[0] = message.m_type;
m_stream.write(code, 1);
switch (message.m_type) {
case kIpcLogLine: {
CString* s = (CString*)message.m_data;
const char* data = s->c_str();
int len = strlen(data);
CProtocolUtil::writef(&m_stream, "%2i", len);
m_stream.write(data, len);
break;
}
case kIpcShutdown:
// no data.
break;
default:
if (m_enableLog) {
LOG((CLOG_ERR "message not supported: %d", message.m_type));
}
break;
}
m_socketBusy = false;
}
catch (...) {
m_socketBusy = false;
throw;
} }
} }

View File

@ -19,6 +19,7 @@
#include "CEvent.h" #include "CEvent.h"
#include "Ipc.h" #include "Ipc.h"
#include "CMutex.h"
namespace synergy { class IStream; } namespace synergy { class IStream; }
class CIpcMessage; class CIpcMessage;
@ -50,8 +51,11 @@ public:
bool m_enableLog; bool m_enableLog;
EIpcClientType m_clientType; EIpcClientType m_clientType;
bool m_disconnecting; bool m_disconnecting;
bool m_socketBusy;
private: private:
CArchMutex m_mutex;
static CEvent::Type s_messageReceivedEvent; static CEvent::Type s_messageReceivedEvent;
static CEvent::Type s_disconnectedEvent; static CEvent::Type s_disconnectedEvent;
}; };

View File

@ -297,13 +297,31 @@ CDaemonApp::handleIpcMessage(const CEvent& e, void*)
CString& command = *static_cast<CString*>(m.m_data); CString& command = *static_cast<CString*>(m.m_data);
LOG((CLOG_DEBUG "got new command: %s", command.c_str())); LOG((CLOG_DEBUG "got new command: %s", command.c_str()));
CString debugArg("--debug");
int debugArgPos = command.find(debugArg);
if (debugArgPos != CString::npos) {
int from = debugArgPos + debugArg.size() + 1;
int nextSpace = command.find(" ", from);
CString logLevel(command.substr(from, nextSpace - from));
try {
// change log level based on that in the command string
// and change to that log level now.
ARCH->setting("LogLevel", logLevel);
CLOG->setFilter(logLevel.c_str());
}
catch (XArch& e) {
LOG((CLOG_ERR "failed to save LogLevel setting, %s", e.what().c_str()));
}
}
try { try {
// store command in system settings. this is used when the daemon // store command in system settings. this is used when the daemon
// next starts. // next starts.
ARCH->setting("Command", command); ARCH->setting("Command", command);
} }
catch (XArch& e) { catch (XArch& e) {
LOG((CLOG_ERR "failed to save setting, %s", e.what().c_str())); LOG((CLOG_ERR "failed to save Command setting, %s", e.what().c_str()));
} }
// tell the relauncher about the new command. this causes the // tell the relauncher about the new command. this causes the