* increased gui ipc read check (yes yes, i should be using wait cond... but qt mutexes suck)

* added mutex to client proxy to protect stream
* added try/catch to ipc log outputter buffer thread
* protected ipc client proxy list with mutex (saw a weird stl error, might be caused by cross-thread weirdness)
This commit is contained in:
Nick Bolton 2012-07-08 17:49:45 +00:00
parent ecf1833f36
commit e501552f24
5 changed files with 53 additions and 23 deletions

View File

@ -84,7 +84,7 @@ void IpcReader::readStream(char* buffer, int length)
// qt is such a fucker with mutexes (can't lock/unlock between // qt is such a fucker with mutexes (can't lock/unlock between
// threads?! wtf?!). i'd just rather not go there (patches welcome). // threads?! wtf?!). i'd just rather not go there (patches welcome).
while (!m_ReadyRead) { while (!m_ReadyRead) {
QThread::usleep(100); QThread::usleep(50);
} }
m_ReadyRead = false; m_ReadyRead = false;
} }

View File

@ -66,7 +66,11 @@ CIpcClientProxy::~CIpcClientProxy()
EVENTQUEUE->removeHandler( EVENTQUEUE->removeHandler(
m_stream.getOutputShutdownEvent(), m_stream.getEventTarget()); m_stream.getOutputShutdownEvent(), m_stream.getEventTarget());
// don't delete the stream while it's being used.
ARCH->lockMutex(m_mutex);
delete &m_stream; delete &m_stream;
ARCH->unlockMutex(m_mutex);
ARCH->closeMutex(m_mutex);
} }
void void
@ -86,6 +90,9 @@ CIpcClientProxy::handleWriteError(const CEvent&, void*)
void void
CIpcClientProxy::handleData(const CEvent&, void*) CIpcClientProxy::handleData(const CEvent&, void*)
{ {
// don't allow the dtor to destroy the stream while we're using it.
CArchMutexLock lock(m_mutex);
UInt8 code[1]; UInt8 code[1];
UInt32 n = m_stream.read(code, 1); UInt32 n = m_stream.read(code, 1);
while (n != 0) { while (n != 0) {
@ -124,6 +131,7 @@ CIpcClientProxy::send(const CIpcMessage& message)
{ {
// don't allow other threads to write until we've finished the entire // don't allow other threads to write until we've finished the entire
// message. stream write is locked, but only for that single write. // message. stream write is locked, but only for that single write.
// also, don't allow the dtor to destroy the stream while we're using it.
CArchMutexLock lock(m_mutex); CArchMutexLock lock(m_mutex);
LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type)); LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type));

View File

@ -26,6 +26,7 @@
#include "CArch.h" #include "CArch.h"
#include "CThread.h" #include "CThread.h"
#include "TMethodJob.h" #include "TMethodJob.h"
#include "XArch.h"
CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) : CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) :
m_ipcServer(ipcServer), m_ipcServer(ipcServer),
@ -39,6 +40,9 @@ m_running(true)
CIpcLogOutputter::~CIpcLogOutputter() CIpcLogOutputter::~CIpcLogOutputter()
{ {
m_running = false;
m_bufferThread->wait(5);
ARCH->closeMutex(m_bufferMutex); ARCH->closeMutex(m_bufferMutex);
delete m_bufferThread; delete m_bufferThread;
} }
@ -51,8 +55,6 @@ CIpcLogOutputter::open(const char* title)
void void
CIpcLogOutputter::close() CIpcLogOutputter::close()
{ {
m_running = false;
m_bufferThread->wait(5);
} }
void void
@ -88,19 +90,26 @@ CIpcLogOutputter::write(ELevel, const char* text, bool force)
void void
CIpcLogOutputter::bufferThread(void*) CIpcLogOutputter::bufferThread(void*)
{ {
while (m_running) { try {
while (m_running && m_buffer.size() == 0) { while (m_running) {
ARCH->sleep(.1); while (m_running && m_buffer.size() == 0) {
} ARCH->sleep(.1);
}
if (!m_running) { if (!m_running) {
break; break;
} }
if (m_ipcServer.hasClients(kIpcClientGui)) { if (m_ipcServer.hasClients(kIpcClientGui)) {
sendBuffer(); sendBuffer();
}
} }
} }
catch (XArch& e) {
LOG((CLOG_ERR "ipc log buffer thread error, %s", e.what().c_str()));
}
LOG((CLOG_DEBUG "ipc log buffer thread finished"));
} }
CString* CString*

View File

