finished ipc send to server.

This commit is contained in:
Nick Bolton 2012-07-02 13:45:52 +00:00
parent 05775bc73d
commit 97b2cd669d
11 changed files with 244 additions and 51 deletions

View File

@ -33,3 +33,24 @@ CIpcClient::connect()
{ {
m_socket.connect(m_serverAddress); m_socket.connect(m_serverAddress);
} }
void
CIpcClient::send(const CIpcMessage& message)
{
UInt8 code[1];
code[0] = message.m_type;
m_socket.write(code, 1);
switch (message.m_type) {
case kIpcCommand: {
CString* s = (CString*)message.m_data;
UInt8 len[1];
len[0] = s->size();
m_socket.write(len, 1);
m_socket.write(s->c_str(), s->size());
}
break;
}
}

View File

@ -19,6 +19,7 @@
#include "CNetworkAddress.h" #include "CNetworkAddress.h"
#include "CTCPSocket.h" #include "CTCPSocket.h"
#include "CIpcMessage.h"
//! IPC client for communication between daemon and GUI. //! IPC client for communication between daemon and GUI.
/*! /*!
@ -30,7 +31,10 @@ public:
virtual ~CIpcClient(); virtual ~CIpcClient();
//! Connects to the IPC server at localhost. //! Connects to the IPC server at localhost.
void connect(); void connect();
//! Sends a message to the server.
void send(const CIpcMessage& message);
private: private:
CNetworkAddress m_serverAddress; CNetworkAddress m_serverAddress;

View File

@ -17,12 +17,79 @@
#include "CIpcClientProxy.h" #include "CIpcClientProxy.h"
#include "IStream.h" #include "IStream.h"
#include "TMethodEventJob.h"
#include "Ipc.h"
#include "CLog.h"
#include "CIpcMessage.h"
CEvent::Type CIpcClientProxy::s_messageReceivedEvent = CEvent::kUnknown;
CIpcClientProxy::CIpcClientProxy(IStream& stream) : CIpcClientProxy::CIpcClientProxy(IStream& stream) :
m_stream(stream) m_stream(stream)
{ {
EVENTQUEUE->adoptHandler(m_stream.getInputReadyEvent(),
stream.getEventTarget(),
new TMethodEventJob<CIpcClientProxy>(
this, &CIpcClientProxy::handleData, nullptr));
} }
CIpcClientProxy::~CIpcClientProxy() CIpcClientProxy::~CIpcClientProxy()
{ {
EVENTQUEUE->removeHandler(m_stream.getInputReadyEvent(),
m_stream.getEventTarget());
}
void
CIpcClientProxy::handleData(const CEvent&, void*)
{
UInt8 code[1];
UInt32 n = m_stream.read(code, 1);
while (n != 0) {
CIpcMessage* m = new CIpcMessage();
m->m_type = code[1];
LOG((CLOG_DEBUG "ipc read message: %d", code[0]));
switch (code[0]) {
case kIpcCommand:
m->m_data = parseCommand();
break;
default:
delete m;
disconnect();
return;
}
// event deletes data.
EVENTQUEUE->addEvent(CEvent(getMessageReceivedEvent(), this, m));
n = m_stream.read(code, 1);
}
}
void*
CIpcClientProxy::parseCommand()
{
UInt8 len[1];
m_stream.read(len, 1);
UInt8* buffer = new UInt8[len[0]];
m_stream.read(buffer, len[0]);
return new CString((const char*)buffer, len[0]);
}
void
CIpcClientProxy::disconnect()
{
LOG((CLOG_NOTE "disconnect, closing stream"));
m_stream.close();
}
CEvent::Type
CIpcClientProxy::getMessageReceivedEvent()
{
return EVENTQUEUE->registerTypeOnce(
s_messageReceivedEvent, "CIpcClientProxy::messageReceived");
} }

View File

@ -17,6 +17,8 @@
#pragma once #pragma once
#include "CEvent.h"
class IStream; class IStream;
class CIpcClientProxy { class CIpcClientProxy {
@ -24,6 +26,16 @@ public:
CIpcClientProxy(IStream& stream); CIpcClientProxy(IStream& stream);
virtual ~CIpcClientProxy(); virtual ~CIpcClientProxy();
//! Raised when the server receives a message from a client.
static CEvent::Type getMessageReceivedEvent();
private:
void handleData(const CEvent&, void*);
void* parseCommand();
void disconnect();
private: private:
IStream& m_stream; IStream& m_stream;
static CEvent::Type s_messageReceivedEvent;
}; };

View File

@ -0,0 +1,27 @@
/*
* synergy -- mouse and keyboard sharing utility
* Copyright (C) 2012 Nick Bolton
*
* This package is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* found in the file COPYING that should have accompanied this file.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "CIpcMessage.h"
CIpcMessage::CIpcMessage()
{
}
CIpcMessage::~CIpcMessage()
{
delete m_data;
}

29
src/lib/ipc/CIpcMessage.h Normal file
View File

@ -0,0 +1,29 @@
/*
* synergy -- mouse and keyboard sharing utility
* Copyright (C) 2012 Nick Bolton
*
* This package is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* found in the file COPYING that should have accompanied this file.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "BasicTypes.h"
class CIpcMessage {
public:
CIpcMessage();
virtual ~CIpcMessage();
UInt8 m_type;
void* m_data;
};

View File

@ -35,7 +35,7 @@ m_address(CNetworkAddress(IPC_HOST, IPC_PORT))
EVENTQUEUE->adoptHandler( EVENTQUEUE->adoptHandler(
IListenSocket::getConnectingEvent(), &m_socket, IListenSocket::getConnectingEvent(), &m_socket,
new TMethodEventJob<CIpcServer>( new TMethodEventJob<CIpcServer>(
this, &CIpcServer::handleClientConnecting)); this, &CIpcServer::handleClientConnecting));
} }
CIpcServer::~CIpcServer() CIpcServer::~CIpcServer()

View File

@ -46,7 +46,7 @@ public:
//! @name accessors //! @name accessors
//@{ //@{
//! This event is raised when we have created the client proxy. //! Raised when we have created the client proxy.
static CEvent::Type getClientConnectedEvent(); static CEvent::Type getClientConnectedEvent();
//@} //@}

View File

@ -19,6 +19,7 @@ set(inc
CIpcClient.h CIpcClient.h
CIpcServerProxy.h CIpcServerProxy.h
CIpcClientProxy.h CIpcClientProxy.h
CIpcMessage.h
) )
set(src set(src
@ -26,6 +27,7 @@ set(src
CIpcClient.cpp CIpcClient.cpp
CIpcServerProxy.cpp CIpcServerProxy.cpp
CIpcClientProxy.cpp CIpcClientProxy.cpp
CIpcMessage.cpp
) )
if (WIN32) if (WIN32)

View File

@ -19,3 +19,8 @@
#define IPC_HOST "127.0.0.1" #define IPC_HOST "127.0.0.1"
#define IPC_PORT 24801 #define IPC_PORT 24801
enum EIpcMessage {
kIpcLog,
kIpcCommand
};

View File

@ -25,79 +25,107 @@
#include "TMethodJob.h" #include "TMethodJob.h"
#include "CArch.h" #include "CArch.h"
#include "CLog.h" #include "CLog.h"
#include "CIpcClientProxy.h"
#include "Ipc.h"
#include "CString.h"
class CIpcTests : public ::testing::Test class CIpcTests : public ::testing::Test
{ {
public: public:
CIpcTests(); CIpcTests();
virtual ~CIpcTests(); virtual ~CIpcTests();
void handleClientConnected(const CEvent&, void* vclient);
void raiseQuitEvent(); void connectToServer_handleClientConnected(const CEvent&, void* vclient);
void sendMessageToServer_handleClientConnected(const CEvent&, void* vclient);
void sendMessageToServer_handleMessageReceived(const CEvent&, void* vclient);
void handleQuitTimeout(const CEvent&, void* vclient);
void raiseQuitEvent();
void quitTimeout(double timeout);
private: private:
void timeoutThread(void*); void timeoutThread(void*);
public: public:
bool m_quitOnClientConnect; CSocketMultiplexer m_multiplexer;
bool m_clientConnected; CEventQueue m_events;
bool m_timeoutCheck; bool m_connectToServer_clientConnected;
double m_timeout; CString m_sendMessageToServer_receivedString;
private:
CThread* m_timeoutThread;
}; };
TEST_F(CIpcTests, connectToServer) TEST_F(CIpcTests, connectToServer)
{ {
m_quitOnClientConnect = true;
CSocketMultiplexer multiplexer;
CEventQueue events;
CIpcServer server; CIpcServer server;
server.listen(); server.listen();
events.adoptHandler( m_events.adoptHandler(
CIpcServer::getClientConnectedEvent(), &server, CIpcServer::getClientConnectedEvent(), &server,
new TMethodEventJob<CIpcTests>( new TMethodEventJob<CIpcTests>(
this, &CIpcTests::handleClientConnected)); this, &CIpcTests::connectToServer_handleClientConnected));
CIpcClient client; CIpcClient client;
client.connect(); client.connect();
quitTimeout(2);
m_events.loop();
m_timeoutCheck = true; EXPECT_EQ(true, m_connectToServer_clientConnected);
m_timeout = ARCH->time() + 5; // 5 sec timeout. }
events.loop();
EXPECT_EQ(true, m_clientConnected); TEST_F(CIpcTests, sendMessageToServer)
{
CIpcServer server;
server.listen();
CIpcClient client;
client.connect();
m_events.adoptHandler(
CIpcServer::getClientConnectedEvent(), &server,
new TMethodEventJob<CIpcTests>(
this, &CIpcTests::sendMessageToServer_handleClientConnected));
CIpcMessage m;
m.m_type = kIpcCommand;
m.m_data = (void*)(new CString("test"));
client.send(m);
quitTimeout(2);
m_events.loop();
EXPECT_EQ("test", m_sendMessageToServer_receivedString);
} }
CIpcTests::CIpcTests() : CIpcTests::CIpcTests() :
m_timeoutThread(nullptr), m_connectToServer_clientConnected(false)
m_quitOnClientConnect(false),
m_clientConnected(false),
m_timeoutCheck(false),
m_timeout(0)
{ {
m_timeoutThread = new CThread(
new TMethodJob<CIpcTests>(
this, &CIpcTests::timeoutThread, nullptr));
} }
CIpcTests::~CIpcTests() CIpcTests::~CIpcTests()
{ {
delete m_timeoutThread;
} }
void
CIpcTests::connectToServer_handleClientConnected(const CEvent&, void*)
{
m_connectToServer_clientConnected = true;
raiseQuitEvent();
}
void void
CIpcTests::handleClientConnected(const CEvent&, void* vclient) CIpcTests::sendMessageToServer_handleClientConnected(const CEvent& e, void*)
{ {
m_clientConnected = true; m_events.adoptHandler(
CIpcClientProxy::getMessageReceivedEvent(), e.getData(),
new TMethodEventJob<CIpcTests>(
this, &CIpcTests::sendMessageToServer_handleMessageReceived));
}
if (m_quitOnClientConnect) { void
raiseQuitEvent(); CIpcTests::sendMessageToServer_handleMessageReceived(const CEvent& e, void*)
} {
CIpcMessage* m = (CIpcMessage*)e.getData();
m_sendMessageToServer_receivedString = *((CString*)m->m_data);
raiseQuitEvent();
} }
void void
@ -107,18 +135,16 @@ CIpcTests::raiseQuitEvent()
} }
void void
CIpcTests::timeoutThread(void*) CIpcTests::quitTimeout(double timeout)
{ {
while (true) { CEventQueueTimer* timer = EVENTQUEUE->newOneShotTimer(timeout, NULL);
if (!m_timeoutCheck) { EVENTQUEUE->adoptHandler(CEvent::kTimer, timer,
ARCH->sleep(1); new TMethodEventJob<CIpcTests>(this, &CIpcTests::handleQuitTimeout, timer));
continue; }
}
if (ARCH->time() > m_timeout) { void
LOG((CLOG_ERR "timeout")); CIpcTests::handleQuitTimeout(const CEvent&, void* vclient)
raiseQuitEvent(); {
m_timeoutCheck = false; LOG((CLOG_ERR "timeout"));
} raiseQuitEvent();
} }
}