* fixed a shit-ton of memory leaks (lots in the ipc log handler, oops)

* added non-pod data support to events (event delete can now call dtors)
* improved cleanup of ipc message objects (because of non-pod event data support)
* moved the "message received" event up to ipc server and client (passed on from proxies)
This commit is contained in:
Nick Bolton 2012-07-10 01:51:51 +00:00
parent 21cf3f2478
commit 8bad45e8a2
37 changed files with 697 additions and 396 deletions

View File

@ -41,6 +41,9 @@ void IpcReader::read()
QMutexLocker locker(&m_Mutex); QMutexLocker locker(&m_Mutex);
std::cout << "ready read" << std::endl; std::cout << "ready read" << std::endl;
while (m_Socket->bytesAvailable()) {
std::cout << "bytes available" << std::endl;
char codeBuf[1]; char codeBuf[1];
readStream(codeBuf, 1); readStream(codeBuf, 1);
int code = bytesToInt(codeBuf, 1); int code = bytesToInt(codeBuf, 1);
@ -66,6 +69,7 @@ void IpcReader::read()
std::cerr << "aborting, message invalid: " << code << std::endl; std::cerr << "aborting, message invalid: " << code << std::endl;
return; return;
} }
}
std::cout << "read done" << std::endl; std::cout << "read done" << std::endl;
} }
@ -82,13 +86,7 @@ bool IpcReader::readStream(char* buffer, int length)
m_Socket->waitForReadyRead(-1); m_Socket->waitForReadyRead(-1);
} }
// i really don't trust qt not to copy beyond the array length. int got = m_Socket->read(buffer, ask);
// seems like a convoluted an expensive way to copy from the stream :/
char* tempBuffer = new char[ask];
int got = m_Socket->read(tempBuffer, ask);
memcpy(buffer, tempBuffer, got);
delete tempBuffer;
read += got; read += got;
std::cout << "> ask=" << ask << " got=" << got std::cout << "> ask=" << ask << " got=" << got

View File

