implemented ipc message buffering (dequeues on gui reconnect)

This commit is contained in:
Nick Bolton 2012-07-06 12:27:22 +00:00
parent b921d9c916
commit 12eb8efb61
10 changed files with 157 additions and 21 deletions

View File

@ -24,24 +24,54 @@
#include "CProtocolUtil.h" #include "CProtocolUtil.h"
CEvent::Type CIpcClientProxy::s_messageReceivedEvent = CEvent::kUnknown; CEvent::Type CIpcClientProxy::s_messageReceivedEvent = CEvent::kUnknown;
CEvent::Type CIpcClientProxy::s_disconnectedEvent = CEvent::kUnknown;
CIpcClientProxy::CIpcClientProxy(synergy::IStream& stream) : CIpcClientProxy::CIpcClientProxy(synergy::IStream& stream) :
m_stream(stream), m_stream(stream),
m_enableLog(false), m_enableLog(false),
m_clientType(kIpcClientUnknown) m_clientType(kIpcClientUnknown),
m_disconnecting(false)
{ {
EVENTQUEUE->adoptHandler( EVENTQUEUE->adoptHandler(
m_stream.getInputReadyEvent(), stream.getEventTarget(), m_stream.getInputReadyEvent(), stream.getEventTarget(),
new TMethodEventJob<CIpcClientProxy>( new TMethodEventJob<CIpcClientProxy>(
this, &CIpcClientProxy::handleData, nullptr)); this, &CIpcClientProxy::handleData));
EVENTQUEUE->adoptHandler(
m_stream.getInputShutdownEvent(), stream.getEventTarget(),
new TMethodEventJob<CIpcClientProxy>(
this, &CIpcClientProxy::handleDisconnect));
EVENTQUEUE->adoptHandler(
m_stream.getOutputShutdownEvent(), stream.getEventTarget(),
new TMethodEventJob<CIpcClientProxy>(
this, &CIpcClientProxy::handleWriteError));
} }
CIpcClientProxy::~CIpcClientProxy() CIpcClientProxy::~CIpcClientProxy()
{ {
EVENTQUEUE->removeHandler( EVENTQUEUE->removeHandler(
m_stream.getInputReadyEvent(), m_stream.getEventTarget()); m_stream.getInputReadyEvent(), m_stream.getEventTarget());
EVENTQUEUE->removeHandler(
m_stream.getInputShutdownEvent(), m_stream.getEventTarget());
EVENTQUEUE->removeHandler(
m_stream.getOutputShutdownEvent(), m_stream.getEventTarget());
m_stream.close(); delete &m_stream;
}
void
CIpcClientProxy::handleDisconnect(const CEvent&, void*)
{
disconnect();
LOG((CLOG_DEBUG "ipc client disconnected"));
}
void
CIpcClientProxy::handleWriteError(const CEvent&, void*)
{
disconnect();
LOG((CLOG_DEBUG "ipc client write error"));
} }
void void
@ -54,6 +84,7 @@ CIpcClientProxy::handleData(const CEvent&, void*)
CIpcMessage* m = new CIpcMessage(); CIpcMessage* m = new CIpcMessage();
m->m_type = type; m->m_type = type;
m->m_source = this;
if (m_enableLog) { if (m_enableLog) {
LOG((CLOG_DEBUG "ipc client proxy read: %d", code[0])); LOG((CLOG_DEBUG "ipc client proxy read: %d", code[0]));
@ -140,10 +171,8 @@ CIpcClientProxy::parseCommand()
void void
CIpcClientProxy::disconnect() CIpcClientProxy::disconnect()
{ {
if (m_enableLog) { m_disconnecting = true;
LOG((CLOG_NOTE "disconnect, closing stream")); EVENTQUEUE->addEvent(CEvent(getDisconnectedEvent(), this));
}
m_stream.close();
} }
CEvent::Type CEvent::Type
@ -152,3 +181,10 @@ CIpcClientProxy::getMessageReceivedEvent()
return EVENTQUEUE->registerTypeOnce( return EVENTQUEUE->registerTypeOnce(
s_messageReceivedEvent, "CIpcClientProxy::messageReceived"); s_messageReceivedEvent, "CIpcClientProxy::messageReceived");
} }
CEvent::Type
CIpcClientProxy::getDisconnectedEvent()
{
return EVENTQUEUE->registerTypeOnce(
s_disconnectedEvent, "CIpcClientProxy::disconnected");
}

