* made gui ipc reader more robust by getting rid of reader thread (but read wait now blocks gui thread)

* separated ipc client proxy mutex into read and write (i think read was deadlocking write)
* fixed cpu spinning on ipc log send buffer thread (now uses wait cond like before).
This commit is contained in:
Nick Bolton 2012-07-09 12:09:24 +00:00
parent e501552f24
commit 346666791e
9 changed files with 158 additions and 105 deletions

View File

@ -26,10 +26,12 @@
IpcClient::IpcClient() IpcClient::IpcClient()
{ {
m_Socket = new QTcpSocket(this); m_Socket = new QTcpSocket(this);
m_Reader = new IpcReader(m_Socket);
connect(m_Socket, SIGNAL(connected()), this, SLOT(connected())); connect(m_Socket, SIGNAL(connected()), this, SLOT(connected()));
connect(m_Socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError))); connect(m_Socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError)));
m_Reader = new IpcReader(m_Socket);
connect(m_Reader, SIGNAL(readLogLine(const QString&)), this, SLOT(handleReadLogLine(const QString&))); connect(m_Reader, SIGNAL(readLogLine(const QString&)), this, SLOT(handleReadLogLine(const QString&)));
m_Reader->start();
} }
IpcClient::~IpcClient() IpcClient::~IpcClient()
@ -43,8 +45,6 @@ void IpcClient::connected()
write(kIpcHello, 1, typeBuf); write(kIpcHello, 1, typeBuf);
infoMessage("connection established"); infoMessage("connection established");
m_Reader->start();
} }
void IpcClient::connectToHost() void IpcClient::connectToHost()

View File

@ -20,96 +20,98 @@
#include "Ipc.h" #include "Ipc.h"
#include <iostream> #include <iostream>
#include <QMutex> #include <QMutex>
#include <QByteArray>
IpcReader::IpcReader(QTcpSocket* socket) : IpcReader::IpcReader(QTcpSocket* socket) :
m_Socket(socket), m_Socket(socket)
m_ReadyRead(false)
{ {
connect(socket, SIGNAL(readyRead()), this, SLOT(readyRead()));
} }
IpcReader::~IpcReader() IpcReader::~IpcReader()
{ {
} }
void IpcReader::readyRead() void IpcReader::start()
{ {
connect(m_Socket, SIGNAL(readyRead()), this, SLOT(read()));
}
void IpcReader::read()
{
QMutexLocker locker(&m_Mutex);
std::cout << "ready read" << std::endl; std::cout << "ready read" << std::endl;
m_ReadyRead = true;
}
void IpcReader::run() char codeBuf[1];
{ readStream(codeBuf, 1);
m_Socket->waitForConnected(-1); int code = bytesToInt(codeBuf, 1);
while (true) {
char codeBuf[1]; switch (code) {
readStream(codeBuf, 1); case kIpcLogLine: {
std::cout << "reading log line" << std::endl;
switch (codeBuf[0]) { char lenBuf[4];
case kIpcLogLine: { readStream(lenBuf, 4);
char lenBuf[4]; int len = bytesToInt(lenBuf, 4);
readStream(lenBuf, 4);
int len = bytesToInt(lenBuf, 4);
char* data = new char[len]; char* data = new char[len];
readStream(data, len); readStream(data, len);
QString line = QString::fromUtf8(data, len);
delete data;
QString line = QString::fromUtf8(data, len); readLogLine(line);
readLogLine(line); break;
break;
}
default:
std::cerr << "aborting, message invalid: " << (unsigned int)codeBuf[0] << std::endl;
return;
} }
default:
std::cerr << "aborting, message invalid: " << code << std::endl;
return;
} }
std::cout << "read done" << std::endl;
} }
void IpcReader::readStream(char* buffer, int length) bool IpcReader::readStream(char* buffer, int length)
{ {
QDataStream stream(m_Socket);
std::cout << "reading stream" << std::endl; std::cout << "reading stream" << std::endl;
int read = 0; int read = 0;
while (read < length) { while (read < length) {
int ask = length - read; int ask = length - read;
int got = stream.readRawData(buffer, ask); if (m_Socket->bytesAvailable() < ask) {
std::cout << "buffer too short, waiting" << std::endl;
if (got == 0) { m_Socket->waitForReadyRead(-1);
std::cout << "end of buffer, waiting" << std::endl;
// i'd love nothing more than to use a wait condition here, but
// qt is such a fucker with mutexes (can't lock/unlock between
// threads?! wtf?!). i'd just rather not go there (patches welcome).
while (!m_ReadyRead) {
QThread::usleep(50);
}
m_ReadyRead = false;
} }
else if (got == -1) {
// i really don't trust qt not to copy beyond the array length.
// seems like a convoluted an expensive way to copy from the stream :/
char* tempBuffer = new char[ask];
int got = m_Socket->read(tempBuffer, ask);
memcpy(buffer, tempBuffer, got);
delete tempBuffer;
read += got;
std::cout << "> ask=" << ask << " got=" << got
<< " read=" << read << std::endl;
if (got == -1) {
std::cout << "socket ended, aborting" << std::endl; std::cout << "socket ended, aborting" << std::endl;
return; return false;
} }
else { else if (length - read > 0) {
read += got; std::cout << "more remains, seek to " << got << std::endl;
buffer += got; buffer += got;
std::cout << "> ask=" << ask << " got=" << got
<< " read=" << read << std::endl;
if (length - read > 0) {
std::cout << "more remains" << std::endl;
}
} }
} }
return true;
} }
// TODO: qt must have a built in way of converting bytes to int.
int IpcReader::bytesToInt(const char *buffer, int size) int IpcReader::bytesToInt(const char *buffer, int size)
{ {
if (size == 2) { if (size == 1) {
return (unsigned char)buffer[0];
}
else if (size == 2) {
return return
(((unsigned char)buffer[0]) << 8) + (((unsigned char)buffer[0]) << 8) +
(unsigned char)buffer[1]; (unsigned char)buffer[1];
@ -122,7 +124,6 @@ int IpcReader::bytesToInt(const char *buffer, int size)
(unsigned char)buffer[3]; (unsigned char)buffer[3];
} }
else { else {
// TODO: other sizes, if needed.
return 0; return 0;
} }
} }