@ -25,16 +25,20 @@ CArch* CArch::s_instance = NULL;
CArch::CArch() CArch::CArch()
{ {
assert(s_instance == NULL);
s_instance = this;
} }
CArch::~CArch() CArch::~CArch()
{ {
#if SYSAPI_WIN32
CArchMiscWindows::cleanup();
#endif
} }
void void
CArch::init() CArch::init()
{ {
// initialization that requires ARCH is done here.
ARCH_NETWORK::init(); ARCH_NETWORK::init();
#if SYSAPI_WIN32 #if SYSAPI_WIN32
ARCH_TASKBAR::init(); ARCH_TASKBAR::init();
@ -45,10 +49,6 @@ CArch::init()
CArch* CArch*
CArch::getInstance() CArch::getInstance()
{ {
if (s_instance == NULL) { assert(s_instance != NULL);
s_instance = new CArch();
s_instance->init();
}
return s_instance; return s_instance;
} }

View File

@ -96,7 +96,15 @@ class CArch : public ARCH_CONSOLE,
public ARCH_TASKBAR, public ARCH_TASKBAR,
public ARCH_TIME { public ARCH_TIME {
public: public:
~CArch(); CArch();
virtual ~CArch();
//! Call init on other arch classes.
/*!
Some arch classes depend on others to exist first. When init is called
these clases will have ARCH available for use.
*/
virtual void init();
// //
// accessors // accessors
@ -111,10 +119,6 @@ public:
ARCH_PLUGIN& plugin() const { return (ARCH_PLUGIN&)m_plugin; } ARCH_PLUGIN& plugin() const { return (ARCH_PLUGIN&)m_plugin; }
private:
CArch();
void init();
private: private:
static CArch* s_instance; static CArch* s_instance;
ARCH_PLUGIN m_plugin; ARCH_PLUGIN m_plugin;

View File

@ -50,6 +50,12 @@ HICON CArchMiscWindows::s_largeIcon = NULL;
HICON CArchMiscWindows::s_smallIcon = NULL; HICON CArchMiscWindows::s_smallIcon = NULL;
HINSTANCE CArchMiscWindows::s_instanceWin32 = NULL; HINSTANCE CArchMiscWindows::s_instanceWin32 = NULL;
void
CArchMiscWindows::cleanup()
{
delete s_dialogs;
}
void void
CArchMiscWindows::init() CArchMiscWindows::init()
{ {

View File

@ -48,6 +48,9 @@ public:
//! Initialize //! Initialize
static void init(); static void init();
//! Delete memory
static void cleanup();
//! Test if windows 95, et al. //! Test if windows 95, et al.
/*! /*!
Returns true iff the platform is win95/98/me. Returns true iff the platform is win95/98/me.

View File

@ -109,6 +109,11 @@ CArchNetworkWinsock::~CArchNetworkWinsock()
s_networkModule = NULL; s_networkModule = NULL;
} }
ARCH->closeMutex(m_mutex); ARCH->closeMutex(m_mutex);
CEventList::iterator it;
for (it = m_unblockEvents.begin(); it != m_unblockEvents.end(); it++) {
delete *it;
}
} }
void void
@ -429,6 +434,7 @@ CArchNetworkWinsock::pollSocket(CPollEntry pe[], int num, double timeout)
ARCH->closeThread(thread); ARCH->closeThread(thread);
if (unblockEvent == NULL) { if (unblockEvent == NULL) {
unblockEvent = new WSAEVENT; unblockEvent = new WSAEVENT;
m_unblockEvents.push_back(unblockEvent);
*unblockEvent = WSACreateEvent_winsock(); *unblockEvent = WSACreateEvent_winsock();
mt->setNetworkDataForCurrentThread(unblockEvent); mt->setNetworkDataForCurrentThread(unblockEvent);
} }

View File

@ -28,6 +28,7 @@
#include "IArchMultithread.h" #include "IArchMultithread.h"
#include <windows.h> #include <windows.h>
#include <winsock2.h> #include <winsock2.h>
#include <list>
#define ARCH_NETWORK CArchNetworkWinsock #define ARCH_NETWORK CArchNetworkWinsock
@ -98,7 +99,10 @@ private:
void throwNameError(int); void throwNameError(int);
private: private:
typedef std::list<WSAEVENT> CEventList;
CArchMutex m_mutex; CArchMutex m_mutex;
CEventList m_unblockEvents;
}; };
#endif #endif

View File

@ -26,7 +26,8 @@ CEvent::CEvent() :
m_type(kUnknown), m_type(kUnknown),
m_target(NULL), m_target(NULL),
m_data(NULL), m_data(NULL),
m_flags(0) m_flags(0),
m_dataObject(nullptr)
{ {
// do nothing // do nothing
} }
@ -35,7 +36,8 @@ CEvent::CEvent(Type type, void* target, void* data, Flags flags) :
m_type(type), m_type(type),
m_target(target), m_target(target),
m_data(data), m_data(data),
m_flags(flags) m_flags(flags),
m_dataObject(nullptr)
{ {
// do nothing // do nothing
} }
@ -58,6 +60,12 @@ CEvent::getData() const
return m_data; return m_data;
} }
CEventData*
CEvent::getDataObject() const
{
return m_dataObject;
}
CEvent::Flags CEvent::Flags
CEvent::getFlags() const CEvent::getFlags() const
{ {
@ -77,7 +85,15 @@ CEvent::deleteData(const CEvent& event)
default: default:
if ((event.getFlags() & kDontFreeData) == 0) { if ((event.getFlags() & kDontFreeData) == 0) {
free(event.getData()); free(event.getData());
delete event.getDataObject();
} }
break; break;
} }
} }
void
CEvent::setDataObject(CEventData* dataObject)
{
assert(m_dataObject == nullptr);
m_dataObject = dataObject;
}

View File

@ -21,6 +21,12 @@
#include "BasicTypes.h" #include "BasicTypes.h"
#include "stdmap.h" #include "stdmap.h"
class CEventData {
public:
CEventData() { }
virtual ~CEventData() { }
};
//! Event //! Event
/*! /*!
A \c CEvent holds an event type and a pointer to event data. A \c CEvent holds an event type and a pointer to event data.
@ -45,13 +51,15 @@ public:
CEvent(); CEvent();
//! Create \c CEvent with data //! Create \c CEvent with data (POD)
/*! /*!
The \p type must have been registered using \c registerType(). The \p type must have been registered using \c registerType().
The \p data must be POD (plain old data) allocated by malloc(), The \p data must be POD (plain old data) allocated by malloc(),
which means it cannot have a constructor, destructor or be which means it cannot have a constructor, destructor or be
composed of any types that do. \p target is the intended composed of any types that do. For non-POD (normal C++ objects
recipient of the event. \p flags is any combination of \c Flags. use \c setDataObject().
\p target is the intended recipient of the event.
\p flags is any combination of \c Flags.
*/ */
CEvent(Type type, void* target = NULL, void* data = NULL, CEvent(Type type, void* target = NULL, void* data = NULL,
Flags flags = kNone); Flags flags = kNone);
@ -65,6 +73,13 @@ public:
*/ */
static void deleteData(const CEvent&); static void deleteData(const CEvent&);
//! Set data (non-POD)
/*!
Set non-POD (non plain old data), where delete is called when the event
is deleted, and the destructor is called.
*/
void setDataObject(CEventData* dataObject);
//@} //@}
//! @name accessors //! @name accessors
//@{ //@{
@ -81,12 +96,20 @@ public:
*/ */
void* getTarget() const; void* getTarget() const;
//! Get the event data //! Get the event data (POD).
/*! /*!
Returns the event data. Returns the event data (POD).
*/ */
void* getData() const; void* getData() const;
//! Get the event data (non-POD)
/*!
Returns the event data (non-POD). The difference between this and
\c getData() is that when delete is called on this data, so non-POD
(non plain old data) dtor is called.
*/
CEventData* getDataObject() const;
//! Get event flags //! Get event flags
/*! /*!
Returns the event flags. Returns the event flags.
@ -100,6 +123,7 @@ private:
void* m_target; void* m_target;
void* m_data; void* m_data;
Flags m_flags; Flags m_flags;
CEventData* m_dataObject;
}; };
#endif #endif

View File

@ -71,6 +71,8 @@ CLog* CLog::s_log = NULL;
CLog::CLog() CLog::CLog()
{ {
assert(s_log == NULL);
// create mutex for multithread safe operation // create mutex for multithread safe operation
m_mutex = ARCH->newMutex(); m_mutex = ARCH->newMutex();
@ -78,6 +80,8 @@ CLog::CLog()
m_maxPriority = g_defaultMaxPriority; m_maxPriority = g_defaultMaxPriority;
m_maxNewlineLength = 0; m_maxNewlineLength = 0;
insert(new CConsoleLogOutputter); insert(new CConsoleLogOutputter);
s_log = this;
} }
CLog::~CLog() CLog::~CLog()
@ -97,9 +101,7 @@ CLog::~CLog()
CLog* CLog*
CLog::getInstance() CLog::getInstance()
{ {
if (s_log == NULL) assert(s_log != NULL);
s_log = new CLog();
return s_log; return s_log;
} }

View File

@ -38,6 +38,7 @@ LOGC() provide convenient access.
*/ */
class CLog { class CLog {
public: public:
CLog();
~CLog(); ~CLog();
//! @name manipulators //! @name manipulators
@ -86,6 +87,8 @@ public:
then it simply returns true. then it simply returns true.
*/ */
bool setFilter(const char* name); bool setFilter(const char* name);
//! Set the minimum priority filter (by ordinal).
void setFilter(int); void setFilter(int);
//@} //@}
@ -119,7 +122,6 @@ public:
//@} //@}
private: private:
CLog();
void output(ELevel priority, char* msg); void output(ELevel priority, char* msg);
private: private:

View File

@ -19,8 +19,6 @@
#include "CStopwatch.h" #include "CStopwatch.h"
#include "CArch.h" #include "CArch.h"
class CEventQueueTimer { };
// //
// CSimpleEventQueueBuffer // CSimpleEventQueueBuffer
// //

View File

@ -49,4 +49,11 @@ private:
CEventDeque m_queue; CEventDeque m_queue;
}; };
class CEventQueueTimer
{
public:
CEventQueueTimer() { }
virtual ~CEventQueueTimer() { }
};
#endif #endif

View File

@ -22,6 +22,7 @@
#include "CIpcMessage.h" #include "CIpcMessage.h"
CEvent::Type CIpcClient::s_connectedEvent = CEvent::kUnknown; CEvent::Type CIpcClient::s_connectedEvent = CEvent::kUnknown;
CEvent::Type CIpcClient::s_messageReceivedEvent = CEvent::kUnknown;
CIpcClient::CIpcClient() : CIpcClient::CIpcClient() :
m_serverAddress(CNetworkAddress(IPC_HOST, IPC_PORT)), m_serverAddress(CNetworkAddress(IPC_HOST, IPC_PORT)),
@ -37,6 +38,8 @@ m_server(nullptr)
CIpcClient::~CIpcClient() CIpcClient::~CIpcClient()
{ {
EVENTQUEUE->removeHandler(m_socket.getConnectedEvent(), m_socket.getEventTarget());
EVENTQUEUE->removeHandler(CIpcServerProxy::getMessageReceivedEvent(), m_server);
delete m_server; delete m_server;
} }
@ -45,12 +48,25 @@ CIpcClient::connect()
{ {
m_socket.connect(m_serverAddress); m_socket.connect(m_serverAddress);
m_server = new CIpcServerProxy(m_socket); m_server = new CIpcServerProxy(m_socket);
EVENTQUEUE->adoptHandler(
CIpcServerProxy::getMessageReceivedEvent(), m_server,
new TMethodEventJob<CIpcClient>(
this, &CIpcClient::handleMessageReceived));
}
void
CIpcClient::disconnect()
{
m_server->disconnect();
delete m_server;
m_server = nullptr;
} }
void void
CIpcClient::send(const CIpcMessage& message) CIpcClient::send(const CIpcMessage& message)
{ {
assert(m_server != NULL); assert(m_server != nullptr);
m_server->send(message); m_server->send(message);
} }
@ -61,13 +77,27 @@ CIpcClient::getConnectedEvent()
s_connectedEvent, "CIpcClient::connected"); s_connectedEvent, "CIpcClient::connected");
} }
CEvent::Type
CIpcClient::getMessageReceivedEvent()
{
return EVENTQUEUE->registerTypeOnce(
s_messageReceivedEvent, "CIpcClient::messageReceived");
}
void void
CIpcClient::handleConnected(const CEvent&, void*) CIpcClient::handleConnected(const CEvent&, void*)
{ {
EVENTQUEUE->addEvent(CEvent(getConnectedEvent(), this, m_server, CEvent::kDontFreeData)); EVENTQUEUE->addEvent(CEvent(
getConnectedEvent(), this, m_server, CEvent::kDontFreeData));
CIpcMessage message; CIpcHelloMessage message(kIpcClientNode);
message.m_type = kIpcHello;
message.m_data = new UInt8(kIpcClientNode);
send(message); send(message);
} }
void
CIpcClient::handleMessageReceived(const CEvent& e, void*)
{
CEvent event(getMessageReceivedEvent(), this);
event.setDataObject(e.getDataObject());
EVENTQUEUE->addEvent(event);
}

View File

@ -38,6 +38,9 @@ public:
//! Connects to the IPC server at localhost. //! Connects to the IPC server at localhost.
void connect(); void connect();
//! Disconnects from the IPC server.
void disconnect();
//! Sends a message to the server. //! Sends a message to the server.
void send(const CIpcMessage& message); void send(const CIpcMessage& message);
@ -47,11 +50,13 @@ public:
//! Raised when the socket is connected. //! Raised when the socket is connected.
static CEvent::Type getConnectedEvent(); static CEvent::Type getConnectedEvent();
static CEvent::Type getMessageReceivedEvent();
//@} //@}
private: private:
void handleConnected(const CEvent&, void*); void handleConnected(const CEvent&, void*);
void handleMessageReceived(const CEvent&, void*);
private: private:
CNetworkAddress m_serverAddress; CNetworkAddress m_serverAddress;
@ -59,4 +64,5 @@ private:
CIpcServerProxy* m_server; CIpcServerProxy* m_server;
static CEvent::Type s_connectedEvent; static CEvent::Type s_connectedEvent;
static CEvent::Type s_messageReceivedEvent;
}; };