View File

@ -34,8 +34,13 @@ public:
//! Raised when the server receives a message from a client. //! Raised when the server receives a message from a client.
static CEvent::Type getMessageReceivedEvent(); static CEvent::Type getMessageReceivedEvent();
//! Raised when the client disconnects from the server.
static CEvent::Type getDisconnectedEvent();
private: private:
void handleData(const CEvent&, void*); void handleData(const CEvent&, void*);
void handleDisconnect(const CEvent&, void*);
void handleWriteError(const CEvent&, void*);
void parseHello(); void parseHello();
void* parseCommand(); void* parseCommand();
void disconnect(); void disconnect();
@ -44,7 +49,9 @@ public:
synergy::IStream& m_stream; synergy::IStream& m_stream;
bool m_enableLog; bool m_enableLog;
EIpcClientType m_clientType; EIpcClientType m_clientType;
bool m_disconnecting;
private: private:
static CEvent::Type s_messageReceivedEvent; static CEvent::Type s_messageReceivedEvent;
static CEvent::Type s_disconnectedEvent;
}; };

View File

@ -19,6 +19,10 @@
#include "CIpcServer.h" #include "CIpcServer.h"
#include "CIpcMessage.h" #include "CIpcMessage.h"
#include "Ipc.h" #include "Ipc.h"
#include "CEvent.h"
#include "CEventQueue.h"
#include "TMethodEventJob.h"
#include "CIpcClientProxy.h"
CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) : CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) :
m_ipcServer(ipcServer) m_ipcServer(ipcServer)
@ -29,6 +33,20 @@ CIpcLogOutputter::~CIpcLogOutputter()
{ {
} }
void
CIpcLogOutputter::sendBuffer(CIpcClientProxy& proxy)
{
while (m_buffer.size() != 0) {
CString text = m_buffer.front();
m_buffer.pop();
CIpcMessage message;
message.m_type = kIpcLogLine;
message.m_data = new CString(text);
proxy.send(message);
}
}
void void
CIpcLogOutputter::open(const char* title) CIpcLogOutputter::open(const char* title)
{ {
@ -45,11 +63,17 @@ CIpcLogOutputter::show(bool showIfEmpty)
} }
bool bool
CIpcLogOutputter::write(ELevel level, const char* msg) CIpcLogOutputter::write(ELevel level, const char* text)
{ {
if (m_ipcServer.hasClients(kIpcClientGui)) {
CIpcMessage message; CIpcMessage message;
message.m_type = kIpcLogLine; message.m_type = kIpcLogLine;
message.m_data = new CString(msg); message.m_data = new CString(text);
m_ipcServer.send(message, kIpcClientGui); m_ipcServer.send(message, kIpcClientGui);
}
else {
m_buffer.push(text);
}
return true; return true;
} }

View File