View File

@ -17,31 +17,31 @@
#pragma once #pragma once
#include <QThread> #include <QObject>
#include <QMutex>
class QTcpSocket; class QTcpSocket;
class IpcReader : public QThread class IpcReader : public QObject
{ {
Q_OBJECT; Q_OBJECT;
public: public:
IpcReader(QTcpSocket* socket); IpcReader(QTcpSocket* socket);
virtual ~IpcReader(); virtual ~IpcReader();
void run(); void start();
void stop();
signals: signals:
void readLogLine(const QString& text); void readLogLine(const QString& text);
private: private:
void readStream(char* buffer, int length); bool readStream(char* buffer, int length);
int bytesToInt(const char* buffer, int size); int bytesToInt(const char* buffer, int size);
private slots: private slots:
void readyRead(); void read();
private: private:
QTcpSocket* m_Socket; QTcpSocket* m_Socket;
bool m_ReadyRead; QMutex m_Mutex;
}; };

View File

@ -54,15 +54,6 @@ static const char* synergyIconFiles[] =
":/res/icons/16x16/synergy-connected.png" ":/res/icons/16x16/synergy-connected.png"
}; };
class QThreadImpl : public QThread
{
public:
static void msleep(unsigned long msecs)
{
QThread::msleep(msecs);
}
};
MainWindow::MainWindow(QSettings& settings, AppConfig& appConfig) : MainWindow::MainWindow(QSettings& settings, AppConfig& appConfig) :
m_Settings(settings), m_Settings(settings),
m_AppConfig(appConfig), m_AppConfig(appConfig),

View File

