Refactor write and read into functions

This commit is contained in:
Jerry (Xinyu Hou) 2016-08-24 17:09:42 +01:00 committed by Andrew Nelless
parent 436e333f6f
commit 61b489ab3d
2 changed files with 185 additions and 162 deletions

View File

@ -323,6 +323,179 @@ TCPSocket::init()
}
}
TCPSocket::EJobResult
TCPSocket::doRead()
{
try {
static UInt8 buffer[4096];
memset(buffer, 0, sizeof(buffer));
int bytesRead = 0;
int status = 0;
if (isSecure()) {
if (isSecureReady()) {
status = secureRead(buffer, sizeof(buffer), bytesRead);
if (status < 0) {
return kBreak;
}
else if (status == 0) {
return kNew;
}
}
else {
return kRetry;
}
}
else {
bytesRead = (int) ARCH->readSocket(m_socket, buffer, sizeof(buffer));
}
if (bytesRead > 0) {
bool wasEmpty = (m_inputBuffer.getSize() == 0);
// slurp up as much as possible
do {
m_inputBuffer.write(buffer, bytesRead);
if (isSecure() && isSecureReady()) {
status = secureRead(buffer, sizeof(buffer), bytesRead);
if (status < 0) {
return kBreak;
}
}
else {
bytesRead = (int) ARCH->readSocket(m_socket, buffer, sizeof(buffer));
}
} while (bytesRead > 0 || status > 0);
// send input ready if input buffer was empty
if (wasEmpty) {
sendEvent(m_events->forIStream().inputReady());
}
}
else {
// remote write end of stream hungup. our input side
// has therefore shutdown but don't flush our buffer
// since there's still data to be read.
sendEvent(m_events->forIStream().inputShutdown());
if (!m_writable && m_inputBuffer.getSize() == 0) {
sendEvent(m_events->forISocket().disconnected());
m_connected = false;
}
m_readable = false;
return kNew;
}
}
catch (XArchNetworkDisconnected&) {
// stream hungup
sendEvent(m_events->forISocket().disconnected());
onDisconnected();
return kNew;
}
catch (XArchNetwork& e) {
// ignore other read error
LOG((CLOG_WARN "error reading socket: %s", e.what()));
}
return kRetry;
}
TCPSocket::EJobResult
TCPSocket::doWrite()
{
static bool s_retry = false;
static int s_retrySize = 0;
static void* s_staticBuffer = NULL;
try {
// write data
int bufferSize = 0;
int bytesWrote = 0;
int status = 0;
if (s_retry) {
bufferSize = s_retrySize;
}
else {
bufferSize = m_outputBuffer.getSize();
s_staticBuffer = malloc(bufferSize);
memcpy(s_staticBuffer, m_outputBuffer.peek(bufferSize), bufferSize);
}
if (bufferSize == 0) {
return kRetry;
}
if (isSecure()) {
if (isSecureReady()) {
status = secureWrite(s_staticBuffer, bufferSize, bytesWrote);
if (status > 0) {
s_retry = false;
bufferSize = 0;
free(s_staticBuffer);
s_staticBuffer = NULL;
}
else if (status < 0) {
return kBreak;
}
else if (status == 0) {
s_retry = true;
s_retrySize = bufferSize;
return kNew;
}
}
else {
return kRetry;
}
}
else {
bytesWrote = (UInt32)ARCH->writeSocket(m_socket, s_staticBuffer, bufferSize);
bufferSize = 0;
free(s_staticBuffer);
s_staticBuffer = NULL;
}
// discard written data
if (bytesWrote > 0) {
m_outputBuffer.pop(bytesWrote);
if (m_outputBuffer.getSize() == 0) {
sendEvent(m_events->forIStream().outputFlushed());
m_flushed = true;
m_flushed.broadcast();
return kNew;
}
}
}
catch (XArchNetworkShutdown&) {
// remote read end of stream hungup. our output side
// has therefore shutdown.
onOutputShutdown();
sendEvent(m_events->forIStream().outputShutdown());
if (!m_readable && m_inputBuffer.getSize() == 0) {
sendEvent(m_events->forISocket().disconnected());
m_connected = false;
}
return kNew;
}
catch (XArchNetworkDisconnected&) {
// stream hungup
onDisconnected();
sendEvent(m_events->forISocket().disconnected());
return kNew;
}
catch (XArchNetwork& e) {
// other write error
LOG((CLOG_WARN "error writing socket: %s", e.what()));
onDisconnected();
sendEvent(m_events->forIStream().outputError());
sendEvent(m_events->forISocket().disconnected());
return kNew;
}
return kRetry;
}
void
TCPSocket::setJob(ISocketMultiplexerJob* job)
{
@ -468,172 +641,14 @@ TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
return newJob();
}
bool needNewJob = false;
static bool s_retry = false;
static int s_retrySize = 0;
static void* s_staticBuffer = NULL;
EJobResult result = kRetry;
if (write) {
try {
// write data
int bufferSize = 0;
int bytesWrote = 0;
int status = 0;
if (s_retry) {
bufferSize = s_retrySize;
}
else {
bufferSize = m_outputBuffer.getSize();
s_staticBuffer = malloc(bufferSize);
memcpy(s_staticBuffer, m_outputBuffer.peek(bufferSize), bufferSize);
}
if (bufferSize == 0) {
return job;
}
if (isSecure()) {
if (isSecureReady()) {
status = secureWrite(s_staticBuffer, bufferSize, bytesWrote);
if (status > 0) {
s_retry = false;
bufferSize = 0;
free(s_staticBuffer);
s_staticBuffer = NULL;
}
else if (status < 0) {
return NULL;
}
else if (status == 0) {
s_retry = true;
s_retrySize = bufferSize;
return newJob();
}
}
else {
return job;
}
}
else {
bytesWrote = (UInt32)ARCH->writeSocket(m_socket, s_staticBuffer, bufferSize);
bufferSize = 0;
free(s_staticBuffer);
s_staticBuffer = NULL;
}
// discard written data
if (bytesWrote > 0) {
m_outputBuffer.pop(bytesWrote);
if (m_outputBuffer.getSize() == 0) {
sendEvent(m_events->forIStream().outputFlushed());
m_flushed = true;
m_flushed.broadcast();
needNewJob = true;
}
}
}
catch (XArchNetworkShutdown&) {
// remote read end of stream hungup. our output side
// has therefore shutdown.
onOutputShutdown();
sendEvent(m_events->forIStream().outputShutdown());
if (!m_readable && m_inputBuffer.getSize() == 0) {
sendEvent(m_events->forISocket().disconnected());
m_connected = false;
}
needNewJob = true;
}
catch (XArchNetworkDisconnected&) {
// stream hungup
onDisconnected();
sendEvent(m_events->forISocket().disconnected());
needNewJob = true;
}
catch (XArchNetwork& e) {
// other write error
LOG((CLOG_WARN "error writing socket: %s", e.what()));
onDisconnected();
sendEvent(m_events->forIStream().outputError());
sendEvent(m_events->forISocket().disconnected());
needNewJob = true;
}
result = doWrite();
}
if (read && m_readable) {
try {
static UInt8 buffer[4096];
memset(buffer, 0, sizeof(buffer));
int bytesRead = 0;
int status = 0;
if (isSecure()) {
if (isSecureReady()) {
status = secureRead(buffer, sizeof(buffer), bytesRead);
if (status < 0) {
return NULL;
}
else if (status == 0) {
return newJob();
}
}
else {
return job;
}
}
else {
bytesRead = (int) ARCH->readSocket(m_socket, buffer, sizeof(buffer));
}
if (bytesRead > 0) {
bool wasEmpty = (m_inputBuffer.getSize() == 0);
// slurp up as much as possible
do {
m_inputBuffer.write(buffer, bytesRead);
if (isSecure() && isSecureReady()) {
status = secureRead(buffer, sizeof(buffer), bytesRead);
if (status < 0) {
return NULL;
}
}
else {
bytesRead = (int) ARCH->readSocket(m_socket, buffer, sizeof(buffer));
}
} while (bytesRead > 0 || status > 0);
// send input ready if input buffer was empty
if (wasEmpty) {
sendEvent(m_events->forIStream().inputReady());
}
}
else {
// remote write end of stream hungup. our input side
// has therefore shutdown but don't flush our buffer
// since there's still data to be read.
sendEvent(m_events->forIStream().inputShutdown());
if (!m_writable && m_inputBuffer.getSize() == 0) {
sendEvent(m_events->forISocket().disconnected());
m_connected = false;
}
m_readable = false;
needNewJob = true;
}
}
catch (XArchNetworkDisconnected&) {
// stream hungup
sendEvent(m_events->forISocket().disconnected());
onDisconnected();
needNewJob = true;
}
catch (XArchNetwork& e) {
// ignore other read error
LOG((CLOG_WARN "error reading socket: %s", e.what()));
}
result = doRead();
}
return needNewJob ? newJob() : job;
return result == kBreak ? NULL : result == kNew ? newJob() : job;
}

View File

@ -66,12 +66,20 @@ public:
virtual void setFingerprintFilename(String& f) {}
protected:
enum EJobResult {
kBreak = -1, //!< Break the Job chain
kRetry, //!< Retry the same job
kNew //!< Require a new job
};
ArchSocket getSocket() { return m_socket; }
IEventQueue* getEvents() { return m_events; }
virtual bool isSecureReady() { return false; }
virtual bool isSecure() { return false; }
virtual int secureRead(void* buffer, int, int& ) { return 0; }
virtual int secureWrite(const void*, int, int& ) { return 0; }
virtual EJobResult doRead();
virtual EJobResult doWrite();
void setJob(ISocketMultiplexerJob*);