made ipc reader on gui side more robuts, but deadlocking issue on ipc server still exists (caused by sending log messages).

This commit is contained in:
Nick Bolton 2012-07-06 22:17:26 +00:00
parent 9fbbff11b6
commit af9a6beb78
12 changed files with 265 additions and 99 deletions

View File

@ -36,7 +36,8 @@ SOURCES += src/main.cpp \
src/QSynergyApplication.cpp \
src/VersionChecker.cpp \
src/SetupWizard.cpp \
src/IpcClient.cpp
src/IpcClient.cpp \
src/IpcReader.cpp
HEADERS += src/MainWindow.h \
src/AboutDialog.h \
src/ServerConfig.h \
@ -59,7 +60,9 @@ HEADERS += src/MainWindow.h \
src/QSynergyApplication.h \
src/VersionChecker.h \
src/SetupWizard.h \
src/IpcClient.h
src/IpcClient.h \
src/IpcReader.h \
src/Ipc.h
RESOURCES += res/Synergy.qrc
RC_FILE = res/win/Synergy.rc
TRANSLATIONS = res/lang/nl_NL.ts

34
src/gui/src/Ipc.h Normal file
View File

@ -0,0 +1,34 @@
/*
* synergy -- mouse and keyboard sharing utility
* Copyright (C) 2012 Nick Bolton
*
* This package is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* found in the file COPYING that should have accompanied this file.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#define IPC_HOST "127.0.0.1"
#define IPC_PORT 24801
enum qIpcMessageType {
kIpcHello,
kIpcLogLine,
kIpcCommand,
kIpcShutdown,
};
enum qIpcClientType {
kIpcClientUnknown,
kIpcClientGui,
kIpcClientNode,
};

View File

@ -20,13 +20,16 @@
#include <QHostAddress>
#include <iostream>
#include <QTimer>
#include "IpcReader.h"
#include "Ipc.h"
IpcClient::IpcClient()
{
m_Socket = new QTcpSocket(this);
connect(m_Socket, SIGNAL(readyRead()), this, SLOT(read()));
m_Reader = new IpcReader(m_Socket);
connect(m_Socket, SIGNAL(connected()), this, SLOT(connected()));
connect(m_Socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError)));
connect(m_Reader, SIGNAL(readLogLine(const QString&)), this, SLOT(handleReadLogLine(const QString&)));
}
IpcClient::~IpcClient()
@ -40,6 +43,8 @@ void IpcClient::connected()
write(kIpcHello, 1, typeBuf);
infoMessage("connection established");
m_Reader->start();
}
void IpcClient::connectToHost()
@ -48,38 +53,6 @@ void IpcClient::connectToHost()
m_Socket->connectToHost(QHostAddress(QHostAddress::LocalHost), IPC_PORT);
}
void IpcClient::read()
{
QDataStream stream(m_Socket);
while (m_Socket->bytesAvailable() != 0) {
char codeBuf[1];
stream.readRawData(codeBuf, 1);
switch (codeBuf[0]) {
case kIpcLogLine: {
char lenBuf[2];
stream.readRawData(lenBuf, 2);
int len = bytesToInt(lenBuf, 2);
std::cout << "told len: " << len << std::endl;
char* data = new char[len];
stream.readRawData(data, len);
QString line = QString::fromUtf8(data, len);
std::cout << "actual len: " << line.size() << std::endl;
readLogLine(line);
break;
}
default:
std::cerr << "message type not supported: " << codeBuf[0] << std::endl;
break;
}
}
}
void IpcClient::error(QAbstractSocket::SocketError error)
{
QString text;
@ -121,17 +94,9 @@ void IpcClient::write(int code, int length, const char* data)
}
}
// TODO: qt must have a built in way of converting bytes to int.
int IpcClient::bytesToInt(const char *buffer, int size)
void IpcClient::handleReadLogLine(const QString& text)
{
if (size == 2) {
return (((unsigned char)buffer[0]) << 8)
+ (unsigned char)buffer[1];
}
else {
// TODO: other sizes, if needed.
return 0;
}
readLogLine(text);
}
// TODO: qt must have a built in way of converting int to bytes.

View File

@ -20,9 +20,8 @@
#include <QObject>
#include <QAbstractSocket>
#define IPC_PORT 24801
class QTcpSocket;
class IpcReader;
class IpcClient : public QObject
{
@ -38,13 +37,12 @@ public slots:
void connectToHost();
private:
int bytesToInt(const char* buffer, int size);
void intToBytes(int value, char* buffer, int size);
private slots:
void connected();
void read();
void error(QAbstractSocket::SocketError error);
void handleReadLogLine(const QString& text);
signals:
void readLogLine(const QString& text);
@ -53,17 +51,5 @@ signals:
private:
QTcpSocket* m_Socket;
};
enum qIpcMessageType {
kIpcHello,
kIpcLogLine,
kIpcCommand,
kIpcShutdown,
};
enum qIpcClientType {
kIpcClientUnknown,
kIpcClientGui,
kIpcClientNode,
IpcReader* m_Reader;
};

123
src/gui/src/IpcReader.cpp Normal file
View File

@ -0,0 +1,123 @@
/*
* synergy -- mouse and keyboard sharing utility
* Copyright (C) 2012 Nick Bolton
*
* This package is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* found in the file COPYING that should have accompanied this file.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "IpcReader.h"
#include <QTcpSocket>
#include "Ipc.h"
#include <iostream>
#include <QMutex>
IpcReader::IpcReader(QTcpSocket* socket) :
m_Socket(socket)
{
connect(socket, SIGNAL(readyRead()), this, SLOT(readyRead()));
}
IpcReader::~IpcReader()
{
}
void IpcReader::readyRead()
{
std::cout << "ready read" << std::endl;
m_Ready.wakeAll();
}
void IpcReader::run()
{
m_Socket->waitForConnected(-1);
while (true) {
char codeBuf[1];
readStream(codeBuf, 1);
switch (codeBuf[0]) {
case kIpcLogLine: {
char lenBuf[4];
readStream(lenBuf, 4);
int len = bytesToInt(lenBuf, 4);
char* data = new char[len];
readStream(data, len);
QString line = QString::fromUtf8(data, len);
readLogLine(line);
break;
}
default:
std::cerr << "aborting, message invalid: " << codeBuf[0] << std::endl;
return;
}
}
}
void IpcReader::readStream(char* buffer, int length)
{
QMutex mutex;
mutex.lock();
QDataStream stream(m_Socket);
std::cout << "reading stream" << std::endl;
int read = 0;
while (read < length) {
int ask = length - read;
int got = stream.readRawData(buffer, ask);
if (got == 0) {
std::cout << "end of buffer, waiting" << std::endl;
m_Ready.wait(&mutex);
}
else if (got == -1) {
std::cout << "socket ended, aborting" << std::endl;
return;
}
else {
read += got;
buffer += got;
std::cout << "> ask=" << ask << " got=" << got
<< " read=" << read << std::endl;
if (length - read > 0) {
std::cout << "more remains" << std::endl;
}
}
}
}
// TODO: qt must have a built in way of converting bytes to int.
int IpcReader::bytesToInt(const char *buffer, int size)
{
if (size == 2) {
return
(((unsigned char)buffer[0]) << 8) +
(unsigned char)buffer[1];
}
else if (size == 4) {
return
(((unsigned char)buffer[0]) << 24) +
(((unsigned char)buffer[1]) << 16) +
(((unsigned char)buffer[2]) << 8) +
(unsigned char)buffer[3];
}
else {
// TODO: other sizes, if needed.
return 0;
}
}

48
src/gui/src/IpcReader.h Normal file
View File

@ -0,0 +1,48 @@
/*
* synergy -- mouse and keyboard sharing utility
* Copyright (C) 2012 Nick Bolton
*
* This package is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* found in the file COPYING that should have accompanied this file.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <QThread>
#include <QWaitCondition>
class QTcpSocket;
class IpcReader : public QThread
{
Q_OBJECT;
public:
IpcReader(QTcpSocket* socket);
virtual ~IpcReader();
void run();
void stop();
signals:
void readLogLine(const QString& text);
private:
void readStream(char* buffer, int length);
int bytesToInt(const char* buffer, int size);
private slots:
void readyRead();
private:
QTcpSocket* m_Socket;
QWaitCondition m_Ready;
};

View File

@ -31,6 +31,7 @@
#include "AppConfig.h"
#include "VersionChecker.h"
#include "IpcClient.h"
#include "Ipc.h"
class QAction;
class QMenu;

View File

@ -0,0 +1,29 @@
/*
* synergy -- mouse and keyboard sharing utility
* Copyright (C) 2012 Nick Bolton
*
* This package is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* found in the file COPYING that should have accompanied this file.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "TcpSocketReader.h"
#include <QTcpSocket>
IpcReader::IpcReader(QTcpSocket& socket) :
m_Socket(socket)
{
connect(m_Socket, SIGNAL(readyRead()), this, SLOT(read()));
}
IpcReader::~IpcReader()
{
}

View File

@ -72,7 +72,6 @@ CIpcClientProxy::~CIpcClientProxy()
void
CIpcClientProxy::handleDisconnect(const CEvent&, void*)
{
CArchMutexLock lock(m_mutex);
disconnect();
LOG((CLOG_DEBUG "ipc client disconnected"));
}
@ -80,7 +79,6 @@ CIpcClientProxy::handleDisconnect(const CEvent&, void*)
void
CIpcClientProxy::handleWriteError(const CEvent&, void*)
{
CArchMutexLock lock(m_mutex);
disconnect();
LOG((CLOG_DEBUG "ipc client write error"));
}
@ -88,7 +86,6 @@ CIpcClientProxy::handleWriteError(const CEvent&, void*)
void
CIpcClientProxy::handleData(const CEvent&, void*)
{
CArchMutexLock lock(m_mutex);
UInt8 code[1];
UInt32 n = m_stream.read(code, 1);
while (n != 0) {
@ -125,7 +122,10 @@ CIpcClientProxy::handleData(const CEvent&, void*)
void
CIpcClientProxy::send(const CIpcMessage& message)
{
// don't allow other threads to write until we've finished the entire
// message. stream write is locked, but only for that single write.
CArchMutexLock lock(m_mutex);
LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type));
UInt8 code[1];
@ -138,7 +138,7 @@ CIpcClientProxy::send(const CIpcMessage& message)
const char* data = s->c_str();
int len = strlen(data);
CProtocolUtil::writef(&m_stream, "%2i", len);
CProtocolUtil::writef(&m_stream, "%4i", len);
m_stream.write(data, len);
break;

View File

@ -39,33 +39,11 @@ CIpcLogOutputter::~CIpcLogOutputter()
void
CIpcLogOutputter::sendBuffer(CIpcClientProxy& proxy)
{
// drop messages logged while sending over ipc, since ipc can cause
// log messages (sending these could cause recursion or deadlocks).
// this has the side effect of dropping messages from other threads
// which weren't caused by ipc, but that is just the downside of
// logging this way.
if (m_sending) {
return;
}
CArchMutexLock lock(m_mutex);
m_sending = true;
try {
while (m_buffer.size() != 0) {
CString text = m_buffer.front();
m_buffer.pop();
CIpcMessage message;
message.m_type = kIpcLogLine;
message.m_data = new CString(text);
proxy.send(message);
}
m_sending = false;
}
catch (...) {
m_sending = false;
throw;
}
CIpcMessage message;
message.m_type = kIpcLogLine;
message.m_data = new CString(m_buffer);
proxy.send(message);
m_buffer.clear();
}
void
@ -95,6 +73,7 @@ CIpcLogOutputter::write(ELevel level, const char* text)
return false;
}
// protect the value of m_sending.
CArchMutexLock lock(m_mutex);
m_sending = true;
@ -106,7 +85,8 @@ CIpcLogOutputter::write(ELevel level, const char* text)
m_ipcServer.send(message, kIpcClientGui);
}
else {
m_buffer.push(text);
m_buffer.append(text);
m_buffer.append("\n");
}
m_sending = false;
return true;

View File

@ -18,7 +18,6 @@
#pragma once
#include "ILogOutputter.h"
#include <queue>
class CIpcServer;
class CEvent;
@ -43,10 +42,8 @@ public:
void sendBuffer(CIpcClientProxy& proxy);
private:
typedef std::queue<CString> CIpcLogQueue;
CIpcServer& m_ipcServer;
CIpcLogQueue m_buffer;
CString m_buffer;
CArchMutex m_mutex;
bool m_sending;
};

View File

@ -110,7 +110,7 @@ void*
CIpcServerProxy::parseLogLine()
{
int len = 0;
CProtocolUtil::readf(&m_stream, "%2i", &len);
CProtocolUtil::readf(&m_stream, "%4i", &len);
UInt8* buffer = new UInt8[len];
m_stream.read(buffer, len);