From 97b2cd669db37f32bb5ba29e829a1b249c8c2f2d Mon Sep 17 00:00:00 2001 From: Nick Bolton Date: Mon, 2 Jul 2012 13:45:52 +0000 Subject: [PATCH] finished ipc send to server. --- src/lib/ipc/CIpcClient.cpp | 21 +++++ src/lib/ipc/CIpcClient.h | 6 +- src/lib/ipc/CIpcClientProxy.cpp | 67 ++++++++++++++++ src/lib/ipc/CIpcClientProxy.h | 12 +++ src/lib/ipc/CIpcMessage.cpp | 27 +++++++ src/lib/ipc/CIpcMessage.h | 29 +++++++ src/lib/ipc/CIpcServer.cpp | 2 +- src/lib/ipc/CIpcServer.h | 2 +- src/lib/ipc/CMakeLists.txt | 2 + src/lib/ipc/Ipc.h | 5 ++ src/test/integtests/CIpcTests.cpp | 122 ++++++++++++++++++------------ 11 files changed, 244 insertions(+), 51 deletions(-) create mode 100644 src/lib/ipc/CIpcMessage.cpp create mode 100644 src/lib/ipc/CIpcMessage.h diff --git a/src/lib/ipc/CIpcClient.cpp b/src/lib/ipc/CIpcClient.cpp index ecb5d70f..fef8a058 100644 --- a/src/lib/ipc/CIpcClient.cpp +++ b/src/lib/ipc/CIpcClient.cpp @@ -33,3 +33,24 @@ CIpcClient::connect() { m_socket.connect(m_serverAddress); } + +void +CIpcClient::send(const CIpcMessage& message) +{ + UInt8 code[1]; + code[0] = message.m_type; + m_socket.write(code, 1); + + switch (message.m_type) { + case kIpcCommand: { + CString* s = (CString*)message.m_data; + + UInt8 len[1]; + len[0] = s->size(); + m_socket.write(len, 1); + + m_socket.write(s->c_str(), s->size()); + } + break; + } +} diff --git a/src/lib/ipc/CIpcClient.h b/src/lib/ipc/CIpcClient.h index d203c8f6..72e85a99 100644 --- a/src/lib/ipc/CIpcClient.h +++ b/src/lib/ipc/CIpcClient.h @@ -19,6 +19,7 @@ #include "CNetworkAddress.h" #include "CTCPSocket.h" +#include "CIpcMessage.h" //! IPC client for communication between daemon and GUI. /*! @@ -30,7 +31,10 @@ public: virtual ~CIpcClient(); //! Connects to the IPC server at localhost. - void connect(); + void connect(); + + //! Sends a message to the server. + void send(const CIpcMessage& message); private: CNetworkAddress m_serverAddress; diff --git a/src/lib/ipc/CIpcClientProxy.cpp b/src/lib/ipc/CIpcClientProxy.cpp index c48e4413..10599a31 100644 --- a/src/lib/ipc/CIpcClientProxy.cpp +++ b/src/lib/ipc/CIpcClientProxy.cpp @@ -17,12 +17,79 @@ #include "CIpcClientProxy.h" #include "IStream.h" +#include "TMethodEventJob.h" +#include "Ipc.h" +#include "CLog.h" +#include "CIpcMessage.h" + +CEvent::Type CIpcClientProxy::s_messageReceivedEvent = CEvent::kUnknown; CIpcClientProxy::CIpcClientProxy(IStream& stream) : m_stream(stream) { + EVENTQUEUE->adoptHandler(m_stream.getInputReadyEvent(), + stream.getEventTarget(), + new TMethodEventJob( + this, &CIpcClientProxy::handleData, nullptr)); } CIpcClientProxy::~CIpcClientProxy() { + EVENTQUEUE->removeHandler(m_stream.getInputReadyEvent(), + m_stream.getEventTarget()); +} + +void +CIpcClientProxy::handleData(const CEvent&, void*) +{ + UInt8 code[1]; + UInt32 n = m_stream.read(code, 1); + while (n != 0) { + + CIpcMessage* m = new CIpcMessage(); + m->m_type = code[1]; + + LOG((CLOG_DEBUG "ipc read message: %d", code[0])); + switch (code[0]) { + case kIpcCommand: + m->m_data = parseCommand(); + break; + + default: + delete m; + disconnect(); + return; + } + + // event deletes data. + EVENTQUEUE->addEvent(CEvent(getMessageReceivedEvent(), this, m)); + + n = m_stream.read(code, 1); + } +} + +void* +CIpcClientProxy::parseCommand() +{ + UInt8 len[1]; + m_stream.read(len, 1); + + UInt8* buffer = new UInt8[len[0]]; + m_stream.read(buffer, len[0]); + + return new CString((const char*)buffer, len[0]); +} + +void +CIpcClientProxy::disconnect() +{ + LOG((CLOG_NOTE "disconnect, closing stream")); + m_stream.close(); +} + +CEvent::Type +CIpcClientProxy::getMessageReceivedEvent() +{ + return EVENTQUEUE->registerTypeOnce( + s_messageReceivedEvent, "CIpcClientProxy::messageReceived"); } diff --git a/src/lib/ipc/CIpcClientProxy.h b/src/lib/ipc/CIpcClientProxy.h index 6722cecc..56505242 100644 --- a/src/lib/ipc/CIpcClientProxy.h +++ b/src/lib/ipc/CIpcClientProxy.h @@ -17,6 +17,8 @@ #pragma once +#include "CEvent.h" + class IStream; class CIpcClientProxy { @@ -24,6 +26,16 @@ public: CIpcClientProxy(IStream& stream); virtual ~CIpcClientProxy(); + //! Raised when the server receives a message from a client. + static CEvent::Type getMessageReceivedEvent(); + +private: + void handleData(const CEvent&, void*); + void* parseCommand(); + void disconnect(); + private: IStream& m_stream; + + static CEvent::Type s_messageReceivedEvent; }; diff --git a/src/lib/ipc/CIpcMessage.cpp b/src/lib/ipc/CIpcMessage.cpp new file mode 100644 index 00000000..198f0719 --- /dev/null +++ b/src/lib/ipc/CIpcMessage.cpp @@ -0,0 +1,27 @@ +/* + * synergy -- mouse and keyboard sharing utility + * Copyright (C) 2012 Nick Bolton + * + * This package is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * found in the file COPYING that should have accompanied this file. + * + * This package is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "CIpcMessage.h" + +CIpcMessage::CIpcMessage() +{ +} + +CIpcMessage::~CIpcMessage() +{ + delete m_data; +} diff --git a/src/lib/ipc/CIpcMessage.h b/src/lib/ipc/CIpcMessage.h new file mode 100644 index 00000000..12a189ab --- /dev/null +++ b/src/lib/ipc/CIpcMessage.h @@ -0,0 +1,29 @@ +/* + * synergy -- mouse and keyboard sharing utility + * Copyright (C) 2012 Nick Bolton + * + * This package is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * found in the file COPYING that should have accompanied this file. + * + * This package is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#pragma once + +#include "BasicTypes.h" + +class CIpcMessage { +public: + CIpcMessage(); + virtual ~CIpcMessage(); + + UInt8 m_type; + void* m_data; +}; diff --git a/src/lib/ipc/CIpcServer.cpp b/src/lib/ipc/CIpcServer.cpp index 9ab9241b..f1775899 100644 --- a/src/lib/ipc/CIpcServer.cpp +++ b/src/lib/ipc/CIpcServer.cpp @@ -35,7 +35,7 @@ m_address(CNetworkAddress(IPC_HOST, IPC_PORT)) EVENTQUEUE->adoptHandler( IListenSocket::getConnectingEvent(), &m_socket, new TMethodEventJob( - this, &CIpcServer::handleClientConnecting)); + this, &CIpcServer::handleClientConnecting)); } CIpcServer::~CIpcServer() diff --git a/src/lib/ipc/CIpcServer.h b/src/lib/ipc/CIpcServer.h index 608bdd7f..581f2f04 100644 --- a/src/lib/ipc/CIpcServer.h +++ b/src/lib/ipc/CIpcServer.h @@ -46,7 +46,7 @@ public: //! @name accessors //@{ - //! This event is raised when we have created the client proxy. + //! Raised when we have created the client proxy. static CEvent::Type getClientConnectedEvent(); //@} diff --git a/src/lib/ipc/CMakeLists.txt b/src/lib/ipc/CMakeLists.txt index fa9ea57f..44a7945d 100644 --- a/src/lib/ipc/CMakeLists.txt +++ b/src/lib/ipc/CMakeLists.txt @@ -19,6 +19,7 @@ set(inc CIpcClient.h CIpcServerProxy.h CIpcClientProxy.h + CIpcMessage.h ) set(src @@ -26,6 +27,7 @@ set(src CIpcClient.cpp CIpcServerProxy.cpp CIpcClientProxy.cpp + CIpcMessage.cpp ) if (WIN32) diff --git a/src/lib/ipc/Ipc.h b/src/lib/ipc/Ipc.h index 69c285be..d29073da 100644 --- a/src/lib/ipc/Ipc.h +++ b/src/lib/ipc/Ipc.h @@ -19,3 +19,8 @@ #define IPC_HOST "127.0.0.1" #define IPC_PORT 24801 + +enum EIpcMessage { + kIpcLog, + kIpcCommand +}; diff --git a/src/test/integtests/CIpcTests.cpp b/src/test/integtests/CIpcTests.cpp index 6ae4b99f..fdb717a4 100644 --- a/src/test/integtests/CIpcTests.cpp +++ b/src/test/integtests/CIpcTests.cpp @@ -25,79 +25,107 @@ #include "TMethodJob.h" #include "CArch.h" #include "CLog.h" +#include "CIpcClientProxy.h" +#include "Ipc.h" +#include "CString.h" class CIpcTests : public ::testing::Test { public: CIpcTests(); virtual ~CIpcTests(); - void handleClientConnected(const CEvent&, void* vclient); - void raiseQuitEvent(); + + void connectToServer_handleClientConnected(const CEvent&, void* vclient); + void sendMessageToServer_handleClientConnected(const CEvent&, void* vclient); + void sendMessageToServer_handleMessageReceived(const CEvent&, void* vclient); + void handleQuitTimeout(const CEvent&, void* vclient); + void raiseQuitEvent(); + void quitTimeout(double timeout); private: - void timeoutThread(void*); + void timeoutThread(void*); public: - bool m_quitOnClientConnect; - bool m_clientConnected; - bool m_timeoutCheck; - double m_timeout; - -private: - CThread* m_timeoutThread; + CSocketMultiplexer m_multiplexer; + CEventQueue m_events; + bool m_connectToServer_clientConnected; + CString m_sendMessageToServer_receivedString; }; TEST_F(CIpcTests, connectToServer) { - m_quitOnClientConnect = true; - - CSocketMultiplexer multiplexer; - CEventQueue events; - CIpcServer server; server.listen(); - events.adoptHandler( + m_events.adoptHandler( CIpcServer::getClientConnectedEvent(), &server, new TMethodEventJob( - this, &CIpcTests::handleClientConnected)); + this, &CIpcTests::connectToServer_handleClientConnected)); CIpcClient client; client.connect(); + + quitTimeout(2); + m_events.loop(); - m_timeoutCheck = true; - m_timeout = ARCH->time() + 5; // 5 sec timeout. - events.loop(); + EXPECT_EQ(true, m_connectToServer_clientConnected); +} - EXPECT_EQ(true, m_clientConnected); +TEST_F(CIpcTests, sendMessageToServer) +{ + CIpcServer server; + server.listen(); + + CIpcClient client; + client.connect(); + + m_events.adoptHandler( + CIpcServer::getClientConnectedEvent(), &server, + new TMethodEventJob( + this, &CIpcTests::sendMessageToServer_handleClientConnected)); + + CIpcMessage m; + m.m_type = kIpcCommand; + m.m_data = (void*)(new CString("test")); + client.send(m); + + quitTimeout(2); + m_events.loop(); + + EXPECT_EQ("test", m_sendMessageToServer_receivedString); } CIpcTests::CIpcTests() : -m_timeoutThread(nullptr), -m_quitOnClientConnect(false), -m_clientConnected(false), -m_timeoutCheck(false), -m_timeout(0) +m_connectToServer_clientConnected(false) { - m_timeoutThread = new CThread( - new TMethodJob( - this, &CIpcTests::timeoutThread, nullptr)); } CIpcTests::~CIpcTests() { - delete m_timeoutThread; } +void +CIpcTests::connectToServer_handleClientConnected(const CEvent&, void*) +{ + m_connectToServer_clientConnected = true; + raiseQuitEvent(); +} void -CIpcTests::handleClientConnected(const CEvent&, void* vclient) +CIpcTests::sendMessageToServer_handleClientConnected(const CEvent& e, void*) { - m_clientConnected = true; + m_events.adoptHandler( + CIpcClientProxy::getMessageReceivedEvent(), e.getData(), + new TMethodEventJob( + this, &CIpcTests::sendMessageToServer_handleMessageReceived)); +} - if (m_quitOnClientConnect) { - raiseQuitEvent(); - } +void +CIpcTests::sendMessageToServer_handleMessageReceived(const CEvent& e, void*) +{ + CIpcMessage* m = (CIpcMessage*)e.getData(); + m_sendMessageToServer_receivedString = *((CString*)m->m_data); + raiseQuitEvent(); } void @@ -107,18 +135,16 @@ CIpcTests::raiseQuitEvent() } void -CIpcTests::timeoutThread(void*) +CIpcTests::quitTimeout(double timeout) { - while (true) { - if (!m_timeoutCheck) { - ARCH->sleep(1); - continue; - } + CEventQueueTimer* timer = EVENTQUEUE->newOneShotTimer(timeout, NULL); + EVENTQUEUE->adoptHandler(CEvent::kTimer, timer, + new TMethodEventJob(this, &CIpcTests::handleQuitTimeout, timer)); +} - if (ARCH->time() > m_timeout) { - LOG((CLOG_ERR "timeout")); - raiseQuitEvent(); - m_timeoutCheck = false; - } - } -} \ No newline at end of file +void +CIpcTests::handleQuitTimeout(const CEvent&, void* vclient) +{ + LOG((CLOG_ERR "timeout")); + raiseQuitEvent(); +}