diff --git a/io/CBufferedOutputStream.cpp b/io/CBufferedOutputStream.cpp index 387ce406..3acd168b 100644 --- a/io/CBufferedOutputStream.cpp +++ b/io/CBufferedOutputStream.cpp @@ -13,6 +13,7 @@ CBufferedOutputStream::CBufferedOutputStream(CMutex* mutex, IJob* closeCB) : m_mutex(mutex), m_closeCB(closeCB), + m_empty(mutex, true), m_closed(false) { assert(m_mutex != NULL); @@ -31,6 +32,9 @@ const void* CBufferedOutputStream::peek(UInt32 n) void CBufferedOutputStream::pop(UInt32 n) { m_buffer.pop(n); + if (m_buffer.getSize() == 0) { + m_empty.broadcast(); + } } UInt32 CBufferedOutputStream::getSize() const @@ -67,14 +71,8 @@ UInt32 CBufferedOutputStream::write( void CBufferedOutputStream::flush() { // wait until all data is written - while (getSizeWithLock() > 0) { - CThread::sleep(0.05); + CLock lock(m_mutex); + while (m_buffer.getSize() > 0) { + m_empty.wait(); } } - -UInt32 CBufferedOutputStream::getSizeWithLock() const -{ - CLock lock(m_mutex); - return m_buffer.getSize(); -} - diff --git a/io/CBufferedOutputStream.h b/io/CBufferedOutputStream.h index 1afbc118..c609336e 100644 --- a/io/CBufferedOutputStream.h +++ b/io/CBufferedOutputStream.h @@ -3,6 +3,7 @@ #include "CStreamBuffer.h" #include "IOutputStream.h" +#include "CCondVar.h" class CMutex; class IJob; @@ -33,12 +34,10 @@ public: virtual UInt32 write(const void*, UInt32 count); virtual void flush(); -private: - UInt32 getSizeWithLock() const; - private: CMutex* m_mutex; IJob* m_closeCB; + CCondVar m_empty; CStreamBuffer m_buffer; bool m_closed; }; diff --git a/net/CTCPSocket.cpp b/net/CTCPSocket.cpp index 989a9668..3429c9f8 100644 --- a/net/CTCPSocket.cpp +++ b/net/CTCPSocket.cpp @@ -80,25 +80,35 @@ void CTCPSocket::connect(const CNetworkAddress& addr) void CTCPSocket::close() { - // shutdown I/O thread before close - if (m_thread != NULL) { - // flush if output buffer not empty and output buffer not closed - bool doFlush; - { - CLock lock(m_mutex); - doFlush = ((m_connected & kWrite) != 0); - } - if (doFlush) { - m_output->flush(); - } + // see if buffers should be flushed + bool doFlush = false; + { + CLock lock(m_mutex); + doFlush = (m_thread != NULL && (m_connected & kWrite) != 0); + } - m_thread->cancel(); + // flush buffers + if (doFlush) { + m_output->flush(); + } + + // cause ioThread to exit + { + CLock lock(m_mutex); + if (m_fd != CNetwork::Null) { + CNetwork::shutdown(m_fd, 2); + m_connected = kClosed; + } + } + + // wait for thread + if (m_thread != NULL) { m_thread->wait(); delete m_thread; m_thread = NULL; } - CLock lock(m_mutex); + // close socket if (m_fd != CNetwork::Null) { if (CNetwork::close(m_fd) == CNetwork::Error) { throw XIOClose(); @@ -190,14 +200,7 @@ void CTCPSocket::ioService() } // check for status - CThread::testCancel(); - if (pfds[0].events == 0) { - CThread::sleep(0.05); - CThread::testCancel(); - continue; - } - const int status = CNetwork::poll(pfds, 1, 50); - CThread::testCancel(); + const int status = CNetwork::poll(pfds, 1, 10); // transfer data and handle errors if (status == 1) {