View File

@ -99,36 +99,36 @@ CIpcClientProxy::handleData(const CEvent&, void*)
// don't allow the dtor to destroy the stream while we're using it. // don't allow the dtor to destroy the stream while we're using it.
CArchMutexLock lock(m_readMutex); CArchMutexLock lock(m_readMutex);
UInt8 code[1]; UInt8 codeBuf[1];
UInt32 n = m_stream.read(code, 1); UInt32 n = m_stream.read(codeBuf, 1);
int code = codeBuf[0];
while (n != 0) { while (n != 0) {
UInt8 type = code[0];
CIpcMessage* m = new CIpcMessage(); LOG((CLOG_DEBUG "ipc client proxy read: %d", code));
m->m_type = type;
m->m_source = this;
LOG((CLOG_DEBUG "ipc client proxy read: %d", code[0])); CIpcMessage* m = nullptr;
switch (code) {
switch (type) {
case kIpcHello: case kIpcHello:
parseHello(); m = parseHello();
break; break;
case kIpcCommand: case kIpcCommand:
m->m_data = parseCommand(); m = parseCommand();
break; break;
default: default:
delete m;
disconnect(); disconnect();
return; return;
} }
// event deletes data. // don't delete with this event; the data is passed to a new event.
EVENTQUEUE->addEvent(CEvent(getMessageReceivedEvent(), this, m)); CEvent e(getMessageReceivedEvent(), this, NULL, CEvent::kDontFreeData);
e.setDataObject(m);
EVENTQUEUE->addEvent(e);
n = m_stream.read(code, 1); n = m_stream.read(codeBuf, 1);
code = codeBuf[0];
} }
LOG((CLOG_DEBUG "finished ipc client proxy handle data")); LOG((CLOG_DEBUG "finished ipc client proxy handle data"));
@ -142,20 +142,19 @@ CIpcClientProxy::send(const CIpcMessage& message)
// also, don't allow the dtor to destroy the stream while we're using it. // also, don't allow the dtor to destroy the stream while we're using it.
CArchMutexLock lock(m_writeMutex); CArchMutexLock lock(m_writeMutex);
LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type)); LOG((CLOG_DEBUG "ipc client proxy write: %d", message.type()));
UInt8 code[1]; CProtocolUtil::writef(&m_stream, "%1i", message.type());
code[0] = message.m_type;
m_stream.write(code, 1);
switch (message.m_type) { switch (message.type()) {
case kIpcLogLine: { case kIpcLogLine: {
CString* s = (CString*)message.m_data; const CIpcLogLineMessage& llm = static_cast<const CIpcLogLineMessage&>(message);
const char* data = s->c_str();
CString logLine = llm.logLine();
const char* data = logLine.c_str();
int len = strlen(data); int len = strlen(data);
CProtocolUtil::writef(&m_stream, "%4i", len);
CProtocolUtil::writef(&m_stream, "%4i", len);
m_stream.write(data, len); m_stream.write(data, len);
break; break;
} }
@ -165,30 +164,35 @@ CIpcClientProxy::send(const CIpcMessage& message)
break; break;
default: default:
LOG((CLOG_ERR "message not supported: %d", message.m_type)); LOG((CLOG_ERR "message not supported: %d", message.type()));
break; break;
} }
} }
void CIpcHelloMessage*
CIpcClientProxy::parseHello() CIpcClientProxy::parseHello()
{ {
UInt8 buffer[1]; UInt8 buffer[1];
m_stream.read(buffer, 1); m_stream.read(buffer, 1);
m_clientType = static_cast<EIpcClientType>(buffer[0]); m_clientType = static_cast<EIpcClientType>(buffer[0]);
// must be deleted by event handler.
return new CIpcHelloMessage(m_clientType);
} }
void* CIpcCommandMessage*
CIpcClientProxy::parseCommand() CIpcClientProxy::parseCommand()
{ {
int len = 0; int len = 0;
CProtocolUtil::readf(&m_stream, "%2i", &len); CProtocolUtil::readf(&m_stream, "%2i", &len);
UInt8* buffer = new UInt8[len]; char* buffer = new char[len];
m_stream.read(buffer, len); m_stream.read(buffer, len);
CString s(buffer, len);
delete buffer;
// delete by event cleanup. // must be deleted by event handler.
return new CString((const char*)buffer, len); return new CIpcCommandMessage(s);
} }
void void

View File

