diff --git a/src/gui/src/IpcClient.cpp b/src/gui/src/IpcClient.cpp index f23ff9f5..3b301da3 100644 --- a/src/gui/src/IpcClient.cpp +++ b/src/gui/src/IpcClient.cpp @@ -26,10 +26,12 @@ IpcClient::IpcClient() { m_Socket = new QTcpSocket(this); - m_Reader = new IpcReader(m_Socket); connect(m_Socket, SIGNAL(connected()), this, SLOT(connected())); connect(m_Socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError))); + + m_Reader = new IpcReader(m_Socket); connect(m_Reader, SIGNAL(readLogLine(const QString&)), this, SLOT(handleReadLogLine(const QString&))); + m_Reader->start(); } IpcClient::~IpcClient() @@ -43,8 +45,6 @@ void IpcClient::connected() write(kIpcHello, 1, typeBuf); infoMessage("connection established"); - - m_Reader->start(); } void IpcClient::connectToHost() diff --git a/src/gui/src/IpcReader.cpp b/src/gui/src/IpcReader.cpp index 9754af11..a741874a 100644 --- a/src/gui/src/IpcReader.cpp +++ b/src/gui/src/IpcReader.cpp @@ -20,96 +20,98 @@ #include "Ipc.h" #include #include +#include IpcReader::IpcReader(QTcpSocket* socket) : -m_Socket(socket), -m_ReadyRead(false) +m_Socket(socket) { - connect(socket, SIGNAL(readyRead()), this, SLOT(readyRead())); } IpcReader::~IpcReader() { } -void IpcReader::readyRead() +void IpcReader::start() { + connect(m_Socket, SIGNAL(readyRead()), this, SLOT(read())); +} + +void IpcReader::read() +{ + QMutexLocker locker(&m_Mutex); std::cout << "ready read" << std::endl; - m_ReadyRead = true; -} -void IpcReader::run() -{ - m_Socket->waitForConnected(-1); - while (true) { + char codeBuf[1]; + readStream(codeBuf, 1); + int code = bytesToInt(codeBuf, 1); - char codeBuf[1]; - readStream(codeBuf, 1); + switch (code) { + case kIpcLogLine: { + std::cout << "reading log line" << std::endl; - switch (codeBuf[0]) { - case kIpcLogLine: { - char lenBuf[4]; - readStream(lenBuf, 4); - int len = bytesToInt(lenBuf, 4); + char lenBuf[4]; + readStream(lenBuf, 4); + int len = bytesToInt(lenBuf, 4); - char* data = new char[len]; - readStream(data, len); + char* data = new char[len]; + readStream(data, len); + QString line = QString::fromUtf8(data, len); + delete data; - QString line = QString::fromUtf8(data, len); - readLogLine(line); - break; - } - - default: - std::cerr << "aborting, message invalid: " << (unsigned int)codeBuf[0] << std::endl; - return; + readLogLine(line); + break; } + + default: + std::cerr << "aborting, message invalid: " << code << std::endl; + return; } + + std::cout << "read done" << std::endl; } -void IpcReader::readStream(char* buffer, int length) +bool IpcReader::readStream(char* buffer, int length) { - QDataStream stream(m_Socket); std::cout << "reading stream" << std::endl; int read = 0; while (read < length) { int ask = length - read; - int got = stream.readRawData(buffer, ask); - - if (got == 0) { - std::cout << "end of buffer, waiting" << std::endl; - - // i'd love nothing more than to use a wait condition here, but - // qt is such a fucker with mutexes (can't lock/unlock between - // threads?! wtf?!). i'd just rather not go there (patches welcome). - while (!m_ReadyRead) { - QThread::usleep(50); - } - m_ReadyRead = false; + if (m_Socket->bytesAvailable() < ask) { + std::cout << "buffer too short, waiting" << std::endl; + m_Socket->waitForReadyRead(-1); } - else if (got == -1) { + + // i really don't trust qt not to copy beyond the array length. + // seems like a convoluted an expensive way to copy from the stream :/ + char* tempBuffer = new char[ask]; + int got = m_Socket->read(tempBuffer, ask); + memcpy(buffer, tempBuffer, got); + delete tempBuffer; + + read += got; + + std::cout << "> ask=" << ask << " got=" << got + << " read=" << read << std::endl; + + if (got == -1) { std::cout << "socket ended, aborting" << std::endl; - return; + return false; } - else { - read += got; + else if (length - read > 0) { + std::cout << "more remains, seek to " << got << std::endl; buffer += got; - - std::cout << "> ask=" << ask << " got=" << got - << " read=" << read << std::endl; - - if (length - read > 0) { - std::cout << "more remains" << std::endl; - } } } + return true; } -// TODO: qt must have a built in way of converting bytes to int. int IpcReader::bytesToInt(const char *buffer, int size) { - if (size == 2) { + if (size == 1) { + return (unsigned char)buffer[0]; + } + else if (size == 2) { return (((unsigned char)buffer[0]) << 8) + (unsigned char)buffer[1]; @@ -122,7 +124,6 @@ int IpcReader::bytesToInt(const char *buffer, int size) (unsigned char)buffer[3]; } else { - // TODO: other sizes, if needed. return 0; } } diff --git a/src/gui/src/IpcReader.h b/src/gui/src/IpcReader.h index 13684cba..5ca49127 100644 --- a/src/gui/src/IpcReader.h +++ b/src/gui/src/IpcReader.h @@ -17,31 +17,31 @@ #pragma once -#include +#include +#include class QTcpSocket; -class IpcReader : public QThread +class IpcReader : public QObject { Q_OBJECT; public: IpcReader(QTcpSocket* socket); virtual ~IpcReader(); - void run(); - void stop(); + void start(); signals: void readLogLine(const QString& text); private: - void readStream(char* buffer, int length); + bool readStream(char* buffer, int length); int bytesToInt(const char* buffer, int size); private slots: - void readyRead(); + void read(); private: QTcpSocket* m_Socket; - bool m_ReadyRead; + QMutex m_Mutex; }; diff --git a/src/gui/src/MainWindow.cpp b/src/gui/src/MainWindow.cpp index b0d8d554..f1ddba9a 100644 --- a/src/gui/src/MainWindow.cpp +++ b/src/gui/src/MainWindow.cpp @@ -54,15 +54,6 @@ static const char* synergyIconFiles[] = ":/res/icons/16x16/synergy-connected.png" }; -class QThreadImpl : public QThread -{ -public: - static void msleep(unsigned long msecs) - { - QThread::msleep(msecs); - } -}; - MainWindow::MainWindow(QSettings& settings, AppConfig& appConfig) : m_Settings(settings), m_AppConfig(appConfig), diff --git a/src/lib/ipc/CIpcClientProxy.cpp b/src/lib/ipc/CIpcClientProxy.cpp index 8c0fec73..d958144b 100644 --- a/src/lib/ipc/CIpcClientProxy.cpp +++ b/src/lib/ipc/CIpcClientProxy.cpp @@ -30,10 +30,10 @@ CEvent::Type CIpcClientProxy::s_disconnectedEvent = CEvent::kUnknown; CIpcClientProxy::CIpcClientProxy(synergy::IStream& stream) : m_stream(stream), m_clientType(kIpcClientUnknown), -m_disconnecting(false) +m_disconnecting(false), +m_readMutex(ARCH->newMutex()), +m_writeMutex(ARCH->newMutex()) { - m_mutex = ARCH->newMutex(); - EVENTQUEUE->adoptHandler( m_stream.getInputReadyEvent(), stream.getEventTarget(), new TMethodEventJob( @@ -67,10 +67,14 @@ CIpcClientProxy::~CIpcClientProxy() m_stream.getOutputShutdownEvent(), m_stream.getEventTarget()); // don't delete the stream while it's being used. - ARCH->lockMutex(m_mutex); + ARCH->lockMutex(m_readMutex); + ARCH->lockMutex(m_writeMutex); delete &m_stream; - ARCH->unlockMutex(m_mutex); - ARCH->closeMutex(m_mutex); + ARCH->unlockMutex(m_readMutex); + ARCH->unlockMutex(m_writeMutex); + + ARCH->closeMutex(m_readMutex); + ARCH->closeMutex(m_writeMutex); } void @@ -90,8 +94,10 @@ CIpcClientProxy::handleWriteError(const CEvent&, void*) void CIpcClientProxy::handleData(const CEvent&, void*) { + LOG((CLOG_DEBUG "start ipc client proxy handle data")); + // don't allow the dtor to destroy the stream while we're using it. - CArchMutexLock lock(m_mutex); + CArchMutexLock lock(m_readMutex); UInt8 code[1]; UInt32 n = m_stream.read(code, 1); @@ -124,6 +130,8 @@ CIpcClientProxy::handleData(const CEvent&, void*) n = m_stream.read(code, 1); } + + LOG((CLOG_DEBUG "finished ipc client proxy handle data")); } void @@ -132,7 +140,7 @@ CIpcClientProxy::send(const CIpcMessage& message) // don't allow other threads to write until we've finished the entire // message. stream write is locked, but only for that single write. // also, don't allow the dtor to destroy the stream while we're using it. - CArchMutexLock lock(m_mutex); + CArchMutexLock lock(m_writeMutex); LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type)); @@ -186,6 +194,7 @@ CIpcClientProxy::parseCommand() void CIpcClientProxy::disconnect() { + LOG((CLOG_DEBUG "ipc client proxy disconnect")); m_disconnecting = true; EVENTQUEUE->addEvent(CEvent(getDisconnectedEvent(), this)); } diff --git a/src/lib/ipc/CIpcClientProxy.h b/src/lib/ipc/CIpcClientProxy.h index 3fdf8297..1f67baa0 100644 --- a/src/lib/ipc/CIpcClientProxy.h +++ b/src/lib/ipc/CIpcClientProxy.h @@ -52,7 +52,8 @@ public: bool m_disconnecting; private: - CArchMutex m_mutex; + CArchMutex m_readMutex; + CArchMutex m_writeMutex; static CEvent::Type s_messageReceivedEvent; static CEvent::Type s_disconnectedEvent; diff --git a/src/lib/ipc/CIpcLogOutputter.cpp b/src/lib/ipc/CIpcLogOutputter.cpp index 93e6fbf6..63a459c1 100644 --- a/src/lib/ipc/CIpcLogOutputter.cpp +++ b/src/lib/ipc/CIpcLogOutputter.cpp @@ -28,11 +28,17 @@ #include "TMethodJob.h" #include "XArch.h" +// limit number of log lines sent in one message. +#define MAX_SEND 100 + CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) : m_ipcServer(ipcServer), m_bufferMutex(ARCH->newMutex()), m_sending(false), -m_running(true) +m_running(true), +m_notifyCond(ARCH->newCondVar()), +m_notifyMutex(ARCH->newMutex()), +m_bufferWaiting(false) { m_bufferThread = new CThread(new TMethodJob( this, &CIpcLogOutputter::bufferThread)); @@ -41,10 +47,14 @@ m_running(true) CIpcLogOutputter::~CIpcLogOutputter() { m_running = false; + notifyBuffer(); m_bufferThread->wait(5); ARCH->closeMutex(m_bufferMutex); delete m_bufferThread; + + ARCH->closeCondVar(m_notifyCond); + ARCH->closeMutex(m_notifyMutex); } void @@ -71,6 +81,7 @@ CIpcLogOutputter::write(ELevel level, const char* text) bool CIpcLogOutputter::write(ELevel, const char* text, bool force) { + // TODO: discard based on thread id? hmm... // sending the buffer generates log messages, which we must throw // away (otherwise this would cause recursion). this is just a drawback // of logging this way. there is also the risk that this could throw @@ -81,28 +92,32 @@ CIpcLogOutputter::write(ELevel, const char* text, bool force) return true; } - CArchMutexLock lock(m_bufferMutex); - m_buffer.append(text); - m_buffer.append("\n"); + appendBuffer(text); + notifyBuffer(); return true; } +void +CIpcLogOutputter::appendBuffer(const CString& text) +{ + CArchMutexLock lock(m_bufferMutex); + m_buffer.push(text); +} + void CIpcLogOutputter::bufferThread(void*) { try { while (m_running) { - while (m_running && m_buffer.size() == 0) { - ARCH->sleep(.1); - } - - if (!m_running) { - break; - } - if (m_ipcServer.hasClients(kIpcClientGui)) { - sendBuffer(); + while (!m_buffer.empty()) { + sendBuffer(); + } } + + m_bufferWaiting = true; + ARCH->waitCondVar(m_notifyCond, m_notifyMutex, -1); + m_bufferWaiting = false; } } catch (XArch& e) { @@ -112,21 +127,41 @@ CIpcLogOutputter::bufferThread(void*) LOG((CLOG_DEBUG "ipc log buffer thread finished")); } -CString* -CIpcLogOutputter::emptyBuffer() +void +CIpcLogOutputter::notifyBuffer() +{ + if (!m_bufferWaiting) { + return; + } + CArchMutexLock lock(m_notifyMutex); + ARCH->broadcastCondVar(m_notifyCond); +} + +CString +CIpcLogOutputter::emptyBuffer(int count) { CArchMutexLock lock(m_bufferMutex); - CString* copy = new CString(m_buffer); - m_buffer.clear(); - return copy; + + if (m_buffer.size() < count) { + count = m_buffer.size(); + } + + CString chunk; + for (int i = 0; i < count; i++) { + chunk.append(m_buffer.front()); + chunk.append("\n"); + m_buffer.pop(); + } + return chunk; } + void CIpcLogOutputter::sendBuffer() { CIpcMessage message; message.m_type = kIpcLogLine; - message.m_data = emptyBuffer(); + message.m_data = new CString(emptyBuffer(MAX_SEND)); m_sending = true; m_ipcServer.send(message, kIpcClientGui); diff --git a/src/lib/ipc/CIpcLogOutputter.h b/src/lib/ipc/CIpcLogOutputter.h index 5f1ca576..f574b9c4 100644 --- a/src/lib/ipc/CIpcLogOutputter.h +++ b/src/lib/ipc/CIpcLogOutputter.h @@ -19,6 +19,7 @@ #include "ILogOutputter.h" #include "CArch.h" +#include class CIpcServer; class CEvent; @@ -38,18 +39,29 @@ public: virtual void close(); virtual void show(bool showIfEmpty); virtual bool write(ELevel level, const char* message); - virtual bool write(ELevel level, const char* text, bool force); + + //! Same as write, but allows message to sidestep anti-recursion mechanism. + bool write(ELevel level, const char* text, bool force); + + //! Notify that the buffer should be sent. + void notifyBuffer(); private: void bufferThread(void*); - CString* emptyBuffer(); + CString emptyBuffer(int count); void sendBuffer(); + void appendBuffer(const CString& text); private: + typedef std::queue CBuffer; + CIpcServer& m_ipcServer; - CString m_buffer; + CBuffer m_buffer; CArchMutex m_bufferMutex; bool m_sending; CThread* m_bufferThread; bool m_running; + CArchCond m_notifyCond; + CArchMutex m_notifyMutex; + bool m_bufferWaiting; }; diff --git a/src/lib/synergy/CDaemonApp.cpp b/src/lib/synergy/CDaemonApp.cpp index 8a07207f..4fe2b317 100644 --- a/src/lib/synergy/CDaemonApp.cpp +++ b/src/lib/synergy/CDaemonApp.cpp @@ -330,5 +330,9 @@ CDaemonApp::handleIpcMessage(const CEvent& e, void*) m_relauncher->command(command); break; } + + case kIpcHello: + m_ipcLogOutputter->notifyBuffer(); + break; } }