Removed recursive mutexes. Simplified stream filters as a side

effect.  Removed -D_BSD_SOURCE and -D_XOPEN_SOURCE=500 from
compile since they're not longer necessary.
This commit is contained in:
crs 2004-09-29 21:59:26 +00:00
parent 376bba970b
commit 07e2a3e511
18 changed files with 148 additions and 302 deletions

View File

@ -38,7 +38,6 @@ CClientTaskBarReceiver::updateStatus(CClient* client, const CString& errorMsg)
{
{
// update our status
CLock lock(&m_mutex);
m_errorMessage = errorMsg;
if (client == NULL) {
if (m_errorMessage.empty()) {
@ -95,13 +94,13 @@ CClientTaskBarReceiver::onStatusChanged(CClient*)
void
CClientTaskBarReceiver::lock() const
{
m_mutex.lock();
// do nothing
}
void
CClientTaskBarReceiver::unlock() const
{
m_mutex.unlock();
// do nothing
}
std::string

View File

@ -15,7 +15,6 @@
#ifndef CCLIENTTASKBARRECEIVER_H
#define CCLIENTTASKBARRECEIVER_H
#include "CMutex.h"
#include "CString.h"
#include "IArchTaskBarReceiver.h"
@ -76,7 +75,6 @@ protected:
virtual void onStatusChanged(CClient* client);
private:
CMutex m_mutex;
EState m_state;
CString m_errorMessage;
};

View File

@ -38,7 +38,6 @@ CServerTaskBarReceiver::updateStatus(CServer* server, const CString& errorMsg)
{
{
// update our status
CLock lock(&m_mutex);
m_errorMessage = errorMsg;
if (server == NULL) {
if (m_errorMessage.empty()) {
@ -100,13 +99,13 @@ CServerTaskBarReceiver::onStatusChanged(CServer*)
void
CServerTaskBarReceiver::lock() const
{
m_mutex.lock();
// do nothing
}
void
CServerTaskBarReceiver::unlock() const
{
m_mutex.unlock();
// do nothing
}
std::string

View File

@ -15,7 +15,6 @@
#ifndef CSERVERTASKBARRECEIVER_H
#define CSERVERTASKBARRECEIVER_H
#include "CMutex.h"
#include "CString.h"
#include "IArchTaskBarReceiver.h"
#include "stdvector.h"
@ -81,7 +80,6 @@ protected:
virtual void onStatusChanged(CServer* server);
private:
CMutex m_mutex;
EState m_state;
CString m_errorMessage;
CClients m_clients;

View File

@ -74,16 +74,6 @@ fi
dnl check compiler
ACX_CHECK_CXX
dnl different platforms have somewhat incompatible requirements for
dnl BSD and Posix macros.
case $host in
*-*-openbsd* | *-*-freebsd*)
;;
*)
CXXFLAGS="$CXXFLAGS -D_BSD_SOURCE -D_XOPEN_SOURCE=500"
;;
esac
dnl checks for libraries
if test x"$acx_host_arch" = xUNIX; then
ACX_PTHREAD(,AC_MSG_ERROR(You must have pthreads to compile synergy))

View File

@ -301,8 +301,6 @@ CArchMultithreadPosix::newMutex()
pthread_mutexattr_t attr;
int status = pthread_mutexattr_init(&attr);
assert(status == 0);
status = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
assert(status == 0);
CArchMutexImpl* mutex = new CArchMutexImpl;
status = pthread_mutex_init(&mutex->m_mutex, &attr);
assert(status == 0);

View File

@ -183,6 +183,9 @@ CEventQueue::dispatchEvent(const CEvent& event)
{
void* target = event.getTarget();
IEventJob* job = getHandler(event.getType(), target);
if (job == NULL) {
job = getHandler(CEvent::kUnknown, target);
}
if (job != NULL) {
job->run(event);
return true;
@ -273,65 +276,56 @@ CEventQueue::deleteTimer(CEventQueueTimer* timer)
m_buffer->deleteTimer(timer);
}
void
CEventQueue::adoptHandler(void* target, IEventJob* handler)
{
doAdoptHandler(CEvent::kUnknown, target, handler);
}
void
CEventQueue::adoptHandler(CEvent::Type type, void* target, IEventJob* handler)
{
assert(type != CEvent::kUnknown);
doAdoptHandler(type, target, handler);
}
IEventJob*
CEventQueue::orphanHandler(void* target)
{
return doOrphanHandler(CEvent::kUnknown, target);
}
IEventJob*
CEventQueue::orphanHandler(CEvent::Type type, void* target)
{
assert(type != CEvent::kUnknown);
return doOrphanHandler(type, target);
}
void
CEventQueue::removeHandler(void* target)
{
delete orphanHandler(target);
CArchMutexLock lock(m_mutex);
IEventJob*& job = m_handlers[target][type];
delete job;
job = handler;
}
void
CEventQueue::removeHandler(CEvent::Type type, void* target)
{
delete orphanHandler(type, target);
IEventJob* handler = NULL;
{
CArchMutexLock lock(m_mutex);
CHandlerTable::iterator index = m_handlers.find(target);
if (index != m_handlers.end()) {
CTypeHandlerTable& typeHandlers = index->second;
CTypeHandlerTable::iterator index2 = typeHandlers.find(type);
if (index2 != typeHandlers.end()) {
handler = index2->second;
typeHandlers.erase(index2);
}
}
}
delete handler;
}
void
CEventQueue::doAdoptHandler(CEvent::Type type, void* target, IEventJob* handler)
CEventQueue::removeHandlers(void* target)
{
std::vector<IEventJob*> handlers;
{
CArchMutexLock lock(m_mutex);
IEventJob*& job = m_handlers[CTypeTarget(type, target)];
delete job;
job = handler;
CHandlerTable::iterator index = m_handlers.find(target);
if (index != m_handlers.end()) {
// copy to handlers array and clear table for target
CTypeHandlerTable& typeHandlers = index->second;
for (CTypeHandlerTable::iterator index2 = typeHandlers.begin();
index2 != typeHandlers.end(); ++index2) {
handlers.push_back(index2->second);
}
typeHandlers.clear();
}
}
IEventJob*
CEventQueue::doOrphanHandler(CEvent::Type type, void* target)
{
CArchMutexLock lock(m_mutex);
CHandlerTable::iterator index = m_handlers.find(CTypeTarget(type, target));
if (index != m_handlers.end()) {
IEventJob* handler = index->second;
m_handlers.erase(index);
return handler;
}
else {
return NULL;
// delete handlers
for (std::vector<IEventJob*>::iterator index = handlers.begin();
index != handlers.end(); ++index) {
delete *index;
}
}
@ -345,14 +339,13 @@ IEventJob*
CEventQueue::getHandler(CEvent::Type type, void* target) const
{
CArchMutexLock lock(m_mutex);
CHandlerTable::const_iterator index =
m_handlers.find(CTypeTarget(type, target));
CHandlerTable::const_iterator index = m_handlers.find(target);
if (index != m_handlers.end()) {
return index->second;
const CTypeHandlerTable& typeHandlers = index->second;
CTypeHandlerTable::const_iterator index2 = typeHandlers.find(type);
if (index2 != typeHandlers.end()) {
return index2->second;
}
index = m_handlers.find(CTypeTarget(CEvent::kUnknown, target));
if (index != m_handlers.end()) {
return index->second;
}
return NULL;
}
@ -454,30 +447,6 @@ CEventQueue::getNextTimerTimeout() const
}
//
// CEventQueue::CTypeTarget
//
CEventQueue::CTypeTarget::CTypeTarget(CEvent::Type type, void* target) :
m_type(type),
m_target(target)
{
// do nothing
}
CEventQueue::CTypeTarget::~CTypeTarget()
{
// do nothing
}
bool
CEventQueue::CTypeTarget::operator<(const CTypeTarget& tt) const
{
return (m_type < tt.m_type ||
(m_type == tt.m_type && m_target < tt.m_target));
}
//
// CEventQueue::CTimer
//

View File

@ -43,13 +43,10 @@ public:
virtual CEventQueueTimer*
newOneShotTimer(double duration, void* target);
virtual void deleteTimer(CEventQueueTimer*);
virtual void adoptHandler(void* target, IEventJob* dispatcher);
virtual void adoptHandler(CEvent::Type type,
void* target, IEventJob* handler);
virtual IEventJob* orphanHandler(void* target);
virtual IEventJob* orphanHandler(CEvent::Type type, void* target);
virtual void removeHandler(void* target);
virtual void removeHandler(CEvent::Type type, void* target);
virtual void removeHandlers(void* target);
virtual CEvent::Type
registerType(const char* name);
virtual CEvent::Type
@ -59,27 +56,12 @@ public:
virtual const char* getTypeName(CEvent::Type type);
private:
void doAdoptHandler(CEvent::Type type,
void* target, IEventJob* handler);
IEventJob* doOrphanHandler(CEvent::Type type, void* target);
UInt32 saveEvent(const CEvent& event);
CEvent removeEvent(UInt32 eventID);
bool hasTimerExpired(CEvent& event);
double getNextTimerTimeout() const;
private:
class CTypeTarget {
public:
CTypeTarget(CEvent::Type type, void* target);
~CTypeTarget();
bool operator<(const CTypeTarget&) const;
private:
CEvent::Type m_type;
void* m_target;
};
class CTimer {
public:
CTimer(CEventQueueTimer*, double timeout, double initialTime,
@ -111,8 +93,9 @@ private:
typedef CPriorityQueue<CTimer> CTimerQueue;
typedef std::map<UInt32, CEvent> CEventTable;
typedef std::vector<UInt32> CEventIDList;
typedef std::map<CTypeTarget, IEventJob*> CHandlerTable;
typedef std::map<CEvent::Type, const char*> CTypeMap;
typedef std::map<CEvent::Type, IEventJob*> CTypeHandlerTable;
typedef std::map<void*, CTypeHandlerTable> CHandlerTable;
CArchMutex m_mutex;

View File

@ -114,47 +114,17 @@ public:
*/
virtual void deleteTimer(CEventQueueTimer*) = 0;
//! Register an event handler
/*!
Registers an event handler for \p target. The \p handler is
adopted. Any existing handler for the target is deleted.
\c dispatchEvent() will invoke \p handler for any event for
\p target that doesn't have a type specific handler.
*/
virtual void adoptHandler(void* target, IEventJob* handler) = 0;
//! Register an event handler for an event type
/*!
Registers an event handler for \p type and \p target. The \p handler
is adopted. Any existing handler for the type,target pair is deleted.
\c dispatchEvent() will invoke \p handler for any event for \p target
of type \p type.
of type \p type. If no such handler exists it will use the handler
for \p target and type \p kUnknown if it exists.
*/
virtual void adoptHandler(CEvent::Type type,
void* target, IEventJob* handler) = 0;
//! Unregister an event handler
/*!
Unregisters an event handler for \p target and returns it.
Returns NULL if there was no such handler. The client becomes
responsible for deleting the returned handler.
*/
virtual IEventJob* orphanHandler(void* target) = 0;
//! Unregister an event handler for an event type
/*!
Unregisters an event handler for the \p type, \p target pair and
returns it. Returns NULL if there was no such handler. The
client becomes responsible for deleting the returned handler.
*/
virtual IEventJob* orphanHandler(CEvent::Type type, void* target) = 0;
//! Unregister an event handler
/*!
Unregisters an event handler for \p target and deletes it.
*/
virtual void removeHandler(void* target) = 0;
//! Unregister an event handler for an event type
/*!
Unregisters an event handler for the \p type, \p target pair and
@ -162,6 +132,12 @@ public:
*/
virtual void removeHandler(CEvent::Type type, void* target) = 0;
//! Unregister all event handlers for an event target
/*!
Unregisters all event handlers for the \p target and deletes them.
*/
virtual void removeHandlers(void* target) = 0;
//! Creates a new event type
/*!
Returns a unique event type id.
@ -192,9 +168,8 @@ public:
//! Get an event handler
/*!
Finds and returns the event handler for the \p type, \p target pair.
If there is no such handler, returns the handler for \p target. If
that doesn't exist, returns NULL.
Finds and returns the event handler for the \p type, \p target pair
if it exists, otherwise it returns NULL.
*/
virtual IEventJob* getHandler(CEvent::Type type, void* target) const = 0;

View File

@ -13,6 +13,8 @@
*/
#include "CStreamFilter.h"
#include "IEventQueue.h"
#include "TMethodEventJob.h"
//
// CStreamFilter
@ -22,11 +24,16 @@ CStreamFilter::CStreamFilter(IStream* stream, bool adoptStream) :
m_stream(stream),
m_adopted(adoptStream)
{
// do nothing
// replace handlers for m_stream
EVENTQUEUE->removeHandlers(m_stream->getEventTarget());
EVENTQUEUE->adoptHandler(CEvent::kUnknown, m_stream->getEventTarget(),
new TMethodEventJob<CStreamFilter>(this,
&CStreamFilter::handleUpstreamEvent));
}
CStreamFilter::~CStreamFilter()
{
EVENTQUEUE->removeHandler(CEvent::kUnknown, m_stream->getEventTarget());
if (m_adopted) {
delete m_stream;
}
@ -68,16 +75,10 @@ CStreamFilter::shutdownOutput()
getStream()->shutdownOutput();
}
void
CStreamFilter::setEventFilter(IEventJob* filter)
{
getStream()->setEventFilter(filter);
}
void*
CStreamFilter::getEventTarget() const
{
return getStream()->getEventTarget();
return const_cast<void*>(reinterpret_cast<const void*>(this));
}
bool
@ -92,14 +93,21 @@ CStreamFilter::getSize() const
return getStream()->getSize();
}
IEventJob*
CStreamFilter::getEventFilter() const
{
return getStream()->getEventFilter();
}
IStream*
CStreamFilter::getStream() const
{
return m_stream;
}
void
CStreamFilter::filterEvent(const CEvent& event)
{
EVENTQUEUE->dispatchEvent(CEvent(event.getType(),
getEventTarget(), event.getData()));
}
void
CStreamFilter::handleUpstreamEvent(const CEvent& event, void*)
{
filterEvent(event);
}

View File

@ -33,19 +33,17 @@ public:
~CStreamFilter();
// IStream overrides
// These all just forward to the underlying stream. Override as
// necessary.
// These all just forward to the underlying stream except getEventTarget.
// Override as necessary. getEventTarget returns a pointer to this.
virtual void close();
virtual UInt32 read(void* buffer, UInt32 n);
virtual void write(const void* buffer, UInt32 n);
virtual void flush();
virtual void shutdownInput();
virtual void shutdownOutput();
virtual void setEventFilter(IEventJob* filter);
virtual void* getEventTarget() const;
virtual bool isReady() const;
virtual UInt32 getSize() const;
virtual IEventJob* getEventFilter() const;
protected:
//! Get the stream
@ -54,6 +52,16 @@ protected:
*/
IStream* getStream() const;
//! Handle events from source stream
/*!
Does the event filtering. The default simply dispatches an event
identical except using this object as the event target.
*/
virtual void filterEvent(const CEvent&);
private:
void handleUpstreamEvent(const CEvent&, void*);
private:
IStream* m_stream;
bool m_adopted;

View File

@ -18,8 +18,6 @@
#include "IInterface.h"
#include "CEvent.h"
class IEventJob;
//! Bidirectional stream interface
/*!
Defines the interface for all streams.
@ -77,22 +75,14 @@ public:
*/
virtual void shutdownOutput() = 0;
//! Set the event filter
/*!
If not NULL, the \p filter is passed any event that would've been
added to the queue. The filter can discard the event, modify it
and add it to the queue, and add other events. The default filter
is NULL. The caller retains ownership of the filter.
*/
virtual void setEventFilter(IEventJob* filter) = 0;
//@}
//! @name accessors
//@{
//! Get event target
/*!
Returns the event target for events generated by this stream.
Returns the event target for events generated by this stream. It
should be the source stream in a chain of stream filters.
*/
virtual void* getEventTarget() const = 0;
@ -113,12 +103,6 @@ public:
*/
virtual UInt32 getSize() const = 0;
//! Get the event filter
/*!
Returns the current event filter.
*/
virtual IEventJob* getEventFilter() const = 0;
//! Get input ready event type
/*!
Returns the input ready event type. A stream sends this event

View File

@ -31,8 +31,7 @@
CTCPSocket::CTCPSocket() :
m_mutex(),
m_flushed(&m_mutex, true),
m_eventFilter(NULL)
m_flushed(&m_mutex, true)
{
try {
m_socket = ARCH->newSocket(IArchNetwork::kINET, IArchNetwork::kSTREAM);
@ -47,8 +46,7 @@ CTCPSocket::CTCPSocket() :
CTCPSocket::CTCPSocket(CArchSocket socket) :
m_mutex(),
m_socket(socket),
m_flushed(&m_mutex, true),
m_eventFilter(NULL)
m_flushed(&m_mutex, true)
{
assert(m_socket != NULL);
@ -92,7 +90,7 @@ CTCPSocket::close()
// clear buffers and enter disconnected state
if (m_connected) {
sendSocketEvent(getDisconnectedEvent());
sendEvent(getDisconnectedEvent());
}
onDisconnected();
@ -132,7 +130,7 @@ CTCPSocket::read(void* buffer, UInt32 n)
// if no more data and we cannot read or write then send disconnected
if (n > 0 && m_inputBuffer.getSize() == 0 && !m_readable && !m_writable) {
sendSocketEvent(getDisconnectedEvent());
sendEvent(getDisconnectedEvent());
m_connected = false;
}
@ -148,7 +146,7 @@ CTCPSocket::write(const void* buffer, UInt32 n)
// must not have shutdown output
if (!m_writable) {
sendStreamEvent(getOutputErrorEvent());
sendEvent(getOutputErrorEvent());
return;
}
@ -197,7 +195,7 @@ CTCPSocket::shutdownInput()
// shutdown buffer for reading
if (m_readable) {
sendStreamEvent(getInputShutdownEvent());
sendEvent(getInputShutdownEvent());
onInputShutdown();
useNewJob = true;
}
@ -224,7 +222,7 @@ CTCPSocket::shutdownOutput()
// shutdown buffer for writing
if (m_writable) {
sendStreamEvent(getOutputShutdownEvent());
sendEvent(getOutputShutdownEvent());
onOutputShutdown();
useNewJob = true;
}
@ -234,13 +232,6 @@ CTCPSocket::shutdownOutput()
}
}
void
CTCPSocket::setEventFilter(IEventJob* filter)
{
CLock lock(&m_mutex);
m_eventFilter = filter;
}
bool
CTCPSocket::isReady() const
{
@ -255,13 +246,6 @@ CTCPSocket::getSize() const
return m_inputBuffer.getSize();
}
IEventJob*
CTCPSocket::getEventFilter() const
{
CLock lock(&m_mutex);
return m_eventFilter;
}
void
CTCPSocket::connect(const CNetworkAddress& addr)
{
@ -276,7 +260,7 @@ CTCPSocket::connect(const CNetworkAddress& addr)
try {
if (ARCH->connectSocket(m_socket, addr.getAddress())) {
sendSocketEvent(getConnectedEvent());
sendEvent(getConnectedEvent());
onConnected();
}
else {
@ -357,12 +341,6 @@ CTCPSocket::newJob()
}
}
void
CTCPSocket::sendSocketEvent(CEvent::Type type)
{
EVENTQUEUE->addEvent(CEvent(type, getEventTarget(), NULL));
}
void
CTCPSocket::sendConnectionFailedEvent(const char* msg)
{
@ -374,15 +352,10 @@ CTCPSocket::sendConnectionFailedEvent(const char* msg)
}
void
CTCPSocket::sendStreamEvent(CEvent::Type type)
CTCPSocket::sendEvent(CEvent::Type type)
{
if (m_eventFilter != NULL) {
m_eventFilter->run(CEvent(type, getEventTarget(), NULL));
}
else {
EVENTQUEUE->addEvent(CEvent(type, getEventTarget(), NULL));
}
}
void
CTCPSocket::onConnected()
@ -455,7 +428,7 @@ CTCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
}
if (write) {
sendSocketEvent(getConnectedEvent());
sendEvent(getConnectedEvent());
onConnected();
return newJob();
}
@ -470,7 +443,7 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
CLock lock(&m_mutex);
if (error) {
sendSocketEvent(getDisconnectedEvent());
sendEvent(getDisconnectedEvent());
onDisconnected();
return newJob();
}
@ -488,7 +461,7 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
if (n > 0) {
m_outputBuffer.pop(n);
if (m_outputBuffer.getSize() == 0) {
sendStreamEvent(getOutputFlushedEvent());
sendEvent(getOutputFlushedEvent());
m_flushed = true;
m_flushed.broadcast();
needNewJob = true;
@ -499,9 +472,9 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
// remote read end of stream hungup. our output side
// has therefore shutdown.
onOutputShutdown();
sendStreamEvent(getOutputShutdownEvent());
sendEvent(getOutputShutdownEvent());
if (!m_readable && m_inputBuffer.getSize() == 0) {
sendSocketEvent(getDisconnectedEvent());
sendEvent(getDisconnectedEvent());
m_connected = false;
}
needNewJob = true;
@ -509,14 +482,14 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
catch (XArchNetworkDisconnected&) {
// stream hungup
onDisconnected();
sendSocketEvent(getDisconnectedEvent());
sendEvent(getDisconnectedEvent());
needNewJob = true;
}
catch (XArchNetwork&) {
// other write error
onDisconnected();
sendStreamEvent(getOutputErrorEvent());
sendSocketEvent(getDisconnectedEvent());
sendEvent(getOutputErrorEvent());
sendEvent(getDisconnectedEvent());
needNewJob = true;
}
}
@ -536,16 +509,16 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
// send input ready if input buffer was empty
if (wasEmpty) {
sendStreamEvent(getInputReadyEvent());
sendEvent(getInputReadyEvent());
}
}
else {
// remote write end of stream hungup. our input side
// has therefore shutdown but don't flush our buffer
// since there's still data to be read.
sendStreamEvent(getInputShutdownEvent());
sendEvent(getInputShutdownEvent());
if (!m_writable && m_inputBuffer.getSize() == 0) {
sendSocketEvent(getDisconnectedEvent());
sendEvent(getDisconnectedEvent());
m_connected = false;
}
m_readable = false;
@ -554,7 +527,7 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
}
catch (XArchNetworkDisconnected&) {
// stream hungup
sendSocketEvent(getDisconnectedEvent());
sendEvent(getDisconnectedEvent());
onDisconnected();
needNewJob = true;
}

View File

@ -46,10 +46,8 @@ public:
virtual void flush();
virtual void shutdownInput();
virtual void shutdownOutput();
virtual void setEventFilter(IEventJob* filter);
virtual bool isReady() const;
virtual UInt32 getSize() const;
virtual IEventJob* getEventFilter() const;
// IDataSocket overrides
virtual void connect(const CNetworkAddress&);
@ -59,9 +57,8 @@ private:
void setJob(ISocketMultiplexerJob*);
ISocketMultiplexerJob* newJob();
void sendSocketEvent(CEvent::Type);
void sendConnectionFailedEvent(const char*);
void sendStreamEvent(CEvent::Type);
void sendEvent(CEvent::Type);
void onConnected();
void onInputShutdown();
@ -84,7 +81,6 @@ private:
bool m_connected;
bool m_readable;
bool m_writable;
IEventJob* m_eventFilter;
};
#endif

View File

@ -79,10 +79,8 @@ public:
virtual void flush() = 0;
virtual void shutdownInput() = 0;
virtual void shutdownOutput() = 0;
virtual void setEventFilter(IEventJob* filter) = 0;
virtual bool isReady() const = 0;
virtual UInt32 getSize() const = 0;
virtual IEventJob* getEventFilter() const = 0;
private:
static CEvent::Type s_connectedEvent;

View File

@ -75,7 +75,12 @@ CClientListener::~CClientListener()
for (CNewClients::iterator index = m_newClients.begin();
index != m_newClients.end(); ++index) {
CClientProxyUnknown* client = *index;
EVENTQUEUE->removeHandler(client);
EVENTQUEUE->removeHandler(
CClientProxyUnknown::getSuccessEvent(), client);
EVENTQUEUE->removeHandler(
CClientProxyUnknown::getFailureEvent(), client);
EVENTQUEUE->removeHandler(
CClientProxy::getDisconnectedEvent(), client);
delete client;
}

View File

@ -24,19 +24,14 @@
CPacketStreamFilter::CPacketStreamFilter(IStream* stream, bool adoptStream) :
CStreamFilter(stream, adoptStream),
m_size(0),
m_eventFilter(NULL),
m_inputShutdown(false)
{
// install event filter
getStream()->setEventFilter(new TMethodEventJob<CPacketStreamFilter>(
this, &CPacketStreamFilter::filterEvent, NULL));
// do nothing
}
CPacketStreamFilter::~CPacketStreamFilter()
{
IEventJob* job = getStream()->getEventFilter();
getStream()->setEventFilter(NULL);
delete job;
// do nothing
}
void
@ -79,7 +74,8 @@ CPacketStreamFilter::read(void* buffer, UInt32 n)
readPacketSize();
if (m_inputShutdown && m_size == 0) {
sendEvent(CEvent(getInputShutdownEvent(), getEventTarget(), NULL));
EVENTQUEUE->addEvent(CEvent(getInputShutdownEvent(),
getEventTarget(), NULL));
}
return n;
@ -109,13 +105,6 @@ CPacketStreamFilter::shutdownInput()
CStreamFilter::shutdownInput();
}
void
CPacketStreamFilter::setEventFilter(IEventJob* filter)
{
CLock lock(&m_mutex);
m_eventFilter = filter;
}
bool
CPacketStreamFilter::isReady() const
{
@ -130,13 +119,6 @@ CPacketStreamFilter::getSize() const
return isReadyNoLock() ? m_size : 0;
}
IEventJob*
CPacketStreamFilter::getEventFilter() const
{
CLock lock(&m_mutex);
return m_eventFilter;
}
bool
CPacketStreamFilter::isReadyNoLock() const
{
@ -159,11 +141,9 @@ CPacketStreamFilter::readPacketSize()
}
}
void
bool
CPacketStreamFilter::readMore()
{
// note -- m_mutex must be locked on entry
// note if we have whole packet
bool wasReady = isReadyNoLock();
@ -184,40 +164,27 @@ CPacketStreamFilter::readMore()
// if we weren't ready before but now we are then send a
// input ready event apparently from the filtered stream.
if (wasReady != isReady) {
sendEvent(CEvent(getInputReadyEvent(), getEventTarget(), NULL));
}
return (wasReady != isReady);
}
void
CPacketStreamFilter::sendEvent(const CEvent& event)
CPacketStreamFilter::filterEvent(const CEvent& event)
{
if (m_eventFilter != NULL) {
m_eventFilter->run(event);
}
else {
EVENTQUEUE->addEvent(event);
}
}
void
CPacketStreamFilter::filterEvent(const CEvent& event, void*)
{
CLock lock(&m_mutex);
if (event.getType() == getInputReadyEvent()) {
readMore();
CLock lock(&m_mutex);
if (!readMore()) {
return;
}
}
else if (event.getType() == getInputShutdownEvent()) {
// discard this if we have buffered data
CLock lock(&m_mutex);
m_inputShutdown = true;
if (m_size == 0) {
sendEvent(CEvent(getInputShutdownEvent(), getEventTarget(), NULL));
}
if (m_size != 0) {
return;
}
}
// pass event
sendEvent(event);
CStreamFilter::filterEvent(event);
}

View File

@ -33,24 +33,22 @@ public:
virtual UInt32 read(void* buffer, UInt32 n);
virtual void write(const void* buffer, UInt32 n);
virtual void shutdownInput();
virtual void setEventFilter(IEventJob* filter);
virtual bool isReady() const;
virtual UInt32 getSize() const;
virtual IEventJob* getEventFilter() const;
protected:
// CStreamFilter overrides
virtual void filterEvent(const CEvent&);
private:
bool isReadyNoLock() const;
void readPacketSize();
void readMore();
void sendEvent(const CEvent&);
void filterEvent(const CEvent&, void*);
bool readMore();
private:
CMutex m_mutex;
UInt32 m_size;
CStreamBuffer m_buffer;
IEventJob* m_eventFilter;
bool m_inputShutdown;
};