Fix infinite loop on fast TCP disconnection

The commit a841b28 changed the condition for removing job from processing.
New flag MultiplexerJobStatus::continue_servicing become used
instead of checking pointer for NULL.
However for cases when TCPSocket::newJob() returns nullptr
the behaviour changed: earlier the job was removed, but after change
it is called again, since MultiplexerJobStatus equal to {true, nullptr}
means "run this job again".

This leads to problem with eating CPU and RAM on linux
https://github.com/debauchee/barrier/issues/470

There is similar windows problem, but not sure it is related.
https://github.com/debauchee/barrier/issues/552

Since it looks that the goal of a841b28 was only clarifying
object ownership and not changing job deletion behaviour,
this commit tries to get original behaviour and fix the bugs above
by returning {false, nullptr} instead of {true, nullptr}
when TCPSocket::newJob() returns nullptr.
This commit is contained in:
Vasily Galkin 2020-02-09 23:27:26 +03:00
parent 170a271737
commit c79120c049
3 changed files with 17 additions and 15 deletions

View File

@ -761,7 +761,7 @@ MultiplexerJobStatus SecureSocket::serviceConnect(ISocketMultiplexerJob* job,
// If status > 0, success
if (status > 0) {
sendEvent(m_events->forIDataSocket().secureConnected());
return {true, newJob()};
return newJobOrStopServicing();
}
// Retry case
@ -793,7 +793,7 @@ MultiplexerJobStatus SecureSocket::serviceAccept(ISocketMultiplexerJob* job,
// If status > 0, success
if (status > 0) {
sendEvent(m_events->forClientListener().accepted());
return {true, newJob()};
return newJobOrStopServicing();
}
// Retry case

View File

@ -403,6 +403,15 @@ void TCPSocket::setJob(std::unique_ptr<ISocketMultiplexerJob>&& job)
}
}
MultiplexerJobStatus TCPSocket::newJobOrStopServicing()
{
auto new_job = newJob();
if (new_job)
return {true, std::move(new_job)};
else
return {false, {}};
}
std::unique_ptr<ISocketMultiplexerJob> TCPSocket::newJob()
{
// note -- must have m_mutex locked on entry
@ -519,22 +528,14 @@ MultiplexerJobStatus TCPSocket::serviceConnecting(ISocketMultiplexerJob* job, bo
catch (XArchNetwork& e) {
sendConnectionFailedEvent(e.what());
onDisconnected();
auto new_job = newJob();
if (new_job)
return {true, std::move(new_job)};
else
return {false, {}};
return newJobOrStopServicing();
}
}
if (write) {
sendEvent(m_events->forIDataSocket().connected());
onConnected();
auto new_job = newJob();
if (new_job)
return {true, std::move(new_job)};
else
return {false, {}};
return newJobOrStopServicing();
}
return {true, {}};
@ -548,7 +549,7 @@ MultiplexerJobStatus TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
if (error) {
sendEvent(m_events->forISocket().disconnected());
onDisconnected();
return {true, newJob()};
return newJobOrStopServicing();
}
EJobResult writeResult = kRetry;
@ -603,7 +604,7 @@ MultiplexerJobStatus TCPSocket::serviceConnected(ISocketMultiplexerJob* job,
if (writeResult == kBreak || readResult == kBreak) {
return {false, {}};
} else if (writeResult == kNew || readResult == kNew) {
return {true, newJob()};
return newJobOrStopServicing();
} else {
return {true, {}};
}

View File

@ -76,7 +76,8 @@ protected:
void removeJob();
void setJob(std::unique_ptr<ISocketMultiplexerJob>&& job);
MultiplexerJobStatus newJobOrStopServicing();
bool isReadable() { return m_readable; }
bool isWritable() { return m_writable; }