From 8d1cf844c7707c69c4a8f399ea0ce0743b2059ae Mon Sep 17 00:00:00 2001 From: crs Date: Sun, 21 Oct 2001 00:21:02 +0000 Subject: [PATCH] fixed bugs in handling streams. --- io/CBufferedInputStream.cpp | 3 +++ io/CBufferedOutputStream.cpp | 1 + mt/CThreadRep.cpp | 32 ++++++++++++++++++++++++++++++-- net/CTCPSocket.cpp | 30 +++++++++++++++++++++++------- net/CTCPSocket.h | 3 ++- synergy/CServer.cpp | 19 +++++++++++++++---- 6 files changed, 74 insertions(+), 14 deletions(-) diff --git a/io/CBufferedInputStream.cpp b/io/CBufferedInputStream.cpp index 98c8fbe9..dbcb63f7 100644 --- a/io/CBufferedInputStream.cpp +++ b/io/CBufferedInputStream.cpp @@ -87,6 +87,9 @@ void CBufferedInputStream::close() } m_closed = true; + m_hungup = true; + m_buffer.pop(m_buffer.getSize()); + m_empty.broadcast(); if (m_closeCB) { m_closeCB->run(); } diff --git a/io/CBufferedOutputStream.cpp b/io/CBufferedOutputStream.cpp index bf1603c6..387ce406 100644 --- a/io/CBufferedOutputStream.cpp +++ b/io/CBufferedOutputStream.cpp @@ -46,6 +46,7 @@ void CBufferedOutputStream::close() } m_closed = true; + m_buffer.pop(m_buffer.getSize()); if (m_closeCB) { m_closeCB->run(); } diff --git a/mt/CThreadRep.cpp b/mt/CThreadRep.cpp index cb6824fa..41380225 100644 --- a/mt/CThreadRep.cpp +++ b/mt/CThreadRep.cpp @@ -15,6 +15,18 @@ // FIXME -- temporary exception type class XThreadUnavailable { }; +#ifndef NDEBUG +#include +#include +#include +#include +static void threadDebug(int) +{ + if (fork() == 0) abort(); + else { wait(0); exit(1); } +} +#endif + // // CThreadRep // @@ -122,21 +134,31 @@ void CThreadRep::initThreads() #if defined(CONFIG_PTHREADS) // install SIGWAKEUP handler struct sigaction act; - act.sa_handler = &threadCancel; + sigemptyset(&act.sa_mask); # if defined(SA_INTERRUPT) act.sa_flags = SA_INTERRUPT; # else act.sa_flags = 0; # endif - sigemptyset(&act.sa_mask); + act.sa_handler = &threadCancel; sigaction(SIGWAKEUP, &act, NULL); +# ifndef NDEBUG + act.sa_handler = &threadDebug; + sigaction(SIGSEGV, &act, NULL); +# endif #endif // set signal mask sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGWAKEUP); +#ifndef NDEBUG + sigaddset(&sigset, SIGSEGV); +#endif pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + sigemptyset(&sigset); + sigaddset(&sigset, SIGPIPE); + pthread_sigmask(SIG_BLOCK, &sigset, NULL); } } @@ -368,7 +390,13 @@ void* CThreadRep::threadFunc(void* arg) sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGWAKEUP); +#ifndef NDEBUG + sigaddset(&sigset, SIGSEGV); +#endif pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + sigemptyset(&sigset); + sigaddset(&sigset, SIGPIPE); + pthread_sigmask(SIG_BLOCK, &sigset, NULL); // run thread rep->doThreadFunc(); diff --git a/net/CTCPSocket.cpp b/net/CTCPSocket.cpp index d38ee738..39defc3d 100644 --- a/net/CTCPSocket.cpp +++ b/net/CTCPSocket.cpp @@ -29,8 +29,7 @@ CTCPSocket::CTCPSocket() init(); } -CTCPSocket::CTCPSocket(int fd) : - m_fd(fd) +CTCPSocket::CTCPSocket(int fd) : m_fd(fd) { assert(m_fd != -1); @@ -41,7 +40,7 @@ CTCPSocket::CTCPSocket(int fd) : // start handling socket m_thread = new CThread(new TMethodJob( - this, &CTCPSocket::service)); + this, &CTCPSocket::ioThread)); } CTCPSocket::~CTCPSocket() @@ -54,9 +53,9 @@ CTCPSocket::~CTCPSocket() } // clean up - delete m_mutex; delete m_input; delete m_output; + delete m_mutex; } void CTCPSocket::bind(const CNetworkAddress& addr) @@ -80,7 +79,7 @@ void CTCPSocket::connect(const CNetworkAddress& addr) // start servicing the socket m_connected = kReadWrite; m_thread = new CThread(new TMethodJob( - this, &CTCPSocket::service)); + this, &CTCPSocket::ioThread)); } void CTCPSocket::close() @@ -135,7 +134,21 @@ void CTCPSocket::init() this, &CTCPSocket::closeOutput)); } -void CTCPSocket::service(void*) +void CTCPSocket::ioThread(void*) +{ + try { + ioService(); + m_input->close(); + m_output->close(); + } + catch (...) { + m_input->close(); + m_output->close(); + throw; + } +} + +void CTCPSocket::ioService() { assert(m_fd != -1); @@ -198,12 +211,15 @@ void CTCPSocket::service(void*) // write data const void* buffer = m_output->peek(n); - n = write(m_fd, buffer, n); + n = (UInt32)write(m_fd, buffer, n); // discard written data if (n > 0) { m_output->pop(n); } + else if (n == (UInt32)-1 && errno == EPIPE) { + return; + } } } } diff --git a/net/CTCPSocket.h b/net/CTCPSocket.h index 208774ff..c7b9ca73 100644 --- a/net/CTCPSocket.h +++ b/net/CTCPSocket.h @@ -30,7 +30,8 @@ class CTCPSocket : public ISocket { private: void init(); - void service(void*); + void ioThread(void*); + void ioService(); void closeInput(void*); void closeOutput(void*); diff --git a/synergy/CServer.cpp b/synergy/CServer.cpp index 31f6e65e..aeafedba 100644 --- a/synergy/CServer.cpp +++ b/synergy/CServer.cpp @@ -20,6 +20,15 @@ #include #include +/* XXX +#include +#include +#include +#include +if (fork() == 0) abort(); +else { wait(0); exit(1); } +*/ + // // CServer // @@ -647,18 +656,20 @@ void CServer::handshakeClient(void* vsocket) std::auto_ptr output; // attach the encryption layer + bool own = false; if (m_securityFactory != NULL) { /* FIXME -- implement ISecurityFactory - input.reset(m_securityFactory->createInputFilter(srcInput, false)); - output.reset(m_securityFactory->createOutputFilter(srcOutput, false)); + input.reset(m_securityFactory->createInputFilter(srcInput, own)); + output.reset(m_securityFactory->createOutputFilter(srcOutput, own)); srcInput = input.get(); srcOutput = output.get(); + own = true; */ } // attach the packetizing filters - input.reset(new CInputPacketStream(srcInput, true)); - output.reset(new COutputPacketStream(srcOutput, true)); + input.reset(new CInputPacketStream(srcInput, own)); + output.reset(new COutputPacketStream(srcOutput, own)); std::auto_ptr protocol; std::auto_ptr connectedNote;