fixed bugs in handling streams.

This commit is contained in:
crs 2001-10-21 00:21:02 +00:00
parent 3cfbc0f68c
commit 8d1cf844c7
6 changed files with 74 additions and 14 deletions

View File

@ -87,6 +87,9 @@ void CBufferedInputStream::close()
} }
m_closed = true; m_closed = true;
m_hungup = true;
m_buffer.pop(m_buffer.getSize());
m_empty.broadcast();
if (m_closeCB) { if (m_closeCB) {
m_closeCB->run(); m_closeCB->run();
} }

View File

@ -46,6 +46,7 @@ void CBufferedOutputStream::close()
} }
m_closed = true; m_closed = true;
m_buffer.pop(m_buffer.getSize());
if (m_closeCB) { if (m_closeCB) {
m_closeCB->run(); m_closeCB->run();
} }

View File

@ -15,6 +15,18 @@
// FIXME -- temporary exception type // FIXME -- temporary exception type
class XThreadUnavailable { }; class XThreadUnavailable { };
#ifndef NDEBUG
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
static void threadDebug(int)
{
if (fork() == 0) abort();
else { wait(0); exit(1); }
}
#endif
// //
// CThreadRep // CThreadRep
// //
@ -122,21 +134,31 @@ void CThreadRep::initThreads()
#if defined(CONFIG_PTHREADS) #if defined(CONFIG_PTHREADS)
// install SIGWAKEUP handler // install SIGWAKEUP handler
struct sigaction act; struct sigaction act;
act.sa_handler = &threadCancel; sigemptyset(&act.sa_mask);
# if defined(SA_INTERRUPT) # if defined(SA_INTERRUPT)
act.sa_flags = SA_INTERRUPT; act.sa_flags = SA_INTERRUPT;
# else # else
act.sa_flags = 0; act.sa_flags = 0;
# endif # endif
sigemptyset(&act.sa_mask); act.sa_handler = &threadCancel;
sigaction(SIGWAKEUP, &act, NULL); sigaction(SIGWAKEUP, &act, NULL);
# ifndef NDEBUG
act.sa_handler = &threadDebug;
sigaction(SIGSEGV, &act, NULL);
# endif
#endif #endif
// set signal mask // set signal mask
sigset_t sigset; sigset_t sigset;
sigemptyset(&sigset); sigemptyset(&sigset);
sigaddset(&sigset, SIGWAKEUP); sigaddset(&sigset, SIGWAKEUP);
#ifndef NDEBUG
sigaddset(&sigset, SIGSEGV);
#endif
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
sigemptyset(&sigset);
sigaddset(&sigset, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
} }
} }
@ -368,7 +390,13 @@ void* CThreadRep::threadFunc(void* arg)
sigset_t sigset; sigset_t sigset;
sigemptyset(&sigset); sigemptyset(&sigset);
sigaddset(&sigset, SIGWAKEUP); sigaddset(&sigset, SIGWAKEUP);
#ifndef NDEBUG
sigaddset(&sigset, SIGSEGV);
#endif
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
sigemptyset(&sigset);
sigaddset(&sigset, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
// run thread // run thread
rep->doThreadFunc(); rep->doThreadFunc();

View File

@ -29,8 +29,7 @@ CTCPSocket::CTCPSocket()
init(); init();
} }
CTCPSocket::CTCPSocket(int fd) : CTCPSocket::CTCPSocket(int fd) : m_fd(fd)
m_fd(fd)
{ {
assert(m_fd != -1); assert(m_fd != -1);
@ -41,7 +40,7 @@ CTCPSocket::CTCPSocket(int fd) :
// start handling socket // start handling socket
m_thread = new CThread(new TMethodJob<CTCPSocket>( m_thread = new CThread(new TMethodJob<CTCPSocket>(
this, &CTCPSocket::service)); this, &CTCPSocket::ioThread));
} }
CTCPSocket::~CTCPSocket() CTCPSocket::~CTCPSocket()
@ -54,9 +53,9 @@ CTCPSocket::~CTCPSocket()
} }
// clean up // clean up
delete m_mutex;
delete m_input; delete m_input;
delete m_output; delete m_output;
delete m_mutex;
} }
void CTCPSocket::bind(const CNetworkAddress& addr) void CTCPSocket::bind(const CNetworkAddress& addr)
@ -80,7 +79,7 @@ void CTCPSocket::connect(const CNetworkAddress& addr)
// start servicing the socket // start servicing the socket
m_connected = kReadWrite; m_connected = kReadWrite;
m_thread = new CThread(new TMethodJob<CTCPSocket>( m_thread = new CThread(new TMethodJob<CTCPSocket>(
this, &CTCPSocket::service)); this, &CTCPSocket::ioThread));
} }
void CTCPSocket::close() void CTCPSocket::close()
@ -135,7 +134,21 @@ void CTCPSocket::init()
this, &CTCPSocket::closeOutput)); this, &CTCPSocket::closeOutput));
} }
void CTCPSocket::service(void*) void CTCPSocket::ioThread(void*)
{
try {
ioService();
m_input->close();
m_output->close();
}
catch (...) {
m_input->close();
m_output->close();
throw;
}
}
void CTCPSocket::ioService()
{ {
assert(m_fd != -1); assert(m_fd != -1);
@ -198,12 +211,15 @@ void CTCPSocket::service(void*)
// write data // write data
const void* buffer = m_output->peek(n); const void* buffer = m_output->peek(n);
n = write(m_fd, buffer, n); n = (UInt32)write(m_fd, buffer, n);
// discard written data // discard written data
if (n > 0) { if (n > 0) {
m_output->pop(n); m_output->pop(n);
} }
else if (n == (UInt32)-1 && errno == EPIPE) {
return;
}
} }
} }
} }

View File

@ -30,7 +30,8 @@ class CTCPSocket : public ISocket {
private: private:
void init(); void init();
void service(void*); void ioThread(void*);
void ioService();
void closeInput(void*); void closeInput(void*);
void closeOutput(void*); void closeOutput(void*);

View File

@ -20,6 +20,15 @@
#include <assert.h> #include <assert.h>
#include <memory> #include <memory>
/* XXX
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
if (fork() == 0) abort();
else { wait(0); exit(1); }
*/
// //
// CServer // CServer
// //
@ -647,18 +656,20 @@ void CServer::handshakeClient(void* vsocket)
std::auto_ptr<IOutputStream> output; std::auto_ptr<IOutputStream> output;
// attach the encryption layer // attach the encryption layer
bool own = false;
if (m_securityFactory != NULL) { if (m_securityFactory != NULL) {
/* FIXME -- implement ISecurityFactory /* FIXME -- implement ISecurityFactory
input.reset(m_securityFactory->createInputFilter(srcInput, false)); input.reset(m_securityFactory->createInputFilter(srcInput, own));
output.reset(m_securityFactory->createOutputFilter(srcOutput, false)); output.reset(m_securityFactory->createOutputFilter(srcOutput, own));
srcInput = input.get(); srcInput = input.get();
srcOutput = output.get(); srcOutput = output.get();
own = true;
*/ */
} }
// attach the packetizing filters // attach the packetizing filters
input.reset(new CInputPacketStream(srcInput, true)); input.reset(new CInputPacketStream(srcInput, own));
output.reset(new COutputPacketStream(srcOutput, true)); output.reset(new COutputPacketStream(srcOutput, own));
std::auto_ptr<IServerProtocol> protocol; std::auto_ptr<IServerProtocol> protocol;
std::auto_ptr<CConnectionNote> connectedNote; std::auto_ptr<CConnectionNote> connectedNote;