@ -31,6 +31,7 @@ CEvent::Type CIpcServer::s_clientConnectedEvent = CEvent::kUnknown;
CIpcServer::CIpcServer() : CIpcServer::CIpcServer() :
m_address(CNetworkAddress(IPC_HOST, IPC_PORT)) m_address(CNetworkAddress(IPC_HOST, IPC_PORT))
{ {
m_clientsMutex = ARCH->newMutex();
m_address.resolve(); m_address.resolve();
EVENTQUEUE->adoptHandler( EVENTQUEUE->adoptHandler(
@ -41,11 +42,14 @@ m_address(CNetworkAddress(IPC_HOST, IPC_PORT))
CIpcServer::~CIpcServer() CIpcServer::~CIpcServer()
{ {
CClientSet::iterator it; ARCH->lockMutex(m_clientsMutex);
CClientList::iterator it;
for (it = m_clients.begin(); it != m_clients.end(); it++) { for (it = m_clients.begin(); it != m_clients.end(); it++) {
delete *it; delete *it;
} }
m_clients.empty(); m_clients.empty();
ARCH->unlockMutex(m_clientsMutex);
ARCH->closeMutex(m_clientsMutex);
EVENTQUEUE->removeHandler(m_socket.getConnectingEvent(), &m_socket); EVENTQUEUE->removeHandler(m_socket.getConnectingEvent(), &m_socket);
} }
@ -65,8 +69,11 @@ CIpcServer::handleClientConnecting(const CEvent&, void*)
} }
LOG((CLOG_DEBUG "accepted ipc client connection")); LOG((CLOG_DEBUG "accepted ipc client connection"));
ARCH->lockMutex(m_clientsMutex);
CIpcClientProxy* proxy = new CIpcClientProxy(*stream); CIpcClientProxy* proxy = new CIpcClientProxy(*stream);
m_clients.insert(proxy); m_clients.push_back(proxy);
ARCH->unlockMutex(m_clientsMutex);
EVENTQUEUE->adoptHandler( EVENTQUEUE->adoptHandler(
CIpcClientProxy::getDisconnectedEvent(), proxy, CIpcClientProxy::getDisconnectedEvent(), proxy,
@ -85,20 +92,22 @@ CIpcServer::handleClientDisconnected(const CEvent& e, void*)
EVENTQUEUE->removeHandler( EVENTQUEUE->removeHandler(
CIpcClientProxy::getDisconnectedEvent(), proxy); CIpcClientProxy::getDisconnectedEvent(), proxy);
CClientSet::iterator& it = m_clients.find(proxy); CArchMutexLock lock(m_clientsMutex);
m_clients.remove(proxy);
delete proxy; delete proxy;
m_clients.erase(it);
LOG((CLOG_DEBUG "ipc client proxy removed, connected=%d", m_clients.size())); LOG((CLOG_DEBUG "ipc client proxy removed, connected=%d", m_clients.size()));
} }
bool bool
CIpcServer::hasClients(EIpcClientType clientType) const CIpcServer::hasClients(EIpcClientType clientType) const
{ {
if (m_clients.size() == 0) { CArchMutexLock lock(m_clientsMutex);
if (m_clients.empty()) {
return false; return false;
} }
CClientSet::iterator it; CClientList::const_iterator it;
for (it = m_clients.begin(); it != m_clients.end(); it++) { for (it = m_clients.begin(); it != m_clients.end(); it++) {
// at least one client is alive and type matches, there are clients. // at least one client is alive and type matches, there are clients.
CIpcClientProxy* p = *it; CIpcClientProxy* p = *it;
@ -121,7 +130,9 @@ CIpcServer::getClientConnectedEvent()
void void
CIpcServer::send(const CIpcMessage& message, EIpcClientType filterType) CIpcServer::send(const CIpcMessage& message, EIpcClientType filterType)
{ {
CClientSet::iterator it; CArchMutexLock lock(m_clientsMutex);
CClientList::iterator it;
for (it = m_clients.begin(); it != m_clients.end(); it++) { for (it = m_clients.begin(); it != m_clients.end(); it++) {
CIpcClientProxy* proxy = *it; CIpcClientProxy* proxy = *it;
if (proxy->m_clientType == filterType) { if (proxy->m_clientType == filterType) {

View File

@ -20,7 +20,8 @@
#include "CTCPListenSocket.h" #include "CTCPListenSocket.h"
#include "CNetworkAddress.h" #include "CNetworkAddress.h"
#include "Ipc.h" #include "Ipc.h"
#include <set> #include <list>
#include "CArch.h"
class CEvent; class CEvent;
class CIpcClientProxy; class CIpcClientProxy;
@ -65,11 +66,12 @@ private:
void handleClientMessage(const CEvent&, void*); void handleClientMessage(const CEvent&, void*);
private: private:
typedef std::set<CIpcClientProxy*> CClientSet; typedef std::list<CIpcClientProxy*> CClientList;
CTCPListenSocket m_socket; CTCPListenSocket m_socket;
CNetworkAddress m_address; CNetworkAddress m_address;
CClientSet m_clients; CClientList m_clients;
CArchMutex m_clientsMutex;
static CEvent::Type s_clientConnectedEvent; static CEvent::Type s_clientConnectedEvent;
}; };