@ -23,12 +23,17 @@
namespace synergy { class IStream; } namespace synergy { class IStream; }
class CIpcMessage; class CIpcMessage;
class CIpcCommandMessage;
class CIpcHelloMessage;
class CIpcClientProxy { class CIpcClientProxy {
friend class CIpcServer;
public: public:
CIpcClientProxy(synergy::IStream& stream); CIpcClientProxy(synergy::IStream& stream);
virtual ~CIpcClientProxy(); virtual ~CIpcClientProxy();
private:
//! Send a message to the client. //! Send a message to the client.
void send(const CIpcMessage& message); void send(const CIpcMessage& message);
@ -38,20 +43,17 @@ public:
//! Raised when the client disconnects from the server. //! Raised when the client disconnects from the server.
static CEvent::Type getDisconnectedEvent(); static CEvent::Type getDisconnectedEvent();
private:
void handleData(const CEvent&, void*); void handleData(const CEvent&, void*);
void handleDisconnect(const CEvent&, void*); void handleDisconnect(const CEvent&, void*);
void handleWriteError(const CEvent&, void*); void handleWriteError(const CEvent&, void*);
void parseHello(); CIpcHelloMessage* parseHello();
void* parseCommand(); CIpcCommandMessage* parseCommand();
void disconnect(); void disconnect();
public: private:
synergy::IStream& m_stream; synergy::IStream& m_stream;
EIpcClientType m_clientType; EIpcClientType m_clientType;
bool m_disconnecting; bool m_disconnecting;
private:
CArchMutex m_readMutex; CArchMutex m_readMutex;
CArchMutex m_writeMutex; CArchMutex m_writeMutex;

View File

@ -109,12 +109,21 @@ CIpcLogOutputter::bufferThread(void*)
{ {
try { try {
while (m_running) { while (m_running) {
if (m_ipcServer.hasClients(kIpcClientGui)) { if (m_ipcServer.hasClients(kIpcClientGui)) {
while (!m_buffer.empty()) {
// buffer is sent in chunks, so keep sending until it's
// empty (or the program has stopped in the meantime).
while (m_running && !m_buffer.empty()) {
sendBuffer(); sendBuffer();
} }
} }
// program may be stopping while we were in the send loop.
if (!m_running) {
break;
}
m_bufferWaiting = true; m_bufferWaiting = true;
ARCH->waitCondVar(m_notifyCond, m_notifyMutex, -1); ARCH->waitCondVar(m_notifyCond, m_notifyMutex, -1);
m_bufferWaiting = false; m_bufferWaiting = false;
@ -138,7 +147,7 @@ CIpcLogOutputter::notifyBuffer()
} }
CString CString
CIpcLogOutputter::emptyBuffer(size_t count) CIpcLogOutputter::getChunk(size_t count)
{ {
CArchMutexLock lock(m_bufferMutex); CArchMutexLock lock(m_bufferMutex);
@ -155,13 +164,10 @@ CIpcLogOutputter::emptyBuffer(size_t count)
return chunk; return chunk;
} }
void void
CIpcLogOutputter::sendBuffer() CIpcLogOutputter::sendBuffer()
{ {
CIpcMessage message; CIpcLogLineMessage message(getChunk(MAX_SEND));
message.m_type = kIpcLogLine;
message.m_data = new CString(emptyBuffer(MAX_SEND));
m_sending = true; m_sending = true;
m_ipcServer.send(message, kIpcClientGui); m_ipcServer.send(message, kIpcClientGui);

View File

@ -48,7 +48,7 @@ public:
private: private:
void bufferThread(void*); void bufferThread(void*);
CString emptyBuffer(size_t count); CString getChunk(size_t count);
void sendBuffer(); void sendBuffer();
void appendBuffer(const CString& text); void appendBuffer(const CString& text);

View File

@ -16,17 +16,52 @@
*/ */
#include "CIpcMessage.h" #include "CIpcMessage.h"
#include "Ipc.h"
CIpcMessage::CIpcMessage() : CIpcMessage::CIpcMessage(UInt8 type) :
m_type(0), m_type(type)
m_data(nullptr),
m_source(nullptr)
{ {
} }
CIpcMessage::~CIpcMessage() CIpcMessage::~CIpcMessage()
{ {
if (m_data != nullptr) {
delete m_data;
} }
CIpcHelloMessage::CIpcHelloMessage(EIpcClientType clientType) :
CIpcMessage(kIpcHello),
m_clientType(clientType)
{
}
CIpcHelloMessage::~CIpcHelloMessage()
{
}
CIpcShutdownMessage::CIpcShutdownMessage() :
CIpcMessage(kIpcShutdown)
{
}
CIpcShutdownMessage::~CIpcShutdownMessage()
{
}
CIpcLogLineMessage::CIpcLogLineMessage(const CString& logLine) :
CIpcMessage(kIpcLogLine),
m_logLine(logLine)
{
}
CIpcLogLineMessage::~CIpcLogLineMessage()
{
}
CIpcCommandMessage::CIpcCommandMessage(const CString& command) :
CIpcMessage(kIpcCommand),
m_command(command)
{
}
CIpcCommandMessage::~CIpcCommandMessage()
{
} }

View File

@ -18,13 +18,63 @@
#pragma once #pragma once
#include "BasicTypes.h" #include "BasicTypes.h"
#include "CString.h"
#include "Ipc.h"
#include "CEvent.h"
class CIpcMessage { class CIpcMessage : public CEventData {
public: public:
CIpcMessage();
virtual ~CIpcMessage(); virtual ~CIpcMessage();
//! Gets the message type ID.
UInt8 type() const { return m_type; }
protected:
CIpcMessage(UInt8 type);
private:
UInt8 m_type; UInt8 m_type;
void* m_data; };
void* m_source;
class CIpcHelloMessage : public CIpcMessage {
public:
CIpcHelloMessage(EIpcClientType clientType);
virtual ~CIpcHelloMessage();
//! Gets the message type ID.
EIpcClientType clientType() const { return m_clientType; }
private:
EIpcClientType m_clientType;
};
class CIpcShutdownMessage : public CIpcMessage {
public:
CIpcShutdownMessage();
virtual ~CIpcShutdownMessage();
};
class CIpcLogLineMessage : public CIpcMessage {
public:
CIpcLogLineMessage(const CString& logLine);
virtual ~CIpcLogLineMessage();
//! Gets the log line.
CString logLine() const { return m_logLine; }
private:
CString m_logLine;
};
class CIpcCommandMessage : public CIpcMessage {
public:
CIpcCommandMessage(const CString& command);
virtual ~CIpcCommandMessage();
//! Gets the command.
CString command() const { return m_command; }
private:
CString m_command;
}; };

View File

@ -27,6 +27,7 @@
#include "CIpcMessage.h" #include "CIpcMessage.h"
CEvent::Type CIpcServer::s_clientConnectedEvent = CEvent::kUnknown; CEvent::Type CIpcServer::s_clientConnectedEvent = CEvent::kUnknown;
CEvent::Type CIpcServer::s_messageReceivedEvent = CEvent::kUnknown;
CIpcServer::CIpcServer() : CIpcServer::CIpcServer() :
m_address(CNetworkAddress(IPC_HOST, IPC_PORT)) m_address(CNetworkAddress(IPC_HOST, IPC_PORT))
@ -45,7 +46,7 @@ CIpcServer::~CIpcServer()
ARCH->lockMutex(m_clientsMutex); ARCH->lockMutex(m_clientsMutex);
CClientList::iterator it; CClientList::iterator it;
for (it = m_clients.begin(); it != m_clients.end(); it++) { for (it = m_clients.begin(); it != m_clients.end(); it++) {
delete *it; deleteClient(*it);
} }
m_clients.empty(); m_clients.empty();
ARCH->unlockMutex(m_clientsMutex); ARCH->unlockMutex(m_clientsMutex);
@ -80,6 +81,11 @@ CIpcServer::handleClientConnecting(const CEvent&, void*)
new TMethodEventJob<CIpcServer>( new TMethodEventJob<CIpcServer>(
this, &CIpcServer::handleClientDisconnected)); this, &CIpcServer::handleClientDisconnected));
EVENTQUEUE->adoptHandler(
CIpcClientProxy::getMessageReceivedEvent(), proxy,
new TMethodEventJob<CIpcServer>(
this, &CIpcServer::handleMessageReceived));
EVENTQUEUE->addEvent(CEvent( EVENTQUEUE->addEvent(CEvent(
getClientConnectedEvent(), this, proxy, CEvent::kDontFreeData)); getClientConnectedEvent(), this, proxy, CEvent::kDontFreeData));
} }
@ -89,15 +95,29 @@ CIpcServer::handleClientDisconnected(const CEvent& e, void*)
{ {
CIpcClientProxy* proxy = static_cast<CIpcClientProxy*>(e.getTarget()); CIpcClientProxy* proxy = static_cast<CIpcClientProxy*>(e.getTarget());
EVENTQUEUE->removeHandler(
CIpcClientProxy::getDisconnectedEvent(), proxy);
CArchMutexLock lock(m_clientsMutex); CArchMutexLock lock(m_clientsMutex);
m_clients.remove(proxy); m_clients.remove(proxy);
delete proxy; deleteClient(proxy);
LOG((CLOG_DEBUG "ipc client proxy removed, connected=%d", m_clients.size())); LOG((CLOG_DEBUG "ipc client proxy removed, connected=%d", m_clients.size()));
} }
void
CIpcServer::handleMessageReceived(const CEvent& e, void*)
{
CEvent event(getMessageReceivedEvent(), this);
event.setDataObject(e.getDataObject());
EVENTQUEUE->addEvent(event);
}
void
CIpcServer::deleteClient(CIpcClientProxy* proxy)
{
EVENTQUEUE->removeHandler(CIpcClientProxy::getMessageReceivedEvent(), proxy);
EVENTQUEUE->removeHandler(CIpcClientProxy::getDisconnectedEvent(), proxy);
delete proxy;
}
bool bool
CIpcServer::hasClients(EIpcClientType clientType) const CIpcServer::hasClients(EIpcClientType clientType) const
{ {
@ -127,6 +147,13 @@ CIpcServer::getClientConnectedEvent()
s_clientConnectedEvent, "CIpcServer::clientConnected"); s_clientConnectedEvent, "CIpcServer::clientConnected");
} }
CEvent::Type
CIpcServer::getMessageReceivedEvent()
{
return EVENTQUEUE->registerTypeOnce(
s_messageReceivedEvent, "CIpcServer::messageReceived");
}
void void
CIpcServer::send(const CIpcMessage& message, EIpcClientType filterType) CIpcServer::send(const CIpcMessage& message, EIpcClientType filterType)
{ {

View File

@ -58,12 +58,16 @@ public:
//! Raised when we have created the client proxy. //! Raised when we have created the client proxy.
static CEvent::Type getClientConnectedEvent(); static CEvent::Type getClientConnectedEvent();
//! Raised when a message is received through a client proxy.
static CEvent::Type getMessageReceivedEvent();
//@} //@}
private: private:
void handleClientConnecting(const CEvent&, void*); void handleClientConnecting(const CEvent&, void*);
void handleClientDisconnected(const CEvent&, void*); void handleClientDisconnected(const CEvent&, void*);
void handleClientMessage(const CEvent&, void*); void handleMessageReceived(const CEvent&, void*);
void deleteClient(CIpcClientProxy* proxy);
private: private:
typedef std::list<CIpcClientProxy*> CClientList; typedef std::list<CIpcClientProxy*> CClientList;
@ -74,4 +78,5 @@ private:
CArchMutex m_clientsMutex; CArchMutex m_clientsMutex;
static CEvent::Type s_clientConnectedEvent; static CEvent::Type s_clientConnectedEvent;
static CEvent::Type s_messageReceivedEvent;
}; };

View File

@ -45,77 +45,88 @@ CIpcServerProxy::~CIpcServerProxy()
void void
CIpcServerProxy::handleData(const CEvent&, void*) CIpcServerProxy::handleData(const CEvent&, void*)
{ {
UInt8 code[1]; LOG((CLOG_DEBUG "start ipc server proxy handle data"));
UInt32 n = m_stream.read(code, 1);
UInt8 codeBuf[1];
UInt32 n = m_stream.read(codeBuf, 1);
int code = codeBuf[0];
while (n != 0) { while (n != 0) {
CIpcMessage* m = new CIpcMessage(); LOG((CLOG_DEBUG "ipc server proxy read: %d", code));
m->m_type = code[0];
LOG((CLOG_DEBUG "ipc server proxy read: %d", m->m_type)); CIpcMessage* m = nullptr;
switch (m->m_type) { switch (code) {
case kIpcLogLine: case kIpcLogLine:
m->m_data = parseLogLine(); m = parseLogLine();
break; break;
case kIpcShutdown: case kIpcShutdown:
// no data. m = new CIpcShutdownMessage();
break; break;
default: default:
delete m;
disconnect(); disconnect();
return; return;
} }
// event deletes data. // don't delete with this event; the data is passed to a new event.
EVENTQUEUE->addEvent(CEvent(getMessageReceivedEvent(), this, m)); CEvent e(getMessageReceivedEvent(), this, NULL, CEvent::kDontFreeData);
e.setDataObject(m);
EVENTQUEUE->addEvent(e);
n = m_stream.read(code, 1); n = m_stream.read(codeBuf, 1);
code = codeBuf[0];
} }
LOG((CLOG_DEBUG "finished ipc server proxy handle data"));
} }
void void
CIpcServerProxy::send(const CIpcMessage& message) CIpcServerProxy::send(const CIpcMessage& message)
{ {
LOG((CLOG_DEBUG "ipc server proxy write: %d", message.m_type)); LOG((CLOG_DEBUG "ipc server proxy write: %d", message.type()));
UInt8 code[1]; CProtocolUtil::writef(&m_stream, "%1i", message.type());
code[0] = message.m_type;
m_stream.write(code, 1);
switch (message.m_type) { switch (message.type()) {
case kIpcHello: case kIpcHello: {
m_stream.write(message.m_data, 1); const CIpcHelloMessage& hm = static_cast<const CIpcHelloMessage&>(message);
CProtocolUtil::writef(&m_stream, "%1i", hm.clientType());
break; break;
}
case kIpcCommand: { case kIpcCommand: {
CString* s = (CString*)message.m_data; const CIpcCommandMessage& cm = static_cast<const CIpcCommandMessage&>(message);
const char* data = s->c_str();
CString command = cm.command();
const char* data = command.c_str();
int len = strlen(data); int len = strlen(data);
CProtocolUtil::writef(&m_stream, "%2i", len);
CProtocolUtil::writef(&m_stream, "%2i", len);
m_stream.write(data, len); m_stream.write(data, len);
break; break;
} }
default: default:
LOG((CLOG_ERR "message not supported: %d", message.m_type)); LOG((CLOG_ERR "message not supported: %d", message.type()));
break; break;
} }
} }
void* CIpcLogLineMessage*
CIpcServerProxy::parseLogLine() CIpcServerProxy::parseLogLine()
{ {
int len = 0; int len = 0;
CProtocolUtil::readf(&m_stream, "%4i", &len); CProtocolUtil::readf(&m_stream, "%4i", &len);
UInt8* buffer = new UInt8[len]; char* buffer = new char[len];
m_stream.read(buffer, len); m_stream.read(buffer, len);
CString s(buffer, len);
delete buffer;
return new CString((const char*)buffer, len); // must be deleted by event handler.
return new CIpcLogLineMessage(s);
} }
void void

View File

@ -21,22 +21,25 @@
namespace synergy { class IStream; } namespace synergy { class IStream; }
class CIpcMessage; class CIpcMessage;
class CIpcLogLineMessage;
class CIpcServerProxy { class CIpcServerProxy {
friend class CIpcClient;
public: public:
CIpcServerProxy(synergy::IStream& stream); CIpcServerProxy(synergy::IStream& stream);
virtual ~CIpcServerProxy(); virtual ~CIpcServerProxy();
private:
void send(const CIpcMessage& message); void send(const CIpcMessage& message);
void handleData(const CEvent&, void*);
CIpcLogLineMessage* parseLogLine();
void disconnect();
//! Raised when the client receives a message from the server. //! Raised when the client receives a message from the server.
static CEvent::Type getMessageReceivedEvent(); static CEvent::Type getMessageReceivedEvent();
private:
void handleData(const CEvent&, void*);
void* parseLogLine();
void disconnect();
private: private:
synergy::IStream& m_stream; synergy::IStream& m_stream;

View File

@ -58,6 +58,14 @@ CMSWindowsClipboard::~CMSWindowsClipboard()
delete m_facade; delete m_facade;
} }
void
CMSWindowsClipboard::setFacade(IMSWindowsClipboardFacade& facade)
{
delete m_facade;
m_facade = &facade;
m_deleteFacade = false;
}
bool bool
CMSWindowsClipboard::emptyUnowned() CMSWindowsClipboard::emptyUnowned()
{ {

View File

@ -61,7 +61,7 @@ public:
virtual bool has(EFormat) const; virtual bool has(EFormat) const;
virtual CString get(EFormat) const; virtual CString get(EFormat) const;
void setFacade(IMSWindowsClipboardFacade& facade) { m_facade = &facade; m_deleteFacade = false; } void setFacade(IMSWindowsClipboardFacade& facade);
private: private:
void clearConverters(); void clearConverters();

View File

@ -186,7 +186,7 @@ CMSWindowsDesks::setOptions(const COptionsList& options)
for (UInt32 i = 0, n = (UInt32)options.size(); i < n; i += 2) { for (UInt32 i = 0, n = (UInt32)options.size(); i < n; i += 2) {
if (options[i] == kOptionWin32KeepForeground) { if (options[i] == kOptionWin32KeepForeground) {
m_leaveForegroundOption = (options[i + 1] != 0); m_leaveForegroundOption = (options[i + 1] != 0);
LOG((CLOG_DEBUG1 "%s the foreground window", m_leaveForegroundOption ? "Don\'t grab" : "Grab")); LOG((CLOG_DEBUG1 "%s the foreground window", m_leaveForegroundOption ? "don\'t grab" : "grab"));
} }
} }
} }

View File

@ -73,7 +73,12 @@ void
CMSWindowsRelauncher::stop() CMSWindowsRelauncher::stop()
{ {
m_running = false; m_running = false;
m_thread->wait(5); m_thread->wait(5);
delete m_thread;
m_outputThread->wait(5);
delete m_outputThread;
} }
// this still gets the physical session (the one the keyboard and // this still gets the physical session (the one the keyboard and
@ -360,6 +365,8 @@ CMSWindowsRelauncher::mainLoop(void*)
LOG((CLOG_DEBUG "terminated running process on exit")); LOG((CLOG_DEBUG "terminated running process on exit"));
shutdownProcess(pi, 10); shutdownProcess(pi, 10);
} }
LOG((CLOG_DEBUG "relauncher main thread finished"));
} }
void void
@ -403,7 +410,7 @@ CMSWindowsRelauncher::outputLoop(void*)
// +1 char for \0 // +1 char for \0
CHAR buffer[kOutputBufferSize + 1]; CHAR buffer[kOutputBufferSize + 1];
while (true) { while (m_running) {
DWORD bytesRead; DWORD bytesRead;
BOOL success = ReadFile(m_stdOutRead, buffer, kOutputBufferSize, &bytesRead, NULL); BOOL success = ReadFile(m_stdOutRead, buffer, kOutputBufferSize, &bytesRead, NULL);
@ -432,8 +439,7 @@ CMSWindowsRelauncher::shutdownProcess(const PROCESS_INFORMATION& pi, int timeout
if (exitCode != STILL_ACTIVE) if (exitCode != STILL_ACTIVE)
return; return;
CIpcMessage shutdown; CIpcShutdownMessage shutdown;
shutdown.m_type = kIpcShutdown;
m_ipcServer.send(shutdown, kIpcClientNode); m_ipcServer.send(shutdown, kIpcClientNode);
// wait for process to exit gracefully. // wait for process to exit gracefully.

View File

@ -248,6 +248,11 @@ CApp::run(int argc, char** argv)
CArchMiscWindows::setInstanceWin32(GetModuleHandle(NULL)); CArchMiscWindows::setInstanceWin32(GetModuleHandle(NULL));
#endif #endif
CArch arch;
arch.init();
CLog log;
#if MAC_OS_X_VERSION_10_7 #if MAC_OS_X_VERSION_10_7
// dock hide only supported on lion :( // dock hide only supported on lion :(
ProcessSerialNumber psn = { 0, kCurrentProcess }; ProcessSerialNumber psn = { 0, kCurrentProcess };
@ -348,28 +353,27 @@ CApp::initApp(int argc, const char** argv)
void void
CApp::initIpcClient() CApp::initIpcClient()
{ {
// TODO: delete ipc client on shutdown and the 2 event handlers.
m_ipcClient = new CIpcClient(); m_ipcClient = new CIpcClient();
m_ipcClient->connect(); m_ipcClient->connect();
EVENTQUEUE->adoptHandler( EVENTQUEUE->adoptHandler(
CIpcClient::getConnectedEvent(), m_ipcClient, CIpcClient::getMessageReceivedEvent(), m_ipcClient,
new TMethodEventJob<CApp>(this, &CApp::handleIpcConnected)); new TMethodEventJob<CApp>(this, &CApp::handleIpcMessage));
} }
void void
CApp::handleIpcConnected(const CEvent& e, void*) CApp::cleanupIpcClient()
{ {
EVENTQUEUE->adoptHandler( m_ipcClient->disconnect();
CIpcServerProxy::getMessageReceivedEvent(), e.getData(), EVENTQUEUE->removeHandler(CIpcClient::getMessageReceivedEvent(), m_ipcClient);
new TMethodEventJob<CApp>(this, &CApp::handleIpcMessage)); delete m_ipcClient;
} }
void void
CApp::handleIpcMessage(const CEvent& e, void*) CApp::handleIpcMessage(const CEvent& e, void*)
{ {
CIpcMessage* m = static_cast<CIpcMessage*>(e.getData()); CIpcMessage* m = static_cast<CIpcMessage*>(e.getDataObject());
if (m->m_type == kIpcShutdown) { if (m->type() == kIpcShutdown) {
LOG((CLOG_INFO "got ipc shutdown message")); LOG((CLOG_INFO "got ipc shutdown message"));
EVENTQUEUE->addEvent(CEvent(CEvent::kQuit)); EVENTQUEUE->addEvent(CEvent(CEvent::kQuit));
} }

View File

@ -94,13 +94,13 @@ public:
virtual void bye(int error) { m_bye(error); } virtual void bye(int error) { m_bye(error); }
private: private:
void handleIpcConnected(const CEvent&, void*);
void handleIpcMessage(const CEvent&, void*); void handleIpcMessage(const CEvent&, void*);
protected: protected:
virtual void parseArgs(int argc, const char* const* argv, int &i); virtual void parseArgs(int argc, const char* const* argv, int &i);
virtual bool parseArg(const int& argc, const char* const* argv, int& i); virtual bool parseArg(const int& argc, const char* const* argv, int& i);
void initIpcClient(); void initIpcClient();
void cleanupIpcClient();
IArchTaskBarReceiver* m_taskBarReceiver; IArchTaskBarReceiver* m_taskBarReceiver;
bool m_suspended; bool m_suspended;

View File

@ -552,6 +552,10 @@ CClientApp::mainLoop()
updateStatus(); updateStatus();
LOG((CLOG_NOTE "stopped client")); LOG((CLOG_NOTE "stopped client"));
if (argsBase().m_enableIpc) {
cleanupIpcClient();
}
return kExitSuccess; return kExitSuccess;
} }

View File

@ -43,6 +43,7 @@
#include "CIpcMessage.h" #include "CIpcMessage.h"
#include "CSocketMultiplexer.h" #include "CSocketMultiplexer.h"
#include "CIpcLogOutputter.h" #include "CIpcLogOutputter.h"
#include "CLog.h"
#define WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN
#include <Windows.h> #include <Windows.h>
@ -90,24 +91,29 @@ CDaemonApp::~CDaemonApp()
int int
CDaemonApp::run(int argc, char** argv) CDaemonApp::run(int argc, char** argv)
{
bool uninstall = false;
try
{ {
#if SYSAPI_WIN32 #if SYSAPI_WIN32
// win32 instance needed for threading, etc. // win32 instance needed for threading, etc.
CArchMiscWindows::setInstanceWin32(GetModuleHandle(NULL)); CArchMiscWindows::setInstanceWin32(GetModuleHandle(NULL));
#endif #endif
CArch arch;
arch.init();
CLog log;
bool uninstall = false;
try
{
#if SYSAPI_WIN32 #if SYSAPI_WIN32
// sends debug messages to visual studio console window. // sends debug messages to visual studio console window.
CLOG->insert(new CMSWindowsDebugOutputter()); log.insert(new CMSWindowsDebugOutputter());
#endif #endif
// default log level to system setting. // default log level to system setting.
string logLevel = ARCH->setting("LogLevel"); string logLevel = arch.setting("LogLevel");
if (logLevel != "") if (logLevel != "")
CLOG->setFilter(logLevel.c_str()); log.setFilter(logLevel.c_str());
bool foreground = false; bool foreground = false;
@ -120,11 +126,11 @@ CDaemonApp::run(int argc, char** argv)
#if SYSAPI_WIN32 #if SYSAPI_WIN32
else if (arg == "/install") { else if (arg == "/install") {
uninstall = true; uninstall = true;
ARCH->installDaemon(); arch.installDaemon();
return kExitSuccess; return kExitSuccess;
} }
else if (arg == "/uninstall") { else if (arg == "/uninstall") {
ARCH->uninstallDaemon(); arch.uninstallDaemon();
return kExitSuccess; return kExitSuccess;
} }
#endif #endif
@ -143,9 +149,9 @@ CDaemonApp::run(int argc, char** argv)
} }
else { else {
#if SYSAPI_WIN32 #if SYSAPI_WIN32
ARCH->daemonize("Synergy", winMainLoopStatic); arch.daemonize("Synergy", winMainLoopStatic);
#elif SYSAPI_UNIX #elif SYSAPI_UNIX
ARCH->daemonize("Synergy", unixMainLoopStatic); arch.daemonize("Synergy", unixMainLoopStatic);
#endif #endif
} }
@ -203,8 +209,8 @@ CDaemonApp::mainLoop(bool logToFile)
#endif #endif
eventQueue.adoptHandler( eventQueue.adoptHandler(
CIpcServer::getClientConnectedEvent(), m_ipcServer, CIpcServer::getMessageReceivedEvent(), m_ipcServer,
new TMethodEventJob<CDaemonApp>(this, &CDaemonApp::handleIpcConnected)); new TMethodEventJob<CDaemonApp>(this, &CDaemonApp::handleIpcMessage));
m_ipcServer->listen(); m_ipcServer->listen();
@ -228,10 +234,11 @@ CDaemonApp::mainLoop(bool logToFile)
#if SYSAPI_WIN32 #if SYSAPI_WIN32
m_relauncher->stop(); m_relauncher->stop();
delete m_relauncher;
#endif #endif
eventQueue.removeHandler( eventQueue.removeHandler(
CIpcServer::getClientConnectedEvent(), m_ipcServer); CIpcServer::getMessageReceivedEvent(), m_ipcServer);
CLOG->remove(m_ipcLogOutputter); CLOG->remove(m_ipcLogOutputter);
delete m_ipcLogOutputter; delete m_ipcLogOutputter;
@ -278,23 +285,14 @@ CDaemonApp::logPath()
#endif #endif
} }
void
CDaemonApp::handleIpcConnected(const CEvent& e, void*)
{
LOG((CLOG_DEBUG "ipc client connected"));
EVENTQUEUE->adoptHandler(
CIpcClientProxy::getMessageReceivedEvent(), e.getData(),
new TMethodEventJob<CDaemonApp>(
this, &CDaemonApp::handleIpcMessage));
}
void void
CDaemonApp::handleIpcMessage(const CEvent& e, void*) CDaemonApp::handleIpcMessage(const CEvent& e, void*)
{ {
CIpcMessage& m = *static_cast<CIpcMessage*>(e.getData()); CIpcMessage* m = static_cast<CIpcMessage*>(e.getDataObject());
switch (m.m_type) { switch (m->type()) {
case kIpcCommand: { case kIpcCommand: {
CString& command = *static_cast<CString*>(m.m_data); CIpcCommandMessage* cm = static_cast<CIpcCommandMessage*>(m);
CString& command = cm->command();
LOG((CLOG_DEBUG "got new command: %s", command.c_str())); LOG((CLOG_DEBUG "got new command: %s", command.c_str()));
CString debugArg("--debug"); CString debugArg("--debug");

View File

@ -41,7 +41,6 @@ private:
void daemonize(); void daemonize();
void foregroundError(const char* message); void foregroundError(const char* message);
std::string logPath(); std::string logPath();
void handleIpcConnected(const CEvent&, void*);
void handleIpcMessage(const CEvent&, void*); void handleIpcMessage(const CEvent&, void*);
public: public:

View File

@ -831,6 +831,10 @@ CServerApp::mainLoop()
updateStatus(); updateStatus();
LOG((CLOG_NOTE "stopped server")); LOG((CLOG_NOTE "stopped server"));
if (argsBase().m_enableIpc) {
cleanupIpcClient();
}
return kExitSuccess; return kExitSuccess;
} }

View File

@ -34,6 +34,7 @@
#include "CString.h" #include "CString.h"
#include "CIpcServerProxy.h" #include "CIpcServerProxy.h"
#include "CIpcMessage.h" #include "CIpcMessage.h"
#include "CSimpleEventQueueBuffer.h"
class CIpcTests : public ::testing::Test class CIpcTests : public ::testing::Test
{ {
@ -41,14 +42,15 @@ public:
CIpcTests(); CIpcTests();
virtual ~CIpcTests(); virtual ~CIpcTests();
void connectToServer_handleClientConnected(const CEvent&, void*); void connectToServer_handleMessageReceived(const CEvent&, void*);
void sendMessageToServer_handleClientConnected(const CEvent&, void*); void sendMessageToServer_handleClientConnected(const CEvent&, void*);
void sendMessageToServer_handleMessageReceived(const CEvent&, void*); void sendMessageToServer_handleMessageReceived(const CEvent&, void*);
void sendMessageToClient_handleConnected(const CEvent&, void*); void sendMessageToClient_handleClientConnected(const CEvent&, void*);
void sendMessageToClient_handleMessageReceived(const CEvent&, void*); void sendMessageToClient_handleMessageReceived(const CEvent&, void*);
void handleQuitTimeout(const CEvent&, void* vclient); void handleQuitTimeout(const CEvent&, void* vclient);
void raiseQuitEvent(); void raiseQuitEvent();
void quitTimeout(double timeout); void initQuitTimeout(double timeout);
void cleanupQuitTimeout();
private: private:
void timeoutThread(void*); void timeoutThread(void*);
@ -56,11 +58,13 @@ private:
public: public:
CSocketMultiplexer m_multiplexer; CSocketMultiplexer m_multiplexer;
CEventQueue m_events; CEventQueue m_events;
CEventQueueTimer* m_quitTimeoutTimer;
bool m_connectToServer_clientConnected; bool m_connectToServer_clientConnected;
CString m_sendMessageToServer_receivedString; CString m_sendMessageToServer_receivedString;
CString m_sendMessageToClient_receivedString; CString m_sendMessageToClient_receivedString;
CIpcClient* m_sendMessageToServer_client; CIpcClient* m_sendMessageToServer_client;
CIpcServer* m_sendMessageToClient_server; CIpcServer* m_sendMessageToClient_server;
}; };
TEST_F(CIpcTests, connectToServer) TEST_F(CIpcTests, connectToServer)
@ -69,15 +73,17 @@ TEST_F(CIpcTests, connectToServer)
server.listen(); server.listen();
m_events.adoptHandler( m_events.adoptHandler(
CIpcServer::getClientConnectedEvent(), &server, CIpcServer::getMessageReceivedEvent(), &server,
new TMethodEventJob<CIpcTests>( new TMethodEventJob<CIpcTests>(
this, &CIpcTests::connectToServer_handleClientConnected)); this, &CIpcTests::connectToServer_handleMessageReceived));
CIpcClient client; CIpcClient client;
client.connect(); client.connect();
quitTimeout(2); initQuitTimeout(2);
m_events.loop(); m_events.loop();
m_events.removeHandler(CIpcServer::getMessageReceivedEvent(), &server);
cleanupQuitTimeout();
EXPECT_EQ(true, m_connectToServer_clientConnected); EXPECT_EQ(true, m_connectToServer_clientConnected);
} }
@ -87,18 +93,26 @@ TEST_F(CIpcTests, sendMessageToServer)
CIpcServer server; CIpcServer server;
server.listen(); server.listen();
CIpcClient client; // event handler sends "test" command to server.
client.connect();
m_sendMessageToServer_client = &client;
// event handler sends "test" log line to client.
m_events.adoptHandler( m_events.adoptHandler(
CIpcServer::getClientConnectedEvent(), &server, CIpcServer::getClientConnectedEvent(), &server,
new TMethodEventJob<CIpcTests>( new TMethodEventJob<CIpcTests>(
this, &CIpcTests::sendMessageToServer_handleClientConnected)); this, &CIpcTests::sendMessageToServer_handleClientConnected));
quitTimeout(2); m_events.adoptHandler(
CIpcServer::getMessageReceivedEvent(), &server,
new TMethodEventJob<CIpcTests>(
this, &CIpcTests::sendMessageToServer_handleMessageReceived));
CIpcClient client;
client.connect();
m_sendMessageToServer_client = &client;
initQuitTimeout(2);
m_events.loop(); m_events.loop();
m_events.removeHandler(CIpcServer::getClientConnectedEvent(), &server);
m_events.removeHandler(CIpcServer::getMessageReceivedEvent(), &server);
cleanupQuitTimeout();
EXPECT_EQ("test", m_sendMessageToServer_receivedString); EXPECT_EQ("test", m_sendMessageToServer_receivedString);
} }
@ -109,22 +123,31 @@ TEST_F(CIpcTests, sendMessageToClient)
server.listen(); server.listen();
m_sendMessageToClient_server = &server; m_sendMessageToClient_server = &server;
// event handler sends "test" log line to client.
m_events.adoptHandler(
CIpcServer::getClientConnectedEvent(), &server,
new TMethodEventJob<CIpcTests>(
this, &CIpcTests::sendMessageToClient_handleClientConnected));
CIpcClient client; CIpcClient client;
client.connect(); client.connect();
// event handler sends "test" log line to server.
m_events.adoptHandler( m_events.adoptHandler(
CIpcClient::getConnectedEvent(), &client, CIpcClient::getMessageReceivedEvent(), &client,
new TMethodEventJob<CIpcTests>( new TMethodEventJob<CIpcTests>(
this, &CIpcTests::sendMessageToClient_handleConnected)); this, &CIpcTests::sendMessageToClient_handleMessageReceived));
quitTimeout(2); initQuitTimeout(2);
m_events.loop(); m_events.loop();
m_events.removeHandler(CIpcServer::getClientConnectedEvent(), &server);
m_events.removeHandler(CIpcClient::getMessageReceivedEvent(), &client);
cleanupQuitTimeout();
EXPECT_EQ("test", m_sendMessageToClient_receivedString); EXPECT_EQ("test", m_sendMessageToClient_receivedString);
} }
CIpcTests::CIpcTests() : CIpcTests::CIpcTests() :
m_quitTimeoutTimer(nullptr),
m_connectToServer_clientConnected(false), m_connectToServer_clientConnected(false),
m_sendMessageToClient_server(nullptr), m_sendMessageToClient_server(nullptr),
m_sendMessageToServer_client(nullptr) m_sendMessageToServer_client(nullptr)
@ -136,56 +159,47 @@ CIpcTests::~CIpcTests()
} }
void void
CIpcTests::connectToServer_handleClientConnected(const CEvent&, void*) CIpcTests::connectToServer_handleMessageReceived(const CEvent& e, void*)
{ {
CIpcMessage* m = static_cast<CIpcMessage*>(e.getDataObject());
if (m->m_type == kIpcHello) {
m_connectToServer_clientConnected = true; m_connectToServer_clientConnected = true;
raiseQuitEvent(); raiseQuitEvent();
} }
}
void void
CIpcTests::sendMessageToServer_handleClientConnected(const CEvent& e, void*) CIpcTests::sendMessageToServer_handleClientConnected(const CEvent& e, void*)
{ {
m_events.adoptHandler( CIpcCommandMessage m("test");
CIpcClientProxy::getMessageReceivedEvent(), e.getData(),
new TMethodEventJob<CIpcTests>(
this, &CIpcTests::sendMessageToServer_handleMessageReceived));
CIpcMessage m;
m.m_type = kIpcCommand;
m.m_data = new CString("test");
m_sendMessageToServer_client->send(m); m_sendMessageToServer_client->send(m);
} }
void void
CIpcTests::sendMessageToServer_handleMessageReceived(const CEvent& e, void*) CIpcTests::sendMessageToServer_handleMessageReceived(const CEvent& e, void*)
{ {
CIpcMessage* m = static_cast<CIpcMessage*>(e.getData()); CIpcMessage* m = static_cast<CIpcMessage*>(e.getDataObject());
if (m->m_type == kIpcCommand) { if (m->m_type == kIpcCommand) {
m_sendMessageToServer_receivedString = *static_cast<CString*>(m->m_data); CIpcCommandMessage* cm = static_cast<CIpcCommandMessage*>(m);
m_sendMessageToServer_receivedString = cm->command();
raiseQuitEvent(); raiseQuitEvent();
} }
} }
void void
CIpcTests::sendMessageToClient_handleConnected(const CEvent& e, void*) CIpcTests::sendMessageToClient_handleClientConnected(const CEvent& e, void*)
{ {
m_events.adoptHandler( CIpcLogLineMessage m("test");
CIpcServerProxy::getMessageReceivedEvent(), e.getData(),
new TMethodEventJob<CIpcTests>(
this, &CIpcTests::sendMessageToClient_handleMessageReceived));
CIpcMessage m;
m.m_type = kIpcLogLine;
m.m_data = new CString("test");
m_sendMessageToClient_server->send(m, kIpcClientUnknown); m_sendMessageToClient_server->send(m, kIpcClientUnknown);
} }
void void
CIpcTests::sendMessageToClient_handleMessageReceived(const CEvent& e, void*) CIpcTests::sendMessageToClient_handleMessageReceived(const CEvent& e, void*)
{ {
CIpcMessage* m = static_cast<CIpcMessage*>(e.getData()); CIpcMessage* m = static_cast<CIpcMessage*>(e.getDataObject());
if (m->m_type == kIpcLogLine) { if (m->m_type == kIpcLogLine) {
m_sendMessageToClient_receivedString = *static_cast<CString*>(m->m_data); CIpcLogLineMessage* llm = static_cast<CIpcLogLineMessage*>(m);
m_sendMessageToClient_receivedString = llm->logLine();
raiseQuitEvent(); raiseQuitEvent();
} }
} }
@ -193,15 +207,25 @@ CIpcTests::sendMessageToClient_handleMessageReceived(const CEvent& e, void*)
void void
CIpcTests::raiseQuitEvent() CIpcTests::raiseQuitEvent()
{ {
EVENTQUEUE->addEvent(CEvent(CEvent::kQuit, nullptr)); EVENTQUEUE->addEvent(CEvent(CEvent::kQuit));
} }
void void
CIpcTests::quitTimeout(double timeout) CIpcTests::initQuitTimeout(double timeout)
{ {
CEventQueueTimer* timer = EVENTQUEUE->newOneShotTimer(timeout, NULL); assert(m_quitTimeoutTimer == nullptr);
EVENTQUEUE->adoptHandler(CEvent::kTimer, timer, m_quitTimeoutTimer = EVENTQUEUE->newOneShotTimer(timeout, NULL);
new TMethodEventJob<CIpcTests>(this, &CIpcTests::handleQuitTimeout, timer)); EVENTQUEUE->adoptHandler(CEvent::kTimer, m_quitTimeoutTimer,
new TMethodEventJob<CIpcTests>(
this, &CIpcTests::handleQuitTimeout));
}
void
CIpcTests::cleanupQuitTimeout()
{
EVENTQUEUE->removeHandler(CEvent::kTimer, m_quitTimeoutTimer);
delete m_quitTimeoutTimer;
m_quitTimeoutTimer = nullptr;
} }
void void

View File

@ -40,6 +40,11 @@ main(int argc, char **argv)
CArchMiscWindows::setInstanceWin32(GetModuleHandle(NULL)); CArchMiscWindows::setInstanceWin32(GetModuleHandle(NULL));
#endif #endif
CArch arch;
arch.init();
CLog log;
string lockFile; string lockFile;
for (int i = 0; i < argc; i++) { for (int i = 0; i < argc; i++) {
if (string(argv[i]).compare("--lock-file") == 0) { if (string(argv[i]).compare("--lock-file") == 0) {