@ -30,10 +30,10 @@ CEvent::Type CIpcClientProxy::s_disconnectedEvent = CEvent::kUnknown;
CIpcClientProxy::CIpcClientProxy(synergy::IStream& stream) : CIpcClientProxy::CIpcClientProxy(synergy::IStream& stream) :
m_stream(stream), m_stream(stream),
m_clientType(kIpcClientUnknown), m_clientType(kIpcClientUnknown),
m_disconnecting(false) m_disconnecting(false),
m_readMutex(ARCH->newMutex()),
m_writeMutex(ARCH->newMutex())
{ {
m_mutex = ARCH->newMutex();
EVENTQUEUE->adoptHandler( EVENTQUEUE->adoptHandler(
m_stream.getInputReadyEvent(), stream.getEventTarget(), m_stream.getInputReadyEvent(), stream.getEventTarget(),
new TMethodEventJob<CIpcClientProxy>( new TMethodEventJob<CIpcClientProxy>(
@ -67,10 +67,14 @@ CIpcClientProxy::~CIpcClientProxy()
m_stream.getOutputShutdownEvent(), m_stream.getEventTarget()); m_stream.getOutputShutdownEvent(), m_stream.getEventTarget());
// don't delete the stream while it's being used. // don't delete the stream while it's being used.
ARCH->lockMutex(m_mutex); ARCH->lockMutex(m_readMutex);
ARCH->lockMutex(m_writeMutex);
delete &m_stream; delete &m_stream;
ARCH->unlockMutex(m_mutex); ARCH->unlockMutex(m_readMutex);
ARCH->closeMutex(m_mutex); ARCH->unlockMutex(m_writeMutex);
ARCH->closeMutex(m_readMutex);
ARCH->closeMutex(m_writeMutex);
} }
void void
@ -90,8 +94,10 @@ CIpcClientProxy::handleWriteError(const CEvent&, void*)
void void
CIpcClientProxy::handleData(const CEvent&, void*) CIpcClientProxy::handleData(const CEvent&, void*)
{ {
LOG((CLOG_DEBUG "start ipc client proxy handle data"));
// don't allow the dtor to destroy the stream while we're using it. // don't allow the dtor to destroy the stream while we're using it.
CArchMutexLock lock(m_mutex); CArchMutexLock lock(m_readMutex);
UInt8 code[1]; UInt8 code[1];
UInt32 n = m_stream.read(code, 1); UInt32 n = m_stream.read(code, 1);
@ -124,6 +130,8 @@ CIpcClientProxy::handleData(const CEvent&, void*)
n = m_stream.read(code, 1); n = m_stream.read(code, 1);
} }
LOG((CLOG_DEBUG "finished ipc client proxy handle data"));
} }
void void
@ -132,7 +140,7 @@ CIpcClientProxy::send(const CIpcMessage& message)
// don't allow other threads to write until we've finished the entire // don't allow other threads to write until we've finished the entire
// message. stream write is locked, but only for that single write. // message. stream write is locked, but only for that single write.
// also, don't allow the dtor to destroy the stream while we're using it. // also, don't allow the dtor to destroy the stream while we're using it.
CArchMutexLock lock(m_mutex); CArchMutexLock lock(m_writeMutex);
LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type)); LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type));
@ -186,6 +194,7 @@ CIpcClientProxy::parseCommand()
void void
CIpcClientProxy::disconnect() CIpcClientProxy::disconnect()
{ {
LOG((CLOG_DEBUG "ipc client proxy disconnect"));
m_disconnecting = true; m_disconnecting = true;
EVENTQUEUE->addEvent(CEvent(getDisconnectedEvent(), this)); EVENTQUEUE->addEvent(CEvent(getDisconnectedEvent(), this));
} }

View File

@ -52,7 +52,8 @@ public:
bool m_disconnecting; bool m_disconnecting;
private: private:
CArchMutex m_mutex; CArchMutex m_readMutex;
CArchMutex m_writeMutex;
static CEvent::Type s_messageReceivedEvent; static CEvent::Type s_messageReceivedEvent;
static CEvent::Type s_disconnectedEvent; static CEvent::Type s_disconnectedEvent;

View File

