ipc send message to client finished.

This commit is contained in:
Nick Bolton 2012-07-02 15:28:23 +00:00
parent 97b2cd669d
commit 79d73bd163
10 changed files with 239 additions and 28 deletions

View File

@ -17,9 +17,11 @@
#include "CIpcClient.h" #include "CIpcClient.h"
#include "Ipc.h" #include "Ipc.h"
#include "CIpcServerProxy.h"
CIpcClient::CIpcClient() : CIpcClient::CIpcClient() :
m_serverAddress(CNetworkAddress(IPC_HOST, IPC_PORT)) m_serverAddress(CNetworkAddress(IPC_HOST, IPC_PORT)),
m_server(nullptr)
{ {
m_serverAddress.resolve(); m_serverAddress.resolve();
} }
@ -32,25 +34,12 @@ void
CIpcClient::connect() CIpcClient::connect()
{ {
m_socket.connect(m_serverAddress); m_socket.connect(m_serverAddress);
m_server = new CIpcServerProxy(m_socket);
} }
void void
CIpcClient::send(const CIpcMessage& message) CIpcClient::send(const CIpcMessage& message)
{ {
UInt8 code[1]; assert(m_server != NULL);
code[0] = message.m_type; m_server->send(message);
m_socket.write(code, 1);
switch (message.m_type) {
case kIpcCommand: {
CString* s = (CString*)message.m_data;
UInt8 len[1];
len[0] = s->size();
m_socket.write(len, 1);
m_socket.write(s->c_str(), s->size());
}
break;
}
} }

View File

