Made buffer thread wait for notify when no gui #4721

This commit is contained in:
Jerry (Xinyu Hou) 2015-07-16 15:53:53 -07:00
parent 18a6f75371
commit c3d38db053
4 changed files with 26 additions and 13 deletions

View File

@ -37,7 +37,7 @@ enum EIpcLogOutputter {
kBufferRateTimeLimit = 1 // seconds kBufferRateTimeLimit = 1 // seconds
}; };
IpcLogOutputter::IpcLogOutputter(IpcServer& ipcServer, bool useThread) : IpcLogOutputter::IpcLogOutputter(IpcServer& ipcServer, EIpcClientType clientType, bool useThread) :
m_ipcServer(ipcServer), m_ipcServer(ipcServer),
m_bufferMutex(ARCH->newMutex()), m_bufferMutex(ARCH->newMutex()),
m_sending(false), m_sending(false),
@ -50,7 +50,9 @@ IpcLogOutputter::IpcLogOutputter(IpcServer& ipcServer, bool useThread) :
m_bufferRateWriteLimit(kBufferRateWriteLimit), m_bufferRateWriteLimit(kBufferRateWriteLimit),
m_bufferRateTimeLimit(kBufferRateTimeLimit), m_bufferRateTimeLimit(kBufferRateTimeLimit),
m_bufferWriteCount(0), m_bufferWriteCount(0),
m_bufferRateStart(ARCH->time()) m_bufferRateStart(ARCH->time()),
m_clientType(clientType),
m_runningMutex(ARCH->newMutex())
{ {
if (useThread) { if (useThread) {
m_bufferThread = new Thread(new TMethodJob<IpcLogOutputter>( m_bufferThread = new Thread(new TMethodJob<IpcLogOutputter>(
@ -81,6 +83,7 @@ void
IpcLogOutputter::close() IpcLogOutputter::close()
{ {
if (m_bufferThread != nullptr) { if (m_bufferThread != nullptr) {
ArchMutexLock lock(m_runningMutex);
m_running = false; m_running = false;
notifyBuffer(); notifyBuffer();
m_bufferThread->wait(5); m_bufferThread->wait(5);
@ -103,6 +106,7 @@ IpcLogOutputter::write(ELevel, const char* text)
appendBuffer(text); appendBuffer(text);
notifyBuffer(); notifyBuffer();
return true; return true;
} }
@ -133,6 +137,13 @@ IpcLogOutputter::appendBuffer(const String& text)
m_bufferWriteCount++; m_bufferWriteCount++;
} }
bool
IpcLogOutputter::isRunning()
{
ArchMutexLock lock(m_runningMutex);
return m_running;
}
void void
IpcLogOutputter::bufferThread(void*) IpcLogOutputter::bufferThread(void*)
{ {
@ -140,8 +151,8 @@ IpcLogOutputter::bufferThread(void*)
m_running = true; m_running = true;
try { try {
while (m_running) { while (isRunning()) {
if (m_buffer.empty()) { if (m_buffer.empty() || !m_ipcServer.hasClients(m_clientType)) {
ArchMutexLock lock(m_notifyMutex); ArchMutexLock lock(m_notifyMutex);
ARCH->waitCondVar(m_notifyCond, m_notifyMutex, -1); ARCH->waitCondVar(m_notifyCond, m_notifyMutex, -1);
} }
@ -184,7 +195,7 @@ IpcLogOutputter::getChunk(size_t count)
void void
IpcLogOutputter::sendBuffer() IpcLogOutputter::sendBuffer()
{ {
if (m_buffer.empty() || !m_ipcServer.hasClients(kIpcClientGui)) { if (m_buffer.empty() || !m_ipcServer.hasClients(m_clientType)) {
return; return;
} }

View File

@ -21,6 +21,7 @@
#include "arch/Arch.h" #include "arch/Arch.h"
#include "arch/IArchMultithread.h" #include "arch/IArchMultithread.h"
#include "base/ILogOutputter.h" #include "base/ILogOutputter.h"
#include "ipc/Ipc.h"
#include <deque> #include <deque>
@ -39,7 +40,7 @@ public:
If \p useThread is \c false, then the buffer needs to be sent manually If \p useThread is \c false, then the buffer needs to be sent manually
using the \c sendBuffer() function. using the \c sendBuffer() function.
*/ */
IpcLogOutputter(IpcServer& ipcServer, bool useThread); IpcLogOutputter(IpcServer& ipcServer, EIpcClientType clientType, bool useThread);
virtual ~IpcLogOutputter(); virtual ~IpcLogOutputter();
// ILogOutputter overrides // ILogOutputter overrides
@ -92,6 +93,7 @@ private:
void bufferThread(void*); void bufferThread(void*);
String getChunk(size_t count); String getChunk(size_t count);
void appendBuffer(const String& text); void appendBuffer(const String& text);
bool isRunning();
private: private:
typedef std::deque<String> Buffer; typedef std::deque<String> Buffer;
@ -113,4 +115,6 @@ private:
UInt16 m_bufferWriteCount; UInt16 m_bufferWriteCount;
double m_bufferRateStart; double m_bufferRateStart;
bool m_useThread; bool m_useThread;
EIpcClientType m_clientType;
ArchMutex m_runningMutex;
}; };

View File

@ -211,7 +211,7 @@ DaemonApp::mainLoop(bool logToFile)
m_ipcServer = new IpcServer(m_events, &multiplexer); m_ipcServer = new IpcServer(m_events, &multiplexer);
// send logging to gui via ipc, log system adopts outputter. // send logging to gui via ipc, log system adopts outputter.
m_ipcLogOutputter = new IpcLogOutputter(*m_ipcServer, true); m_ipcLogOutputter = new IpcLogOutputter(*m_ipcServer, kIpcClientGui, true);
CLOG->insert(m_ipcLogOutputter); CLOG->insert(m_ipcLogOutputter);
#if SYSAPI_WIN32 #if SYSAPI_WIN32

View File

@ -52,11 +52,10 @@ TEST(IpcLogOutputterTests, write_threadingEnabled_bufferIsSent)
ON_CALL(mockServer, hasClients(_)).WillByDefault(Return(true)); ON_CALL(mockServer, hasClients(_)).WillByDefault(Return(true));
EXPECT_CALL(mockServer, hasClients(_)).Times(2);
EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 1\n"), _)).Times(1); EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 1\n"), _)).Times(1);
EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 2\n"), _)).Times(1); EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 2\n"), _)).Times(1);
IpcLogOutputter outputter(mockServer, true); IpcLogOutputter outputter(mockServer, kIpcClientUnknown, true);
outputter.write(kNOTE, "mock 1"); outputter.write(kNOTE, "mock 1");
mockServer.waitForSend(); mockServer.waitForSend();
outputter.write(kNOTE, "mock 2"); outputter.write(kNOTE, "mock 2");
@ -68,11 +67,10 @@ TEST(IpcLogOutputterTests, write_overBufferMaxSize_firstLineTruncated)
MockIpcServer mockServer; MockIpcServer mockServer;
ON_CALL(mockServer, hasClients(_)).WillByDefault(Return(true)); ON_CALL(mockServer, hasClients(_)).WillByDefault(Return(true));
EXPECT_CALL(mockServer, hasClients(_)).Times(1); EXPECT_CALL(mockServer, hasClients(_)).Times(1);
EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 2\nmock 3\n"), _)).Times(1); EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 2\nmock 3\n"), _)).Times(1);
IpcLogOutputter outputter(mockServer, false); IpcLogOutputter outputter(mockServer, kIpcClientUnknown, false);
outputter.bufferMaxSize(2); outputter.bufferMaxSize(2);
// log more lines than the buffer can contain // log more lines than the buffer can contain
@ -91,7 +89,7 @@ TEST(IpcLogOutputterTests, write_underBufferMaxSize_allLinesAreSent)
EXPECT_CALL(mockServer, hasClients(_)).Times(1); EXPECT_CALL(mockServer, hasClients(_)).Times(1);
EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 1\nmock 2\n"), _)).Times(1); EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 1\nmock 2\n"), _)).Times(1);
IpcLogOutputter outputter(mockServer, false); IpcLogOutputter outputter(mockServer, kIpcClientUnknown, false);
outputter.bufferMaxSize(2); outputter.bufferMaxSize(2);
// log more lines than the buffer can contain // log more lines than the buffer can contain
@ -147,7 +145,7 @@ TEST(IpcLogOutputterTests, write_underBufferRateLimit_allLinesAreSent)
EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 1\nmock 2\n"), _)).Times(1); EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 1\nmock 2\n"), _)).Times(1);
EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 3\nmock 4\n"), _)).Times(1); EXPECT_CALL(mockServer, send(IpcLogLineMessageEq("mock 3\nmock 4\n"), _)).Times(1);
IpcLogOutputter outputter(mockServer, false); IpcLogOutputter outputter(mockServer, kIpcClientUnknown, false);
outputter.bufferRateLimit(4, 1); // 1s (should be plenty of time) outputter.bufferRateLimit(4, 1); // 1s (should be plenty of time)
// log 1 more line than the buffer can accept in time limit. // log 1 more line than the buffer can accept in time limit.