From 79d73bd163c9afe561cdbab7bddd1c734d9f7b4d Mon Sep 17 00:00:00 2001 From: Nick Bolton Date: Mon, 2 Jul 2012 15:28:23 +0000 Subject: [PATCH] ipc send message to client finished. --- src/lib/ipc/CIpcClient.cpp | 23 ++------ src/lib/ipc/CIpcClient.h | 5 +- src/lib/ipc/CIpcClientProxy.cpp | 29 ++++++++- src/lib/ipc/CIpcClientProxy.h | 9 ++- src/lib/ipc/CIpcServer.cpp | 10 ++++ src/lib/ipc/CIpcServer.h | 4 ++ src/lib/ipc/CIpcServerProxy.cpp | 98 ++++++++++++++++++++++++++++++- src/lib/ipc/CIpcServerProxy.h | 22 ++++++- src/lib/ipc/Ipc.h | 2 +- src/test/integtests/CIpcTests.cpp | 65 ++++++++++++++++++-- 10 files changed, 239 insertions(+), 28 deletions(-) diff --git a/src/lib/ipc/CIpcClient.cpp b/src/lib/ipc/CIpcClient.cpp index fef8a058..61c32389 100644 --- a/src/lib/ipc/CIpcClient.cpp +++ b/src/lib/ipc/CIpcClient.cpp @@ -17,9 +17,11 @@ #include "CIpcClient.h" #include "Ipc.h" +#include "CIpcServerProxy.h" CIpcClient::CIpcClient() : -m_serverAddress(CNetworkAddress(IPC_HOST, IPC_PORT)) +m_serverAddress(CNetworkAddress(IPC_HOST, IPC_PORT)), +m_server(nullptr) { m_serverAddress.resolve(); } @@ -32,25 +34,12 @@ void CIpcClient::connect() { m_socket.connect(m_serverAddress); + m_server = new CIpcServerProxy(m_socket); } void CIpcClient::send(const CIpcMessage& message) { - UInt8 code[1]; - code[0] = message.m_type; - m_socket.write(code, 1); - - switch (message.m_type) { - case kIpcCommand: { - CString* s = (CString*)message.m_data; - - UInt8 len[1]; - len[0] = s->size(); - m_socket.write(len, 1); - - m_socket.write(s->c_str(), s->size()); - } - break; - } + assert(m_server != NULL); + m_server->send(message); } diff --git a/src/lib/ipc/CIpcClient.h b/src/lib/ipc/CIpcClient.h index 72e85a99..7b365eb2 100644 --- a/src/lib/ipc/CIpcClient.h +++ b/src/lib/ipc/CIpcClient.h @@ -19,7 +19,9 @@ #include "CNetworkAddress.h" #include "CTCPSocket.h" -#include "CIpcMessage.h" + +class CIpcServerProxy; +class CIpcMessage; //! IPC client for communication between daemon and GUI. /*! @@ -39,4 +41,5 @@ public: private: CNetworkAddress m_serverAddress; CTCPSocket m_socket; + CIpcServerProxy* m_server; }; diff --git a/src/lib/ipc/CIpcClientProxy.cpp b/src/lib/ipc/CIpcClientProxy.cpp index 10599a31..440e0b71 100644 --- a/src/lib/ipc/CIpcClientProxy.cpp +++ b/src/lib/ipc/CIpcClientProxy.cpp @@ -49,7 +49,7 @@ CIpcClientProxy::handleData(const CEvent&, void*) CIpcMessage* m = new CIpcMessage(); m->m_type = code[1]; - LOG((CLOG_DEBUG "ipc read message: %d", code[0])); + LOG((CLOG_DEBUG "ipc client proxy read: %d", code[0])); switch (code[0]) { case kIpcCommand: m->m_data = parseCommand(); @@ -68,6 +68,33 @@ CIpcClientProxy::handleData(const CEvent&, void*) } } +void +CIpcClientProxy::send(const CIpcMessage& message) +{ + LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type)); + + UInt8 code[1]; + code[0] = message.m_type; + m_stream.write(code, 1); + + switch (message.m_type) { + case kIpcLogLine: { + CString* s = (CString*)message.m_data; + + UInt8 len[1]; + len[0] = s->size(); + m_stream.write(len, 1); + + m_stream.write(s->c_str(), s->size()); + } + break; + + default: + LOG((CLOG_ERR "message not supported: %d", message.m_type)); + break; + } +} + void* CIpcClientProxy::parseCommand() { diff --git a/src/lib/ipc/CIpcClientProxy.h b/src/lib/ipc/CIpcClientProxy.h index 56505242..6797ea68 100644 --- a/src/lib/ipc/CIpcClientProxy.h +++ b/src/lib/ipc/CIpcClientProxy.h @@ -20,12 +20,16 @@ #include "CEvent.h" class IStream; +class CIpcMessage; class CIpcClientProxy { public: CIpcClientProxy(IStream& stream); virtual ~CIpcClientProxy(); + //! Send a message to the client. + void send(const CIpcMessage& message); + //! Raised when the server receives a message from a client. static CEvent::Type getMessageReceivedEvent(); @@ -34,8 +38,9 @@ private: void* parseCommand(); void disconnect(); -private: +public: IStream& m_stream; - + +private: static CEvent::Type s_messageReceivedEvent; }; diff --git a/src/lib/ipc/CIpcServer.cpp b/src/lib/ipc/CIpcServer.cpp index f1775899..383479ff 100644 --- a/src/lib/ipc/CIpcServer.cpp +++ b/src/lib/ipc/CIpcServer.cpp @@ -24,6 +24,7 @@ #include "CIpcClientProxy.h" #include "IStream.h" #include "IDataSocket.h" +#include "CIpcMessage.h" CEvent::Type CIpcServer::s_clientConnectedEvent = CEvent::kUnknown; @@ -71,3 +72,12 @@ CIpcServer::getClientConnectedEvent() return EVENTQUEUE->registerTypeOnce( s_clientConnectedEvent, "CIpcServer::clientConnected"); } + +void +CIpcServer::send(const CIpcMessage& message) +{ + CClientSet::iterator it; + for (it = m_clients.begin(); it != m_clients.end(); it++) { + (*it)->send(message); + } +} diff --git a/src/lib/ipc/CIpcServer.h b/src/lib/ipc/CIpcServer.h index 581f2f04..d0ccffbf 100644 --- a/src/lib/ipc/CIpcServer.h +++ b/src/lib/ipc/CIpcServer.h @@ -23,6 +23,7 @@ class CEvent; class CIpcClientProxy; +class CIpcMessage; //! IPC server for communication between daemon and GUI. /*! @@ -42,6 +43,9 @@ public: //! Opens a TCP socket only allowing local connections. void listen(); + //! Send a message to all clients. + void send(const CIpcMessage& message); + //@} //! @name accessors //@{ diff --git a/src/lib/ipc/CIpcServerProxy.cpp b/src/lib/ipc/CIpcServerProxy.cpp index 22282b13..0a7c7869 100644 --- a/src/lib/ipc/CIpcServerProxy.cpp +++ b/src/lib/ipc/CIpcServerProxy.cpp @@ -16,11 +16,107 @@ */ #include "CIpcServerProxy.h" +#include "IStream.h" +#include "TMethodEventJob.h" +#include "CLog.h" +#include "CIpcMessage.h" +#include "Ipc.h" -CIpcServerProxy::CIpcServerProxy() +CEvent::Type CIpcServerProxy::s_messageReceivedEvent = CEvent::kUnknown; + +CIpcServerProxy::CIpcServerProxy(IStream& stream) : +m_stream(stream) { + EVENTQUEUE->adoptHandler(m_stream.getInputReadyEvent(), + stream.getEventTarget(), + new TMethodEventJob( + this, &CIpcServerProxy::handleData, nullptr)); } CIpcServerProxy::~CIpcServerProxy() { + EVENTQUEUE->removeHandler(m_stream.getInputReadyEvent(), + m_stream.getEventTarget()); +} + +void +CIpcServerProxy::handleData(const CEvent&, void*) +{ + UInt8 code[1]; + UInt32 n = m_stream.read(code, 1); + while (n != 0) { + + CIpcMessage* m = new CIpcMessage(); + m->m_type = code[1]; + + LOG((CLOG_DEBUG "ipc server proxy read: %d", code[0])); + switch (code[0]) { + case kIpcLogLine: + m->m_data = parseLogLine(); + break; + + default: + delete m; + disconnect(); + return; + } + + // event deletes data. + EVENTQUEUE->addEvent(CEvent(getMessageReceivedEvent(), this, m)); + + n = m_stream.read(code, 1); + } +} + +void +CIpcServerProxy::send(const CIpcMessage& message) +{ + LOG((CLOG_DEBUG "ipc server proxy write: %d", message.m_type)); + + UInt8 code[1]; + code[0] = message.m_type; + m_stream.write(code, 1); + + switch (message.m_type) { + case kIpcCommand: { + CString* s = (CString*)message.m_data; + + UInt8 len[1]; + len[0] = s->size(); + m_stream.write(len, 1); + + m_stream.write(s->c_str(), s->size()); + } + break; + + default: + LOG((CLOG_ERR "message not supported: %d", message.m_type)); + break; + } +} + +void* +CIpcServerProxy::parseLogLine() +{ + UInt8 len[1]; + m_stream.read(len, 1); + + UInt8* buffer = new UInt8[len[0]]; + m_stream.read(buffer, len[0]); + + return new CString((const char*)buffer, len[0]); +} + +void +CIpcServerProxy::disconnect() +{ + LOG((CLOG_NOTE "disconnect, closing stream")); + m_stream.close(); +} + +CEvent::Type +CIpcServerProxy::getMessageReceivedEvent() +{ + return EVENTQUEUE->registerTypeOnce( + s_messageReceivedEvent, "CIpcServerProxy::messageReceived"); } diff --git a/src/lib/ipc/CIpcServerProxy.h b/src/lib/ipc/CIpcServerProxy.h index eb9d28e6..42895754 100644 --- a/src/lib/ipc/CIpcServerProxy.h +++ b/src/lib/ipc/CIpcServerProxy.h @@ -17,8 +17,28 @@ #pragma once +#include "CEvent.h" + +class IStream; +class CIpcMessage; + class CIpcServerProxy { public: - CIpcServerProxy(); + CIpcServerProxy(IStream& stream); virtual ~CIpcServerProxy(); + + void send(const CIpcMessage& message); + + //! Raised when the client receives a message from the server. + static CEvent::Type getMessageReceivedEvent(); + +private: + void handleData(const CEvent&, void*); + void* parseLogLine(); + void disconnect(); + +private: + IStream& m_stream; + + static CEvent::Type s_messageReceivedEvent; }; diff --git a/src/lib/ipc/Ipc.h b/src/lib/ipc/Ipc.h index d29073da..66837fd3 100644 --- a/src/lib/ipc/Ipc.h +++ b/src/lib/ipc/Ipc.h @@ -21,6 +21,6 @@ #define IPC_PORT 24801 enum EIpcMessage { - kIpcLog, + kIpcLogLine, kIpcCommand }; diff --git a/src/test/integtests/CIpcTests.cpp b/src/test/integtests/CIpcTests.cpp index fdb717a4..11c69011 100644 --- a/src/test/integtests/CIpcTests.cpp +++ b/src/test/integtests/CIpcTests.cpp @@ -16,6 +16,10 @@ */ #include + +#define TEST_ENV +#include "Global.h" + #include "CIpcServer.h" #include "CIpcClient.h" #include "CSocketMultiplexer.h" @@ -28,6 +32,8 @@ #include "CIpcClientProxy.h" #include "Ipc.h" #include "CString.h" +#include "CIpcServerProxy.h" +#include "CIpcMessage.h" class CIpcTests : public ::testing::Test { @@ -35,9 +41,11 @@ public: CIpcTests(); virtual ~CIpcTests(); - void connectToServer_handleClientConnected(const CEvent&, void* vclient); - void sendMessageToServer_handleClientConnected(const CEvent&, void* vclient); - void sendMessageToServer_handleMessageReceived(const CEvent&, void* vclient); + void connectToServer_handleClientConnected(const CEvent&, void*); + void sendMessageToServer_handleClientConnected(const CEvent&, void*); + void sendMessageToServer_handleMessageReceived(const CEvent&, void*); + void sendMessageToClient_handleConnected(const CEvent&, void*); + void sendMessageToClient_handleMessageReceived(const CEvent&, void*); void handleQuitTimeout(const CEvent&, void* vclient); void raiseQuitEvent(); void quitTimeout(double timeout); @@ -50,6 +58,9 @@ public: CEventQueue m_events; bool m_connectToServer_clientConnected; CString m_sendMessageToServer_receivedString; + CString m_sendMessageToClient_receivedString; + CIpcServer* m_sendMessageToClient_server; + CIpcServerProxy* m_sendMessageToClient_serverProxy; }; TEST_F(CIpcTests, connectToServer) @@ -95,8 +106,32 @@ TEST_F(CIpcTests, sendMessageToServer) EXPECT_EQ("test", m_sendMessageToServer_receivedString); } +TEST_F(CIpcTests, sendMessageToClient) +{ + CIpcServer server; + server.listen(); + m_sendMessageToClient_server = &server; + + CIpcClient client; + client.connect(); + m_sendMessageToClient_serverProxy = client.m_server; + + // event handler sends "test" log line to client. + m_events.adoptHandler( + CIpcServer::getClientConnectedEvent(), &server, + new TMethodEventJob( + this, &CIpcTests::sendMessageToClient_handleConnected)); + + quitTimeout(2); + m_events.loop(); + + EXPECT_EQ("test", m_sendMessageToClient_receivedString); +} + CIpcTests::CIpcTests() : -m_connectToServer_clientConnected(false) +m_connectToServer_clientConnected(false), +m_sendMessageToClient_server(nullptr), +m_sendMessageToClient_serverProxy(nullptr) { } @@ -128,6 +163,28 @@ CIpcTests::sendMessageToServer_handleMessageReceived(const CEvent& e, void*) raiseQuitEvent(); } +void +CIpcTests::sendMessageToClient_handleConnected(const CEvent& e, void*) +{ + m_events.adoptHandler( + CIpcServerProxy::getMessageReceivedEvent(), m_sendMessageToClient_serverProxy, + new TMethodEventJob( + this, &CIpcTests::sendMessageToClient_handleMessageReceived)); + + CIpcMessage m; + m.m_type = kIpcLogLine; + m.m_data = (void*)(new CString("test")); + m_sendMessageToClient_server->send(m); +} + +void +CIpcTests::sendMessageToClient_handleMessageReceived(const CEvent& e, void*) +{ + CIpcMessage* m = (CIpcMessage*)e.getData(); + m_sendMessageToClient_receivedString = *((CString*)m->m_data); + raiseQuitEvent(); +} + void CIpcTests::raiseQuitEvent() {