@ -19,7 +19,9 @@
#include "CNetworkAddress.h" #include "CNetworkAddress.h"
#include "CTCPSocket.h" #include "CTCPSocket.h"
#include "CIpcMessage.h"
class CIpcServerProxy;
class CIpcMessage;
//! IPC client for communication between daemon and GUI. //! IPC client for communication between daemon and GUI.
/*! /*!
@ -39,4 +41,5 @@ public:
private: private:
CNetworkAddress m_serverAddress; CNetworkAddress m_serverAddress;
CTCPSocket m_socket; CTCPSocket m_socket;
CIpcServerProxy* m_server;
}; };

View File

@ -49,7 +49,7 @@ CIpcClientProxy::handleData(const CEvent&, void*)
CIpcMessage* m = new CIpcMessage(); CIpcMessage* m = new CIpcMessage();
m->m_type = code[1]; m->m_type = code[1];
LOG((CLOG_DEBUG "ipc read message: %d", code[0])); LOG((CLOG_DEBUG "ipc client proxy read: %d", code[0]));
switch (code[0]) { switch (code[0]) {
case kIpcCommand: case kIpcCommand:
m->m_data = parseCommand(); m->m_data = parseCommand();
@ -68,6 +68,33 @@ CIpcClientProxy::handleData(const CEvent&, void*)
} }
} }
void
CIpcClientProxy::send(const CIpcMessage& message)
{
LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type));
UInt8 code[1];
code[0] = message.m_type;
m_stream.write(code, 1);
switch (message.m_type) {
case kIpcLogLine: {
CString* s = (CString*)message.m_data;
UInt8 len[1];
len[0] = s->size();
m_stream.write(len, 1);
m_stream.write(s->c_str(), s->size());
}
break;
default:
LOG((CLOG_ERR "message not supported: %d", message.m_type));
break;
}
}
void* void*
CIpcClientProxy::parseCommand() CIpcClientProxy::parseCommand()
{ {

View File

@ -20,12 +20,16 @@
#include "CEvent.h" #include "CEvent.h"
class IStream; class IStream;
class CIpcMessage;
class CIpcClientProxy { class CIpcClientProxy {
public: public:
CIpcClientProxy(IStream& stream); CIpcClientProxy(IStream& stream);
virtual ~CIpcClientProxy(); virtual ~CIpcClientProxy();
//! Send a message to the client.
void send(const CIpcMessage& message);
//! Raised when the server receives a message from a client. //! Raised when the server receives a message from a client.
static CEvent::Type getMessageReceivedEvent(); static CEvent::Type getMessageReceivedEvent();
@ -34,8 +38,9 @@ private:
void* parseCommand(); void* parseCommand();
void disconnect(); void disconnect();
private: public:
IStream& m_stream; IStream& m_stream;
private:
static CEvent::Type s_messageReceivedEvent; static CEvent::Type s_messageReceivedEvent;
}; };

View File

@ -24,6 +24,7 @@
#include "CIpcClientProxy.h" #include "CIpcClientProxy.h"
#include "IStream.h" #include "IStream.h"
#include "IDataSocket.h" #include "IDataSocket.h"
#include "CIpcMessage.h"
CEvent::Type CIpcServer::s_clientConnectedEvent = CEvent::kUnknown; CEvent::Type CIpcServer::s_clientConnectedEvent = CEvent::kUnknown;
@ -71,3 +72,12 @@ CIpcServer::getClientConnectedEvent()
return EVENTQUEUE->registerTypeOnce( return EVENTQUEUE->registerTypeOnce(
s_clientConnectedEvent, "CIpcServer::clientConnected"); s_clientConnectedEvent, "CIpcServer::clientConnected");
} }
void
CIpcServer::send(const CIpcMessage& message)
{
CClientSet::iterator it;
for (it = m_clients.begin(); it != m_clients.end(); it++) {
(*it)->send(message);
}
}

View File

@ -23,6 +23,7 @@
class CEvent; class CEvent;
class CIpcClientProxy; class CIpcClientProxy;
class CIpcMessage;
//! IPC server for communication between daemon and GUI. //! IPC server for communication between daemon and GUI.
/*! /*!
@ -42,6 +43,9 @@ public:
//! Opens a TCP socket only allowing local connections. //! Opens a TCP socket only allowing local connections.
void listen(); void listen();
//! Send a message to all clients.
void send(const CIpcMessage& message);
//@} //@}
//! @name accessors //! @name accessors
//@{ //@{

View File

@ -16,11 +16,107 @@
*/ */
#include "CIpcServerProxy.h" #include "CIpcServerProxy.h"
#include "IStream.h"
#include "TMethodEventJob.h"
#include "CLog.h"
#include "CIpcMessage.h"
#include "Ipc.h"
CIpcServerProxy::CIpcServerProxy() CEvent::Type CIpcServerProxy::s_messageReceivedEvent = CEvent::kUnknown;
CIpcServerProxy::CIpcServerProxy(IStream& stream) :
m_stream(stream)
{ {
EVENTQUEUE->adoptHandler(m_stream.getInputReadyEvent(),
stream.getEventTarget(),
new TMethodEventJob<CIpcServerProxy>(
this, &CIpcServerProxy::handleData, nullptr));
} }
CIpcServerProxy::~CIpcServerProxy() CIpcServerProxy::~CIpcServerProxy()
{ {
EVENTQUEUE->removeHandler(m_stream.getInputReadyEvent(),
m_stream.getEventTarget());
}
void
CIpcServerProxy::handleData(const CEvent&, void*)
{
UInt8 code[1];
UInt32 n = m_stream.read(code, 1);
while (n != 0) {
CIpcMessage* m = new CIpcMessage();
m->m_type = code[1];
LOG((CLOG_DEBUG "ipc server proxy read: %d", code[0]));
switch (code[0]) {
case kIpcLogLine:
m->m_data = parseLogLine();
break;
default:
delete m;
disconnect();
return;
}
// event deletes data.
EVENTQUEUE->addEvent(CEvent(getMessageReceivedEvent(), this, m));
n = m_stream.read(code, 1);
}
}
void
CIpcServerProxy::send(const CIpcMessage& message)
{
LOG((CLOG_DEBUG "ipc server proxy write: %d", message.m_type));
UInt8 code[1];
code[0] = message.m_type;
m_stream.write(code, 1);
switch (message.m_type) {
case kIpcCommand: {
CString* s = (CString*)message.m_data;
UInt8 len[1];
len[0] = s->size();
m_stream.write(len, 1);
m_stream.write(s->c_str(), s->size());
}
break;
default:
LOG((CLOG_ERR "message not supported: %d", message.m_type));
break;
}
}
void*
CIpcServerProxy::parseLogLine()
{
UInt8 len[1];
m_stream.read(len, 1);
UInt8* buffer = new UInt8[len[0]];
m_stream.read(buffer, len[0]);
return new CString((const char*)buffer, len[0]);
}
void
CIpcServerProxy::disconnect()
{
LOG((CLOG_NOTE "disconnect, closing stream"));
m_stream.close();
}
CEvent::Type
CIpcServerProxy::getMessageReceivedEvent()
{
return EVENTQUEUE->registerTypeOnce(
s_messageReceivedEvent, "CIpcServerProxy::messageReceived");
} }

View File

@ -17,8 +17,28 @@
#pragma once #pragma once
#include "CEvent.h"
class IStream;
class CIpcMessage;
class CIpcServerProxy { class CIpcServerProxy {
public: public:
CIpcServerProxy(); CIpcServerProxy(IStream& stream);
virtual ~CIpcServerProxy(); virtual ~CIpcServerProxy();
void send(const CIpcMessage& message);
//! Raised when the client receives a message from the server.
static CEvent::Type getMessageReceivedEvent();
private:
void handleData(const CEvent&, void*);
void* parseLogLine();
void disconnect();
private:
IStream& m_stream;
static CEvent::Type s_messageReceivedEvent;
}; };

