From af9a6beb789f87dcdec069bcb941547c1221d9e7 Mon Sep 17 00:00:00 2001 From: Nick Bolton Date: Fri, 6 Jul 2012 22:17:26 +0000 Subject: [PATCH] made ipc reader on gui side more robuts, but deadlocking issue on ipc server still exists (caused by sending log messages). --- src/gui/gui.pro | 7 +- src/gui/src/Ipc.h | 34 +++++++++ src/gui/src/IpcClient.cpp | 51 ++----------- src/gui/src/IpcClient.h | 20 +---- src/gui/src/IpcReader.cpp | 123 +++++++++++++++++++++++++++++++ src/gui/src/IpcReader.h | 48 ++++++++++++ src/gui/src/MainWindow.h | 1 + src/gui/src/TcpSocketReader.cpp | 29 ++++++++ src/lib/ipc/CIpcClientProxy.cpp | 8 +- src/lib/ipc/CIpcLogOutputter.cpp | 36 ++------- src/lib/ipc/CIpcLogOutputter.h | 5 +- src/lib/ipc/CIpcServerProxy.cpp | 2 +- 12 files changed, 265 insertions(+), 99 deletions(-) create mode 100644 src/gui/src/Ipc.h create mode 100644 src/gui/src/IpcReader.cpp create mode 100644 src/gui/src/IpcReader.h create mode 100644 src/gui/src/TcpSocketReader.cpp diff --git a/src/gui/gui.pro b/src/gui/gui.pro index 2840e5c4..67f8901d 100644 --- a/src/gui/gui.pro +++ b/src/gui/gui.pro @@ -36,7 +36,8 @@ SOURCES += src/main.cpp \ src/QSynergyApplication.cpp \ src/VersionChecker.cpp \ src/SetupWizard.cpp \ - src/IpcClient.cpp + src/IpcClient.cpp \ + src/IpcReader.cpp HEADERS += src/MainWindow.h \ src/AboutDialog.h \ src/ServerConfig.h \ @@ -59,7 +60,9 @@ HEADERS += src/MainWindow.h \ src/QSynergyApplication.h \ src/VersionChecker.h \ src/SetupWizard.h \ - src/IpcClient.h + src/IpcClient.h \ + src/IpcReader.h \ + src/Ipc.h RESOURCES += res/Synergy.qrc RC_FILE = res/win/Synergy.rc TRANSLATIONS = res/lang/nl_NL.ts diff --git a/src/gui/src/Ipc.h b/src/gui/src/Ipc.h new file mode 100644 index 00000000..7bc0099b --- /dev/null +++ b/src/gui/src/Ipc.h @@ -0,0 +1,34 @@ +/* + * synergy -- mouse and keyboard sharing utility + * Copyright (C) 2012 Nick Bolton + * + * This package is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * found in the file COPYING that should have accompanied this file. + * + * This package is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#pragma once + +#define IPC_HOST "127.0.0.1" +#define IPC_PORT 24801 + +enum qIpcMessageType { + kIpcHello, + kIpcLogLine, + kIpcCommand, + kIpcShutdown, +}; + +enum qIpcClientType { + kIpcClientUnknown, + kIpcClientGui, + kIpcClientNode, +}; diff --git a/src/gui/src/IpcClient.cpp b/src/gui/src/IpcClient.cpp index dc1e947b..f23ff9f5 100644 --- a/src/gui/src/IpcClient.cpp +++ b/src/gui/src/IpcClient.cpp @@ -20,13 +20,16 @@ #include #include #include +#include "IpcReader.h" +#include "Ipc.h" IpcClient::IpcClient() { m_Socket = new QTcpSocket(this); - connect(m_Socket, SIGNAL(readyRead()), this, SLOT(read())); + m_Reader = new IpcReader(m_Socket); connect(m_Socket, SIGNAL(connected()), this, SLOT(connected())); connect(m_Socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError))); + connect(m_Reader, SIGNAL(readLogLine(const QString&)), this, SLOT(handleReadLogLine(const QString&))); } IpcClient::~IpcClient() @@ -40,6 +43,8 @@ void IpcClient::connected() write(kIpcHello, 1, typeBuf); infoMessage("connection established"); + + m_Reader->start(); } void IpcClient::connectToHost() @@ -48,38 +53,6 @@ void IpcClient::connectToHost() m_Socket->connectToHost(QHostAddress(QHostAddress::LocalHost), IPC_PORT); } -void IpcClient::read() -{ - QDataStream stream(m_Socket); - - while (m_Socket->bytesAvailable() != 0) { - - char codeBuf[1]; - stream.readRawData(codeBuf, 1); - - switch (codeBuf[0]) { - case kIpcLogLine: { - char lenBuf[2]; - stream.readRawData(lenBuf, 2); - int len = bytesToInt(lenBuf, 2); - std::cout << "told len: " << len << std::endl; - - char* data = new char[len]; - stream.readRawData(data, len); - - QString line = QString::fromUtf8(data, len); - std::cout << "actual len: " << line.size() << std::endl; - readLogLine(line); - break; - } - - default: - std::cerr << "message type not supported: " << codeBuf[0] << std::endl; - break; - } - } -} - void IpcClient::error(QAbstractSocket::SocketError error) { QString text; @@ -121,17 +94,9 @@ void IpcClient::write(int code, int length, const char* data) } } -// TODO: qt must have a built in way of converting bytes to int. -int IpcClient::bytesToInt(const char *buffer, int size) +void IpcClient::handleReadLogLine(const QString& text) { - if (size == 2) { - return (((unsigned char)buffer[0]) << 8) - + (unsigned char)buffer[1]; - } - else { - // TODO: other sizes, if needed. - return 0; - } + readLogLine(text); } // TODO: qt must have a built in way of converting int to bytes. diff --git a/src/gui/src/IpcClient.h b/src/gui/src/IpcClient.h index f8f6bcc5..998cb9a7 100644 --- a/src/gui/src/IpcClient.h +++ b/src/gui/src/IpcClient.h @@ -20,9 +20,8 @@ #include #include -#define IPC_PORT 24801 - class QTcpSocket; +class IpcReader; class IpcClient : public QObject { @@ -38,13 +37,12 @@ public slots: void connectToHost(); private: - int bytesToInt(const char* buffer, int size); void intToBytes(int value, char* buffer, int size); private slots: void connected(); - void read(); void error(QAbstractSocket::SocketError error); + void handleReadLogLine(const QString& text); signals: void readLogLine(const QString& text); @@ -53,17 +51,5 @@ signals: private: QTcpSocket* m_Socket; -}; - -enum qIpcMessageType { - kIpcHello, - kIpcLogLine, - kIpcCommand, - kIpcShutdown, -}; - -enum qIpcClientType { - kIpcClientUnknown, - kIpcClientGui, - kIpcClientNode, + IpcReader* m_Reader; }; diff --git a/src/gui/src/IpcReader.cpp b/src/gui/src/IpcReader.cpp new file mode 100644 index 00000000..df0d3d0f --- /dev/null +++ b/src/gui/src/IpcReader.cpp @@ -0,0 +1,123 @@ +/* + * synergy -- mouse and keyboard sharing utility + * Copyright (C) 2012 Nick Bolton + * + * This package is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * found in the file COPYING that should have accompanied this file. + * + * This package is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "IpcReader.h" +#include +#include "Ipc.h" +#include +#include + +IpcReader::IpcReader(QTcpSocket* socket) : +m_Socket(socket) +{ + connect(socket, SIGNAL(readyRead()), this, SLOT(readyRead())); +} + +IpcReader::~IpcReader() +{ +} + +void IpcReader::readyRead() +{ + std::cout << "ready read" << std::endl; + m_Ready.wakeAll(); +} + +void IpcReader::run() +{ + m_Socket->waitForConnected(-1); + while (true) { + + char codeBuf[1]; + readStream(codeBuf, 1); + + switch (codeBuf[0]) { + case kIpcLogLine: { + char lenBuf[4]; + readStream(lenBuf, 4); + int len = bytesToInt(lenBuf, 4); + + char* data = new char[len]; + readStream(data, len); + + QString line = QString::fromUtf8(data, len); + readLogLine(line); + break; + } + + default: + std::cerr << "aborting, message invalid: " << codeBuf[0] << std::endl; + return; + } + } +} + +void IpcReader::readStream(char* buffer, int length) +{ + QMutex mutex; + mutex.lock(); + + QDataStream stream(m_Socket); + std::cout << "reading stream" << std::endl; + + int read = 0; + while (read < length) { + int ask = length - read; + int got = stream.readRawData(buffer, ask); + + if (got == 0) { + std::cout << "end of buffer, waiting" << std::endl; + m_Ready.wait(&mutex); + } + else if (got == -1) { + std::cout << "socket ended, aborting" << std::endl; + return; + } + else { + read += got; + buffer += got; + + std::cout << "> ask=" << ask << " got=" << got + << " read=" << read << std::endl; + + if (length - read > 0) { + std::cout << "more remains" << std::endl; + } + } + } +} + +// TODO: qt must have a built in way of converting bytes to int. +int IpcReader::bytesToInt(const char *buffer, int size) +{ + if (size == 2) { + return + (((unsigned char)buffer[0]) << 8) + + (unsigned char)buffer[1]; + } + else if (size == 4) { + return + (((unsigned char)buffer[0]) << 24) + + (((unsigned char)buffer[1]) << 16) + + (((unsigned char)buffer[2]) << 8) + + (unsigned char)buffer[3]; + } + else { + // TODO: other sizes, if needed. + return 0; + } +} diff --git a/src/gui/src/IpcReader.h b/src/gui/src/IpcReader.h new file mode 100644 index 00000000..95ae6998 --- /dev/null +++ b/src/gui/src/IpcReader.h @@ -0,0 +1,48 @@ +/* + * synergy -- mouse and keyboard sharing utility + * Copyright (C) 2012 Nick Bolton + * + * This package is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * found in the file COPYING that should have accompanied this file. + * + * This package is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#pragma once + +#include +#include + +class QTcpSocket; + +class IpcReader : public QThread +{ + Q_OBJECT; + +public: + IpcReader(QTcpSocket* socket); + virtual ~IpcReader(); + void run(); + void stop(); + +signals: + void readLogLine(const QString& text); + +private: + void readStream(char* buffer, int length); + int bytesToInt(const char* buffer, int size); + +private slots: + void readyRead(); + +private: + QTcpSocket* m_Socket; + QWaitCondition m_Ready; +}; diff --git a/src/gui/src/MainWindow.h b/src/gui/src/MainWindow.h index 1525d8b5..0f5d64f5 100644 --- a/src/gui/src/MainWindow.h +++ b/src/gui/src/MainWindow.h @@ -31,6 +31,7 @@ #include "AppConfig.h" #include "VersionChecker.h" #include "IpcClient.h" +#include "Ipc.h" class QAction; class QMenu; diff --git a/src/gui/src/TcpSocketReader.cpp b/src/gui/src/TcpSocketReader.cpp new file mode 100644 index 00000000..a5226a27 --- /dev/null +++ b/src/gui/src/TcpSocketReader.cpp @@ -0,0 +1,29 @@ +/* + * synergy -- mouse and keyboard sharing utility + * Copyright (C) 2012 Nick Bolton + * + * This package is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * found in the file COPYING that should have accompanied this file. + * + * This package is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "TcpSocketReader.h" +#include + +IpcReader::IpcReader(QTcpSocket& socket) : +m_Socket(socket) +{ + connect(m_Socket, SIGNAL(readyRead()), this, SLOT(read())); +} + +IpcReader::~IpcReader() +{ +} diff --git a/src/lib/ipc/CIpcClientProxy.cpp b/src/lib/ipc/CIpcClientProxy.cpp index a796e30c..618ae48c 100644 --- a/src/lib/ipc/CIpcClientProxy.cpp +++ b/src/lib/ipc/CIpcClientProxy.cpp @@ -72,7 +72,6 @@ CIpcClientProxy::~CIpcClientProxy() void CIpcClientProxy::handleDisconnect(const CEvent&, void*) { - CArchMutexLock lock(m_mutex); disconnect(); LOG((CLOG_DEBUG "ipc client disconnected")); } @@ -80,7 +79,6 @@ CIpcClientProxy::handleDisconnect(const CEvent&, void*) void CIpcClientProxy::handleWriteError(const CEvent&, void*) { - CArchMutexLock lock(m_mutex); disconnect(); LOG((CLOG_DEBUG "ipc client write error")); } @@ -88,7 +86,6 @@ CIpcClientProxy::handleWriteError(const CEvent&, void*) void CIpcClientProxy::handleData(const CEvent&, void*) { - CArchMutexLock lock(m_mutex); UInt8 code[1]; UInt32 n = m_stream.read(code, 1); while (n != 0) { @@ -125,7 +122,10 @@ CIpcClientProxy::handleData(const CEvent&, void*) void CIpcClientProxy::send(const CIpcMessage& message) { + // don't allow other threads to write until we've finished the entire + // message. stream write is locked, but only for that single write. CArchMutexLock lock(m_mutex); + LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type)); UInt8 code[1]; @@ -138,7 +138,7 @@ CIpcClientProxy::send(const CIpcMessage& message) const char* data = s->c_str(); int len = strlen(data); - CProtocolUtil::writef(&m_stream, "%2i", len); + CProtocolUtil::writef(&m_stream, "%4i", len); m_stream.write(data, len); break; diff --git a/src/lib/ipc/CIpcLogOutputter.cpp b/src/lib/ipc/CIpcLogOutputter.cpp index 8025ca2d..49703329 100644 --- a/src/lib/ipc/CIpcLogOutputter.cpp +++ b/src/lib/ipc/CIpcLogOutputter.cpp @@ -39,33 +39,11 @@ CIpcLogOutputter::~CIpcLogOutputter() void CIpcLogOutputter::sendBuffer(CIpcClientProxy& proxy) { - // drop messages logged while sending over ipc, since ipc can cause - // log messages (sending these could cause recursion or deadlocks). - // this has the side effect of dropping messages from other threads - // which weren't caused by ipc, but that is just the downside of - // logging this way. - if (m_sending) { - return; - } - - CArchMutexLock lock(m_mutex); - m_sending = true; - try { - while (m_buffer.size() != 0) { - CString text = m_buffer.front(); - m_buffer.pop(); - - CIpcMessage message; - message.m_type = kIpcLogLine; - message.m_data = new CString(text); - proxy.send(message); - } - m_sending = false; - } - catch (...) { - m_sending = false; - throw; - } + CIpcMessage message; + message.m_type = kIpcLogLine; + message.m_data = new CString(m_buffer); + proxy.send(message); + m_buffer.clear(); } void @@ -95,6 +73,7 @@ CIpcLogOutputter::write(ELevel level, const char* text) return false; } + // protect the value of m_sending. CArchMutexLock lock(m_mutex); m_sending = true; @@ -106,7 +85,8 @@ CIpcLogOutputter::write(ELevel level, const char* text) m_ipcServer.send(message, kIpcClientGui); } else { - m_buffer.push(text); + m_buffer.append(text); + m_buffer.append("\n"); } m_sending = false; return true; diff --git a/src/lib/ipc/CIpcLogOutputter.h b/src/lib/ipc/CIpcLogOutputter.h index b08d3f85..f2bc62e4 100644 --- a/src/lib/ipc/CIpcLogOutputter.h +++ b/src/lib/ipc/CIpcLogOutputter.h @@ -18,7 +18,6 @@ #pragma once #include "ILogOutputter.h" -#include class CIpcServer; class CEvent; @@ -43,10 +42,8 @@ public: void sendBuffer(CIpcClientProxy& proxy); private: - typedef std::queue CIpcLogQueue; - CIpcServer& m_ipcServer; - CIpcLogQueue m_buffer; + CString m_buffer; CArchMutex m_mutex; bool m_sending; }; diff --git a/src/lib/ipc/CIpcServerProxy.cpp b/src/lib/ipc/CIpcServerProxy.cpp index f073be69..cfcfa322 100644 --- a/src/lib/ipc/CIpcServerProxy.cpp +++ b/src/lib/ipc/CIpcServerProxy.cpp @@ -110,7 +110,7 @@ void* CIpcServerProxy::parseLogLine() { int len = 0; - CProtocolUtil::readf(&m_stream, "%2i", &len); + CProtocolUtil::readf(&m_stream, "%4i", &len); UInt8* buffer = new UInt8[len]; m_stream.read(buffer, len);