changed buffered output stream to wait() when flush()ing instead

of polling/sleeping.  changed CTCPSocket to not use thread
cancellation but to instead use m_connected to exit the thread.
also shortened poll timeout.
This commit is contained in:
crs 2002-06-02 22:57:50 +00:00
parent 1e988b3839
commit ad15393732
3 changed files with 33 additions and 33 deletions

View File

@ -13,6 +13,7 @@
CBufferedOutputStream::CBufferedOutputStream(CMutex* mutex, IJob* closeCB) : CBufferedOutputStream::CBufferedOutputStream(CMutex* mutex, IJob* closeCB) :
m_mutex(mutex), m_mutex(mutex),
m_closeCB(closeCB), m_closeCB(closeCB),
m_empty(mutex, true),
m_closed(false) m_closed(false)
{ {
assert(m_mutex != NULL); assert(m_mutex != NULL);
@ -31,6 +32,9 @@ const void* CBufferedOutputStream::peek(UInt32 n)
void CBufferedOutputStream::pop(UInt32 n) void CBufferedOutputStream::pop(UInt32 n)
{ {
m_buffer.pop(n); m_buffer.pop(n);
if (m_buffer.getSize() == 0) {
m_empty.broadcast();
}
} }
UInt32 CBufferedOutputStream::getSize() const UInt32 CBufferedOutputStream::getSize() const
@ -67,14 +71,8 @@ UInt32 CBufferedOutputStream::write(
void CBufferedOutputStream::flush() void CBufferedOutputStream::flush()
{ {
// wait until all data is written // wait until all data is written
while (getSizeWithLock() > 0) { CLock lock(m_mutex);
CThread::sleep(0.05); while (m_buffer.getSize() > 0) {
m_empty.wait();
} }
} }
UInt32 CBufferedOutputStream::getSizeWithLock() const
{
CLock lock(m_mutex);
return m_buffer.getSize();
}

View File

@ -3,6 +3,7 @@
#include "CStreamBuffer.h" #include "CStreamBuffer.h"
#include "IOutputStream.h" #include "IOutputStream.h"
#include "CCondVar.h"
class CMutex; class CMutex;
class IJob; class IJob;
@ -33,12 +34,10 @@ public:
virtual UInt32 write(const void*, UInt32 count); virtual UInt32 write(const void*, UInt32 count);
virtual void flush(); virtual void flush();
private:
UInt32 getSizeWithLock() const;
private: private:
CMutex* m_mutex; CMutex* m_mutex;
IJob* m_closeCB; IJob* m_closeCB;
CCondVar<bool> m_empty;
CStreamBuffer m_buffer; CStreamBuffer m_buffer;
bool m_closed; bool m_closed;
}; };

View File

@ -80,25 +80,35 @@ void CTCPSocket::connect(const CNetworkAddress& addr)
void CTCPSocket::close() void CTCPSocket::close()
{ {
// shutdown I/O thread before close // see if buffers should be flushed
if (m_thread != NULL) { bool doFlush = false;
// flush if output buffer not empty and output buffer not closed {
bool doFlush; CLock lock(m_mutex);
{ doFlush = (m_thread != NULL && (m_connected & kWrite) != 0);
CLock lock(m_mutex); }
doFlush = ((m_connected & kWrite) != 0);
}
if (doFlush) {
m_output->flush();
}
m_thread->cancel(); // flush buffers
if (doFlush) {
m_output->flush();
}
// cause ioThread to exit
{
CLock lock(m_mutex);
if (m_fd != CNetwork::Null) {
CNetwork::shutdown(m_fd, 2);
m_connected = kClosed;
}
}
// wait for thread
if (m_thread != NULL) {
m_thread->wait(); m_thread->wait();
delete m_thread; delete m_thread;
m_thread = NULL; m_thread = NULL;
} }
CLock lock(m_mutex); // close socket
if (m_fd != CNetwork::Null) { if (m_fd != CNetwork::Null) {
if (CNetwork::close(m_fd) == CNetwork::Error) { if (CNetwork::close(m_fd) == CNetwork::Error) {
throw XIOClose(); throw XIOClose();
@ -190,14 +200,7 @@ void CTCPSocket::ioService()
} }
// check for status // check for status
CThread::testCancel(); const int status = CNetwork::poll(pfds, 1, 10);
if (pfds[0].events == 0) {
CThread::sleep(0.05);
CThread::testCancel();
continue;
}
const int status = CNetwork::poll(pfds, 1, 50);
CThread::testCancel();
// transfer data and handle errors // transfer data and handle errors
if (status == 1) { if (status == 1) {