View File

@ -21,6 +21,6 @@
#define IPC_PORT 24801 #define IPC_PORT 24801
enum EIpcMessage { enum EIpcMessage {
kIpcLog, kIpcLogLine,
kIpcCommand kIpcCommand
}; };

View File

@ -16,6 +16,10 @@
*/ */
#include <gtest/gtest.h> #include <gtest/gtest.h>
#define TEST_ENV
#include "Global.h"
#include "CIpcServer.h" #include "CIpcServer.h"
#include "CIpcClient.h" #include "CIpcClient.h"
#include "CSocketMultiplexer.h" #include "CSocketMultiplexer.h"
@ -28,6 +32,8 @@
#include "CIpcClientProxy.h" #include "CIpcClientProxy.h"
#include "Ipc.h" #include "Ipc.h"
#include "CString.h" #include "CString.h"
#include "CIpcServerProxy.h"
#include "CIpcMessage.h"
class CIpcTests : public ::testing::Test class CIpcTests : public ::testing::Test
{ {
@ -35,9 +41,11 @@ public:
CIpcTests(); CIpcTests();
virtual ~CIpcTests(); virtual ~CIpcTests();
void connectToServer_handleClientConnected(const CEvent&, void* vclient); void connectToServer_handleClientConnected(const CEvent&, void*);
void sendMessageToServer_handleClientConnected(const CEvent&, void* vclient); void sendMessageToServer_handleClientConnected(const CEvent&, void*);
void sendMessageToServer_handleMessageReceived(const CEvent&, void* vclient); void sendMessageToServer_handleMessageReceived(const CEvent&, void*);
void sendMessageToClient_handleConnected(const CEvent&, void*);
void sendMessageToClient_handleMessageReceived(const CEvent&, void*);
void handleQuitTimeout(const CEvent&, void* vclient); void handleQuitTimeout(const CEvent&, void* vclient);
void raiseQuitEvent(); void raiseQuitEvent();
void quitTimeout(double timeout); void quitTimeout(double timeout);
@ -50,6 +58,9 @@ public:
CEventQueue m_events; CEventQueue m_events;
bool m_connectToServer_clientConnected; bool m_connectToServer_clientConnected;
CString m_sendMessageToServer_receivedString; CString m_sendMessageToServer_receivedString;
CString m_sendMessageToClient_receivedString;
CIpcServer* m_sendMessageToClient_server;
CIpcServerProxy* m_sendMessageToClient_serverProxy;
}; };
TEST_F(CIpcTests, connectToServer) TEST_F(CIpcTests, connectToServer)
@ -95,8 +106,32 @@ TEST_F(CIpcTests, sendMessageToServer)
EXPECT_EQ("test", m_sendMessageToServer_receivedString); EXPECT_EQ("test", m_sendMessageToServer_receivedString);
} }
TEST_F(CIpcTests, sendMessageToClient)
{
CIpcServer server;
server.listen();
m_sendMessageToClient_server = &server;
CIpcClient client;
client.connect();
m_sendMessageToClient_serverProxy = client.m_server;
// event handler sends "test" log line to client.
m_events.adoptHandler(
CIpcServer::getClientConnectedEvent(), &server,
new TMethodEventJob<CIpcTests>(
this, &CIpcTests::sendMessageToClient_handleConnected));
quitTimeout(2);
m_events.loop();
EXPECT_EQ("test", m_sendMessageToClient_receivedString);
}
CIpcTests::CIpcTests() : CIpcTests::CIpcTests() :
m_connectToServer_clientConnected(false) m_connectToServer_clientConnected(false),
m_sendMessageToClient_server(nullptr),
m_sendMessageToClient_serverProxy(nullptr)
{ {
} }
@ -128,6 +163,28 @@ CIpcTests::sendMessageToServer_handleMessageReceived(const CEvent& e, void*)
raiseQuitEvent(); raiseQuitEvent();
} }
void
CIpcTests::sendMessageToClient_handleConnected(const CEvent& e, void*)
{
m_events.adoptHandler(
CIpcServerProxy::getMessageReceivedEvent(), m_sendMessageToClient_serverProxy,
new TMethodEventJob<CIpcTests>(
this, &CIpcTests::sendMessageToClient_handleMessageReceived));
CIpcMessage m;
m.m_type = kIpcLogLine;
m.m_data = (void*)(new CString("test"));
m_sendMessageToClient_server->send(m);
}
void
CIpcTests::sendMessageToClient_handleMessageReceived(const CEvent& e, void*)
{
CIpcMessage* m = (CIpcMessage*)e.getData();
m_sendMessageToClient_receivedString = *((CString*)m->m_data);
raiseQuitEvent();
}
void void
CIpcTests::raiseQuitEvent() CIpcTests::raiseQuitEvent()
{ {