From 12eb8efb61a4832e0cebe0e242607c1c24c09a2b Mon Sep 17 00:00:00 2001 From: Nick Bolton Date: Fri, 6 Jul 2012 12:27:22 +0000 Subject: [PATCH] implemented ipc message buffering (dequeues on gui reconnect) --- src/lib/ipc/CIpcClientProxy.cpp | 52 +++++++++++++++++++---- src/lib/ipc/CIpcClientProxy.h | 7 +++ src/lib/ipc/CIpcLogOutputter.cpp | 34 ++++++++++++--- src/lib/ipc/CIpcLogOutputter.h | 9 ++++ src/lib/ipc/CIpcMessage.cpp | 3 +- src/lib/ipc/CIpcMessage.h | 1 + src/lib/ipc/CIpcServer.cpp | 46 ++++++++++++++++++-- src/lib/ipc/CIpcServer.h | 7 ++- src/lib/platform/CMSWindowsRelauncher.cpp | 5 ++- src/lib/synergy/CDaemonApp.cpp | 14 +++++- 10 files changed, 157 insertions(+), 21 deletions(-) diff --git a/src/lib/ipc/CIpcClientProxy.cpp b/src/lib/ipc/CIpcClientProxy.cpp index 42e1e780..73915035 100644 --- a/src/lib/ipc/CIpcClientProxy.cpp +++ b/src/lib/ipc/CIpcClientProxy.cpp @@ -24,24 +24,54 @@ #include "CProtocolUtil.h" CEvent::Type CIpcClientProxy::s_messageReceivedEvent = CEvent::kUnknown; +CEvent::Type CIpcClientProxy::s_disconnectedEvent = CEvent::kUnknown; CIpcClientProxy::CIpcClientProxy(synergy::IStream& stream) : m_stream(stream), m_enableLog(false), -m_clientType(kIpcClientUnknown) +m_clientType(kIpcClientUnknown), +m_disconnecting(false) { EVENTQUEUE->adoptHandler( m_stream.getInputReadyEvent(), stream.getEventTarget(), new TMethodEventJob( - this, &CIpcClientProxy::handleData, nullptr)); + this, &CIpcClientProxy::handleData)); + + EVENTQUEUE->adoptHandler( + m_stream.getInputShutdownEvent(), stream.getEventTarget(), + new TMethodEventJob( + this, &CIpcClientProxy::handleDisconnect)); + + EVENTQUEUE->adoptHandler( + m_stream.getOutputShutdownEvent(), stream.getEventTarget(), + new TMethodEventJob( + this, &CIpcClientProxy::handleWriteError)); } CIpcClientProxy::~CIpcClientProxy() { EVENTQUEUE->removeHandler( m_stream.getInputReadyEvent(), m_stream.getEventTarget()); - - m_stream.close(); + EVENTQUEUE->removeHandler( + m_stream.getInputShutdownEvent(), m_stream.getEventTarget()); + EVENTQUEUE->removeHandler( + m_stream.getOutputShutdownEvent(), m_stream.getEventTarget()); + + delete &m_stream; +} + +void +CIpcClientProxy::handleDisconnect(const CEvent&, void*) +{ + disconnect(); + LOG((CLOG_DEBUG "ipc client disconnected")); +} + +void +CIpcClientProxy::handleWriteError(const CEvent&, void*) +{ + disconnect(); + LOG((CLOG_DEBUG "ipc client write error")); } void @@ -54,6 +84,7 @@ CIpcClientProxy::handleData(const CEvent&, void*) CIpcMessage* m = new CIpcMessage(); m->m_type = type; + m->m_source = this; if (m_enableLog) { LOG((CLOG_DEBUG "ipc client proxy read: %d", code[0])); @@ -140,10 +171,8 @@ CIpcClientProxy::parseCommand() void CIpcClientProxy::disconnect() { - if (m_enableLog) { - LOG((CLOG_NOTE "disconnect, closing stream")); - } - m_stream.close(); + m_disconnecting = true; + EVENTQUEUE->addEvent(CEvent(getDisconnectedEvent(), this)); } CEvent::Type @@ -152,3 +181,10 @@ CIpcClientProxy::getMessageReceivedEvent() return EVENTQUEUE->registerTypeOnce( s_messageReceivedEvent, "CIpcClientProxy::messageReceived"); } + +CEvent::Type +CIpcClientProxy::getDisconnectedEvent() +{ + return EVENTQUEUE->registerTypeOnce( + s_disconnectedEvent, "CIpcClientProxy::disconnected"); +} diff --git a/src/lib/ipc/CIpcClientProxy.h b/src/lib/ipc/CIpcClientProxy.h index 5e76e4bb..d1866b9a 100644 --- a/src/lib/ipc/CIpcClientProxy.h +++ b/src/lib/ipc/CIpcClientProxy.h @@ -34,8 +34,13 @@ public: //! Raised when the server receives a message from a client. static CEvent::Type getMessageReceivedEvent(); + //! Raised when the client disconnects from the server. + static CEvent::Type getDisconnectedEvent(); + private: void handleData(const CEvent&, void*); + void handleDisconnect(const CEvent&, void*); + void handleWriteError(const CEvent&, void*); void parseHello(); void* parseCommand(); void disconnect(); @@ -44,7 +49,9 @@ public: synergy::IStream& m_stream; bool m_enableLog; EIpcClientType m_clientType; + bool m_disconnecting; private: static CEvent::Type s_messageReceivedEvent; + static CEvent::Type s_disconnectedEvent; }; diff --git a/src/lib/ipc/CIpcLogOutputter.cpp b/src/lib/ipc/CIpcLogOutputter.cpp index fd665c13..705b44d6 100644 --- a/src/lib/ipc/CIpcLogOutputter.cpp +++ b/src/lib/ipc/CIpcLogOutputter.cpp @@ -19,6 +19,10 @@ #include "CIpcServer.h" #include "CIpcMessage.h" #include "Ipc.h" +#include "CEvent.h" +#include "CEventQueue.h" +#include "TMethodEventJob.h" +#include "CIpcClientProxy.h" CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) : m_ipcServer(ipcServer) @@ -29,6 +33,20 @@ CIpcLogOutputter::~CIpcLogOutputter() { } +void +CIpcLogOutputter::sendBuffer(CIpcClientProxy& proxy) +{ + while (m_buffer.size() != 0) { + CString text = m_buffer.front(); + m_buffer.pop(); + + CIpcMessage message; + message.m_type = kIpcLogLine; + message.m_data = new CString(text); + proxy.send(message); + } +} + void CIpcLogOutputter::open(const char* title) { @@ -45,11 +63,17 @@ CIpcLogOutputter::show(bool showIfEmpty) } bool -CIpcLogOutputter::write(ELevel level, const char* msg) +CIpcLogOutputter::write(ELevel level, const char* text) { - CIpcMessage message; - message.m_type = kIpcLogLine; - message.m_data = new CString(msg); - m_ipcServer.send(message, kIpcClientGui); + if (m_ipcServer.hasClients(kIpcClientGui)) { + CIpcMessage message; + message.m_type = kIpcLogLine; + message.m_data = new CString(text); + m_ipcServer.send(message, kIpcClientGui); + } + else { + m_buffer.push(text); + } + return true; } diff --git a/src/lib/ipc/CIpcLogOutputter.h b/src/lib/ipc/CIpcLogOutputter.h index 34ca561c..a340b66c 100644 --- a/src/lib/ipc/CIpcLogOutputter.h +++ b/src/lib/ipc/CIpcLogOutputter.h @@ -18,8 +18,11 @@ #pragma once #include "ILogOutputter.h" +#include class CIpcServer; +class CEvent; +class CIpcClientProxy; //! Write log to GUI over IPC /*! @@ -36,6 +39,12 @@ public: virtual void show(bool showIfEmpty); virtual bool write(ELevel level, const char* message); + //! Sends messages queued while no clients were connected. + void sendBuffer(CIpcClientProxy& proxy); + private: + typedef std::queue CIpcLogQueue; + CIpcServer& m_ipcServer; + CIpcLogQueue m_buffer; }; diff --git a/src/lib/ipc/CIpcMessage.cpp b/src/lib/ipc/CIpcMessage.cpp index 2376d90f..d1cdc50f 100644 --- a/src/lib/ipc/CIpcMessage.cpp +++ b/src/lib/ipc/CIpcMessage.cpp @@ -19,7 +19,8 @@ CIpcMessage::CIpcMessage() : m_type(0), -m_data(nullptr) +m_data(nullptr), +m_source(nullptr) { } diff --git a/src/lib/ipc/CIpcMessage.h b/src/lib/ipc/CIpcMessage.h index 12a189ab..bcdea76a 100644 --- a/src/lib/ipc/CIpcMessage.h +++ b/src/lib/ipc/CIpcMessage.h @@ -26,4 +26,5 @@ public: UInt8 m_type; void* m_data; + void* m_source; }; diff --git a/src/lib/ipc/CIpcServer.cpp b/src/lib/ipc/CIpcServer.cpp index 8045f48c..4616a629 100644 --- a/src/lib/ipc/CIpcServer.cpp +++ b/src/lib/ipc/CIpcServer.cpp @@ -34,7 +34,7 @@ m_address(CNetworkAddress(IPC_HOST, IPC_PORT)) m_address.resolve(); EVENTQUEUE->adoptHandler( - m_socket.getConnectingEvent(), &m_socket, + IListenSocket::getConnectingEvent(), &m_socket, new TMethodEventJob( this, &CIpcServer::handleClientConnecting)); } @@ -65,12 +65,50 @@ CIpcServer::handleClientConnecting(const CEvent&, void*) } LOG((CLOG_DEBUG "accepted ipc client connection")); - - // TODO: delete on disconnect CIpcClientProxy* proxy = new CIpcClientProxy(*stream); m_clients.insert(proxy); - EVENTQUEUE->addEvent(CEvent(getClientConnectedEvent(), this, proxy, CEvent::kDontFreeData)); + EVENTQUEUE->adoptHandler( + CIpcClientProxy::getDisconnectedEvent(), proxy, + new TMethodEventJob( + this, &CIpcServer::handleClientDisconnected)); + + EVENTQUEUE->addEvent(CEvent( + getClientConnectedEvent(), this, proxy, CEvent::kDontFreeData)); +} + +void +CIpcServer::handleClientDisconnected(const CEvent& e, void*) +{ + CIpcClientProxy* proxy = static_cast(e.getTarget()); + + EVENTQUEUE->removeHandler( + CIpcClientProxy::getDisconnectedEvent(), proxy); + + CClientSet::iterator& it = m_clients.find(proxy); + delete proxy; + m_clients.erase(it); + LOG((CLOG_DEBUG "ipc client proxy removed, connected=%d", m_clients.size())); +} + +bool +CIpcServer::hasClients(EIpcClientType clientType) const +{ + if (m_clients.size() == 0) { + return false; + } + + CClientSet::iterator it; + for (it = m_clients.begin(); it != m_clients.end(); it++) { + // at least one client is alive and type matches, there are clients. + CIpcClientProxy* p = *it; + if (!p->m_disconnecting && p->m_clientType == clientType) { + return true; + } + } + + // all clients must be disconnecting, no active clients. + return false; } CEvent::Type diff --git a/src/lib/ipc/CIpcServer.h b/src/lib/ipc/CIpcServer.h index 19a49cfa..ff58e0ac 100644 --- a/src/lib/ipc/CIpcServer.h +++ b/src/lib/ipc/CIpcServer.h @@ -51,6 +51,9 @@ public: //! @name accessors //@{ + //! Returns true when there are clients of the specified type connected. + bool hasClients(EIpcClientType clientType) const; + //! Raised when we have created the client proxy. static CEvent::Type getClientConnectedEvent(); @@ -58,6 +61,8 @@ public: private: void handleClientConnecting(const CEvent&, void*); + void handleClientDisconnected(const CEvent&, void*); + void handleClientMessage(const CEvent&, void*); private: typedef std::set CClientSet; @@ -65,6 +70,6 @@ private: CTCPListenSocket m_socket; CNetworkAddress m_address; CClientSet m_clients; - + static CEvent::Type s_clientConnectedEvent; }; diff --git a/src/lib/platform/CMSWindowsRelauncher.cpp b/src/lib/platform/CMSWindowsRelauncher.cpp index 4f4ebf0c..04780798 100644 --- a/src/lib/platform/CMSWindowsRelauncher.cpp +++ b/src/lib/platform/CMSWindowsRelauncher.cpp @@ -288,7 +288,10 @@ CMSWindowsRelauncher::mainLoop(void*) std::string cmd = command(); if (cmd == "") { - LOG((CLOG_WARN "nothing to launch, no command specified.")); + // this appears on first launch when the user hasn't configured + // anything yet, so don't show it as a warning, only show it as + // debug to devs to let them know why nothing happened. + LOG((CLOG_DEBUG "nothing to launch, no command specified.")); continue; } diff --git a/src/lib/synergy/CDaemonApp.cpp b/src/lib/synergy/CDaemonApp.cpp index 411a6e66..05ed72b3 100644 --- a/src/lib/synergy/CDaemonApp.cpp +++ b/src/lib/synergy/CDaemonApp.cpp @@ -310,7 +310,19 @@ CDaemonApp::handleIpcMessage(const CEvent& e, void*) // relauncher to stop the existing command and start the new // command. m_relauncher->command(command); + break; + } + + case kIpcHello: { + if (m.m_source != nullptr) { + CIpcClientProxy& proxy = *static_cast(m.m_source); + if (proxy.m_clientType == kIpcClientGui) { + // when a new gui client connects, send them all the + // log messages queued up while they were gone. + m_ipcLogOutputter->sendBuffer(proxy); + } + } + break; } - break; } }