@ -18,8 +18,11 @@
#pragma once #pragma once
#include "ILogOutputter.h" #include "ILogOutputter.h"
#include <queue>
class CIpcServer; class CIpcServer;
class CEvent;
class CIpcClientProxy;
//! Write log to GUI over IPC //! Write log to GUI over IPC
/*! /*!
@ -36,6 +39,12 @@ public:
virtual void show(bool showIfEmpty); virtual void show(bool showIfEmpty);
virtual bool write(ELevel level, const char* message); virtual bool write(ELevel level, const char* message);
//! Sends messages queued while no clients were connected.
void sendBuffer(CIpcClientProxy& proxy);
private: private:
typedef std::queue<CString> CIpcLogQueue;
CIpcServer& m_ipcServer; CIpcServer& m_ipcServer;
CIpcLogQueue m_buffer;
}; };

View File

@ -19,7 +19,8 @@
CIpcMessage::CIpcMessage() : CIpcMessage::CIpcMessage() :
m_type(0), m_type(0),
m_data(nullptr) m_data(nullptr),
m_source(nullptr)
{ {
} }

View File

@ -26,4 +26,5 @@ public:
UInt8 m_type; UInt8 m_type;
void* m_data; void* m_data;
void* m_source;
}; };

View File

@ -34,7 +34,7 @@ m_address(CNetworkAddress(IPC_HOST, IPC_PORT))
m_address.resolve(); m_address.resolve();
EVENTQUEUE->adoptHandler( EVENTQUEUE->adoptHandler(
m_socket.getConnectingEvent(), &m_socket, IListenSocket::getConnectingEvent(), &m_socket,
new TMethodEventJob<CIpcServer>( new TMethodEventJob<CIpcServer>(
this, &CIpcServer::handleClientConnecting)); this, &CIpcServer::handleClientConnecting));
} }
@ -65,12 +65,50 @@ CIpcServer::handleClientConnecting(const CEvent&, void*)
} }
LOG((CLOG_DEBUG "accepted ipc client connection")); LOG((CLOG_DEBUG "accepted ipc client connection"));
// TODO: delete on disconnect
CIpcClientProxy* proxy = new CIpcClientProxy(*stream); CIpcClientProxy* proxy = new CIpcClientProxy(*stream);
m_clients.insert(proxy); m_clients.insert(proxy);
EVENTQUEUE->addEvent(CEvent(getClientConnectedEvent(), this, proxy, CEvent::kDontFreeData)); EVENTQUEUE->adoptHandler(
CIpcClientProxy::getDisconnectedEvent(), proxy,
new TMethodEventJob<CIpcServer>(
this, &CIpcServer::handleClientDisconnected));
EVENTQUEUE->addEvent(CEvent(
getClientConnectedEvent(), this, proxy, CEvent::kDontFreeData));
}
void
CIpcServer::handleClientDisconnected(const CEvent& e, void*)
{
CIpcClientProxy* proxy = static_cast<CIpcClientProxy*>(e.getTarget());
EVENTQUEUE->removeHandler(
CIpcClientProxy::getDisconnectedEvent(), proxy);
CClientSet::iterator& it = m_clients.find(proxy);
delete proxy;
m_clients.erase(it);
LOG((CLOG_DEBUG "ipc client proxy removed, connected=%d", m_clients.size()));
}
bool
CIpcServer::hasClients(EIpcClientType clientType) const
{
if (m_clients.size() == 0) {
return false;
}
CClientSet::iterator it;
for (it = m_clients.begin(); it != m_clients.end(); it++) {
// at least one client is alive and type matches, there are clients.
CIpcClientProxy* p = *it;
if (!p->m_disconnecting && p->m_clientType == clientType) {
return true;
}
}
// all clients must be disconnecting, no active clients.
return false;
} }
CEvent::Type CEvent::Type

View File

@ -51,6 +51,9 @@ public:
//! @name accessors //! @name accessors
//@{ //@{
//! Returns true when there are clients of the specified type connected.
bool hasClients(EIpcClientType clientType) const;
//! Raised when we have created the client proxy. //! Raised when we have created the client proxy.
static CEvent::Type getClientConnectedEvent(); static CEvent::Type getClientConnectedEvent();
@ -58,6 +61,8 @@ public:
private: private:
void handleClientConnecting(const CEvent&, void*); void handleClientConnecting(const CEvent&, void*);
void handleClientDisconnected(const CEvent&, void*);
void handleClientMessage(const CEvent&, void*);
private: private:
typedef std::set<CIpcClientProxy*> CClientSet; typedef std::set<CIpcClientProxy*> CClientSet;

View File

@ -288,7 +288,10 @@ CMSWindowsRelauncher::mainLoop(void*)
std::string cmd = command(); std::string cmd = command();
if (cmd == "") { if (cmd == "") {
LOG((CLOG_WARN "nothing to launch, no command specified.")); // this appears on first launch when the user hasn't configured
// anything yet, so don't show it as a warning, only show it as
// debug to devs to let them know why nothing happened.
LOG((CLOG_DEBUG "nothing to launch, no command specified."));
continue; continue;
} }

View File

@ -310,7 +310,19 @@ CDaemonApp::handleIpcMessage(const CEvent& e, void*)
// relauncher to stop the existing command and start the new // relauncher to stop the existing command and start the new
// command. // command.
m_relauncher->command(command); m_relauncher->command(command);
break;
}
case kIpcHello: {
if (m.m_source != nullptr) {
CIpcClientProxy& proxy = *static_cast<CIpcClientProxy*>(m.m_source);
if (proxy.m_clientType == kIpcClientGui) {
// when a new gui client connects, send them all the
// log messages queued up while they were gone.
m_ipcLogOutputter->sendBuffer(proxy);
}
} }
break; break;
} }
} }
}