@ -28,11 +28,17 @@
#include "TMethodJob.h" #include "TMethodJob.h"
#include "XArch.h" #include "XArch.h"
// limit number of log lines sent in one message.
#define MAX_SEND 100
CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) : CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) :
m_ipcServer(ipcServer), m_ipcServer(ipcServer),
m_bufferMutex(ARCH->newMutex()), m_bufferMutex(ARCH->newMutex()),
m_sending(false), m_sending(false),
m_running(true) m_running(true),
m_notifyCond(ARCH->newCondVar()),
m_notifyMutex(ARCH->newMutex()),
m_bufferWaiting(false)
{ {
m_bufferThread = new CThread(new TMethodJob<CIpcLogOutputter>( m_bufferThread = new CThread(new TMethodJob<CIpcLogOutputter>(
this, &CIpcLogOutputter::bufferThread)); this, &CIpcLogOutputter::bufferThread));
@ -41,10 +47,14 @@ m_running(true)
CIpcLogOutputter::~CIpcLogOutputter() CIpcLogOutputter::~CIpcLogOutputter()
{ {
m_running = false; m_running = false;
notifyBuffer();
m_bufferThread->wait(5); m_bufferThread->wait(5);
ARCH->closeMutex(m_bufferMutex); ARCH->closeMutex(m_bufferMutex);
delete m_bufferThread; delete m_bufferThread;
ARCH->closeCondVar(m_notifyCond);
ARCH->closeMutex(m_notifyMutex);
} }
void void
@ -71,6 +81,7 @@ CIpcLogOutputter::write(ELevel level, const char* text)
bool bool
CIpcLogOutputter::write(ELevel, const char* text, bool force) CIpcLogOutputter::write(ELevel, const char* text, bool force)
{ {
// TODO: discard based on thread id? hmm...
// sending the buffer generates log messages, which we must throw // sending the buffer generates log messages, which we must throw
// away (otherwise this would cause recursion). this is just a drawback // away (otherwise this would cause recursion). this is just a drawback
// of logging this way. there is also the risk that this could throw // of logging this way. there is also the risk that this could throw
@ -81,28 +92,32 @@ CIpcLogOutputter::write(ELevel, const char* text, bool force)
return true; return true;
} }
CArchMutexLock lock(m_bufferMutex); appendBuffer(text);
m_buffer.append(text); notifyBuffer();
m_buffer.append("\n");
return true; return true;
} }
void
CIpcLogOutputter::appendBuffer(const CString& text)
{
CArchMutexLock lock(m_bufferMutex);
m_buffer.push(text);
}
void void
CIpcLogOutputter::bufferThread(void*) CIpcLogOutputter::bufferThread(void*)
{ {
try { try {
while (m_running) { while (m_running) {
while (m_running && m_buffer.size() == 0) {
ARCH->sleep(.1);
}
if (!m_running) {
break;
}
if (m_ipcServer.hasClients(kIpcClientGui)) { if (m_ipcServer.hasClients(kIpcClientGui)) {
sendBuffer(); while (!m_buffer.empty()) {
sendBuffer();
}
} }
m_bufferWaiting = true;
ARCH->waitCondVar(m_notifyCond, m_notifyMutex, -1);
m_bufferWaiting = false;
} }
} }
catch (XArch& e) { catch (XArch& e) {
@ -112,21 +127,41 @@ CIpcLogOutputter::bufferThread(void*)
LOG((CLOG_DEBUG "ipc log buffer thread finished")); LOG((CLOG_DEBUG "ipc log buffer thread finished"));
} }
CString* void
CIpcLogOutputter::emptyBuffer() CIpcLogOutputter::notifyBuffer()
{
if (!m_bufferWaiting) {
return;
}
CArchMutexLock lock(m_notifyMutex);
ARCH->broadcastCondVar(m_notifyCond);
}
CString
CIpcLogOutputter::emptyBuffer(int count)
{ {
CArchMutexLock lock(m_bufferMutex); CArchMutexLock lock(m_bufferMutex);
CString* copy = new CString(m_buffer);
m_buffer.clear(); if (m_buffer.size() < count) {
return copy; count = m_buffer.size();
}
CString chunk;
for (int i = 0; i < count; i++) {
chunk.append(m_buffer.front());
chunk.append("\n");
m_buffer.pop();
}
return chunk;
} }
void void
CIpcLogOutputter::sendBuffer() CIpcLogOutputter::sendBuffer()
{ {
CIpcMessage message; CIpcMessage message;
message.m_type = kIpcLogLine; message.m_type = kIpcLogLine;
message.m_data = emptyBuffer(); message.m_data = new CString(emptyBuffer(MAX_SEND));
m_sending = true; m_sending = true;
m_ipcServer.send(message, kIpcClientGui); m_ipcServer.send(message, kIpcClientGui);

View File

@ -19,6 +19,7 @@
#include "ILogOutputter.h" #include "ILogOutputter.h"
#include "CArch.h" #include "CArch.h"
#include <queue>
class CIpcServer; class CIpcServer;
class CEvent; class CEvent;
@ -38,18 +39,29 @@ public:
virtual void close(); virtual void close();
virtual void show(bool showIfEmpty); virtual void show(bool showIfEmpty);
virtual bool write(ELevel level, const char* message); virtual bool write(ELevel level, const char* message);
virtual bool write(ELevel level, const char* text, bool force);
//! Same as write, but allows message to sidestep anti-recursion mechanism.
bool write(ELevel level, const char* text, bool force);
//! Notify that the buffer should be sent.
void notifyBuffer();
private: private:
void bufferThread(void*); void bufferThread(void*);
CString* emptyBuffer(); CString emptyBuffer(int count);
void sendBuffer(); void sendBuffer();
void appendBuffer(const CString& text);
private: private:
typedef std::queue<CString> CBuffer;
CIpcServer& m_ipcServer; CIpcServer& m_ipcServer;
CString m_buffer; CBuffer m_buffer;
CArchMutex m_bufferMutex; CArchMutex m_bufferMutex;
bool m_sending; bool m_sending;
CThread* m_bufferThread; CThread* m_bufferThread;
bool m_running; bool m_running;
CArchCond m_notifyCond;
CArchMutex m_notifyMutex;
bool m_bufferWaiting;
}; };

View File

@ -330,5 +330,9 @@ CDaemonApp::handleIpcMessage(const CEvent& e, void*)
m_relauncher->command(command); m_relauncher->command(command);
break; break;
} }
case kIpcHello:
m_ipcLogOutputter->notifyBuffer();
break;
} }
} }